You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/09/22 15:21:47 UTC

[incubator-wayang] 06/15: [WAYANG-31] split DataQuantaBuilder class on several class

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch WAYANG-28
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 5a4815c3ae7cecc0df880f8844613431027d1072
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Tue May 11 14:27:30 2021 -0400

    [WAYANG-31] split DataQuantaBuilder class on several class
---
 .../org/apache/wayang/api/DataQuantaBuilder.scala  | 1268 +-------------------
 .../org/apache/wayang/api/JavaPlanBuilder.scala    |    2 +-
 .../wayang/api/RecordDataQuantaBuilder.scala       |    1 +
 .../dataquantabuilder/BasicDataQuantaBuilder.scala |  152 +++
 .../CartesianDataQuantaBuilder.scala               |   43 +
 .../CoGroupDataQuantaBuilder.scala                 |  142 +++
 .../dataquantabuilder/CountDataQuantaBuilder.scala |   40 +
 .../CustomOperatorDataQuantaBuilder.scala          |   52 +
 .../DistinctDataQuantaBuilder.scala                |   39 +
 .../DoWhileDataQuantaBuilder.scala                 |  126 ++
 .../dataquantabuilder/FakeDataQuantaBuilder.scala  |   44 +
 .../FilterDataQuantaBuilder.scala                  |  102 ++
 .../FlatMapDataQuantaBuilder.scala                 |   91 ++
 .../GlobalGroupDataQuantaBuilder.scala             |   34 +
 .../GlobalReduceDataQuantaBuilder.scala            |   68 ++
 .../GroupByDataQuantaBuilder.scala                 |   76 ++
 .../IntersectDataQuantaBuilder.scala               |   39 +
 .../dataquantabuilder/JoinDataQuantaBuilder.scala  |  153 +++
 .../dataquantabuilder/KeyedDataQuantaBuilder.scala |   50 +
 .../LoadCollectionDataQuantaBuilder.scala          |   49 +
 .../dataquantabuilder/MapDataQuantaBuilder.scala   |   69 ++
 .../MapPartitionsDataQuantaBuilder.scala           |   93 ++
 .../ProjectionDataQuantaBuilder.scala              |   37 +
 .../ReduceByDataQuantaBuilder.scala                |   94 ++
 .../RepeatDataQuantaBuilder.scala                  |   51 +
 .../SampleDataQuantaBuilder.scala                  |   92 ++
 .../dataquantabuilder/SortDataQuantaBuilder.scala  |   97 ++
 .../UnarySourceDataQuantaBuilder.scala             |   37 +
 .../dataquantabuilder/UnionDataQuantaBuilder.scala |   39 +
 .../ZipWithIdDataQuantaBuilder.scala               |   41 +
 .../wayang/api/graph/EdgeDataQuantaBuilder.scala   |    3 +-
 .../java/org/apache/wayang/api/JavaApiTest.java    |    2 +
 .../java/org/apache/wayang/tests/RegressionIT.java |    4 +-
 33 files changed, 1960 insertions(+), 1270 deletions(-)

diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
index a590795..c6e5dcb 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
@@ -21,8 +21,8 @@ package org.apache.wayang.api
 
 import java.util.function.{Consumer, IntUnaryOperator, Function => JavaFunction}
 import java.util.{Collection => JavaCollection}
-
 import de.hpi.isg.profiledb.store.model.Experiment
+import org.apache.wayang.api.dataquantabuilder.{BasicDataQuantaBuilder, CartesianDataQuantaBuilder, CoGroupDataQuantaBuilder, CountDataQuantaBuilder, CustomOperatorDataQuantaBuilder, DistinctDataQuantaBuilder, DoWhileDataQuantaBuilder, FilterDataQuantaBuilder, FlatMapDataQuantaBuilder, GlobalGroupDataQuantaBuilder, GlobalReduceDataQuantaBuilder, GroupByDataQuantaBuilder, IntersectDataQuantaBuilder, JoinDataQuantaBuilder, KeyedDataQuantaBuilder, MapDataQuantaBuilder, MapPartitionsDataQuan [...]
 import org.apache.wayang.api.graph.{Edge, EdgeDataQuantaBuilder, EdgeDataQuantaBuilderDecorator}
 import org.apache.wayang.api.util.{DataQuantaBuilderCache, TypeTrap}
 import org.apache.wayang.basic.data.{Record, Tuple2 => RT2}
@@ -31,7 +31,7 @@ import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunctio
 import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
 import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
 import org.apache.wayang.core.optimizer.costs.{LoadEstimator, LoadProfile, LoadProfileEstimator}
-import org.apache.wayang.core.plan.wayangplan.{Operator, OutputSlot, WayangPlan, UnarySource}
+import org.apache.wayang.core.plan.wayangplan.{Operator, OutputSlot, UnarySource, WayangPlan}
 import org.apache.wayang.core.platform.Platform
 import org.apache.wayang.core.types.DataSetType
 import org.apache.wayang.core.util.{Logging, ReflectionUtils, WayangCollections, Tuple => WayangTuple}
@@ -438,1267 +438,3 @@ trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging
   protected[api] def dataQuanta(): DataQuanta[Out]
 
 }
-
-/**
-  * Abstract base class for builders of [[DataQuanta]]. The purpose of the builders is to provide a convenient
-  * Java API for Wayang that compensates for lacking default and named arguments.
-  */
-abstract class BasicDataQuantaBuilder[This <: DataQuantaBuilder[_, Out], Out](implicit _javaPlanBuilder: JavaPlanBuilder)
-  extends Logging with DataQuantaBuilder[This, Out] {
-
-  /**
-    * Lazy-initialized. The [[DataQuanta]] product of this builder.
-    */
-  private var result: DataQuanta[Out] = _
-
-  /**
-    * A name for the [[DataQuanta]] to be built.
-    */
-  private var name: String = _
-
-  /**
-    * An [[Experiment]] for the [[DataQuanta]] to be built.
-    */
-  private var experiment: Experiment = _
-
-  /**
-    * Broadcasts for the [[DataQuanta]] to be built.
-    */
-  private val broadcasts: ListBuffer[(String, DataQuantaBuilder[_, _])] = ListBuffer()
-
-  /**
-    * [[CardinalityEstimator]] for the [[DataQuanta]] to be built.
-    */
-  private var cardinalityEstimator: CardinalityEstimator = _
-
-  /**
-    * Target [[Platform]]s for the [[DataQuanta]] to be built.
-    */
-  private val targetPlatforms: ListBuffer[Platform] = ListBuffer()
-
-  /**
-    * Paths of UDF JAR files for the [[DataQuanta]] to be built.
-    */
-  private val udfJars: ListBuffer[String] = ListBuffer()
-
-  /**
-    * The type of the [[DataQuanta]] to be built.
-    */
-  protected[api] val outputTypeTrap = getOutputTypeTrap
-
-  /**
-    * Retrieve an intialization value for [[outputTypeTrap]].
-    *
-    * @return the [[TypeTrap]]
-    */
-  protected def getOutputTypeTrap = new TypeTrap
-
-  override protected[api] implicit def javaPlanBuilder = _javaPlanBuilder
-
-  override def withName(name: String): This = {
-    this.name = name
-    this.asInstanceOf[This]
-  }
-
-  override def withExperiment(experiment: Experiment): This = {
-    this.experiment = experiment
-    this.asInstanceOf[This]
-  }
-
-  override def withOutputType(outputType: DataSetType[Out]): This = {
-    this.outputTypeTrap.dataSetType = outputType
-    this.asInstanceOf[This]
-  }
-
-  override def withOutputClass(cls: Class[Out]): This = this.withOutputType(DataSetType.createDefault(cls))
-
-  override def withBroadcast[Sender <: DataQuantaBuilder[_, _]](sender: Sender, broadcastName: String): This = {
-    this.broadcasts += Tuple2(broadcastName, sender)
-    this.asInstanceOf[This]
-  }
-
-  override def withCardinalityEstimator(cardinalityEstimator: CardinalityEstimator): This = {
-    this.cardinalityEstimator = cardinalityEstimator
-    this.asInstanceOf[This]
-  }
-
-  override def withTargetPlatform(platform: Platform): This = {
-    this.targetPlatforms += platform
-    this.asInstanceOf[This]
-  }
-
-  def withUdfJarOf(cls: Class[_]): This = this.withUdfJar(ReflectionUtils.getDeclaringJar(cls))
-
-  override def withUdfJar(path: String): This = {
-    this.udfJars += path
-    this.asInstanceOf[This]
-  }
-
-  override protected[api] implicit def classTag: ClassTag[Out] = ClassTag(outputTypeTrap.typeClass)
-
-  override protected[api] def dataQuanta(): DataQuanta[Out] = {
-    if (this.result == null) {
-      this.result = this.build
-      if (this.name != null) this.result.withName(this.name)
-      if (this.cardinalityEstimator != null) this.result.withCardinalityEstimator(this.cardinalityEstimator)
-      if (this.experiment != null) this.result.withExperiment(experiment)
-      this.result.withUdfJars(this.udfJars: _*)
-      this.result.withTargetPlatforms(this.targetPlatforms: _*)
-      this.broadcasts.foreach {
-        case (broadcastName, senderBuilder) => this.result.withBroadcast(senderBuilder.dataQuanta(), broadcastName)
-      }
-    }
-    this.result
-  }
-
-  /**
-    * Create the [[DataQuanta]] built by this instance. Note the configuration being done in [[dataQuanta()]].
-    *
-    * @return the created and partially configured [[DataQuanta]]
-    */
-  protected def build: DataQuanta[Out]
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.core.plan.wayangplan.UnarySource]]s.
-  *
-  * @param source          the [[UnarySource]]
-  * @param javaPlanBuilder the [[JavaPlanBuilder]]
-  */
-class UnarySourceDataQuantaBuilder[This <: DataQuantaBuilder[_, Out], Out](source: UnarySource[Out])
-                                                                          (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[This, Out] {
-
-  override protected def build: DataQuanta[Out] = javaPlanBuilder.planBuilder.load(source)(this.classTag)
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CollectionSource]]s.
-  *
-  * @param collection      the [[JavaCollection]] to be loaded
-  * @param javaPlanBuilder the [[JavaPlanBuilder]]
-  */
-class LoadCollectionDataQuantaBuilder[Out](collection: JavaCollection[Out])(implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[LoadCollectionDataQuantaBuilder[Out], Out] {
-
-  // Try to infer the type class from the collection.
-  locally {
-    if (!collection.isEmpty) {
-      val any = WayangCollections.getAny(collection)
-      if (any != null) {
-        this.outputTypeTrap.dataSetType = DataSetType.createDefault(any.getClass)
-      }
-    }
-  }
-
-  override protected def build: DataQuanta[Out] = javaPlanBuilder.planBuilder.loadCollection(collection)(this.classTag)
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s.
-  *
-  * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  * @param udf             UDF for the [[MapOperator]]
-  */
-class MapDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In],
-                                    udf: SerializableFunction[In, Out])
-                                   (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[MapDataQuantaBuilder[In, Out], Out] {
-
-  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
-  private var udfLoadProfileEstimator: LoadProfileEstimator = _
-
-  // Try to infer the type classes from the udf.
-  locally {
-    val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
-    parameters.get("Input") match {
-      case cls: Class[In] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
-      case _ => logger.warn("Could not infer types from {}.", udf)
-    }
-    parameters.get("Output") match {
-      case cls: Class[Out] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
-      case _ => logger.warn("Could not infer types from {}.", udf)
-    }
-  }
-
-  /**
-    * Set a [[LoadProfileEstimator]] for the load of the UDF.
-    *
-    * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
-    * @return this instance
-    */
-  def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
-    this.udfLoadProfileEstimator = udfLoadProfileEstimator
-    this
-  }
-
-  override protected def build = inputDataQuanta.dataQuanta().mapJava(udf, this.udfLoadProfileEstimator)
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s with
-  * [[org.apache.wayang.basic.function.ProjectionDescriptor]]s.
-  *
-  * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  * @param fieldNames      field names for the [[org.apache.wayang.basic.function.ProjectionDescriptor]]
-  */
-class ProjectionDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In], fieldNames: Array[String])
-                                          (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[ProjectionDataQuantaBuilder[In, Out], Out] {
-
-  override protected def build = inputDataQuanta.dataQuanta().project(fieldNames.toSeq)
-
-}
-
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s.
-  *
-  * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  * @param udf             UDF for the [[MapOperator]]
-  */
-class FilterDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], udf: SerializablePredicate[T])
-                                (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[FilterDataQuantaBuilder[T], T] {
-
-  // Reuse the input TypeTrap to enforce type equality between input and output.
-  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
-  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
-  private var udfLoadProfileEstimator: LoadProfileEstimator = _
-
-  /** Selectivity of the filter predicate. */
-  private var selectivity: ProbabilisticDoubleInterval = _
-
-  /** SQL UDF implementing the filter predicate. */
-  private var sqlUdf: String = _
-
-  // Try to infer the type classes from the udf.
-  locally {
-    val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
-    parameters.get("Input") match {
-      case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
-      case _ => logger.warn("Could not infer types from {}.", udf)
-    }
-  }
-
-
-  /**
-    * Set a [[LoadProfileEstimator]] for the load of the UDF.
-    *
-    * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
-    * @return this instance
-    */
-  def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
-    this.udfLoadProfileEstimator = udfLoadProfileEstimator
-    this
-  }
-
-  /**
-    * Add a SQL implementation of the UDF.
-    *
-    * @param sqlUdf a SQL condition that can be plugged into a `WHERE` clause
-    * @return this instance
-    */
-  def withSqlUdf(sqlUdf: String) = {
-    this.sqlUdf = sqlUdf
-    this
-  }
-
-  /**
-    * Specify the selectivity of the UDF.
-    *
-    * @param lowerEstimate the lower bound of the expected selectivity
-    * @param upperEstimate the upper bound of the expected selectivity
-    * @param confidence    the probability of the actual selectivity being within these bounds
-    * @return this instance
-    */
-  def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = {
-    this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence)
-    this
-  }
-
-  override protected def build = inputDataQuanta.dataQuanta().filterJava(
-    udf, this.sqlUdf, this.selectivity, this.udfLoadProfileEstimator
-  )
-
-}
-
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.SortOperator]]s.
-  *
-  * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  * @param keyUdf             UDF for the [[org.apache.wayang.basic.operators.SortOperator]]
-  */
-class SortDataQuantaBuilder[T, Key](inputDataQuanta: DataQuantaBuilder[_, T],
-                                    keyUdf: SerializableFunction[T, Key])
-                                   (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[SortDataQuantaBuilder[T, Key], T] {
-
-  // Reuse the input TypeTrap to enforce type equality between input and output.
-  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
-  /** [[ClassTag]] or surrogate of [[Key]] */
-  implicit var keyTag: ClassTag[Key] = _
-
-  /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf]]. */
-  private var keyUdfCpuEstimator: LoadEstimator = _
-
-  /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf]]. */
-  private var keyUdfRamEstimator: LoadEstimator = _
-
-
-  // Try to infer the type classes from the UDFs.
-  locally {
-    val parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]])
-    parameters.get("Input") match {
-      case cls: Class[T] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
-      case _ => logger.warn("Could not infer types from {}.", keyUdf)
-    }
-
-    this.keyTag = parameters.get("Output") match {
-      case cls: Class[Key] => ClassTag(cls)
-      case _ =>
-        logger.warn("Could not infer types from {}.", keyUdf)
-        ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
-    }
-  }
-
-
-  /**
-    * Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
-    *
-    * @param udfCpuEstimator the [[LoadEstimator]]
-    * @return this instance
-    */
-  def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
-    this.keyUdfCpuEstimator = udfCpuEstimator
-    this
-  }
-
-  /**
-    * Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
-    *
-    * @param udfRamEstimator the [[LoadEstimator]]
-    * @return this instance
-    */
-  def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
-    this.keyUdfRamEstimator = udfRamEstimator
-    this
-  }
-
-  override protected def build =
-    inputDataQuanta.dataQuanta().sortJava(keyUdf)(this.keyTag)
-
-}
-
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.FlatMapOperator]]s.
-  *
-  * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  * @param udf             UDF for the [[org.apache.wayang.basic.operators.FlatMapOperator]]
-  */
-class FlatMapDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In],
-                                        udf: SerializableFunction[In, java.lang.Iterable[Out]])
-                                       (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[FlatMapDataQuantaBuilder[In, Out], Out] {
-
-
-  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
-  private var udfLoadProfileEstimator: LoadProfileEstimator = _
-
-  /** Selectivity of the filter predicate. */
-  private var selectivity: ProbabilisticDoubleInterval = _
-
-  // Try to infer the type classes from the udf.
-  locally {
-    val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
-    parameters.get("Input") match {
-      case cls: Class[In] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
-      case _ => logger.warn("Could not infer types from {}.", udf)
-    }
-    val originalClass = ReflectionUtils.getWrapperClass(parameters.get("Output"), 0)
-    originalClass match {
-      case cls: Class[Out] => {
-        this.outputTypeTrap.dataSetType= DataSetType.createDefault(cls)
-      }
-      case _ => logger.warn("Could not infer types from {}.", udf)
-    }
-  }
-
-  /**
-    * Set a [[LoadProfileEstimator]] for the load of the UDF.
-    *
-    * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
-    * @return this instance
-    */
-  def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
-    this.udfLoadProfileEstimator = udfLoadProfileEstimator
-    this
-  }
-
-  /**
-    * Specify the selectivity of the UDF.
-    *
-    * @param lowerEstimate the lower bound of the expected selectivity
-    * @param upperEstimate the upper bound of the expected selectivity
-    * @param confidence    the probability of the actual selectivity being within these bounds
-    * @return this instance
-    */
-  def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = {
-    this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence)
-    this
-  }
-
-  override protected def build = inputDataQuanta.dataQuanta().flatMapJava(
-    udf, this.selectivity, this.udfLoadProfileEstimator
-  )
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapPartitionsOperator]]s.
-  *
-  * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  * @param udf             UDF for the [[org.apache.wayang.basic.operators.MapPartitionsOperator]]
-  */
-class MapPartitionsDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In],
-                                              udf: SerializableFunction[java.lang.Iterable[In], java.lang.Iterable[Out]])
-                                             (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[MapPartitionsDataQuantaBuilder[In, Out], Out] {
-
-  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
-  private var udfLoadProfileEstimator: LoadProfileEstimator = _
-
-  /** Selectivity of the filter predicate. */
-  private var selectivity: ProbabilisticDoubleInterval = _
-
-  // Try to infer the type classes from the udf.
-  locally {
-    val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
-    parameters.get("Input") match {
-      case cls: Class[In] => {
-        inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
-      }
-      case _ => logger.warn("Could not infer types from {}.", udf)
-    }
-    val originalClass = ReflectionUtils.getWrapperClass(parameters.get("Output"), 0)
-    originalClass match {
-      case cls: Class[Out] => {
-        this.outputTypeTrap.dataSetType= DataSetType.createDefault(cls)
-      }
-      case _ => logger.warn("Could not infer types from {}.", udf)
-    }
-  }
-
-
-  /**
-    * Set a [[LoadProfileEstimator]] for the load of the UDF.
-    *
-    * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
-    * @return this instance
-    */
-  def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
-    this.udfLoadProfileEstimator = udfLoadProfileEstimator
-    this
-  }
-
-  /**
-    * Specify the selectivity of the UDF.
-    *
-    * @param lowerEstimate the lower bound of the expected selectivity
-    * @param upperEstimate the upper bound of the expected selectivity
-    * @param confidence    the probability of the actual selectivity being within these bounds
-    * @return this instance
-    */
-  def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = {
-    this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence)
-    this
-  }
-
-  override protected def build = inputDataQuanta.dataQuanta().mapPartitionsJava(
-    udf, this.selectivity, this.udfLoadProfileEstimator
-  )
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.SampleOperator]]s.
-  *
-  * @param inputDataQuanta    [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  * @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number
-  */
-class SampleDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], sampleSizeFunction: IntUnaryOperator)
-                                (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[SampleDataQuantaBuilder[T], T] {
-
-  /**
-    * Size of the dataset to be sampled.
-    */
-  private var datasetSize = SampleOperator.UNKNOWN_DATASET_SIZE
-
-  /**
-    * Sampling method to use.
-    */
-  private var sampleMethod = SampleOperator.Methods.ANY
-
-  /**
-    * Seed to use.
-    */
-  private var seed: Option[Long] = None
-
-  // Reuse the input TypeTrap to enforce type equality between input and output.
-  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
-  /**
-    * Set the size of the dataset that should be sampled.
-    *
-    * @param datasetSize the size of the dataset
-    * @return this instance
-    */
-  def withDatasetSize(datasetSize: Long) = {
-    this.datasetSize = datasetSize
-    this
-  }
-
-  /**
-    * Set the sample method to be used.
-    *
-    * @param sampleMethod the sample method
-    * @return this instance
-    */
-  def withSampleMethod(sampleMethod: SampleOperator.Methods) = {
-    this.sampleMethod = sampleMethod
-    this
-  }
-
-  /**
-    * Set the sample method to be used.
-    *
-    * @param seed
-    * @return this instance
-    */
-  def withSeed(seed: Long) = {
-    this.seed = Some(seed)
-    this
-  }
-
-  override protected def build =
-    inputDataQuanta.dataQuanta().sampleDynamicJava(sampleSizeFunction, this.datasetSize, this.seed, this.sampleMethod)
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.ReduceByOperator]]s.
-  *
-  * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  * @param udf             UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
-  * @param keyUdf          key extraction UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
-  */
-class ReduceByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T],
-                                        keyUdf: SerializableFunction[T, Key],
-                                        udf: SerializableBinaryOperator[T])
-                                       (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[ReduceByDataQuantaBuilder[Key, T], T] {
-
-  // Reuse the input TypeTrap to enforce type equality between input and output.
-  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
-  implicit var keyTag: ClassTag[Key] = _
-
-  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
-  private var udfLoadProfileEstimator: LoadProfileEstimator = _
-
-  // TODO: Add these estimators.
-  //  /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf]]. */
-  //  private var keyUdfCpuEstimator: LoadEstimator = _
-  //
-  //  /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf]]. */
-  //  private var keyUdfRamEstimator: LoadEstimator = _
-
-  // Try to infer the type classes from the UDFs.
-  locally {
-    var parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableBinaryOperator[_]])
-    parameters.get("Type") match {
-      case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
-      case _ => logger.warn("Could not infer types from {}.", udf)
-    }
-
-    parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]])
-    parameters.get("Input") match {
-      case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
-      case _ => logger.warn("Could not infer types from {}.", keyUdf)
-    }
-    this.keyTag = parameters.get("Output") match {
-      case cls: Class[Key] => ClassTag(cls)
-      case _ =>
-        logger.warn("Could not infer types from {}.", keyUdf)
-        ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
-    }
-  }
-
-  /**
-    * Set a [[LoadProfileEstimator]] for the load of the UDF.
-    *
-    * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
-    * @return this instance
-    */
-  def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
-    this.udfLoadProfileEstimator = udfLoadProfileEstimator
-    this
-  }
-
-  override protected def build =
-    inputDataQuanta.dataQuanta().reduceByKeyJava(keyUdf, udf, this.udfLoadProfileEstimator)
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]s.
-  *
-  * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  * @param keyUdf          key extraction UDF for the [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]
-  */
-class GroupByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T], keyUdf: SerializableFunction[T, Key])
-                                      (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[GroupByDataQuantaBuilder[Key, T], java.lang.Iterable[T]] {
-
-  implicit var keyTag: ClassTag[Key] = _
-
-
-  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[keyUdf]]. */
-  private var keyUdfLoadProfileEstimator: LoadProfileEstimator = _
-
-  // Try to infer the type classes from the UDFs.
-  locally {
-    val parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]])
-    parameters.get("Input") match {
-      case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createGrouped(cls)
-      case _ => logger.warn("Could not infer types from {}.", keyUdf)
-    }
-
-    this.keyTag = parameters.get("Output") match {
-      case cls: Class[Key] => ClassTag(cls)
-      case _ =>
-        logger.warn("Could not infer types from {}.", keyUdf)
-        ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
-    }
-  }
-
-  /**
-    * Set a [[LoadProfileEstimator]] for the load of the UDF.
-    *
-    * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
-    * @return this instance
-    */
-  def withKeyUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
-    this.keyUdfLoadProfileEstimator = udfLoadProfileEstimator
-    this
-  }
-
-  override protected def build =
-    inputDataQuanta.dataQuanta().groupByKeyJava(keyUdf, this.keyUdfLoadProfileEstimator)
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator]]s.
-  *
-  * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  */
-class GlobalGroupDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])(implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[GlobalGroupDataQuantaBuilder[T], java.lang.Iterable[T]] {
-
-  override protected def build = inputDataQuanta.dataQuanta().group()
-
-}
-
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.GlobalReduceOperator]]s.
-  *
-  * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  * @param udf             UDF for the [[org.apache.wayang.basic.operators.GlobalReduceOperator]]
-  */
-class GlobalReduceDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T],
-                                       udf: SerializableBinaryOperator[T])
-                                      (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[GlobalReduceDataQuantaBuilder[T], T] {
-
-  // Reuse the input TypeTrap to enforce type equality between input and output.
-  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
-  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
-  private var udfLoadProfileEstimator: LoadProfileEstimator = _
-
-  // Try to infer the type classes from the udf.
-  locally {
-    val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableBinaryOperator[_]])
-    parameters.get("Type") match {
-      case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
-      case _ => logger.warn("Could not infer types from {}.", udf)
-    }
-  }
-
-  /**
-    * Set a [[LoadProfileEstimator]] for the load of the UDF.
-    *
-    * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
-    * @return this instance
-    */
-  def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
-    this.udfLoadProfileEstimator = udfLoadProfileEstimator
-    this
-  }
-
-  override protected def build = inputDataQuanta.dataQuanta().reduceJava(udf, this.udfLoadProfileEstimator)
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.UnionAllOperator]]s.
-  *
-  * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
-  * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
-  */
-class UnionDataQuantaBuilder[T](inputDataQuanta0: DataQuantaBuilder[_, T],
-                                inputDataQuanta1: DataQuantaBuilder[_, T])
-                               (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[UnionDataQuantaBuilder[T], T] {
-
-  override def getOutputTypeTrap = inputDataQuanta0.outputTypeTrap
-
-  override protected def build = inputDataQuanta0.dataQuanta().union(inputDataQuanta1.dataQuanta())
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.IntersectOperator]]s.
-  *
-  * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
-  * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
-  */
-class IntersectDataQuantaBuilder[T](inputDataQuanta0: DataQuantaBuilder[_, T],
-                                    inputDataQuanta1: DataQuantaBuilder[_, T])
-                                   (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[IntersectDataQuantaBuilder[T], T] {
-
-  override def getOutputTypeTrap = inputDataQuanta0.outputTypeTrap
-
-  override protected def build = inputDataQuanta0.dataQuanta().intersect(inputDataQuanta1.dataQuanta())
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.JoinOperator]]s.
-  *
-  * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
-  * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
-  * @param keyUdf0          first key extraction UDF for the [[org.apache.wayang.basic.operators.JoinOperator]]
-  * @param keyUdf1          first key extraction UDF for the [[org.apache.wayang.basic.operators.JoinOperator]]
-  */
-class JoinDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_, In0],
-                                           inputDataQuanta1: DataQuantaBuilder[_, In1],
-                                           keyUdf0: SerializableFunction[In0, Key],
-                                           keyUdf1: SerializableFunction[In1, Key])
-                                          (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[JoinDataQuantaBuilder[In0, In1, Key], RT2[In0, In1]] {
-
-  /** [[ClassTag]] or surrogate of [[Key]] */
-  implicit var keyTag: ClassTag[Key] = _
-
-  /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf0]]. */
-  private var keyUdf0CpuEstimator: LoadEstimator = _
-
-  /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf0]]. */
-  private var keyUdf0RamEstimator: LoadEstimator = _
-
-  /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf1]]. */
-  private var keyUdf1CpuEstimator: LoadEstimator = _
-
-  /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf1]]. */
-  private var keyUdf1RamEstimator: LoadEstimator = _
-
-  // Try to infer the type classes from the UDFs.
-  locally {
-    val parameters = ReflectionUtils.getTypeParameters(keyUdf0.getClass, classOf[SerializableFunction[_, _]])
-    parameters.get("Input") match {
-      case cls: Class[In0] => inputDataQuanta0.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
-      case _ => logger.warn("Could not infer types from {}.", keyUdf0)
-    }
-
-    this.keyTag = parameters.get("Output") match {
-      case cls: Class[Key] => ClassTag(cls)
-      case _ =>
-        logger.warn("Could not infer types from {}.", keyUdf0)
-        ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
-    }
-  }
-  locally {
-    val parameters = ReflectionUtils.getTypeParameters(keyUdf1.getClass, classOf[SerializableFunction[_, _]])
-    parameters.get("Input") match {
-      case cls: Class[In1] => inputDataQuanta1.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
-      case _ => logger.warn("Could not infer types from {}.", keyUdf0)
-    }
-
-    this.keyTag = parameters.get("Output") match {
-      case cls: Class[Key] => ClassTag(cls)
-      case _ =>
-        logger.warn("Could not infer types from {}.", keyUdf0)
-        ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
-    }
-  }
-  // Since we are currently not looking at type parameters, we can statically determine the output type.
-  locally {
-    this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]]
-  }
-
-  /**
-    * Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
-    *
-    * @param udfCpuEstimator the [[LoadEstimator]]
-    * @return this instance
-    */
-  def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
-    this.keyUdf0CpuEstimator = udfCpuEstimator
-    this
-  }
-
-  /**
-    * Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
-    *
-    * @param udfRamEstimator the [[LoadEstimator]]
-    * @return this instance
-    */
-  def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
-    this.keyUdf0RamEstimator = udfRamEstimator
-    this
-  }
-
-  /**
-    * Set a [[LoadEstimator]] for the CPU load of the second key extraction UDF. Currently effectless.
-    *
-    * @param udfCpuEstimator the [[LoadEstimator]]
-    * @return this instance
-    */
-  def withThatKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
-    this.keyUdf1CpuEstimator = udfCpuEstimator
-    this
-  }
-
-  /**
-    * Set a [[LoadEstimator]] for the RAM load of the second key extraction UDF. Currently effectless.
-    *
-    * @param udfRamEstimator the [[LoadEstimator]]
-    * @return this instance
-    */
-  def withThatKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
-    this.keyUdf1RamEstimator = udfRamEstimator
-    this
-  }
-
-  /**
-    * Assemble the joined elements to new elements.
-    *
-    * @param udf produces a joined element from two joinable elements
-    * @return a new [[DataQuantaBuilder]] representing the assembled join product
-    */
-  def assemble[NewOut](udf: SerializableBiFunction[In0, In1, NewOut]) =
-    this.map(new SerializableFunction[RT2[In0, In1], NewOut] {
-      override def apply(joinTuple: RT2[In0, In1]): NewOut = udf.apply(joinTuple.field0, joinTuple.field1)
-    })
-
-  override protected def build =
-    inputDataQuanta0.dataQuanta().joinJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag)
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CoGroupOperator]]s.
-  *
-  * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
-  * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
-  * @param keyUdf0          first key extraction UDF for the [[org.apache.wayang.basic.operators.CoGroupOperator]]
-  * @param keyUdf1          first key extraction UDF for the [[org.apache.wayang.basic.operators.CoGroupOperator]]
-  */
-class CoGroupDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_, In0],
-                                           inputDataQuanta1: DataQuantaBuilder[_, In1],
-                                           keyUdf0: SerializableFunction[In0, Key],
-                                           keyUdf1: SerializableFunction[In1, Key])
-                                          (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[CoGroupDataQuantaBuilder[In0, In1, Key], RT2[java.lang.Iterable[In0], java.lang.Iterable[In1]]] {
-
-  /** [[ClassTag]] or surrogate of [[Key]] */
-  implicit var keyTag: ClassTag[Key] = _
-
-  /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf0]]. */
-  private var keyUdf0CpuEstimator: LoadEstimator = _
-
-  /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf0]]. */
-  private var keyUdf0RamEstimator: LoadEstimator = _
-
-  /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf1]]. */
-  private var keyUdf1CpuEstimator: LoadEstimator = _
-
-  /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf1]]. */
-  private var keyUdf1RamEstimator: LoadEstimator = _
-
-  // Try to infer the type classes from the UDFs.
-  locally {
-    val parameters = ReflectionUtils.getTypeParameters(keyUdf0.getClass, classOf[SerializableFunction[_, _]])
-    parameters.get("Input") match {
-      case cls: Class[In0] => inputDataQuanta0.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
-      case _ => logger.warn("Could not infer types from {}.", keyUdf0)
-    }
-
-    this.keyTag = parameters.get("Output") match {
-      case cls: Class[Key] => ClassTag(cls)
-      case _ =>
-        logger.warn("Could not infer types from {}.", keyUdf0)
-        ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
-    }
-  }
-  locally {
-    val parameters = ReflectionUtils.getTypeParameters(keyUdf1.getClass, classOf[SerializableFunction[_, _]])
-    parameters.get("Input") match {
-      case cls: Class[In1] => inputDataQuanta1.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
-      case _ => logger.warn("Could not infer types from {}.", keyUdf0)
-    }
-
-    this.keyTag = parameters.get("Output") match {
-      case cls: Class[Key] => ClassTag(cls)
-      case _ =>
-        logger.warn("Could not infer types from {}.", keyUdf0)
-        ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
-    }
-  }
-  // Since we are currently not looking at type parameters, we can statically determine the output type.
-  locally {
-    this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]]
-  }
-
-  /**
-    * Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
-    *
-    * @param udfCpuEstimator the [[LoadEstimator]]
-    * @return this instance
-    */
-  def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
-    this.keyUdf0CpuEstimator = udfCpuEstimator
-    this
-  }
-
-  /**
-    * Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
-    *
-    * @param udfRamEstimator the [[LoadEstimator]]
-    * @return this instance
-    */
-  def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
-    this.keyUdf0RamEstimator = udfRamEstimator
-    this
-  }
-
-  /**
-    * Set a [[LoadEstimator]] for the CPU load of the second key extraction UDF. Currently effectless.
-    *
-    * @param udfCpuEstimator the [[LoadEstimator]]
-    * @return this instance
-    */
-  def withThatKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
-    this.keyUdf1CpuEstimator = udfCpuEstimator
-    this
-  }
-
-  /**
-    * Set a [[LoadEstimator]] for the RAM load of the second key extraction UDF. Currently effectless.
-    *
-    * @param udfRamEstimator the [[LoadEstimator]]
-    * @return this instance
-    */
-  def withThatKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
-    this.keyUdf1RamEstimator = udfRamEstimator
-    this
-  }
-
-  override protected def build =
-    inputDataQuanta0.dataQuanta().coGroupJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag)
-
-}
-
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CartesianOperator]]s.
-  *
-  * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
-  * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
-  */
-class CartesianDataQuantaBuilder[In0, In1](inputDataQuanta0: DataQuantaBuilder[_, In0],
-                                           inputDataQuanta1: DataQuantaBuilder[_, In1])
-                                          (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[CartesianDataQuantaBuilder[In0, In1], RT2[In0, In1]] {
-
-  // Since we are currently not looking at type parameters, we can statically determine the output type.
-  locally {
-    this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]]
-  }
-
-  override protected def build =
-    inputDataQuanta0.dataQuanta().cartesian(inputDataQuanta1.dataQuanta())(inputDataQuanta1.classTag)
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.ZipWithIdOperator]]s.
-  *
-  * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  */
-class ZipWithIdDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])
-                                   (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[ZipWithIdDataQuantaBuilder[T], RT2[java.lang.Long, T]] {
-
-  // Since we are currently not looking at type parameters, we can statically determine the output type.
-  locally {
-    this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]]
-  }
-
-  override protected def build = inputDataQuanta.dataQuanta().zipWithId
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DistinctOperator]]s.
-  *
-  * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  */
-class DistinctDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])
-                                  (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[DistinctDataQuantaBuilder[T], T] {
-
-  // Reuse the input TypeTrap to enforce type equality between input and output.
-  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
-  override protected def build = inputDataQuanta.dataQuanta().distinct
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CountOperator]]s.
-  *
-  * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  */
-class CountDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])
-                               (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[CountDataQuantaBuilder[T], java.lang.Long] {
-
-  // We can statically determine the output type.
-  locally {
-    this.outputTypeTrap.dataSetType = dataSetType[java.lang.Long]
-  }
-
-  override protected def build = inputDataQuanta.dataQuanta().count
-
-}
-
-
-/**
-  * [[DataQuantaBuilder]] implementation for any [[org.apache.wayang.core.plan.wayangplan.Operator]]s. Does not offer
-  * any convenience methods, though.
-  *
-  * @param operator        the custom [[org.apache.wayang.core.plan.wayangplan.Operator]]
-  * @param outputIndex     index of the [[OutputSlot]] addressed by the new instance
-  * @param buildCache      a [[DataQuantaBuilderCache]] that must be shared across instances addressing the same [[Operator]]
-  * @param inputDataQuanta [[DataQuantaBuilder]]s for the input [[DataQuanta]]
-  * @param javaPlanBuilder the [[JavaPlanBuilder]] used to construct the current [[WayangPlan]]
-  */
-class CustomOperatorDataQuantaBuilder[T](operator: Operator,
-                                         outputIndex: Int,
-                                         buildCache: DataQuantaBuilderCache,
-                                         inputDataQuanta: DataQuantaBuilder[_, _]*)
-                                        (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[DataQuantaBuilder[_, T], T] {
-
-  override protected def build = {
-    // If the [[operator]] has multiple [[OutputSlot]]s, make sure that we only execute the build once.
-    if (!buildCache.hasCached) {
-      val dataQuanta = javaPlanBuilder.planBuilder.customOperator(operator, inputDataQuanta.map(_.dataQuanta()): _*)
-      buildCache.cache(dataQuanta)
-    }
-    buildCache(outputIndex)
-  }
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DoWhileOperator]]s.
-  *
-  * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  * @param conditionUdf    UDF for the looping condition
-  * @param bodyBuilder     builds the loop body
-  */
-class DoWhileDataQuantaBuilder[T, ConvOut](inputDataQuanta: DataQuantaBuilder[_, T],
-                                           conditionUdf: SerializablePredicate[JavaCollection[ConvOut]],
-                                           bodyBuilder: JavaFunction[DataQuantaBuilder[_, T], WayangTuple[DataQuantaBuilder[_, T], DataQuantaBuilder[_, ConvOut]]])
-                                          (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[DoWhileDataQuantaBuilder[T, ConvOut], T] {
-
-  // TODO: Get the ClassTag right.
-  implicit private var convOutClassTag: ClassTag[ConvOut] = ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
-
-  // Reuse the input TypeTrap to enforce type equality between input and output.
-  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
-  // TODO: We could improve by combining the TypeTraps in the body loop.
-
-  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the UDF. */
-  private var udfLoadProfileEstimator: LoadProfileEstimator = _
-
-  /** Number of expected iterations. */
-  private var numExpectedIterations = 20
-
-  /**
-    * Set a [[LoadProfileEstimator]] for the load of the UDF.
-    *
-    * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
-    * @return this instance
-    */
-  def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
-    this.udfLoadProfileEstimator = udfLoadProfileEstimator
-    this
-  }
-
-  /**
-    * Explicitly set the [[DataSetType]] for the condition [[DataQuanta]]. Note that it is not
-    * always necessary to set it and that it can be inferred in some situations.
-    *
-    * @param outputType the output [[DataSetType]]
-    * @return this instance
-    */
-  def withConditionType(outputType: DataSetType[ConvOut]) = {
-    this.convOutClassTag = ClassTag(outputType.getDataUnitType.getTypeClass)
-    this
-  }
-
-  /**
-    * Explicitly set the [[Class]] for the condition [[DataQuanta]]. Note that it is not
-    * always necessary to set it and that it can be inferred in some situations.
-    *
-    * @param cls the output [[Class]]
-    * @return this instance
-    */
-  def withConditionClass(cls: Class[ConvOut]) = {
-    this.convOutClassTag = ClassTag(cls)
-    this
-  }
-
-  /**
-    * Set the number of expected iterations for the built [[org.apache.wayang.basic.operators.DoWhileOperator]].
-    *
-    * @param numExpectedIterations the expected number of iterations
-    * @return this instance
-    */
-  def withExpectedNumberOfIterations(numExpectedIterations: Int) = {
-    this.numExpectedIterations = numExpectedIterations
-    this
-  }
-
-  override protected def build =
-    inputDataQuanta.dataQuanta().doWhileJava[ConvOut](
-      conditionUdf, dataQuantaBodyBuilder, this.numExpectedIterations, this.udfLoadProfileEstimator
-    )(this.convOutClassTag)
-
-
-  /**
-    * Create a loop body builder that is based on [[DataQuanta]].
-    *
-    * @return the loop body builder
-    */
-  private def dataQuantaBodyBuilder =
-    new JavaFunction[DataQuanta[T], WayangTuple[DataQuanta[T], DataQuanta[ConvOut]]] {
-      override def apply(loopStart: DataQuanta[T]) = {
-        val loopStartBuilder = new FakeDataQuantaBuilder(loopStart)
-        val loopEndBuilders = bodyBuilder(loopStartBuilder)
-        new WayangTuple(loopEndBuilders.field0.dataQuanta(), loopEndBuilders.field1.dataQuanta())
-      }
-    }
-
-}
-
-/**
-  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DoWhileOperator]]s.
-  *
-  * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
-  * @param numRepetitions  number of repetitions of the loop
-  * @param bodyBuilder     builds the loop body
-  */
-class RepeatDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T],
-                                 numRepetitions: Int,
-                                 bodyBuilder: JavaFunction[DataQuantaBuilder[_, T], DataQuantaBuilder[_, T]])
-                                (implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[RepeatDataQuantaBuilder[T], T] {
-
-  // Reuse the input TypeTrap to enforce type equality between input and output.
-  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
-  // TODO: We could improve by combining the TypeTraps in the body loop.
-
-  override protected def build =
-    inputDataQuanta.dataQuanta().repeat(numRepetitions, startDataQuanta => {
-      val loopStartbuilder = new FakeDataQuantaBuilder(startDataQuanta)
-      bodyBuilder(loopStartbuilder).dataQuanta()
-    })
-
-}
-
-/**
-  * Wraps [[DataQuanta]] and exposes them as [[DataQuantaBuilder]], i.e., this is an adapter.
-  *
-  * @param _dataQuanta the wrapped [[DataQuanta]]
-  */
-class FakeDataQuantaBuilder[T](_dataQuanta: DataQuanta[T])(implicit javaPlanBuilder: JavaPlanBuilder)
-  extends BasicDataQuantaBuilder[FakeDataQuantaBuilder[T], T] {
-
-  override implicit def classTag = ClassTag(_dataQuanta.output.getType.getDataUnitType.getTypeClass)
-
-  override def dataQuanta() = _dataQuanta
-
-  /**
-    * Create the [[DataQuanta]] built by this instance. Note the configuration being done in [[dataQuanta()]].
-    *
-    * @return the created and partially configured [[DataQuanta]]
-    */
-  override protected def build: DataQuanta[T] = _dataQuanta
-}
-
-/**
-  * This is not an actual [[DataQuantaBuilder]] but rather decorates such a [[DataQuantaBuilder]] with a key.
-  */
-class KeyedDataQuantaBuilder[Out, Key](private val dataQuantaBuilder: DataQuantaBuilder[_, Out],
-                                       private val keyExtractor: SerializableFunction[Out, Key])
-                                      (implicit javaPlanBuilder: JavaPlanBuilder) {
-
-  /**
-    * Joins this instance with the given one via their keys.
-    *
-    * @param that the instance to join with
-    * @return a [[DataQuantaBuilder]] representing the join product
-    */
-  def join[ThatOut](that: KeyedDataQuantaBuilder[ThatOut, Key]) =
-    dataQuantaBuilder.join(this.keyExtractor, that.dataQuantaBuilder, that.keyExtractor)
-
-  /**
-    * Co-groups this instance with the given one via their keys.
-    *
-    * @param that the instance to join with
-    * @return a [[DataQuantaBuilder]] representing the co-group product
-    */
-  def coGroup[ThatOut](that: KeyedDataQuantaBuilder[ThatOut, Key]) =
-    dataQuantaBuilder.coGroup(this.keyExtractor, that.dataQuantaBuilder, that.keyExtractor)
-
-}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
index dc9f6be..2c88f58 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
@@ -19,9 +19,9 @@
 package org.apache.wayang.api
 
 import java.util.{Collection => JavaCollection}
-
 import de.hpi.isg.profiledb.store.model.Experiment
 import org.apache.commons.lang3.Validate
+import org.apache.wayang.api.dataquantabuilder.{CustomOperatorDataQuantaBuilder, LoadCollectionDataQuantaBuilder, UnarySourceDataQuantaBuilder}
 import org.apache.wayang.api.util.DataQuantaBuilderCache
 import org.apache.wayang.basic.data.Record
 import org.apache.wayang.basic.operators.{TableSource, TextFileSource}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala
index 5af271f..5dc045b 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala
@@ -18,6 +18,7 @@
 
 package org.apache.wayang.api
 
+import org.apache.wayang.api.dataquantabuilder.BasicDataQuantaBuilder
 import org.apache.wayang.api.util.DataQuantaBuilderDecorator
 import org.apache.wayang.basic.data.Record
 import org.apache.wayang.basic.function.ProjectionDescriptor
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/BasicDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/BasicDataQuantaBuilder.scala
new file mode 100644
index 0000000..9aad1a1
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/BasicDataQuantaBuilder.scala
@@ -0,0 +1,152 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import de.hpi.isg.profiledb.store.model.Experiment
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
+import org.apache.wayang.core.platform.Platform
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.{Logging, ReflectionUtils}
+
+import scala.collection.mutable.ListBuffer
+import scala.reflect.ClassTag
+
+/**
+ * Abstract base class for builders of [[DataQuanta]]. The purpose of the builders is to provide a convenient
+ * Java API for Wayang that compensates for lacking default and named arguments.
+ */
+abstract class BasicDataQuantaBuilder[This <: DataQuantaBuilder[_, Out], Out](implicit _javaPlanBuilder: JavaPlanBuilder)
+  extends Logging with DataQuantaBuilder[This, Out] {
+
+  /**
+   * Lazy-initialized. The [[DataQuanta]] product of this builder.
+   */
+  private var result: DataQuanta[Out] = _
+
+  /**
+   * A name for the [[DataQuanta]] to be built.
+   */
+  private var name: String = _
+
+  /**
+   * An [[Experiment]] for the [[DataQuanta]] to be built.
+   */
+  private var experiment: Experiment = _
+
+  /**
+   * Broadcasts for the [[DataQuanta]] to be built.
+   */
+  private val broadcasts: ListBuffer[(String, DataQuantaBuilder[_, _])] = ListBuffer()
+
+  /**
+   * [[CardinalityEstimator]] for the [[DataQuanta]] to be built.
+   */
+  private var cardinalityEstimator: CardinalityEstimator = _
+
+  /**
+   * Target [[Platform]]s for the [[DataQuanta]] to be built.
+   */
+  private val targetPlatforms: ListBuffer[Platform] = ListBuffer()
+
+  /**
+   * Paths of UDF JAR files for the [[DataQuanta]] to be built.
+   */
+  private val udfJars: ListBuffer[String] = ListBuffer()
+
+  /**
+   * The type of the [[DataQuanta]] to be built.
+   */
+  protected[api] val outputTypeTrap = getOutputTypeTrap
+
+  /**
+   * Retrieve an intialization value for [[outputTypeTrap]].
+   *
+   * @return the [[TypeTrap]]
+   */
+  protected def getOutputTypeTrap = new TypeTrap
+
+  override protected[api] implicit def javaPlanBuilder = _javaPlanBuilder
+
+  override def withName(name: String): This = {
+    this.name = name
+    this.asInstanceOf[This]
+  }
+
+  override def withExperiment(experiment: Experiment): This = {
+    this.experiment = experiment
+    this.asInstanceOf[This]
+  }
+
+  override def withOutputType(outputType: DataSetType[Out]): This = {
+    this.outputTypeTrap.dataSetType = outputType
+    this.asInstanceOf[This]
+  }
+
+  override def withOutputClass(cls: Class[Out]): This = this.withOutputType(DataSetType.createDefault(cls))
+
+  override def withBroadcast[Sender <: DataQuantaBuilder[_, _]](sender: Sender, broadcastName: String): This = {
+    this.broadcasts += Tuple2(broadcastName, sender)
+    this.asInstanceOf[This]
+  }
+
+  override def withCardinalityEstimator(cardinalityEstimator: CardinalityEstimator): This = {
+    this.cardinalityEstimator = cardinalityEstimator
+    this.asInstanceOf[This]
+  }
+
+  override def withTargetPlatform(platform: Platform): This = {
+    this.targetPlatforms += platform
+    this.asInstanceOf[This]
+  }
+
+  def withUdfJarOf(cls: Class[_]): This = this.withUdfJar(ReflectionUtils.getDeclaringJar(cls))
+
+  override def withUdfJar(path: String): This = {
+    this.udfJars += path
+    this.asInstanceOf[This]
+  }
+
+  override protected[api] implicit def classTag: ClassTag[Out] = ClassTag(outputTypeTrap.typeClass)
+
+  override protected[api] def dataQuanta(): DataQuanta[Out] = {
+    if (this.result == null) {
+      this.result = this.build
+      if (this.name != null) this.result.withName(this.name)
+      if (this.cardinalityEstimator != null) this.result.withCardinalityEstimator(this.cardinalityEstimator)
+      if (this.experiment != null) this.result.withExperiment(experiment)
+      this.result.withUdfJars(this.udfJars: _*)
+      this.result.withTargetPlatforms(this.targetPlatforms: _*)
+      this.broadcasts.foreach {
+        case (broadcastName, senderBuilder) => this.result.withBroadcast(senderBuilder.dataQuanta(), broadcastName)
+      }
+    }
+    this.result
+  }
+
+  /**
+   * Create the [[DataQuanta]] built by this instance. Note the configuration being done in [[dataQuanta()]].
+   *
+   * @return the created and partially configured [[DataQuanta]]
+   */
+  protected def build: DataQuanta[Out]
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CartesianDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CartesianDataQuantaBuilder.scala
new file mode 100644
index 0000000..40d4fae
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CartesianDataQuantaBuilder.scala
@@ -0,0 +1,43 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.basic.data.{Tuple2 => WT2}
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CartesianOperator]]s.
+ *
+ * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ */
+class CartesianDataQuantaBuilder[In0, In1](inputDataQuanta0: DataQuantaBuilder[_, In0],
+                                           inputDataQuanta1: DataQuantaBuilder[_, In1])
+                                          (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[CartesianDataQuantaBuilder[In0, In1], WT2[In0, In1]] {
+
+  // Since we are currently not looking at type parameters, we can statically determine the output type.
+  locally {
+    this.outputTypeTrap.dataSetType = dataSetType[WT2[_, _]]
+  }
+
+  override protected def build =
+    inputDataQuanta0.dataQuanta().cartesian(inputDataQuanta1.dataQuanta())(inputDataQuanta1.classTag)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CoGroupDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CoGroupDataQuantaBuilder.scala
new file mode 100644
index 0000000..aa1d88a
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CoGroupDataQuantaBuilder.scala
@@ -0,0 +1,142 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.basic.data.{Tuple2 => WT2}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.core.optimizer.costs.LoadEstimator
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+import scala.reflect.ClassTag
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CoGroupOperator]]s.
+ *
+ * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ * @param keyUdf0          first key extraction UDF for the [[org.apache.wayang.basic.operators.CoGroupOperator]]
+ * @param keyUdf1          first key extraction UDF for the [[org.apache.wayang.basic.operators.CoGroupOperator]]
+ */
+class CoGroupDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_, In0],
+                                              inputDataQuanta1: DataQuantaBuilder[_, In1],
+                                              keyUdf0: SerializableFunction[In0, Key],
+                                              keyUdf1: SerializableFunction[In1, Key])
+                                             (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[CoGroupDataQuantaBuilder[In0, In1, Key], WT2[java.lang.Iterable[In0], java.lang.Iterable[In1]]] {
+
+  /** [[ClassTag]] or surrogate of [[Key]] */
+  implicit var keyTag: ClassTag[Key] = _
+
+  /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf0]]. */
+  private var keyUdf0CpuEstimator: LoadEstimator = _
+
+  /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf0]]. */
+  private var keyUdf0RamEstimator: LoadEstimator = _
+
+  /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf1]]. */
+  private var keyUdf1CpuEstimator: LoadEstimator = _
+
+  /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf1]]. */
+  private var keyUdf1RamEstimator: LoadEstimator = _
+
+  // Try to infer the type classes from the UDFs.
+  locally {
+    val parameters = ReflectionUtils.getTypeParameters(keyUdf0.getClass, classOf[SerializableFunction[_, _]])
+    parameters.get("Input") match {
+      case cls: Class[In0] => inputDataQuanta0.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+      case _ => logger.warn("Could not infer types from {}.", keyUdf0)
+    }
+
+    this.keyTag = parameters.get("Output") match {
+      case cls: Class[Key] => ClassTag(cls)
+      case _ =>
+        logger.warn("Could not infer types from {}.", keyUdf0)
+        ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+    }
+  }
+  locally {
+    val parameters = ReflectionUtils.getTypeParameters(keyUdf1.getClass, classOf[SerializableFunction[_, _]])
+    parameters.get("Input") match {
+      case cls: Class[In1] => inputDataQuanta1.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+      case _ => logger.warn("Could not infer types from {}.", keyUdf0)
+    }
+
+    this.keyTag = parameters.get("Output") match {
+      case cls: Class[Key] => ClassTag(cls)
+      case _ =>
+        logger.warn("Could not infer types from {}.", keyUdf0)
+        ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+    }
+  }
+  // Since we are currently not looking at type parameters, we can statically determine the output type.
+  locally {
+    this.outputTypeTrap.dataSetType = dataSetType[WT2[_, _]]
+  }
+
+  /**
+   * Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
+   *
+   * @param udfCpuEstimator the [[LoadEstimator]]
+   * @return this instance
+   */
+  def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
+    this.keyUdf0CpuEstimator = udfCpuEstimator
+    this
+  }
+
+  /**
+   * Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
+   *
+   * @param udfRamEstimator the [[LoadEstimator]]
+   * @return this instance
+   */
+  def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
+    this.keyUdf0RamEstimator = udfRamEstimator
+    this
+  }
+
+  /**
+   * Set a [[LoadEstimator]] for the CPU load of the second key extraction UDF. Currently effectless.
+   *
+   * @param udfCpuEstimator the [[LoadEstimator]]
+   * @return this instance
+   */
+  def withThatKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
+    this.keyUdf1CpuEstimator = udfCpuEstimator
+    this
+  }
+
+  /**
+   * Set a [[LoadEstimator]] for the RAM load of the second key extraction UDF. Currently effectless.
+   *
+   * @param udfRamEstimator the [[LoadEstimator]]
+   * @return this instance
+   */
+  def withThatKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
+    this.keyUdf1RamEstimator = udfRamEstimator
+    this
+  }
+
+  override protected def build =
+    inputDataQuanta0.dataQuanta().coGroupJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CountDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CountDataQuantaBuilder.scala
new file mode 100644
index 0000000..d8f7d18
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CountDataQuantaBuilder.scala
@@ -0,0 +1,40 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CountOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ */
+class CountDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])
+                               (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[CountDataQuantaBuilder[T], java.lang.Long] {
+
+  // We can statically determine the output type.
+  locally {
+    this.outputTypeTrap.dataSetType = dataSetType[java.lang.Long]
+  }
+
+  override protected def build = inputDataQuanta.dataQuanta().count
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CustomOperatorDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CustomOperatorDataQuantaBuilder.scala
new file mode 100644
index 0000000..5fca840
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CustomOperatorDataQuantaBuilder.scala
@@ -0,0 +1,52 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.DataQuantaBuilderCache
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.plan.wayangplan.{Operator, OutputSlot, WayangPlan}
+
+/**
+ * [[DataQuantaBuilder]] implementation for any [[org.apache.wayang.core.plan.wayangplan.Operator]]s. Does not offer
+ * any convenience methods, though.
+ *
+ * @param operator        the custom [[org.apache.wayang.core.plan.wayangplan.Operator]]
+ * @param outputIndex     index of the [[OutputSlot]] addressed by the new instance
+ * @param buildCache      a [[DataQuantaBuilderCache]] that must be shared across instances addressing the same [[Operator]]
+ * @param inputDataQuanta [[DataQuantaBuilder]]s for the input [[DataQuanta]]
+ * @param javaPlanBuilder the [[JavaPlanBuilder]] used to construct the current [[WayangPlan]]
+ */
+class CustomOperatorDataQuantaBuilder[T](operator: Operator,
+                                         outputIndex: Int,
+                                         buildCache: DataQuantaBuilderCache,
+                                         inputDataQuanta: DataQuantaBuilder[_, _]*)
+                                        (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[DataQuantaBuilder[_, T], T] {
+
+  override protected def build = {
+    // If the [[operator]] has multiple [[OutputSlot]]s, make sure that we only execute the build once.
+    if (!buildCache.hasCached) {
+      val dataQuanta = javaPlanBuilder.planBuilder.customOperator(operator, inputDataQuanta.map(_.dataQuanta()): _*)
+      buildCache.cache(dataQuanta)
+    }
+    buildCache(outputIndex)
+  }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DistinctDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DistinctDataQuantaBuilder.scala
new file mode 100644
index 0000000..9e8e044
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DistinctDataQuantaBuilder.scala
@@ -0,0 +1,39 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DistinctOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ */
+class DistinctDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])
+                                  (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[DistinctDataQuantaBuilder[T], T] {
+
+  // Reuse the input TypeTrap to enforce type equality between input and output.
+  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+  override protected def build = inputDataQuanta.dataQuanta().distinct
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DoWhileDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DoWhileDataQuantaBuilder.scala
new file mode 100644
index 0000000..34de7be
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DoWhileDataQuantaBuilder.scala
@@ -0,0 +1,126 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializablePredicate
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.{Tuple => WayangTuple}
+
+import java.util.function.{Function => JavaFunction}
+import java.util.{Collection => JavaCollection}
+import scala.reflect.ClassTag
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DoWhileOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param conditionUdf    UDF for the looping condition
+ * @param bodyBuilder     builds the loop body
+ */
+class DoWhileDataQuantaBuilder[T, ConvOut](inputDataQuanta: DataQuantaBuilder[_, T],
+                                           conditionUdf: SerializablePredicate[JavaCollection[ConvOut]],
+                                           bodyBuilder: JavaFunction[DataQuantaBuilder[_, T], WayangTuple[DataQuantaBuilder[_, T], DataQuantaBuilder[_, ConvOut]]])
+                                          (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[DoWhileDataQuantaBuilder[T, ConvOut], T] {
+
+  // TODO: Get the ClassTag right.
+  implicit private var convOutClassTag: ClassTag[ConvOut] = ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+
+  // Reuse the input TypeTrap to enforce type equality between input and output.
+  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+  // TODO: We could improve by combining the TypeTraps in the body loop.
+
+  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the UDF. */
+  private var udfLoadProfileEstimator: LoadProfileEstimator = _
+
+  /** Number of expected iterations. */
+  private var numExpectedIterations = 20
+
+  /**
+   * Set a [[LoadProfileEstimator]] for the load of the UDF.
+   *
+   * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+   * @return this instance
+   */
+  def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+    this.udfLoadProfileEstimator = udfLoadProfileEstimator
+    this
+  }
+
+  /**
+   * Explicitly set the [[DataSetType]] for the condition [[DataQuanta]]. Note that it is not
+   * always necessary to set it and that it can be inferred in some situations.
+   *
+   * @param outputType the output [[DataSetType]]
+   * @return this instance
+   */
+  def withConditionType(outputType: DataSetType[ConvOut]) = {
+    this.convOutClassTag = ClassTag(outputType.getDataUnitType.getTypeClass)
+    this
+  }
+
+  /**
+   * Explicitly set the [[Class]] for the condition [[DataQuanta]]. Note that it is not
+   * always necessary to set it and that it can be inferred in some situations.
+   *
+   * @param cls the output [[Class]]
+   * @return this instance
+   */
+  def withConditionClass(cls: Class[ConvOut]) = {
+    this.convOutClassTag = ClassTag(cls)
+    this
+  }
+
+  /**
+   * Set the number of expected iterations for the built [[org.apache.wayang.basic.operators.DoWhileOperator]].
+   *
+   * @param numExpectedIterations the expected number of iterations
+   * @return this instance
+   */
+  def withExpectedNumberOfIterations(numExpectedIterations: Int) = {
+    this.numExpectedIterations = numExpectedIterations
+    this
+  }
+
+  override protected def build =
+    inputDataQuanta.dataQuanta().doWhileJava[ConvOut](
+      conditionUdf, dataQuantaBodyBuilder, this.numExpectedIterations, this.udfLoadProfileEstimator
+    )(this.convOutClassTag)
+
+
+  /**
+   * Create a loop body builder that is based on [[DataQuanta]].
+   *
+   * @return the loop body builder
+   */
+  private def dataQuantaBodyBuilder =
+    new JavaFunction[DataQuanta[T], WayangTuple[DataQuanta[T], DataQuanta[ConvOut]]] {
+      override def apply(loopStart: DataQuanta[T]) = {
+        val loopStartBuilder = new FakeDataQuantaBuilder(loopStart)
+        val loopEndBuilders = bodyBuilder(loopStartBuilder)
+        new WayangTuple(loopEndBuilders.field0.dataQuanta(), loopEndBuilders.field1.dataQuanta())
+      }
+    }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FakeDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FakeDataQuantaBuilder.scala
new file mode 100644
index 0000000..6b52c40
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FakeDataQuantaBuilder.scala
@@ -0,0 +1,44 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+
+import scala.reflect.ClassTag
+
+/**
+ * Wraps [[DataQuanta]] and exposes them as [[DataQuantaBuilder]], i.e., this is an adapter.
+ *
+ * @param _dataQuanta the wrapped [[DataQuanta]]
+ */
+class FakeDataQuantaBuilder[T](_dataQuanta: DataQuanta[T])(implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[FakeDataQuantaBuilder[T], T] {
+
+  override implicit def classTag = ClassTag(_dataQuanta.output.getType.getDataUnitType.getTypeClass)
+
+  override def dataQuanta() = _dataQuanta
+
+  /**
+   * Create the [[DataQuanta]] built by this instance. Note the configuration being done in [[dataQuanta()]].
+   *
+   * @return the created and partially configured [[DataQuanta]]
+   */
+  override protected def build: DataQuanta[T] = _dataQuanta
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FilterDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FilterDataQuantaBuilder.scala
new file mode 100644
index 0000000..3d33c80
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FilterDataQuantaBuilder.scala
@@ -0,0 +1,102 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.basic.operators.MapOperator
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableFunction, SerializablePredicate}
+import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param udf             UDF for the [[MapOperator]]
+ */
+class FilterDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], udf: SerializablePredicate[T])
+                                (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[FilterDataQuantaBuilder[T], T] {
+
+  // Reuse the input TypeTrap to enforce type equality between input and output.
+  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
+  private var udfLoadProfileEstimator: LoadProfileEstimator = _
+
+  /** Selectivity of the filter predicate. */
+  private var selectivity: ProbabilisticDoubleInterval = _
+
+  /** SQL UDF implementing the filter predicate. */
+  private var sqlUdf: String = _
+
+  // Try to infer the type classes from the udf.
+  locally {
+    val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
+    parameters.get("Input") match {
+      case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+      case _ => logger.warn("Could not infer types from {}.", udf)
+    }
+  }
+
+
+  /**
+   * Set a [[LoadProfileEstimator]] for the load of the UDF.
+   *
+   * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+   * @return this instance
+   */
+  def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+    this.udfLoadProfileEstimator = udfLoadProfileEstimator
+    this
+  }
+
+  /**
+   * Add a SQL implementation of the UDF.
+   *
+   * @param sqlUdf a SQL condition that can be plugged into a `WHERE` clause
+   * @return this instance
+   */
+  def withSqlUdf(sqlUdf: String) = {
+    this.sqlUdf = sqlUdf
+    this
+  }
+
+  /**
+   * Specify the selectivity of the UDF.
+   *
+   * @param lowerEstimate the lower bound of the expected selectivity
+   * @param upperEstimate the upper bound of the expected selectivity
+   * @param confidence    the probability of the actual selectivity being within these bounds
+   * @return this instance
+   */
+  def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = {
+    this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence)
+    this
+  }
+
+  override protected def build = inputDataQuanta.dataQuanta().filterJava(
+    udf, this.sqlUdf, this.selectivity, this.udfLoadProfileEstimator
+  )
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FlatMapDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FlatMapDataQuantaBuilder.scala
new file mode 100644
index 0000000..83b4383
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FlatMapDataQuantaBuilder.scala
@@ -0,0 +1,91 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.FlatMapOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param udf             UDF for the [[org.apache.wayang.basic.operators.FlatMapOperator]]
+ */
+class FlatMapDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In],
+                                        udf: SerializableFunction[In, java.lang.Iterable[Out]])
+                                       (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[FlatMapDataQuantaBuilder[In, Out], Out] {
+
+
+  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
+  private var udfLoadProfileEstimator: LoadProfileEstimator = _
+
+  /** Selectivity of the filter predicate. */
+  private var selectivity: ProbabilisticDoubleInterval = _
+
+  // Try to infer the type classes from the udf.
+  locally {
+    val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
+    parameters.get("Input") match {
+      case cls: Class[In] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+      case _ => logger.warn("Could not infer types from {}.", udf)
+    }
+    val originalClass = ReflectionUtils.getWrapperClass(parameters.get("Output"), 0)
+    originalClass match {
+      case cls: Class[Out] => {
+        this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+      }
+      case _ => logger.warn("Could not infer types from {}.", udf)
+    }
+  }
+
+  /**
+   * Set a [[LoadProfileEstimator]] for the load of the UDF.
+   *
+   * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+   * @return this instance
+   */
+  def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+    this.udfLoadProfileEstimator = udfLoadProfileEstimator
+    this
+  }
+
+  /**
+   * Specify the selectivity of the UDF.
+   *
+   * @param lowerEstimate the lower bound of the expected selectivity
+   * @param upperEstimate the upper bound of the expected selectivity
+   * @param confidence    the probability of the actual selectivity being within these bounds
+   * @return this instance
+   */
+  def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = {
+    this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence)
+    this
+  }
+
+  override protected def build = inputDataQuanta.dataQuanta().flatMapJava(
+    udf, this.selectivity, this.udfLoadProfileEstimator
+  )
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalGroupDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalGroupDataQuantaBuilder.scala
new file mode 100644
index 0000000..abc1d9a
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalGroupDataQuantaBuilder.scala
@@ -0,0 +1,34 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ */
+class GlobalGroupDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])(implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[GlobalGroupDataQuantaBuilder[T], java.lang.Iterable[T]] {
+
+  override protected def build = inputDataQuanta.dataQuanta().group()
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalReduceDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalReduceDataQuantaBuilder.scala
new file mode 100644
index 0000000..c3efc95
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalReduceDataQuantaBuilder.scala
@@ -0,0 +1,68 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableBinaryOperator
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.GlobalReduceOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param udf             UDF for the [[org.apache.wayang.basic.operators.GlobalReduceOperator]]
+ */
+class GlobalReduceDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T],
+                                       udf: SerializableBinaryOperator[T])
+                                      (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[GlobalReduceDataQuantaBuilder[T], T] {
+
+  // Reuse the input TypeTrap to enforce type equality between input and output.
+  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
+  private var udfLoadProfileEstimator: LoadProfileEstimator = _
+
+  // Try to infer the type classes from the udf.
+  locally {
+    val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableBinaryOperator[_]])
+    parameters.get("Type") match {
+      case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+      case _ => logger.warn("Could not infer types from {}.", udf)
+    }
+  }
+
+  /**
+   * Set a [[LoadProfileEstimator]] for the load of the UDF.
+   *
+   * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+   * @return this instance
+   */
+  def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+    this.udfLoadProfileEstimator = udfLoadProfileEstimator
+    this
+  }
+
+  override protected def build = inputDataQuanta.dataQuanta().reduceJava(udf, this.udfLoadProfileEstimator)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GroupByDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GroupByDataQuantaBuilder.scala
new file mode 100644
index 0000000..b61c71e
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GroupByDataQuantaBuilder.scala
@@ -0,0 +1,76 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+import scala.reflect.ClassTag
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param keyUdf          key extraction UDF for the [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]
+ */
+class GroupByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T], keyUdf: SerializableFunction[T, Key])
+                                      (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[GroupByDataQuantaBuilder[Key, T], java.lang.Iterable[T]] {
+
+  implicit var keyTag: ClassTag[Key] = _
+
+
+  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[keyUdf]]. */
+  private var keyUdfLoadProfileEstimator: LoadProfileEstimator = _
+
+  // Try to infer the type classes from the UDFs.
+  locally {
+    val parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]])
+    parameters.get("Input") match {
+      case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createGrouped(cls)
+      case _ => logger.warn("Could not infer types from {}.", keyUdf)
+    }
+
+    this.keyTag = parameters.get("Output") match {
+      case cls: Class[Key] => ClassTag(cls)
+      case _ =>
+        logger.warn("Could not infer types from {}.", keyUdf)
+        ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+    }
+  }
+
+  /**
+   * Set a [[LoadProfileEstimator]] for the load of the UDF.
+   *
+   * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+   * @return this instance
+   */
+  def withKeyUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+    this.keyUdfLoadProfileEstimator = udfLoadProfileEstimator
+    this
+  }
+
+  override protected def build =
+    inputDataQuanta.dataQuanta().groupByKeyJava(keyUdf, this.keyUdfLoadProfileEstimator)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/IntersectDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/IntersectDataQuantaBuilder.scala
new file mode 100644
index 0000000..6db8ffe
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/IntersectDataQuantaBuilder.scala
@@ -0,0 +1,39 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.IntersectOperator]]s.
+ *
+ * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ */
+class IntersectDataQuantaBuilder[T](inputDataQuanta0: DataQuantaBuilder[_, T],
+                                    inputDataQuanta1: DataQuantaBuilder[_, T])
+                                   (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[IntersectDataQuantaBuilder[T], T] {
+
+  override def getOutputTypeTrap = inputDataQuanta0.outputTypeTrap
+
+  override protected def build = inputDataQuanta0.dataQuanta().intersect(inputDataQuanta1.dataQuanta())
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/JoinDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/JoinDataQuantaBuilder.scala
new file mode 100644
index 0000000..ab49ffb
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/JoinDataQuantaBuilder.scala
@@ -0,0 +1,153 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.basic.data.{Tuple2 => WT2}
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableFunction}
+import org.apache.wayang.core.optimizer.costs.LoadEstimator
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+import scala.reflect.ClassTag
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.JoinOperator]]s.
+ *
+ * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ * @param keyUdf0          first key extraction UDF for the [[org.apache.wayang.basic.operators.JoinOperator]]
+ * @param keyUdf1          first key extraction UDF for the [[org.apache.wayang.basic.operators.JoinOperator]]
+ */
+class JoinDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_, In0],
+                                           inputDataQuanta1: DataQuantaBuilder[_, In1],
+                                           keyUdf0: SerializableFunction[In0, Key],
+                                           keyUdf1: SerializableFunction[In1, Key])
+                                          (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[JoinDataQuantaBuilder[In0, In1, Key], WT2[In0, In1]] {
+
+  /** [[ClassTag]] or surrogate of [[Key]] */
+  implicit var keyTag: ClassTag[Key] = _
+
+  /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf0]]. */
+  private var keyUdf0CpuEstimator: LoadEstimator = _
+
+  /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf0]]. */
+  private var keyUdf0RamEstimator: LoadEstimator = _
+
+  /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf1]]. */
+  private var keyUdf1CpuEstimator: LoadEstimator = _
+
+  /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf1]]. */
+  private var keyUdf1RamEstimator: LoadEstimator = _
+
+  // Try to infer the type classes from the UDFs.
+  locally {
+    val parameters = ReflectionUtils.getTypeParameters(keyUdf0.getClass, classOf[SerializableFunction[_, _]])
+    parameters.get("Input") match {
+      case cls: Class[In0] => inputDataQuanta0.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+      case _ => logger.warn("Could not infer types from {}.", keyUdf0)
+    }
+
+    this.keyTag = parameters.get("Output") match {
+      case cls: Class[Key] => ClassTag(cls)
+      case _ =>
+        logger.warn("Could not infer types from {}.", keyUdf0)
+        ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+    }
+  }
+  locally {
+    val parameters = ReflectionUtils.getTypeParameters(keyUdf1.getClass, classOf[SerializableFunction[_, _]])
+    parameters.get("Input") match {
+      case cls: Class[In1] => inputDataQuanta1.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+      case _ => logger.warn("Could not infer types from {}.", keyUdf0)
+    }
+
+    this.keyTag = parameters.get("Output") match {
+      case cls: Class[Key] => ClassTag(cls)
+      case _ =>
+        logger.warn("Could not infer types from {}.", keyUdf0)
+        ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+    }
+  }
+  // Since we are currently not looking at type parameters, we can statically determine the output type.
+  locally {
+    this.outputTypeTrap.dataSetType = dataSetType[WT2[_, _]]
+  }
+
+  /**
+   * Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
+   *
+   * @param udfCpuEstimator the [[LoadEstimator]]
+   * @return this instance
+   */
+  def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
+    this.keyUdf0CpuEstimator = udfCpuEstimator
+    this
+  }
+
+  /**
+   * Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
+   *
+   * @param udfRamEstimator the [[LoadEstimator]]
+   * @return this instance
+   */
+  def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
+    this.keyUdf0RamEstimator = udfRamEstimator
+    this
+  }
+
+  /**
+   * Set a [[LoadEstimator]] for the CPU load of the second key extraction UDF. Currently effectless.
+   *
+   * @param udfCpuEstimator the [[LoadEstimator]]
+   * @return this instance
+   */
+  def withThatKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
+    this.keyUdf1CpuEstimator = udfCpuEstimator
+    this
+  }
+
+  /**
+   * Set a [[LoadEstimator]] for the RAM load of the second key extraction UDF. Currently effectless.
+   *
+   * @param udfRamEstimator the [[LoadEstimator]]
+   * @return this instance
+   */
+  def withThatKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
+    this.keyUdf1RamEstimator = udfRamEstimator
+    this
+  }
+
+  /**
+   * Assemble the joined elements to new elements.
+   *
+   * @param udf produces a joined element from two joinable elements
+   * @return a new [[DataQuantaBuilder]] representing the assembled join product
+   */
+  def assemble[NewOut](udf: SerializableBiFunction[In0, In1, NewOut]) =
+    this.map(new SerializableFunction[WT2[In0, In1], NewOut] {
+      override def apply(joinTuple: WT2[In0, In1]): NewOut = udf.apply(joinTuple.field0, joinTuple.field1)
+    })
+
+  override protected def build =
+    inputDataQuanta0.dataQuanta().joinJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/KeyedDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/KeyedDataQuantaBuilder.scala
new file mode 100644
index 0000000..9ddc101
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/KeyedDataQuantaBuilder.scala
@@ -0,0 +1,50 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+
+/**
+ * This is not an actual [[DataQuantaBuilder]] but rather decorates such a [[DataQuantaBuilder]] with a key.
+ */
+class KeyedDataQuantaBuilder[Out, Key](private val dataQuantaBuilder: DataQuantaBuilder[_, Out],
+                                       private val keyExtractor: SerializableFunction[Out, Key])
+                                      (implicit javaPlanBuilder: JavaPlanBuilder) {
+
+  /**
+   * Joins this instance with the given one via their keys.
+   *
+   * @param that the instance to join with
+   * @return a [[DataQuantaBuilder]] representing the join product
+   */
+  def join[ThatOut](that: KeyedDataQuantaBuilder[ThatOut, Key]) =
+    dataQuantaBuilder.join(this.keyExtractor, that.dataQuantaBuilder, that.keyExtractor)
+
+  /**
+   * Co-groups this instance with the given one via their keys.
+   *
+   * @param that the instance to join with
+   * @return a [[DataQuantaBuilder]] representing the co-group product
+   */
+  def coGroup[ThatOut](that: KeyedDataQuantaBuilder[ThatOut, Key]) =
+    dataQuantaBuilder.coGroup(this.keyExtractor, that.dataQuantaBuilder, that.keyExtractor)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/LoadCollectionDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/LoadCollectionDataQuantaBuilder.scala
new file mode 100644
index 0000000..748416e
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/LoadCollectionDataQuantaBuilder.scala
@@ -0,0 +1,49 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.WayangCollections
+
+import java.util.{Collection => JavaCollection}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CollectionSource]]s.
+ *
+ * @param collection      the [[JavaCollection]] to be loaded
+ * @param javaPlanBuilder the [[JavaPlanBuilder]]
+ */
+class LoadCollectionDataQuantaBuilder[Out](collection: JavaCollection[Out])(implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[LoadCollectionDataQuantaBuilder[Out], Out] {
+
+  // Try to infer the type class from the collection.
+  locally {
+    if (!collection.isEmpty) {
+      val any = WayangCollections.getAny(collection)
+      if (any != null) {
+        this.outputTypeTrap.dataSetType = DataSetType.createDefault(any.getClass)
+      }
+    }
+  }
+
+  override protected def build: DataQuanta[Out] = javaPlanBuilder.planBuilder.loadCollection(collection)(this.classTag)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapDataQuantaBuilder.scala
new file mode 100644
index 0000000..deb15a9
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapDataQuantaBuilder.scala
@@ -0,0 +1,69 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.basic.operators.MapOperator
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param udf             UDF for the [[MapOperator]]
+ */
+class MapDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In],
+                                    udf: SerializableFunction[In, Out])
+                                   (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[MapDataQuantaBuilder[In, Out], Out] {
+
+  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
+  private var udfLoadProfileEstimator: LoadProfileEstimator = _
+
+  // Try to infer the type classes from the udf.
+  locally {
+    val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
+    parameters.get("Input") match {
+      case cls: Class[In] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+      case _ => logger.warn("Could not infer types from {}.", udf)
+    }
+    parameters.get("Output") match {
+      case cls: Class[Out] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+      case _ => logger.warn("Could not infer types from {}.", udf)
+    }
+  }
+
+  /**
+   * Set a [[LoadProfileEstimator]] for the load of the UDF.
+   *
+   * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+   * @return this instance
+   */
+  def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+    this.udfLoadProfileEstimator = udfLoadProfileEstimator
+    this
+  }
+
+  override protected def build = inputDataQuanta.dataQuanta().mapJava(udf, this.udfLoadProfileEstimator)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapPartitionsDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapPartitionsDataQuantaBuilder.scala
new file mode 100644
index 0000000..e686565
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapPartitionsDataQuantaBuilder.scala
@@ -0,0 +1,93 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapPartitionsOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param udf             UDF for the [[org.apache.wayang.basic.operators.MapPartitionsOperator]]
+ */
+class MapPartitionsDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In],
+                                              udf: SerializableFunction[java.lang.Iterable[In], java.lang.Iterable[Out]])
+                                             (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[MapPartitionsDataQuantaBuilder[In, Out], Out] {
+
+  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
+  private var udfLoadProfileEstimator: LoadProfileEstimator = _
+
+  /** Selectivity of the filter predicate. */
+  private var selectivity: ProbabilisticDoubleInterval = _
+
+  // Try to infer the type classes from the udf.
+  locally {
+    val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
+    parameters.get("Input") match {
+      case cls: Class[In] => {
+        inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+      }
+      case _ => logger.warn("Could not infer types from {}.", udf)
+    }
+    val originalClass = ReflectionUtils.getWrapperClass(parameters.get("Output"), 0)
+    originalClass match {
+      case cls: Class[Out] => {
+        this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+      }
+      case _ => logger.warn("Could not infer types from {}.", udf)
+    }
+  }
+
+
+  /**
+   * Set a [[LoadProfileEstimator]] for the load of the UDF.
+   *
+   * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+   * @return this instance
+   */
+  def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+    this.udfLoadProfileEstimator = udfLoadProfileEstimator
+    this
+  }
+
+  /**
+   * Specify the selectivity of the UDF.
+   *
+   * @param lowerEstimate the lower bound of the expected selectivity
+   * @param upperEstimate the upper bound of the expected selectivity
+   * @param confidence    the probability of the actual selectivity being within these bounds
+   * @return this instance
+   */
+  def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = {
+    this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence)
+    this
+  }
+
+  override protected def build = inputDataQuanta.dataQuanta().mapPartitionsJava(
+    udf, this.selectivity, this.udfLoadProfileEstimator
+  )
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ProjectionDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ProjectionDataQuantaBuilder.scala
new file mode 100644
index 0000000..28ede1e
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ProjectionDataQuantaBuilder.scala
@@ -0,0 +1,37 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s with
+ * [[org.apache.wayang.basic.function.ProjectionDescriptor]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param fieldNames      field names for the [[org.apache.wayang.basic.function.ProjectionDescriptor]]
+ */
+class ProjectionDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In], fieldNames: Array[String])
+                                          (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[ProjectionDataQuantaBuilder[In, Out], Out] {
+
+  override protected def build = inputDataQuanta.dataQuanta().project(fieldNames.toSeq)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ReduceByDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ReduceByDataQuantaBuilder.scala
new file mode 100644
index 0000000..ca14576
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ReduceByDataQuantaBuilder.scala
@@ -0,0 +1,94 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction}
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+import scala.reflect.ClassTag
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.ReduceByOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param udf             UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
+ * @param keyUdf          key extraction UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
+ */
+class ReduceByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T],
+                                        keyUdf: SerializableFunction[T, Key],
+                                        udf: SerializableBinaryOperator[T])
+                                       (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[ReduceByDataQuantaBuilder[Key, T], T] {
+
+  // Reuse the input TypeTrap to enforce type equality between input and output.
+  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+  implicit var keyTag: ClassTag[Key] = _
+
+  /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
+  private var udfLoadProfileEstimator: LoadProfileEstimator = _
+
+  // TODO: Add these estimators.
+  //  /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf]]. */
+  //  private var keyUdfCpuEstimator: LoadEstimator = _
+  //
+  //  /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf]]. */
+  //  private var keyUdfRamEstimator: LoadEstimator = _
+
+  // Try to infer the type classes from the UDFs.
+  locally {
+    var parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableBinaryOperator[_]])
+    parameters.get("Type") match {
+      case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+      case _ => logger.warn("Could not infer types from {}.", udf)
+    }
+
+    parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]])
+    parameters.get("Input") match {
+      case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+      case _ => logger.warn("Could not infer types from {}.", keyUdf)
+    }
+    this.keyTag = parameters.get("Output") match {
+      case cls: Class[Key] => ClassTag(cls)
+      case _ =>
+        logger.warn("Could not infer types from {}.", keyUdf)
+        ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+    }
+  }
+
+  /**
+   * Set a [[LoadProfileEstimator]] for the load of the UDF.
+   *
+   * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+   * @return this instance
+   */
+  def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+    this.udfLoadProfileEstimator = udfLoadProfileEstimator
+    this
+  }
+
+  override protected def build =
+    inputDataQuanta.dataQuanta().reduceByKeyJava(keyUdf, udf, this.udfLoadProfileEstimator)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/RepeatDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/RepeatDataQuantaBuilder.scala
new file mode 100644
index 0000000..f7170f7
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/RepeatDataQuantaBuilder.scala
@@ -0,0 +1,51 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+
+import java.util.function.{Function => JavaFunction}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DoWhileOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param numRepetitions  number of repetitions of the loop
+ * @param bodyBuilder     builds the loop body
+ */
+class RepeatDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T],
+                                 numRepetitions: Int,
+                                 bodyBuilder: JavaFunction[DataQuantaBuilder[_, T], DataQuantaBuilder[_, T]])
+                                (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[RepeatDataQuantaBuilder[T], T] {
+
+  // Reuse the input TypeTrap to enforce type equality between input and output.
+  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+  // TODO: We could improve by combining the TypeTraps in the body loop.
+
+  override protected def build =
+    inputDataQuanta.dataQuanta().repeat(numRepetitions, startDataQuanta => {
+      val loopStartbuilder = new FakeDataQuantaBuilder(startDataQuanta)
+      bodyBuilder(loopStartbuilder).dataQuanta()
+    })
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SampleDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SampleDataQuantaBuilder.scala
new file mode 100644
index 0000000..0642fe6
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SampleDataQuantaBuilder.scala
@@ -0,0 +1,92 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.basic.operators.SampleOperator
+
+import java.util.function.IntUnaryOperator
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.SampleOperator]]s.
+ *
+ * @param inputDataQuanta    [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number
+ */
+class SampleDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], sampleSizeFunction: IntUnaryOperator)
+                                (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[SampleDataQuantaBuilder[T], T] {
+
+  /**
+   * Size of the dataset to be sampled.
+   */
+  private var datasetSize = SampleOperator.UNKNOWN_DATASET_SIZE
+
+  /**
+   * Sampling method to use.
+   */
+  private var sampleMethod = SampleOperator.Methods.ANY
+
+  /**
+   * Seed to use.
+   */
+  private var seed: Option[Long] = None
+
+  // Reuse the input TypeTrap to enforce type equality between input and output.
+  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+  /**
+   * Set the size of the dataset that should be sampled.
+   *
+   * @param datasetSize the size of the dataset
+   * @return this instance
+   */
+  def withDatasetSize(datasetSize: Long) = {
+    this.datasetSize = datasetSize
+    this
+  }
+
+  /**
+   * Set the sample method to be used.
+   *
+   * @param sampleMethod the sample method
+   * @return this instance
+   */
+  def withSampleMethod(sampleMethod: SampleOperator.Methods) = {
+    this.sampleMethod = sampleMethod
+    this
+  }
+
+  /**
+   * Set the sample method to be used.
+   *
+   * @param seed
+   * @return this instance
+   */
+  def withSeed(seed: Long) = {
+    this.seed = Some(seed)
+    this
+  }
+
+  override protected def build =
+    inputDataQuanta.dataQuanta().sampleDynamicJava(sampleSizeFunction, this.datasetSize, this.seed, this.sampleMethod)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SortDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SortDataQuantaBuilder.scala
new file mode 100644
index 0000000..d1af25a
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SortDataQuantaBuilder.scala
@@ -0,0 +1,97 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.core.optimizer.costs.LoadEstimator
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+import scala.reflect.ClassTag
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.SortOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param keyUdf          UDF for the [[org.apache.wayang.basic.operators.SortOperator]]
+ */
+class SortDataQuantaBuilder[T, Key](inputDataQuanta: DataQuantaBuilder[_, T],
+                                    keyUdf: SerializableFunction[T, Key])
+                                   (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[SortDataQuantaBuilder[T, Key], T] {
+
+  // Reuse the input TypeTrap to enforce type equality between input and output.
+  override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+  /** [[ClassTag]] or surrogate of [[Key]] */
+  implicit var keyTag: ClassTag[Key] = _
+
+  /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf]]. */
+  private var keyUdfCpuEstimator: LoadEstimator = _
+
+  /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf]]. */
+  private var keyUdfRamEstimator: LoadEstimator = _
+
+
+  // Try to infer the type classes from the UDFs.
+  locally {
+    val parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]])
+    parameters.get("Input") match {
+      case cls: Class[T] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+      case _ => logger.warn("Could not infer types from {}.", keyUdf)
+    }
+
+    this.keyTag = parameters.get("Output") match {
+      case cls: Class[Key] => ClassTag(cls)
+      case _ =>
+        logger.warn("Could not infer types from {}.", keyUdf)
+        ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+    }
+  }
+
+
+  /**
+   * Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
+   *
+   * @param udfCpuEstimator the [[LoadEstimator]]
+   * @return this instance
+   */
+  def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
+    this.keyUdfCpuEstimator = udfCpuEstimator
+    this
+  }
+
+  /**
+   * Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
+   *
+   * @param udfRamEstimator the [[LoadEstimator]]
+   * @return this instance
+   */
+  def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
+    this.keyUdfRamEstimator = udfRamEstimator
+    this
+  }
+
+  override protected def build =
+    inputDataQuanta.dataQuanta().sortJava(keyUdf)(this.keyTag)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnarySourceDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnarySourceDataQuantaBuilder.scala
new file mode 100644
index 0000000..d97b08b
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnarySourceDataQuantaBuilder.scala
@@ -0,0 +1,37 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.plan.wayangplan.UnarySource
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.core.plan.wayangplan.UnarySource]]s.
+ *
+ * @param source          the [[UnarySource]]
+ * @param javaPlanBuilder the [[JavaPlanBuilder]]
+ */
+class UnarySourceDataQuantaBuilder[This <: DataQuantaBuilder[_, Out], Out](source: UnarySource[Out])
+                                                                          (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[This, Out] {
+
+  override protected def build: DataQuanta[Out] = javaPlanBuilder.planBuilder.load(source)(this.classTag)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnionDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnionDataQuantaBuilder.scala
new file mode 100644
index 0000000..04fc6ac
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnionDataQuantaBuilder.scala
@@ -0,0 +1,39 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.UnionAllOperator]]s.
+ *
+ * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ */
+class UnionDataQuantaBuilder[T](inputDataQuanta0: DataQuantaBuilder[_, T],
+                                inputDataQuanta1: DataQuantaBuilder[_, T])
+                               (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[UnionDataQuantaBuilder[T], T] {
+
+  override def getOutputTypeTrap = inputDataQuanta0.outputTypeTrap
+
+  override protected def build = inputDataQuanta0.dataQuanta().union(inputDataQuanta1.dataQuanta())
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ZipWithIdDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ZipWithIdDataQuantaBuilder.scala
new file mode 100644
index 0000000..ded640c
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ZipWithIdDataQuantaBuilder.scala
@@ -0,0 +1,41 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.basic.data.{Tuple2 => WT2}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.ZipWithIdOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ */
+class ZipWithIdDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])
+                                   (implicit javaPlanBuilder: JavaPlanBuilder)
+  extends BasicDataQuantaBuilder[ZipWithIdDataQuantaBuilder[T], WT2[java.lang.Long, T]] {
+
+  // Since we are currently not looking at type parameters, we can statically determine the output type.
+  locally {
+    this.outputTypeTrap.dataSetType = dataSetType[WT2[_, _]]
+  }
+
+  override protected def build = inputDataQuanta.dataQuanta().zipWithId
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala
index 17f5e32..1eaec53 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala
@@ -18,8 +18,9 @@
 
 package org.apache.wayang.api.graph
 
+import org.apache.wayang.api.dataquantabuilder.BasicDataQuantaBuilder
 import org.apache.wayang.api.util.DataQuantaBuilderDecorator
-import org.apache.wayang.api.{BasicDataQuantaBuilder, DataQuanta, DataQuantaBuilder, JavaPlanBuilder, _}
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, _}
 import org.apache.wayang.basic.operators.PageRankOperator
 import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
 
diff --git a/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java b/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java
index 2d4bb6c..71996a0 100644
--- a/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java
+++ b/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.wayang.api;
 
+import org.apache.wayang.api.dataquantabuilder.GlobalReduceDataQuantaBuilder;
+import org.apache.wayang.api.dataquantabuilder.LoadCollectionDataQuantaBuilder;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java
index 59b48cd..3b55a51 100644
--- a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java
+++ b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java
@@ -21,8 +21,8 @@ package org.apache.wayang.tests;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.wayang.api.JavaPlanBuilder;
-import org.apache.wayang.api.LoadCollectionDataQuantaBuilder;
-import org.apache.wayang.api.MapDataQuantaBuilder;
+import org.apache.wayang.api.dataquantabuilder.LoadCollectionDataQuantaBuilder;
+import org.apache.wayang.api.dataquantabuilder.MapDataQuantaBuilder;
 import org.apache.wayang.core.api.WayangContext;
 import org.apache.wayang.core.util.WayangArrays;
 import org.apache.wayang.java.Java;