You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/09/22 15:21:47 UTC
[incubator-wayang] 06/15: [WAYANG-31] split DataQuantaBuilder class
on several class
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch WAYANG-28
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 5a4815c3ae7cecc0df880f8844613431027d1072
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Tue May 11 14:27:30 2021 -0400
[WAYANG-31] split DataQuantaBuilder class on several class
---
.../org/apache/wayang/api/DataQuantaBuilder.scala | 1268 +-------------------
.../org/apache/wayang/api/JavaPlanBuilder.scala | 2 +-
.../wayang/api/RecordDataQuantaBuilder.scala | 1 +
.../dataquantabuilder/BasicDataQuantaBuilder.scala | 152 +++
.../CartesianDataQuantaBuilder.scala | 43 +
.../CoGroupDataQuantaBuilder.scala | 142 +++
.../dataquantabuilder/CountDataQuantaBuilder.scala | 40 +
.../CustomOperatorDataQuantaBuilder.scala | 52 +
.../DistinctDataQuantaBuilder.scala | 39 +
.../DoWhileDataQuantaBuilder.scala | 126 ++
.../dataquantabuilder/FakeDataQuantaBuilder.scala | 44 +
.../FilterDataQuantaBuilder.scala | 102 ++
.../FlatMapDataQuantaBuilder.scala | 91 ++
.../GlobalGroupDataQuantaBuilder.scala | 34 +
.../GlobalReduceDataQuantaBuilder.scala | 68 ++
.../GroupByDataQuantaBuilder.scala | 76 ++
.../IntersectDataQuantaBuilder.scala | 39 +
.../dataquantabuilder/JoinDataQuantaBuilder.scala | 153 +++
.../dataquantabuilder/KeyedDataQuantaBuilder.scala | 50 +
.../LoadCollectionDataQuantaBuilder.scala | 49 +
.../dataquantabuilder/MapDataQuantaBuilder.scala | 69 ++
.../MapPartitionsDataQuantaBuilder.scala | 93 ++
.../ProjectionDataQuantaBuilder.scala | 37 +
.../ReduceByDataQuantaBuilder.scala | 94 ++
.../RepeatDataQuantaBuilder.scala | 51 +
.../SampleDataQuantaBuilder.scala | 92 ++
.../dataquantabuilder/SortDataQuantaBuilder.scala | 97 ++
.../UnarySourceDataQuantaBuilder.scala | 37 +
.../dataquantabuilder/UnionDataQuantaBuilder.scala | 39 +
.../ZipWithIdDataQuantaBuilder.scala | 41 +
.../wayang/api/graph/EdgeDataQuantaBuilder.scala | 3 +-
.../java/org/apache/wayang/api/JavaApiTest.java | 2 +
.../java/org/apache/wayang/tests/RegressionIT.java | 4 +-
33 files changed, 1960 insertions(+), 1270 deletions(-)
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
index a590795..c6e5dcb 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
@@ -21,8 +21,8 @@ package org.apache.wayang.api
import java.util.function.{Consumer, IntUnaryOperator, Function => JavaFunction}
import java.util.{Collection => JavaCollection}
-
import de.hpi.isg.profiledb.store.model.Experiment
+import org.apache.wayang.api.dataquantabuilder.{BasicDataQuantaBuilder, CartesianDataQuantaBuilder, CoGroupDataQuantaBuilder, CountDataQuantaBuilder, CustomOperatorDataQuantaBuilder, DistinctDataQuantaBuilder, DoWhileDataQuantaBuilder, FilterDataQuantaBuilder, FlatMapDataQuantaBuilder, GlobalGroupDataQuantaBuilder, GlobalReduceDataQuantaBuilder, GroupByDataQuantaBuilder, IntersectDataQuantaBuilder, JoinDataQuantaBuilder, KeyedDataQuantaBuilder, MapDataQuantaBuilder, MapPartitionsDataQuan [...]
import org.apache.wayang.api.graph.{Edge, EdgeDataQuantaBuilder, EdgeDataQuantaBuilderDecorator}
import org.apache.wayang.api.util.{DataQuantaBuilderCache, TypeTrap}
import org.apache.wayang.basic.data.{Record, Tuple2 => RT2}
@@ -31,7 +31,7 @@ import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunctio
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
import org.apache.wayang.core.optimizer.costs.{LoadEstimator, LoadProfile, LoadProfileEstimator}
-import org.apache.wayang.core.plan.wayangplan.{Operator, OutputSlot, WayangPlan, UnarySource}
+import org.apache.wayang.core.plan.wayangplan.{Operator, OutputSlot, UnarySource, WayangPlan}
import org.apache.wayang.core.platform.Platform
import org.apache.wayang.core.types.DataSetType
import org.apache.wayang.core.util.{Logging, ReflectionUtils, WayangCollections, Tuple => WayangTuple}
@@ -438,1267 +438,3 @@ trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging
protected[api] def dataQuanta(): DataQuanta[Out]
}
-
-/**
- * Abstract base class for builders of [[DataQuanta]]. The purpose of the builders is to provide a convenient
- * Java API for Wayang that compensates for lacking default and named arguments.
- */
-abstract class BasicDataQuantaBuilder[This <: DataQuantaBuilder[_, Out], Out](implicit _javaPlanBuilder: JavaPlanBuilder)
- extends Logging with DataQuantaBuilder[This, Out] {
-
- /**
- * Lazy-initialized. The [[DataQuanta]] product of this builder.
- */
- private var result: DataQuanta[Out] = _
-
- /**
- * A name for the [[DataQuanta]] to be built.
- */
- private var name: String = _
-
- /**
- * An [[Experiment]] for the [[DataQuanta]] to be built.
- */
- private var experiment: Experiment = _
-
- /**
- * Broadcasts for the [[DataQuanta]] to be built.
- */
- private val broadcasts: ListBuffer[(String, DataQuantaBuilder[_, _])] = ListBuffer()
-
- /**
- * [[CardinalityEstimator]] for the [[DataQuanta]] to be built.
- */
- private var cardinalityEstimator: CardinalityEstimator = _
-
- /**
- * Target [[Platform]]s for the [[DataQuanta]] to be built.
- */
- private val targetPlatforms: ListBuffer[Platform] = ListBuffer()
-
- /**
- * Paths of UDF JAR files for the [[DataQuanta]] to be built.
- */
- private val udfJars: ListBuffer[String] = ListBuffer()
-
- /**
- * The type of the [[DataQuanta]] to be built.
- */
- protected[api] val outputTypeTrap = getOutputTypeTrap
-
- /**
- * Retrieve an intialization value for [[outputTypeTrap]].
- *
- * @return the [[TypeTrap]]
- */
- protected def getOutputTypeTrap = new TypeTrap
-
- override protected[api] implicit def javaPlanBuilder = _javaPlanBuilder
-
- override def withName(name: String): This = {
- this.name = name
- this.asInstanceOf[This]
- }
-
- override def withExperiment(experiment: Experiment): This = {
- this.experiment = experiment
- this.asInstanceOf[This]
- }
-
- override def withOutputType(outputType: DataSetType[Out]): This = {
- this.outputTypeTrap.dataSetType = outputType
- this.asInstanceOf[This]
- }
-
- override def withOutputClass(cls: Class[Out]): This = this.withOutputType(DataSetType.createDefault(cls))
-
- override def withBroadcast[Sender <: DataQuantaBuilder[_, _]](sender: Sender, broadcastName: String): This = {
- this.broadcasts += Tuple2(broadcastName, sender)
- this.asInstanceOf[This]
- }
-
- override def withCardinalityEstimator(cardinalityEstimator: CardinalityEstimator): This = {
- this.cardinalityEstimator = cardinalityEstimator
- this.asInstanceOf[This]
- }
-
- override def withTargetPlatform(platform: Platform): This = {
- this.targetPlatforms += platform
- this.asInstanceOf[This]
- }
-
- def withUdfJarOf(cls: Class[_]): This = this.withUdfJar(ReflectionUtils.getDeclaringJar(cls))
-
- override def withUdfJar(path: String): This = {
- this.udfJars += path
- this.asInstanceOf[This]
- }
-
- override protected[api] implicit def classTag: ClassTag[Out] = ClassTag(outputTypeTrap.typeClass)
-
- override protected[api] def dataQuanta(): DataQuanta[Out] = {
- if (this.result == null) {
- this.result = this.build
- if (this.name != null) this.result.withName(this.name)
- if (this.cardinalityEstimator != null) this.result.withCardinalityEstimator(this.cardinalityEstimator)
- if (this.experiment != null) this.result.withExperiment(experiment)
- this.result.withUdfJars(this.udfJars: _*)
- this.result.withTargetPlatforms(this.targetPlatforms: _*)
- this.broadcasts.foreach {
- case (broadcastName, senderBuilder) => this.result.withBroadcast(senderBuilder.dataQuanta(), broadcastName)
- }
- }
- this.result
- }
-
- /**
- * Create the [[DataQuanta]] built by this instance. Note the configuration being done in [[dataQuanta()]].
- *
- * @return the created and partially configured [[DataQuanta]]
- */
- protected def build: DataQuanta[Out]
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.core.plan.wayangplan.UnarySource]]s.
- *
- * @param source the [[UnarySource]]
- * @param javaPlanBuilder the [[JavaPlanBuilder]]
- */
-class UnarySourceDataQuantaBuilder[This <: DataQuantaBuilder[_, Out], Out](source: UnarySource[Out])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[This, Out] {
-
- override protected def build: DataQuanta[Out] = javaPlanBuilder.planBuilder.load(source)(this.classTag)
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CollectionSource]]s.
- *
- * @param collection the [[JavaCollection]] to be loaded
- * @param javaPlanBuilder the [[JavaPlanBuilder]]
- */
-class LoadCollectionDataQuantaBuilder[Out](collection: JavaCollection[Out])(implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[LoadCollectionDataQuantaBuilder[Out], Out] {
-
- // Try to infer the type class from the collection.
- locally {
- if (!collection.isEmpty) {
- val any = WayangCollections.getAny(collection)
- if (any != null) {
- this.outputTypeTrap.dataSetType = DataSetType.createDefault(any.getClass)
- }
- }
- }
-
- override protected def build: DataQuanta[Out] = javaPlanBuilder.planBuilder.loadCollection(collection)(this.classTag)
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- * @param udf UDF for the [[MapOperator]]
- */
-class MapDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In],
- udf: SerializableFunction[In, Out])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[MapDataQuantaBuilder[In, Out], Out] {
-
- /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
- private var udfLoadProfileEstimator: LoadProfileEstimator = _
-
- // Try to infer the type classes from the udf.
- locally {
- val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
- parameters.get("Input") match {
- case cls: Class[In] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
- case _ => logger.warn("Could not infer types from {}.", udf)
- }
- parameters.get("Output") match {
- case cls: Class[Out] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
- case _ => logger.warn("Could not infer types from {}.", udf)
- }
- }
-
- /**
- * Set a [[LoadProfileEstimator]] for the load of the UDF.
- *
- * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
- * @return this instance
- */
- def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
- this.udfLoadProfileEstimator = udfLoadProfileEstimator
- this
- }
-
- override protected def build = inputDataQuanta.dataQuanta().mapJava(udf, this.udfLoadProfileEstimator)
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s with
- * [[org.apache.wayang.basic.function.ProjectionDescriptor]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- * @param fieldNames field names for the [[org.apache.wayang.basic.function.ProjectionDescriptor]]
- */
-class ProjectionDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In], fieldNames: Array[String])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[ProjectionDataQuantaBuilder[In, Out], Out] {
-
- override protected def build = inputDataQuanta.dataQuanta().project(fieldNames.toSeq)
-
-}
-
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- * @param udf UDF for the [[MapOperator]]
- */
-class FilterDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], udf: SerializablePredicate[T])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[FilterDataQuantaBuilder[T], T] {
-
- // Reuse the input TypeTrap to enforce type equality between input and output.
- override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
- /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
- private var udfLoadProfileEstimator: LoadProfileEstimator = _
-
- /** Selectivity of the filter predicate. */
- private var selectivity: ProbabilisticDoubleInterval = _
-
- /** SQL UDF implementing the filter predicate. */
- private var sqlUdf: String = _
-
- // Try to infer the type classes from the udf.
- locally {
- val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
- parameters.get("Input") match {
- case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
- case _ => logger.warn("Could not infer types from {}.", udf)
- }
- }
-
-
- /**
- * Set a [[LoadProfileEstimator]] for the load of the UDF.
- *
- * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
- * @return this instance
- */
- def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
- this.udfLoadProfileEstimator = udfLoadProfileEstimator
- this
- }
-
- /**
- * Add a SQL implementation of the UDF.
- *
- * @param sqlUdf a SQL condition that can be plugged into a `WHERE` clause
- * @return this instance
- */
- def withSqlUdf(sqlUdf: String) = {
- this.sqlUdf = sqlUdf
- this
- }
-
- /**
- * Specify the selectivity of the UDF.
- *
- * @param lowerEstimate the lower bound of the expected selectivity
- * @param upperEstimate the upper bound of the expected selectivity
- * @param confidence the probability of the actual selectivity being within these bounds
- * @return this instance
- */
- def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = {
- this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence)
- this
- }
-
- override protected def build = inputDataQuanta.dataQuanta().filterJava(
- udf, this.sqlUdf, this.selectivity, this.udfLoadProfileEstimator
- )
-
-}
-
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.SortOperator]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- * @param keyUdf UDF for the [[org.apache.wayang.basic.operators.SortOperator]]
- */
-class SortDataQuantaBuilder[T, Key](inputDataQuanta: DataQuantaBuilder[_, T],
- keyUdf: SerializableFunction[T, Key])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[SortDataQuantaBuilder[T, Key], T] {
-
- // Reuse the input TypeTrap to enforce type equality between input and output.
- override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
- /** [[ClassTag]] or surrogate of [[Key]] */
- implicit var keyTag: ClassTag[Key] = _
-
- /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf]]. */
- private var keyUdfCpuEstimator: LoadEstimator = _
-
- /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf]]. */
- private var keyUdfRamEstimator: LoadEstimator = _
-
-
- // Try to infer the type classes from the UDFs.
- locally {
- val parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]])
- parameters.get("Input") match {
- case cls: Class[T] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
- case _ => logger.warn("Could not infer types from {}.", keyUdf)
- }
-
- this.keyTag = parameters.get("Output") match {
- case cls: Class[Key] => ClassTag(cls)
- case _ =>
- logger.warn("Could not infer types from {}.", keyUdf)
- ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
- }
- }
-
-
- /**
- * Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
- *
- * @param udfCpuEstimator the [[LoadEstimator]]
- * @return this instance
- */
- def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
- this.keyUdfCpuEstimator = udfCpuEstimator
- this
- }
-
- /**
- * Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
- *
- * @param udfRamEstimator the [[LoadEstimator]]
- * @return this instance
- */
- def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
- this.keyUdfRamEstimator = udfRamEstimator
- this
- }
-
- override protected def build =
- inputDataQuanta.dataQuanta().sortJava(keyUdf)(this.keyTag)
-
-}
-
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.FlatMapOperator]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- * @param udf UDF for the [[org.apache.wayang.basic.operators.FlatMapOperator]]
- */
-class FlatMapDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In],
- udf: SerializableFunction[In, java.lang.Iterable[Out]])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[FlatMapDataQuantaBuilder[In, Out], Out] {
-
-
- /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
- private var udfLoadProfileEstimator: LoadProfileEstimator = _
-
- /** Selectivity of the filter predicate. */
- private var selectivity: ProbabilisticDoubleInterval = _
-
- // Try to infer the type classes from the udf.
- locally {
- val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
- parameters.get("Input") match {
- case cls: Class[In] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
- case _ => logger.warn("Could not infer types from {}.", udf)
- }
- val originalClass = ReflectionUtils.getWrapperClass(parameters.get("Output"), 0)
- originalClass match {
- case cls: Class[Out] => {
- this.outputTypeTrap.dataSetType= DataSetType.createDefault(cls)
- }
- case _ => logger.warn("Could not infer types from {}.", udf)
- }
- }
-
- /**
- * Set a [[LoadProfileEstimator]] for the load of the UDF.
- *
- * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
- * @return this instance
- */
- def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
- this.udfLoadProfileEstimator = udfLoadProfileEstimator
- this
- }
-
- /**
- * Specify the selectivity of the UDF.
- *
- * @param lowerEstimate the lower bound of the expected selectivity
- * @param upperEstimate the upper bound of the expected selectivity
- * @param confidence the probability of the actual selectivity being within these bounds
- * @return this instance
- */
- def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = {
- this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence)
- this
- }
-
- override protected def build = inputDataQuanta.dataQuanta().flatMapJava(
- udf, this.selectivity, this.udfLoadProfileEstimator
- )
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapPartitionsOperator]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- * @param udf UDF for the [[org.apache.wayang.basic.operators.MapPartitionsOperator]]
- */
-class MapPartitionsDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In],
- udf: SerializableFunction[java.lang.Iterable[In], java.lang.Iterable[Out]])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[MapPartitionsDataQuantaBuilder[In, Out], Out] {
-
- /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
- private var udfLoadProfileEstimator: LoadProfileEstimator = _
-
- /** Selectivity of the filter predicate. */
- private var selectivity: ProbabilisticDoubleInterval = _
-
- // Try to infer the type classes from the udf.
- locally {
- val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
- parameters.get("Input") match {
- case cls: Class[In] => {
- inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
- }
- case _ => logger.warn("Could not infer types from {}.", udf)
- }
- val originalClass = ReflectionUtils.getWrapperClass(parameters.get("Output"), 0)
- originalClass match {
- case cls: Class[Out] => {
- this.outputTypeTrap.dataSetType= DataSetType.createDefault(cls)
- }
- case _ => logger.warn("Could not infer types from {}.", udf)
- }
- }
-
-
- /**
- * Set a [[LoadProfileEstimator]] for the load of the UDF.
- *
- * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
- * @return this instance
- */
- def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
- this.udfLoadProfileEstimator = udfLoadProfileEstimator
- this
- }
-
- /**
- * Specify the selectivity of the UDF.
- *
- * @param lowerEstimate the lower bound of the expected selectivity
- * @param upperEstimate the upper bound of the expected selectivity
- * @param confidence the probability of the actual selectivity being within these bounds
- * @return this instance
- */
- def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = {
- this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence)
- this
- }
-
- override protected def build = inputDataQuanta.dataQuanta().mapPartitionsJava(
- udf, this.selectivity, this.udfLoadProfileEstimator
- )
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.SampleOperator]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- * @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number
- */
-class SampleDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], sampleSizeFunction: IntUnaryOperator)
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[SampleDataQuantaBuilder[T], T] {
-
- /**
- * Size of the dataset to be sampled.
- */
- private var datasetSize = SampleOperator.UNKNOWN_DATASET_SIZE
-
- /**
- * Sampling method to use.
- */
- private var sampleMethod = SampleOperator.Methods.ANY
-
- /**
- * Seed to use.
- */
- private var seed: Option[Long] = None
-
- // Reuse the input TypeTrap to enforce type equality between input and output.
- override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
- /**
- * Set the size of the dataset that should be sampled.
- *
- * @param datasetSize the size of the dataset
- * @return this instance
- */
- def withDatasetSize(datasetSize: Long) = {
- this.datasetSize = datasetSize
- this
- }
-
- /**
- * Set the sample method to be used.
- *
- * @param sampleMethod the sample method
- * @return this instance
- */
- def withSampleMethod(sampleMethod: SampleOperator.Methods) = {
- this.sampleMethod = sampleMethod
- this
- }
-
- /**
- * Set the sample method to be used.
- *
- * @param seed
- * @return this instance
- */
- def withSeed(seed: Long) = {
- this.seed = Some(seed)
- this
- }
-
- override protected def build =
- inputDataQuanta.dataQuanta().sampleDynamicJava(sampleSizeFunction, this.datasetSize, this.seed, this.sampleMethod)
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.ReduceByOperator]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- * @param udf UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
- * @param keyUdf key extraction UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
- */
-class ReduceByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T],
- keyUdf: SerializableFunction[T, Key],
- udf: SerializableBinaryOperator[T])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[ReduceByDataQuantaBuilder[Key, T], T] {
-
- // Reuse the input TypeTrap to enforce type equality between input and output.
- override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
- implicit var keyTag: ClassTag[Key] = _
-
- /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
- private var udfLoadProfileEstimator: LoadProfileEstimator = _
-
- // TODO: Add these estimators.
- // /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf]]. */
- // private var keyUdfCpuEstimator: LoadEstimator = _
- //
- // /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf]]. */
- // private var keyUdfRamEstimator: LoadEstimator = _
-
- // Try to infer the type classes from the UDFs.
- locally {
- var parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableBinaryOperator[_]])
- parameters.get("Type") match {
- case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
- case _ => logger.warn("Could not infer types from {}.", udf)
- }
-
- parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]])
- parameters.get("Input") match {
- case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
- case _ => logger.warn("Could not infer types from {}.", keyUdf)
- }
- this.keyTag = parameters.get("Output") match {
- case cls: Class[Key] => ClassTag(cls)
- case _ =>
- logger.warn("Could not infer types from {}.", keyUdf)
- ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
- }
- }
-
- /**
- * Set a [[LoadProfileEstimator]] for the load of the UDF.
- *
- * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
- * @return this instance
- */
- def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
- this.udfLoadProfileEstimator = udfLoadProfileEstimator
- this
- }
-
- override protected def build =
- inputDataQuanta.dataQuanta().reduceByKeyJava(keyUdf, udf, this.udfLoadProfileEstimator)
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- * @param keyUdf key extraction UDF for the [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]
- */
-class GroupByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T], keyUdf: SerializableFunction[T, Key])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[GroupByDataQuantaBuilder[Key, T], java.lang.Iterable[T]] {
-
- implicit var keyTag: ClassTag[Key] = _
-
-
- /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[keyUdf]]. */
- private var keyUdfLoadProfileEstimator: LoadProfileEstimator = _
-
- // Try to infer the type classes from the UDFs.
- locally {
- val parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]])
- parameters.get("Input") match {
- case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createGrouped(cls)
- case _ => logger.warn("Could not infer types from {}.", keyUdf)
- }
-
- this.keyTag = parameters.get("Output") match {
- case cls: Class[Key] => ClassTag(cls)
- case _ =>
- logger.warn("Could not infer types from {}.", keyUdf)
- ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
- }
- }
-
- /**
- * Set a [[LoadProfileEstimator]] for the load of the UDF.
- *
- * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
- * @return this instance
- */
- def withKeyUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
- this.keyUdfLoadProfileEstimator = udfLoadProfileEstimator
- this
- }
-
- override protected def build =
- inputDataQuanta.dataQuanta().groupByKeyJava(keyUdf, this.keyUdfLoadProfileEstimator)
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- */
-class GlobalGroupDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])(implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[GlobalGroupDataQuantaBuilder[T], java.lang.Iterable[T]] {
-
- override protected def build = inputDataQuanta.dataQuanta().group()
-
-}
-
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.GlobalReduceOperator]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- * @param udf UDF for the [[org.apache.wayang.basic.operators.GlobalReduceOperator]]
- */
-class GlobalReduceDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T],
- udf: SerializableBinaryOperator[T])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[GlobalReduceDataQuantaBuilder[T], T] {
-
- // Reuse the input TypeTrap to enforce type equality between input and output.
- override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
- /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
- private var udfLoadProfileEstimator: LoadProfileEstimator = _
-
- // Try to infer the type classes from the udf.
- locally {
- val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableBinaryOperator[_]])
- parameters.get("Type") match {
- case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
- case _ => logger.warn("Could not infer types from {}.", udf)
- }
- }
-
- /**
- * Set a [[LoadProfileEstimator]] for the load of the UDF.
- *
- * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
- * @return this instance
- */
- def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
- this.udfLoadProfileEstimator = udfLoadProfileEstimator
- this
- }
-
- override protected def build = inputDataQuanta.dataQuanta().reduceJava(udf, this.udfLoadProfileEstimator)
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.UnionAllOperator]]s.
- *
- * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
- * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
- */
-class UnionDataQuantaBuilder[T](inputDataQuanta0: DataQuantaBuilder[_, T],
- inputDataQuanta1: DataQuantaBuilder[_, T])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[UnionDataQuantaBuilder[T], T] {
-
- override def getOutputTypeTrap = inputDataQuanta0.outputTypeTrap
-
- override protected def build = inputDataQuanta0.dataQuanta().union(inputDataQuanta1.dataQuanta())
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.IntersectOperator]]s.
- *
- * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
- * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
- */
-class IntersectDataQuantaBuilder[T](inputDataQuanta0: DataQuantaBuilder[_, T],
- inputDataQuanta1: DataQuantaBuilder[_, T])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[IntersectDataQuantaBuilder[T], T] {
-
- override def getOutputTypeTrap = inputDataQuanta0.outputTypeTrap
-
- override protected def build = inputDataQuanta0.dataQuanta().intersect(inputDataQuanta1.dataQuanta())
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.JoinOperator]]s.
- *
- * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
- * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
- * @param keyUdf0 first key extraction UDF for the [[org.apache.wayang.basic.operators.JoinOperator]]
- * @param keyUdf1 first key extraction UDF for the [[org.apache.wayang.basic.operators.JoinOperator]]
- */
-class JoinDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_, In0],
- inputDataQuanta1: DataQuantaBuilder[_, In1],
- keyUdf0: SerializableFunction[In0, Key],
- keyUdf1: SerializableFunction[In1, Key])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[JoinDataQuantaBuilder[In0, In1, Key], RT2[In0, In1]] {
-
- /** [[ClassTag]] or surrogate of [[Key]] */
- implicit var keyTag: ClassTag[Key] = _
-
- /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf0]]. */
- private var keyUdf0CpuEstimator: LoadEstimator = _
-
- /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf0]]. */
- private var keyUdf0RamEstimator: LoadEstimator = _
-
- /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf1]]. */
- private var keyUdf1CpuEstimator: LoadEstimator = _
-
- /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf1]]. */
- private var keyUdf1RamEstimator: LoadEstimator = _
-
- // Try to infer the type classes from the UDFs.
- locally {
- val parameters = ReflectionUtils.getTypeParameters(keyUdf0.getClass, classOf[SerializableFunction[_, _]])
- parameters.get("Input") match {
- case cls: Class[In0] => inputDataQuanta0.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
- case _ => logger.warn("Could not infer types from {}.", keyUdf0)
- }
-
- this.keyTag = parameters.get("Output") match {
- case cls: Class[Key] => ClassTag(cls)
- case _ =>
- logger.warn("Could not infer types from {}.", keyUdf0)
- ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
- }
- }
- locally {
- val parameters = ReflectionUtils.getTypeParameters(keyUdf1.getClass, classOf[SerializableFunction[_, _]])
- parameters.get("Input") match {
- case cls: Class[In1] => inputDataQuanta1.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
- case _ => logger.warn("Could not infer types from {}.", keyUdf0)
- }
-
- this.keyTag = parameters.get("Output") match {
- case cls: Class[Key] => ClassTag(cls)
- case _ =>
- logger.warn("Could not infer types from {}.", keyUdf0)
- ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
- }
- }
- // Since we are currently not looking at type parameters, we can statically determine the output type.
- locally {
- this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]]
- }
-
- /**
- * Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
- *
- * @param udfCpuEstimator the [[LoadEstimator]]
- * @return this instance
- */
- def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
- this.keyUdf0CpuEstimator = udfCpuEstimator
- this
- }
-
- /**
- * Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
- *
- * @param udfRamEstimator the [[LoadEstimator]]
- * @return this instance
- */
- def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
- this.keyUdf0RamEstimator = udfRamEstimator
- this
- }
-
- /**
- * Set a [[LoadEstimator]] for the CPU load of the second key extraction UDF. Currently effectless.
- *
- * @param udfCpuEstimator the [[LoadEstimator]]
- * @return this instance
- */
- def withThatKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
- this.keyUdf1CpuEstimator = udfCpuEstimator
- this
- }
-
- /**
- * Set a [[LoadEstimator]] for the RAM load of the second key extraction UDF. Currently effectless.
- *
- * @param udfRamEstimator the [[LoadEstimator]]
- * @return this instance
- */
- def withThatKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
- this.keyUdf1RamEstimator = udfRamEstimator
- this
- }
-
- /**
- * Assemble the joined elements to new elements.
- *
- * @param udf produces a joined element from two joinable elements
- * @return a new [[DataQuantaBuilder]] representing the assembled join product
- */
- def assemble[NewOut](udf: SerializableBiFunction[In0, In1, NewOut]) =
- this.map(new SerializableFunction[RT2[In0, In1], NewOut] {
- override def apply(joinTuple: RT2[In0, In1]): NewOut = udf.apply(joinTuple.field0, joinTuple.field1)
- })
-
- override protected def build =
- inputDataQuanta0.dataQuanta().joinJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag)
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CoGroupOperator]]s.
- *
- * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
- * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
- * @param keyUdf0 first key extraction UDF for the [[org.apache.wayang.basic.operators.CoGroupOperator]]
- * @param keyUdf1 first key extraction UDF for the [[org.apache.wayang.basic.operators.CoGroupOperator]]
- */
-class CoGroupDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_, In0],
- inputDataQuanta1: DataQuantaBuilder[_, In1],
- keyUdf0: SerializableFunction[In0, Key],
- keyUdf1: SerializableFunction[In1, Key])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[CoGroupDataQuantaBuilder[In0, In1, Key], RT2[java.lang.Iterable[In0], java.lang.Iterable[In1]]] {
-
- /** [[ClassTag]] or surrogate of [[Key]] */
- implicit var keyTag: ClassTag[Key] = _
-
- /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf0]]. */
- private var keyUdf0CpuEstimator: LoadEstimator = _
-
- /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf0]]. */
- private var keyUdf0RamEstimator: LoadEstimator = _
-
- /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf1]]. */
- private var keyUdf1CpuEstimator: LoadEstimator = _
-
- /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf1]]. */
- private var keyUdf1RamEstimator: LoadEstimator = _
-
- // Try to infer the type classes from the UDFs.
- locally {
- val parameters = ReflectionUtils.getTypeParameters(keyUdf0.getClass, classOf[SerializableFunction[_, _]])
- parameters.get("Input") match {
- case cls: Class[In0] => inputDataQuanta0.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
- case _ => logger.warn("Could not infer types from {}.", keyUdf0)
- }
-
- this.keyTag = parameters.get("Output") match {
- case cls: Class[Key] => ClassTag(cls)
- case _ =>
- logger.warn("Could not infer types from {}.", keyUdf0)
- ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
- }
- }
- locally {
- val parameters = ReflectionUtils.getTypeParameters(keyUdf1.getClass, classOf[SerializableFunction[_, _]])
- parameters.get("Input") match {
- case cls: Class[In1] => inputDataQuanta1.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
- case _ => logger.warn("Could not infer types from {}.", keyUdf0)
- }
-
- this.keyTag = parameters.get("Output") match {
- case cls: Class[Key] => ClassTag(cls)
- case _ =>
- logger.warn("Could not infer types from {}.", keyUdf0)
- ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
- }
- }
- // Since we are currently not looking at type parameters, we can statically determine the output type.
- locally {
- this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]]
- }
-
- /**
- * Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
- *
- * @param udfCpuEstimator the [[LoadEstimator]]
- * @return this instance
- */
- def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
- this.keyUdf0CpuEstimator = udfCpuEstimator
- this
- }
-
- /**
- * Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
- *
- * @param udfRamEstimator the [[LoadEstimator]]
- * @return this instance
- */
- def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
- this.keyUdf0RamEstimator = udfRamEstimator
- this
- }
-
- /**
- * Set a [[LoadEstimator]] for the CPU load of the second key extraction UDF. Currently effectless.
- *
- * @param udfCpuEstimator the [[LoadEstimator]]
- * @return this instance
- */
- def withThatKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
- this.keyUdf1CpuEstimator = udfCpuEstimator
- this
- }
-
- /**
- * Set a [[LoadEstimator]] for the RAM load of the second key extraction UDF. Currently effectless.
- *
- * @param udfRamEstimator the [[LoadEstimator]]
- * @return this instance
- */
- def withThatKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
- this.keyUdf1RamEstimator = udfRamEstimator
- this
- }
-
- override protected def build =
- inputDataQuanta0.dataQuanta().coGroupJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag)
-
-}
-
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CartesianOperator]]s.
- *
- * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
- * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
- */
-class CartesianDataQuantaBuilder[In0, In1](inputDataQuanta0: DataQuantaBuilder[_, In0],
- inputDataQuanta1: DataQuantaBuilder[_, In1])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[CartesianDataQuantaBuilder[In0, In1], RT2[In0, In1]] {
-
- // Since we are currently not looking at type parameters, we can statically determine the output type.
- locally {
- this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]]
- }
-
- override protected def build =
- inputDataQuanta0.dataQuanta().cartesian(inputDataQuanta1.dataQuanta())(inputDataQuanta1.classTag)
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.ZipWithIdOperator]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- */
-class ZipWithIdDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[ZipWithIdDataQuantaBuilder[T], RT2[java.lang.Long, T]] {
-
- // Since we are currently not looking at type parameters, we can statically determine the output type.
- locally {
- this.outputTypeTrap.dataSetType = dataSetType[RT2[_, _]]
- }
-
- override protected def build = inputDataQuanta.dataQuanta().zipWithId
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DistinctOperator]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- */
-class DistinctDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[DistinctDataQuantaBuilder[T], T] {
-
- // Reuse the input TypeTrap to enforce type equality between input and output.
- override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
- override protected def build = inputDataQuanta.dataQuanta().distinct
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CountOperator]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- */
-class CountDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[CountDataQuantaBuilder[T], java.lang.Long] {
-
- // We can statically determine the output type.
- locally {
- this.outputTypeTrap.dataSetType = dataSetType[java.lang.Long]
- }
-
- override protected def build = inputDataQuanta.dataQuanta().count
-
-}
-
-
-/**
- * [[DataQuantaBuilder]] implementation for any [[org.apache.wayang.core.plan.wayangplan.Operator]]s. Does not offer
- * any convenience methods, though.
- *
- * @param operator the custom [[org.apache.wayang.core.plan.wayangplan.Operator]]
- * @param outputIndex index of the [[OutputSlot]] addressed by the new instance
- * @param buildCache a [[DataQuantaBuilderCache]] that must be shared across instances addressing the same [[Operator]]
- * @param inputDataQuanta [[DataQuantaBuilder]]s for the input [[DataQuanta]]
- * @param javaPlanBuilder the [[JavaPlanBuilder]] used to construct the current [[WayangPlan]]
- */
-class CustomOperatorDataQuantaBuilder[T](operator: Operator,
- outputIndex: Int,
- buildCache: DataQuantaBuilderCache,
- inputDataQuanta: DataQuantaBuilder[_, _]*)
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[DataQuantaBuilder[_, T], T] {
-
- override protected def build = {
- // If the [[operator]] has multiple [[OutputSlot]]s, make sure that we only execute the build once.
- if (!buildCache.hasCached) {
- val dataQuanta = javaPlanBuilder.planBuilder.customOperator(operator, inputDataQuanta.map(_.dataQuanta()): _*)
- buildCache.cache(dataQuanta)
- }
- buildCache(outputIndex)
- }
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DoWhileOperator]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- * @param conditionUdf UDF for the looping condition
- * @param bodyBuilder builds the loop body
- */
-class DoWhileDataQuantaBuilder[T, ConvOut](inputDataQuanta: DataQuantaBuilder[_, T],
- conditionUdf: SerializablePredicate[JavaCollection[ConvOut]],
- bodyBuilder: JavaFunction[DataQuantaBuilder[_, T], WayangTuple[DataQuantaBuilder[_, T], DataQuantaBuilder[_, ConvOut]]])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[DoWhileDataQuantaBuilder[T, ConvOut], T] {
-
- // TODO: Get the ClassTag right.
- implicit private var convOutClassTag: ClassTag[ConvOut] = ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
-
- // Reuse the input TypeTrap to enforce type equality between input and output.
- override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
- // TODO: We could improve by combining the TypeTraps in the body loop.
-
- /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the UDF. */
- private var udfLoadProfileEstimator: LoadProfileEstimator = _
-
- /** Number of expected iterations. */
- private var numExpectedIterations = 20
-
- /**
- * Set a [[LoadProfileEstimator]] for the load of the UDF.
- *
- * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
- * @return this instance
- */
- def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
- this.udfLoadProfileEstimator = udfLoadProfileEstimator
- this
- }
-
- /**
- * Explicitly set the [[DataSetType]] for the condition [[DataQuanta]]. Note that it is not
- * always necessary to set it and that it can be inferred in some situations.
- *
- * @param outputType the output [[DataSetType]]
- * @return this instance
- */
- def withConditionType(outputType: DataSetType[ConvOut]) = {
- this.convOutClassTag = ClassTag(outputType.getDataUnitType.getTypeClass)
- this
- }
-
- /**
- * Explicitly set the [[Class]] for the condition [[DataQuanta]]. Note that it is not
- * always necessary to set it and that it can be inferred in some situations.
- *
- * @param cls the output [[Class]]
- * @return this instance
- */
- def withConditionClass(cls: Class[ConvOut]) = {
- this.convOutClassTag = ClassTag(cls)
- this
- }
-
- /**
- * Set the number of expected iterations for the built [[org.apache.wayang.basic.operators.DoWhileOperator]].
- *
- * @param numExpectedIterations the expected number of iterations
- * @return this instance
- */
- def withExpectedNumberOfIterations(numExpectedIterations: Int) = {
- this.numExpectedIterations = numExpectedIterations
- this
- }
-
- override protected def build =
- inputDataQuanta.dataQuanta().doWhileJava[ConvOut](
- conditionUdf, dataQuantaBodyBuilder, this.numExpectedIterations, this.udfLoadProfileEstimator
- )(this.convOutClassTag)
-
-
- /**
- * Create a loop body builder that is based on [[DataQuanta]].
- *
- * @return the loop body builder
- */
- private def dataQuantaBodyBuilder =
- new JavaFunction[DataQuanta[T], WayangTuple[DataQuanta[T], DataQuanta[ConvOut]]] {
- override def apply(loopStart: DataQuanta[T]) = {
- val loopStartBuilder = new FakeDataQuantaBuilder(loopStart)
- val loopEndBuilders = bodyBuilder(loopStartBuilder)
- new WayangTuple(loopEndBuilders.field0.dataQuanta(), loopEndBuilders.field1.dataQuanta())
- }
- }
-
-}
-
-/**
- * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DoWhileOperator]]s.
- *
- * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
- * @param numRepetitions number of repetitions of the loop
- * @param bodyBuilder builds the loop body
- */
-class RepeatDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T],
- numRepetitions: Int,
- bodyBuilder: JavaFunction[DataQuantaBuilder[_, T], DataQuantaBuilder[_, T]])
- (implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[RepeatDataQuantaBuilder[T], T] {
-
- // Reuse the input TypeTrap to enforce type equality between input and output.
- override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
-
- // TODO: We could improve by combining the TypeTraps in the body loop.
-
- override protected def build =
- inputDataQuanta.dataQuanta().repeat(numRepetitions, startDataQuanta => {
- val loopStartbuilder = new FakeDataQuantaBuilder(startDataQuanta)
- bodyBuilder(loopStartbuilder).dataQuanta()
- })
-
-}
-
-/**
- * Wraps [[DataQuanta]] and exposes them as [[DataQuantaBuilder]], i.e., this is an adapter.
- *
- * @param _dataQuanta the wrapped [[DataQuanta]]
- */
-class FakeDataQuantaBuilder[T](_dataQuanta: DataQuanta[T])(implicit javaPlanBuilder: JavaPlanBuilder)
- extends BasicDataQuantaBuilder[FakeDataQuantaBuilder[T], T] {
-
- override implicit def classTag = ClassTag(_dataQuanta.output.getType.getDataUnitType.getTypeClass)
-
- override def dataQuanta() = _dataQuanta
-
- /**
- * Create the [[DataQuanta]] built by this instance. Note the configuration being done in [[dataQuanta()]].
- *
- * @return the created and partially configured [[DataQuanta]]
- */
- override protected def build: DataQuanta[T] = _dataQuanta
-}
-
-/**
- * This is not an actual [[DataQuantaBuilder]] but rather decorates such a [[DataQuantaBuilder]] with a key.
- */
-class KeyedDataQuantaBuilder[Out, Key](private val dataQuantaBuilder: DataQuantaBuilder[_, Out],
- private val keyExtractor: SerializableFunction[Out, Key])
- (implicit javaPlanBuilder: JavaPlanBuilder) {
-
- /**
- * Joins this instance with the given one via their keys.
- *
- * @param that the instance to join with
- * @return a [[DataQuantaBuilder]] representing the join product
- */
- def join[ThatOut](that: KeyedDataQuantaBuilder[ThatOut, Key]) =
- dataQuantaBuilder.join(this.keyExtractor, that.dataQuantaBuilder, that.keyExtractor)
-
- /**
- * Co-groups this instance with the given one via their keys.
- *
- * @param that the instance to join with
- * @return a [[DataQuantaBuilder]] representing the co-group product
- */
- def coGroup[ThatOut](that: KeyedDataQuantaBuilder[ThatOut, Key]) =
- dataQuantaBuilder.coGroup(this.keyExtractor, that.dataQuantaBuilder, that.keyExtractor)
-
-}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
index dc9f6be..2c88f58 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
@@ -19,9 +19,9 @@
package org.apache.wayang.api
import java.util.{Collection => JavaCollection}
-
import de.hpi.isg.profiledb.store.model.Experiment
import org.apache.commons.lang3.Validate
+import org.apache.wayang.api.dataquantabuilder.{CustomOperatorDataQuantaBuilder, LoadCollectionDataQuantaBuilder, UnarySourceDataQuantaBuilder}
import org.apache.wayang.api.util.DataQuantaBuilderCache
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.operators.{TableSource, TextFileSource}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala
index 5af271f..5dc045b 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala
@@ -18,6 +18,7 @@
package org.apache.wayang.api
+import org.apache.wayang.api.dataquantabuilder.BasicDataQuantaBuilder
import org.apache.wayang.api.util.DataQuantaBuilderDecorator
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.function.ProjectionDescriptor
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/BasicDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/BasicDataQuantaBuilder.scala
new file mode 100644
index 0000000..9aad1a1
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/BasicDataQuantaBuilder.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import de.hpi.isg.profiledb.store.model.Experiment
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
+import org.apache.wayang.core.platform.Platform
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.{Logging, ReflectionUtils}
+
+import scala.collection.mutable.ListBuffer
+import scala.reflect.ClassTag
+
+/**
+ * Abstract base class for builders of [[DataQuanta]]. The purpose of the builders is to provide a convenient
+ * Java API for Wayang that compensates for lacking default and named arguments.
+ */
+abstract class BasicDataQuantaBuilder[This <: DataQuantaBuilder[_, Out], Out](implicit _javaPlanBuilder: JavaPlanBuilder)
+ extends Logging with DataQuantaBuilder[This, Out] {
+
+ /**
+ * Lazy-initialized. The [[DataQuanta]] product of this builder.
+ */
+ private var result: DataQuanta[Out] = _
+
+ /**
+ * A name for the [[DataQuanta]] to be built.
+ */
+ private var name: String = _
+
+ /**
+ * An [[Experiment]] for the [[DataQuanta]] to be built.
+ */
+ private var experiment: Experiment = _
+
+ /**
+ * Broadcasts for the [[DataQuanta]] to be built.
+ */
+ private val broadcasts: ListBuffer[(String, DataQuantaBuilder[_, _])] = ListBuffer()
+
+ /**
+ * [[CardinalityEstimator]] for the [[DataQuanta]] to be built.
+ */
+ private var cardinalityEstimator: CardinalityEstimator = _
+
+ /**
+ * Target [[Platform]]s for the [[DataQuanta]] to be built.
+ */
+ private val targetPlatforms: ListBuffer[Platform] = ListBuffer()
+
+ /**
+ * Paths of UDF JAR files for the [[DataQuanta]] to be built.
+ */
+ private val udfJars: ListBuffer[String] = ListBuffer()
+
+ /**
+ * The type of the [[DataQuanta]] to be built.
+ */
+ protected[api] val outputTypeTrap = getOutputTypeTrap
+
+ /**
+ * Retrieve an intialization value for [[outputTypeTrap]].
+ *
+ * @return the [[TypeTrap]]
+ */
+ protected def getOutputTypeTrap = new TypeTrap
+
+ override protected[api] implicit def javaPlanBuilder = _javaPlanBuilder
+
+ override def withName(name: String): This = {
+ this.name = name
+ this.asInstanceOf[This]
+ }
+
+ override def withExperiment(experiment: Experiment): This = {
+ this.experiment = experiment
+ this.asInstanceOf[This]
+ }
+
+ override def withOutputType(outputType: DataSetType[Out]): This = {
+ this.outputTypeTrap.dataSetType = outputType
+ this.asInstanceOf[This]
+ }
+
+ override def withOutputClass(cls: Class[Out]): This = this.withOutputType(DataSetType.createDefault(cls))
+
+ override def withBroadcast[Sender <: DataQuantaBuilder[_, _]](sender: Sender, broadcastName: String): This = {
+ this.broadcasts += Tuple2(broadcastName, sender)
+ this.asInstanceOf[This]
+ }
+
+ override def withCardinalityEstimator(cardinalityEstimator: CardinalityEstimator): This = {
+ this.cardinalityEstimator = cardinalityEstimator
+ this.asInstanceOf[This]
+ }
+
+ override def withTargetPlatform(platform: Platform): This = {
+ this.targetPlatforms += platform
+ this.asInstanceOf[This]
+ }
+
+ def withUdfJarOf(cls: Class[_]): This = this.withUdfJar(ReflectionUtils.getDeclaringJar(cls))
+
+ override def withUdfJar(path: String): This = {
+ this.udfJars += path
+ this.asInstanceOf[This]
+ }
+
+ override protected[api] implicit def classTag: ClassTag[Out] = ClassTag(outputTypeTrap.typeClass)
+
+ override protected[api] def dataQuanta(): DataQuanta[Out] = {
+ if (this.result == null) {
+ this.result = this.build
+ if (this.name != null) this.result.withName(this.name)
+ if (this.cardinalityEstimator != null) this.result.withCardinalityEstimator(this.cardinalityEstimator)
+ if (this.experiment != null) this.result.withExperiment(experiment)
+ this.result.withUdfJars(this.udfJars: _*)
+ this.result.withTargetPlatforms(this.targetPlatforms: _*)
+ this.broadcasts.foreach {
+ case (broadcastName, senderBuilder) => this.result.withBroadcast(senderBuilder.dataQuanta(), broadcastName)
+ }
+ }
+ this.result
+ }
+
+ /**
+ * Create the [[DataQuanta]] built by this instance. Note the configuration being done in [[dataQuanta()]].
+ *
+ * @return the created and partially configured [[DataQuanta]]
+ */
+ protected def build: DataQuanta[Out]
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CartesianDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CartesianDataQuantaBuilder.scala
new file mode 100644
index 0000000..40d4fae
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CartesianDataQuantaBuilder.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.basic.data.{Tuple2 => WT2}
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CartesianOperator]]s.
+ *
+ * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ */
+class CartesianDataQuantaBuilder[In0, In1](inputDataQuanta0: DataQuantaBuilder[_, In0],
+ inputDataQuanta1: DataQuantaBuilder[_, In1])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[CartesianDataQuantaBuilder[In0, In1], WT2[In0, In1]] {
+
+ // Since we are currently not looking at type parameters, we can statically determine the output type.
+ locally {
+ this.outputTypeTrap.dataSetType = dataSetType[WT2[_, _]]
+ }
+
+ override protected def build =
+ inputDataQuanta0.dataQuanta().cartesian(inputDataQuanta1.dataQuanta())(inputDataQuanta1.classTag)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CoGroupDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CoGroupDataQuantaBuilder.scala
new file mode 100644
index 0000000..aa1d88a
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CoGroupDataQuantaBuilder.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.basic.data.{Tuple2 => WT2}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.core.optimizer.costs.LoadEstimator
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+import scala.reflect.ClassTag
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CoGroupOperator]]s.
+ *
+ * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ * @param keyUdf0 first key extraction UDF for the [[org.apache.wayang.basic.operators.CoGroupOperator]]
+ * @param keyUdf1 first key extraction UDF for the [[org.apache.wayang.basic.operators.CoGroupOperator]]
+ */
+class CoGroupDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_, In0],
+ inputDataQuanta1: DataQuantaBuilder[_, In1],
+ keyUdf0: SerializableFunction[In0, Key],
+ keyUdf1: SerializableFunction[In1, Key])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[CoGroupDataQuantaBuilder[In0, In1, Key], WT2[java.lang.Iterable[In0], java.lang.Iterable[In1]]] {
+
+ /** [[ClassTag]] or surrogate of [[Key]] */
+ implicit var keyTag: ClassTag[Key] = _
+
+ /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf0]]. */
+ private var keyUdf0CpuEstimator: LoadEstimator = _
+
+ /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf0]]. */
+ private var keyUdf0RamEstimator: LoadEstimator = _
+
+ /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf1]]. */
+ private var keyUdf1CpuEstimator: LoadEstimator = _
+
+ /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf1]]. */
+ private var keyUdf1RamEstimator: LoadEstimator = _
+
+ // Try to infer the type classes from the UDFs.
+ locally {
+ val parameters = ReflectionUtils.getTypeParameters(keyUdf0.getClass, classOf[SerializableFunction[_, _]])
+ parameters.get("Input") match {
+ case cls: Class[In0] => inputDataQuanta0.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+ case _ => logger.warn("Could not infer types from {}.", keyUdf0)
+ }
+
+ this.keyTag = parameters.get("Output") match {
+ case cls: Class[Key] => ClassTag(cls)
+ case _ =>
+ logger.warn("Could not infer types from {}.", keyUdf0)
+ ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+ }
+ }
+ locally {
+ val parameters = ReflectionUtils.getTypeParameters(keyUdf1.getClass, classOf[SerializableFunction[_, _]])
+ parameters.get("Input") match {
+ case cls: Class[In1] => inputDataQuanta1.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+ case _ => logger.warn("Could not infer types from {}.", keyUdf0)
+ }
+
+ this.keyTag = parameters.get("Output") match {
+ case cls: Class[Key] => ClassTag(cls)
+ case _ =>
+ logger.warn("Could not infer types from {}.", keyUdf0)
+ ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+ }
+ }
+ // Since we are currently not looking at type parameters, we can statically determine the output type.
+ locally {
+ this.outputTypeTrap.dataSetType = dataSetType[WT2[_, _]]
+ }
+
+ /**
+ * Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
+ *
+ * @param udfCpuEstimator the [[LoadEstimator]]
+ * @return this instance
+ */
+ def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
+ this.keyUdf0CpuEstimator = udfCpuEstimator
+ this
+ }
+
+ /**
+ * Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
+ *
+ * @param udfRamEstimator the [[LoadEstimator]]
+ * @return this instance
+ */
+ def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
+ this.keyUdf0RamEstimator = udfRamEstimator
+ this
+ }
+
+ /**
+ * Set a [[LoadEstimator]] for the CPU load of the second key extraction UDF. Currently effectless.
+ *
+ * @param udfCpuEstimator the [[LoadEstimator]]
+ * @return this instance
+ */
+ def withThatKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
+ this.keyUdf1CpuEstimator = udfCpuEstimator
+ this
+ }
+
+ /**
+ * Set a [[LoadEstimator]] for the RAM load of the second key extraction UDF. Currently effectless.
+ *
+ * @param udfRamEstimator the [[LoadEstimator]]
+ * @return this instance
+ */
+ def withThatKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
+ this.keyUdf1RamEstimator = udfRamEstimator
+ this
+ }
+
+ override protected def build =
+ inputDataQuanta0.dataQuanta().coGroupJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CountDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CountDataQuantaBuilder.scala
new file mode 100644
index 0000000..d8f7d18
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CountDataQuantaBuilder.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CountOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ */
+class CountDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[CountDataQuantaBuilder[T], java.lang.Long] {
+
+ // We can statically determine the output type.
+ locally {
+ this.outputTypeTrap.dataSetType = dataSetType[java.lang.Long]
+ }
+
+ override protected def build = inputDataQuanta.dataQuanta().count
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CustomOperatorDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CustomOperatorDataQuantaBuilder.scala
new file mode 100644
index 0000000..5fca840
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CustomOperatorDataQuantaBuilder.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.DataQuantaBuilderCache
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.plan.wayangplan.{Operator, OutputSlot, WayangPlan}
+
+/**
+ * [[DataQuantaBuilder]] implementation for any [[org.apache.wayang.core.plan.wayangplan.Operator]]s. Does not offer
+ * any convenience methods, though.
+ *
+ * @param operator the custom [[org.apache.wayang.core.plan.wayangplan.Operator]]
+ * @param outputIndex index of the [[OutputSlot]] addressed by the new instance
+ * @param buildCache a [[DataQuantaBuilderCache]] that must be shared across instances addressing the same [[Operator]]
+ * @param inputDataQuanta [[DataQuantaBuilder]]s for the input [[DataQuanta]]
+ * @param javaPlanBuilder the [[JavaPlanBuilder]] used to construct the current [[WayangPlan]]
+ */
+class CustomOperatorDataQuantaBuilder[T](operator: Operator,
+ outputIndex: Int,
+ buildCache: DataQuantaBuilderCache,
+ inputDataQuanta: DataQuantaBuilder[_, _]*)
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[DataQuantaBuilder[_, T], T] {
+
+ override protected def build = {
+ // If the [[operator]] has multiple [[OutputSlot]]s, make sure that we only execute the build once.
+ if (!buildCache.hasCached) {
+ val dataQuanta = javaPlanBuilder.planBuilder.customOperator(operator, inputDataQuanta.map(_.dataQuanta()): _*)
+ buildCache.cache(dataQuanta)
+ }
+ buildCache(outputIndex)
+ }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DistinctDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DistinctDataQuantaBuilder.scala
new file mode 100644
index 0000000..9e8e044
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DistinctDataQuantaBuilder.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DistinctOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ */
+class DistinctDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[DistinctDataQuantaBuilder[T], T] {
+
+ // Reuse the input TypeTrap to enforce type equality between input and output.
+ override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+ override protected def build = inputDataQuanta.dataQuanta().distinct
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DoWhileDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DoWhileDataQuantaBuilder.scala
new file mode 100644
index 0000000..34de7be
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DoWhileDataQuantaBuilder.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializablePredicate
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.{Tuple => WayangTuple}
+
+import java.util.function.{Function => JavaFunction}
+import java.util.{Collection => JavaCollection}
+import scala.reflect.ClassTag
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DoWhileOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param conditionUdf UDF for the looping condition
+ * @param bodyBuilder builds the loop body
+ */
+class DoWhileDataQuantaBuilder[T, ConvOut](inputDataQuanta: DataQuantaBuilder[_, T],
+ conditionUdf: SerializablePredicate[JavaCollection[ConvOut]],
+ bodyBuilder: JavaFunction[DataQuantaBuilder[_, T], WayangTuple[DataQuantaBuilder[_, T], DataQuantaBuilder[_, ConvOut]]])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[DoWhileDataQuantaBuilder[T, ConvOut], T] {
+
+ // TODO: Get the ClassTag right.
+ implicit private var convOutClassTag: ClassTag[ConvOut] = ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+
+ // Reuse the input TypeTrap to enforce type equality between input and output.
+ override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+ // TODO: We could improve by combining the TypeTraps in the body loop.
+
+ /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the UDF. */
+ private var udfLoadProfileEstimator: LoadProfileEstimator = _
+
+ /** Number of expected iterations. */
+ private var numExpectedIterations = 20
+
+ /**
+ * Set a [[LoadProfileEstimator]] for the load of the UDF.
+ *
+ * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+ * @return this instance
+ */
+ def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+ this.udfLoadProfileEstimator = udfLoadProfileEstimator
+ this
+ }
+
+ /**
+ * Explicitly set the [[DataSetType]] for the condition [[DataQuanta]]. Note that it is not
+ * always necessary to set it and that it can be inferred in some situations.
+ *
+ * @param outputType the output [[DataSetType]]
+ * @return this instance
+ */
+ def withConditionType(outputType: DataSetType[ConvOut]) = {
+ this.convOutClassTag = ClassTag(outputType.getDataUnitType.getTypeClass)
+ this
+ }
+
+ /**
+ * Explicitly set the [[Class]] for the condition [[DataQuanta]]. Note that it is not
+ * always necessary to set it and that it can be inferred in some situations.
+ *
+ * @param cls the output [[Class]]
+ * @return this instance
+ */
+ def withConditionClass(cls: Class[ConvOut]) = {
+ this.convOutClassTag = ClassTag(cls)
+ this
+ }
+
+ /**
+ * Set the number of expected iterations for the built [[org.apache.wayang.basic.operators.DoWhileOperator]].
+ *
+ * @param numExpectedIterations the expected number of iterations
+ * @return this instance
+ */
+ def withExpectedNumberOfIterations(numExpectedIterations: Int) = {
+ this.numExpectedIterations = numExpectedIterations
+ this
+ }
+
+ override protected def build =
+ inputDataQuanta.dataQuanta().doWhileJava[ConvOut](
+ conditionUdf, dataQuantaBodyBuilder, this.numExpectedIterations, this.udfLoadProfileEstimator
+ )(this.convOutClassTag)
+
+
+ /**
+ * Create a loop body builder that is based on [[DataQuanta]].
+ *
+ * @return the loop body builder
+ */
+ private def dataQuantaBodyBuilder =
+ new JavaFunction[DataQuanta[T], WayangTuple[DataQuanta[T], DataQuanta[ConvOut]]] {
+ override def apply(loopStart: DataQuanta[T]) = {
+ val loopStartBuilder = new FakeDataQuantaBuilder(loopStart)
+ val loopEndBuilders = bodyBuilder(loopStartBuilder)
+ new WayangTuple(loopEndBuilders.field0.dataQuanta(), loopEndBuilders.field1.dataQuanta())
+ }
+ }
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FakeDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FakeDataQuantaBuilder.scala
new file mode 100644
index 0000000..6b52c40
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FakeDataQuantaBuilder.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+
+import scala.reflect.ClassTag
+
+/**
+ * Wraps [[DataQuanta]] and exposes them as [[DataQuantaBuilder]], i.e., this is an adapter.
+ *
+ * @param _dataQuanta the wrapped [[DataQuanta]]
+ */
+class FakeDataQuantaBuilder[T](_dataQuanta: DataQuanta[T])(implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[FakeDataQuantaBuilder[T], T] {
+
+ override implicit def classTag = ClassTag(_dataQuanta.output.getType.getDataUnitType.getTypeClass)
+
+ override def dataQuanta() = _dataQuanta
+
+ /**
+ * Create the [[DataQuanta]] built by this instance. Note the configuration being done in [[dataQuanta()]].
+ *
+ * @return the created and partially configured [[DataQuanta]]
+ */
+ override protected def build: DataQuanta[T] = _dataQuanta
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FilterDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FilterDataQuantaBuilder.scala
new file mode 100644
index 0000000..3d33c80
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FilterDataQuantaBuilder.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.basic.operators.MapOperator
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableFunction, SerializablePredicate}
+import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param udf UDF for the [[MapOperator]]
+ */
+class FilterDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], udf: SerializablePredicate[T])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[FilterDataQuantaBuilder[T], T] {
+
+ // Reuse the input TypeTrap to enforce type equality between input and output.
+ override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+ /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
+ private var udfLoadProfileEstimator: LoadProfileEstimator = _
+
+ /** Selectivity of the filter predicate. */
+ private var selectivity: ProbabilisticDoubleInterval = _
+
+ /** SQL UDF implementing the filter predicate. */
+ private var sqlUdf: String = _
+
+ // Try to infer the type classes from the udf.
+ locally {
+ val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
+ parameters.get("Input") match {
+ case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+ case _ => logger.warn("Could not infer types from {}.", udf)
+ }
+ }
+
+
+ /**
+ * Set a [[LoadProfileEstimator]] for the load of the UDF.
+ *
+ * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+ * @return this instance
+ */
+ def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+ this.udfLoadProfileEstimator = udfLoadProfileEstimator
+ this
+ }
+
+ /**
+ * Add a SQL implementation of the UDF.
+ *
+ * @param sqlUdf a SQL condition that can be plugged into a `WHERE` clause
+ * @return this instance
+ */
+ def withSqlUdf(sqlUdf: String) = {
+ this.sqlUdf = sqlUdf
+ this
+ }
+
+ /**
+ * Specify the selectivity of the UDF.
+ *
+ * @param lowerEstimate the lower bound of the expected selectivity
+ * @param upperEstimate the upper bound of the expected selectivity
+ * @param confidence the probability of the actual selectivity being within these bounds
+ * @return this instance
+ */
+ def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = {
+ this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence)
+ this
+ }
+
+ override protected def build = inputDataQuanta.dataQuanta().filterJava(
+ udf, this.sqlUdf, this.selectivity, this.udfLoadProfileEstimator
+ )
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FlatMapDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FlatMapDataQuantaBuilder.scala
new file mode 100644
index 0000000..83b4383
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FlatMapDataQuantaBuilder.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.FlatMapOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param udf UDF for the [[org.apache.wayang.basic.operators.FlatMapOperator]]
+ */
+class FlatMapDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In],
+ udf: SerializableFunction[In, java.lang.Iterable[Out]])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[FlatMapDataQuantaBuilder[In, Out], Out] {
+
+
+ /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
+ private var udfLoadProfileEstimator: LoadProfileEstimator = _
+
+ /** Selectivity of the filter predicate. */
+ private var selectivity: ProbabilisticDoubleInterval = _
+
+ // Try to infer the type classes from the udf.
+ locally {
+ val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
+ parameters.get("Input") match {
+ case cls: Class[In] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+ case _ => logger.warn("Could not infer types from {}.", udf)
+ }
+ val originalClass = ReflectionUtils.getWrapperClass(parameters.get("Output"), 0)
+ originalClass match {
+ case cls: Class[Out] => {
+ this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+ }
+ case _ => logger.warn("Could not infer types from {}.", udf)
+ }
+ }
+
+ /**
+ * Set a [[LoadProfileEstimator]] for the load of the UDF.
+ *
+ * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+ * @return this instance
+ */
+ def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+ this.udfLoadProfileEstimator = udfLoadProfileEstimator
+ this
+ }
+
+ /**
+ * Specify the selectivity of the UDF.
+ *
+ * @param lowerEstimate the lower bound of the expected selectivity
+ * @param upperEstimate the upper bound of the expected selectivity
+ * @param confidence the probability of the actual selectivity being within these bounds
+ * @return this instance
+ */
+ def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = {
+ this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence)
+ this
+ }
+
+ override protected def build = inputDataQuanta.dataQuanta().flatMapJava(
+ udf, this.selectivity, this.udfLoadProfileEstimator
+ )
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalGroupDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalGroupDataQuantaBuilder.scala
new file mode 100644
index 0000000..abc1d9a
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalGroupDataQuantaBuilder.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ */
+class GlobalGroupDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])(implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[GlobalGroupDataQuantaBuilder[T], java.lang.Iterable[T]] {
+
+ override protected def build = inputDataQuanta.dataQuanta().group()
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalReduceDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalReduceDataQuantaBuilder.scala
new file mode 100644
index 0000000..c3efc95
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalReduceDataQuantaBuilder.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableBinaryOperator
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.GlobalReduceOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param udf UDF for the [[org.apache.wayang.basic.operators.GlobalReduceOperator]]
+ */
+class GlobalReduceDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T],
+ udf: SerializableBinaryOperator[T])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[GlobalReduceDataQuantaBuilder[T], T] {
+
+ // Reuse the input TypeTrap to enforce type equality between input and output.
+ override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+ /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
+ private var udfLoadProfileEstimator: LoadProfileEstimator = _
+
+ // Try to infer the type classes from the udf.
+ locally {
+ val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableBinaryOperator[_]])
+ parameters.get("Type") match {
+ case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+ case _ => logger.warn("Could not infer types from {}.", udf)
+ }
+ }
+
+ /**
+ * Set a [[LoadProfileEstimator]] for the load of the UDF.
+ *
+ * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+ * @return this instance
+ */
+ def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+ this.udfLoadProfileEstimator = udfLoadProfileEstimator
+ this
+ }
+
+ override protected def build = inputDataQuanta.dataQuanta().reduceJava(udf, this.udfLoadProfileEstimator)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GroupByDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GroupByDataQuantaBuilder.scala
new file mode 100644
index 0000000..b61c71e
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GroupByDataQuantaBuilder.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+import scala.reflect.ClassTag
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param keyUdf key extraction UDF for the [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]
+ */
+class GroupByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T], keyUdf: SerializableFunction[T, Key])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[GroupByDataQuantaBuilder[Key, T], java.lang.Iterable[T]] {
+
+ implicit var keyTag: ClassTag[Key] = _
+
+
+ /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[keyUdf]]. */
+ private var keyUdfLoadProfileEstimator: LoadProfileEstimator = _
+
+ // Try to infer the type classes from the UDFs.
+ locally {
+ val parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]])
+ parameters.get("Input") match {
+ case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createGrouped(cls)
+ case _ => logger.warn("Could not infer types from {}.", keyUdf)
+ }
+
+ this.keyTag = parameters.get("Output") match {
+ case cls: Class[Key] => ClassTag(cls)
+ case _ =>
+ logger.warn("Could not infer types from {}.", keyUdf)
+ ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+ }
+ }
+
+ /**
+ * Set a [[LoadProfileEstimator]] for the load of the UDF.
+ *
+ * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+ * @return this instance
+ */
+ def withKeyUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+ this.keyUdfLoadProfileEstimator = udfLoadProfileEstimator
+ this
+ }
+
+ override protected def build =
+ inputDataQuanta.dataQuanta().groupByKeyJava(keyUdf, this.keyUdfLoadProfileEstimator)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/IntersectDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/IntersectDataQuantaBuilder.scala
new file mode 100644
index 0000000..6db8ffe
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/IntersectDataQuantaBuilder.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.IntersectOperator]]s.
+ *
+ * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ */
+class IntersectDataQuantaBuilder[T](inputDataQuanta0: DataQuantaBuilder[_, T],
+ inputDataQuanta1: DataQuantaBuilder[_, T])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[IntersectDataQuantaBuilder[T], T] {
+
+ override def getOutputTypeTrap = inputDataQuanta0.outputTypeTrap
+
+ override protected def build = inputDataQuanta0.dataQuanta().intersect(inputDataQuanta1.dataQuanta())
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/JoinDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/JoinDataQuantaBuilder.scala
new file mode 100644
index 0000000..ab49ffb
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/JoinDataQuantaBuilder.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.basic.data.{Tuple2 => WT2}
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableFunction}
+import org.apache.wayang.core.optimizer.costs.LoadEstimator
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+import scala.reflect.ClassTag
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.JoinOperator]]s.
+ *
+ * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ * @param keyUdf0 first key extraction UDF for the [[org.apache.wayang.basic.operators.JoinOperator]]
+ * @param keyUdf1 first key extraction UDF for the [[org.apache.wayang.basic.operators.JoinOperator]]
+ */
+class JoinDataQuantaBuilder[In0, In1, Key](inputDataQuanta0: DataQuantaBuilder[_, In0],
+ inputDataQuanta1: DataQuantaBuilder[_, In1],
+ keyUdf0: SerializableFunction[In0, Key],
+ keyUdf1: SerializableFunction[In1, Key])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[JoinDataQuantaBuilder[In0, In1, Key], WT2[In0, In1]] {
+
+ /** [[ClassTag]] or surrogate of [[Key]] */
+ implicit var keyTag: ClassTag[Key] = _
+
+ /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf0]]. */
+ private var keyUdf0CpuEstimator: LoadEstimator = _
+
+ /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf0]]. */
+ private var keyUdf0RamEstimator: LoadEstimator = _
+
+ /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf1]]. */
+ private var keyUdf1CpuEstimator: LoadEstimator = _
+
+ /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf1]]. */
+ private var keyUdf1RamEstimator: LoadEstimator = _
+
+ // Try to infer the type classes from the UDFs.
+ locally {
+ val parameters = ReflectionUtils.getTypeParameters(keyUdf0.getClass, classOf[SerializableFunction[_, _]])
+ parameters.get("Input") match {
+ case cls: Class[In0] => inputDataQuanta0.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+ case _ => logger.warn("Could not infer types from {}.", keyUdf0)
+ }
+
+ this.keyTag = parameters.get("Output") match {
+ case cls: Class[Key] => ClassTag(cls)
+ case _ =>
+ logger.warn("Could not infer types from {}.", keyUdf0)
+ ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+ }
+ }
+ locally {
+ val parameters = ReflectionUtils.getTypeParameters(keyUdf1.getClass, classOf[SerializableFunction[_, _]])
+ parameters.get("Input") match {
+ case cls: Class[In1] => inputDataQuanta1.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+ case _ => logger.warn("Could not infer types from {}.", keyUdf0)
+ }
+
+ this.keyTag = parameters.get("Output") match {
+ case cls: Class[Key] => ClassTag(cls)
+ case _ =>
+ logger.warn("Could not infer types from {}.", keyUdf0)
+ ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+ }
+ }
+ // Since we are currently not looking at type parameters, we can statically determine the output type.
+ locally {
+ this.outputTypeTrap.dataSetType = dataSetType[WT2[_, _]]
+ }
+
+ /**
+ * Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
+ *
+ * @param udfCpuEstimator the [[LoadEstimator]]
+ * @return this instance
+ */
+ def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
+ this.keyUdf0CpuEstimator = udfCpuEstimator
+ this
+ }
+
+ /**
+ * Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
+ *
+ * @param udfRamEstimator the [[LoadEstimator]]
+ * @return this instance
+ */
+ def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
+ this.keyUdf0RamEstimator = udfRamEstimator
+ this
+ }
+
+ /**
+ * Set a [[LoadEstimator]] for the CPU load of the second key extraction UDF. Currently effectless.
+ *
+ * @param udfCpuEstimator the [[LoadEstimator]]
+ * @return this instance
+ */
+ def withThatKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
+ this.keyUdf1CpuEstimator = udfCpuEstimator
+ this
+ }
+
+ /**
+ * Set a [[LoadEstimator]] for the RAM load of the second key extraction UDF. Currently effectless.
+ *
+ * @param udfRamEstimator the [[LoadEstimator]]
+ * @return this instance
+ */
+ def withThatKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
+ this.keyUdf1RamEstimator = udfRamEstimator
+ this
+ }
+
+ /**
+ * Assemble the joined elements to new elements.
+ *
+ * @param udf produces a joined element from two joinable elements
+ * @return a new [[DataQuantaBuilder]] representing the assembled join product
+ */
+ def assemble[NewOut](udf: SerializableBiFunction[In0, In1, NewOut]) =
+ this.map(new SerializableFunction[WT2[In0, In1], NewOut] {
+ override def apply(joinTuple: WT2[In0, In1]): NewOut = udf.apply(joinTuple.field0, joinTuple.field1)
+ })
+
+ override protected def build =
+ inputDataQuanta0.dataQuanta().joinJava(keyUdf0, inputDataQuanta1.dataQuanta(), keyUdf1)(inputDataQuanta1.classTag, this.keyTag)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/KeyedDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/KeyedDataQuantaBuilder.scala
new file mode 100644
index 0000000..9ddc101
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/KeyedDataQuantaBuilder.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+
+/**
+ * This is not an actual [[DataQuantaBuilder]] but rather decorates such a [[DataQuantaBuilder]] with a key.
+ */
+class KeyedDataQuantaBuilder[Out, Key](private val dataQuantaBuilder: DataQuantaBuilder[_, Out],
+ private val keyExtractor: SerializableFunction[Out, Key])
+ (implicit javaPlanBuilder: JavaPlanBuilder) {
+
+ /**
+ * Joins this instance with the given one via their keys.
+ *
+ * @param that the instance to join with
+ * @return a [[DataQuantaBuilder]] representing the join product
+ */
+ def join[ThatOut](that: KeyedDataQuantaBuilder[ThatOut, Key]) =
+ dataQuantaBuilder.join(this.keyExtractor, that.dataQuantaBuilder, that.keyExtractor)
+
+ /**
+ * Co-groups this instance with the given one via their keys.
+ *
+ * @param that the instance to join with
+ * @return a [[DataQuantaBuilder]] representing the co-group product
+ */
+ def coGroup[ThatOut](that: KeyedDataQuantaBuilder[ThatOut, Key]) =
+ dataQuantaBuilder.coGroup(this.keyExtractor, that.dataQuantaBuilder, that.keyExtractor)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/LoadCollectionDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/LoadCollectionDataQuantaBuilder.scala
new file mode 100644
index 0000000..748416e
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/LoadCollectionDataQuantaBuilder.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.WayangCollections
+
+import java.util.{Collection => JavaCollection}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CollectionSource]]s.
+ *
+ * @param collection the [[JavaCollection]] to be loaded
+ * @param javaPlanBuilder the [[JavaPlanBuilder]]
+ */
+class LoadCollectionDataQuantaBuilder[Out](collection: JavaCollection[Out])(implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[LoadCollectionDataQuantaBuilder[Out], Out] {
+
+ // Try to infer the type class from the collection.
+ locally {
+ if (!collection.isEmpty) {
+ val any = WayangCollections.getAny(collection)
+ if (any != null) {
+ this.outputTypeTrap.dataSetType = DataSetType.createDefault(any.getClass)
+ }
+ }
+ }
+
+ override protected def build: DataQuanta[Out] = javaPlanBuilder.planBuilder.loadCollection(collection)(this.classTag)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapDataQuantaBuilder.scala
new file mode 100644
index 0000000..deb15a9
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapDataQuantaBuilder.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.basic.operators.MapOperator
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param udf UDF for the [[MapOperator]]
+ */
+class MapDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In],
+ udf: SerializableFunction[In, Out])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[MapDataQuantaBuilder[In, Out], Out] {
+
+ /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
+ private var udfLoadProfileEstimator: LoadProfileEstimator = _
+
+ // Try to infer the type classes from the udf.
+ locally {
+ val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
+ parameters.get("Input") match {
+ case cls: Class[In] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+ case _ => logger.warn("Could not infer types from {}.", udf)
+ }
+ parameters.get("Output") match {
+ case cls: Class[Out] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+ case _ => logger.warn("Could not infer types from {}.", udf)
+ }
+ }
+
+ /**
+ * Set a [[LoadProfileEstimator]] for the load of the UDF.
+ *
+ * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+ * @return this instance
+ */
+ def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+ this.udfLoadProfileEstimator = udfLoadProfileEstimator
+ this
+ }
+
+ override protected def build = inputDataQuanta.dataQuanta().mapJava(udf, this.udfLoadProfileEstimator)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapPartitionsDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapPartitionsDataQuantaBuilder.scala
new file mode 100644
index 0000000..e686565
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapPartitionsDataQuantaBuilder.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapPartitionsOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param udf UDF for the [[org.apache.wayang.basic.operators.MapPartitionsOperator]]
+ */
+class MapPartitionsDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In],
+ udf: SerializableFunction[java.lang.Iterable[In], java.lang.Iterable[Out]])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[MapPartitionsDataQuantaBuilder[In, Out], Out] {
+
+ /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
+ private var udfLoadProfileEstimator: LoadProfileEstimator = _
+
+ /** Selectivity of the filter predicate. */
+ private var selectivity: ProbabilisticDoubleInterval = _
+
+ // Try to infer the type classes from the udf.
+ locally {
+ val parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableFunction[_, _]])
+ parameters.get("Input") match {
+ case cls: Class[In] => {
+ inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+ }
+ case _ => logger.warn("Could not infer types from {}.", udf)
+ }
+ val originalClass = ReflectionUtils.getWrapperClass(parameters.get("Output"), 0)
+ originalClass match {
+ case cls: Class[Out] => {
+ this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+ }
+ case _ => logger.warn("Could not infer types from {}.", udf)
+ }
+ }
+
+
+ /**
+ * Set a [[LoadProfileEstimator]] for the load of the UDF.
+ *
+ * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+ * @return this instance
+ */
+ def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+ this.udfLoadProfileEstimator = udfLoadProfileEstimator
+ this
+ }
+
+ /**
+ * Specify the selectivity of the UDF.
+ *
+ * @param lowerEstimate the lower bound of the expected selectivity
+ * @param upperEstimate the upper bound of the expected selectivity
+ * @param confidence the probability of the actual selectivity being within these bounds
+ * @return this instance
+ */
+ def withSelectivity(lowerEstimate: Double, upperEstimate: Double, confidence: Double) = {
+ this.selectivity = new ProbabilisticDoubleInterval(lowerEstimate, upperEstimate, confidence)
+ this
+ }
+
+ override protected def build = inputDataQuanta.dataQuanta().mapPartitionsJava(
+ udf, this.selectivity, this.udfLoadProfileEstimator
+ )
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ProjectionDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ProjectionDataQuantaBuilder.scala
new file mode 100644
index 0000000..28ede1e
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ProjectionDataQuantaBuilder.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s with
+ * [[org.apache.wayang.basic.function.ProjectionDescriptor]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param fieldNames field names for the [[org.apache.wayang.basic.function.ProjectionDescriptor]]
+ */
+class ProjectionDataQuantaBuilder[In, Out](inputDataQuanta: DataQuantaBuilder[_, In], fieldNames: Array[String])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[ProjectionDataQuantaBuilder[In, Out], Out] {
+
+ override protected def build = inputDataQuanta.dataQuanta().project(fieldNames.toSeq)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ReduceByDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ReduceByDataQuantaBuilder.scala
new file mode 100644
index 0000000..ca14576
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ReduceByDataQuantaBuilder.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction}
+import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+import scala.reflect.ClassTag
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.ReduceByOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param udf UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
+ * @param keyUdf key extraction UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
+ */
+class ReduceByDataQuantaBuilder[Key, T](inputDataQuanta: DataQuantaBuilder[_, T],
+ keyUdf: SerializableFunction[T, Key],
+ udf: SerializableBinaryOperator[T])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[ReduceByDataQuantaBuilder[Key, T], T] {
+
+ // Reuse the input TypeTrap to enforce type equality between input and output.
+ override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+ implicit var keyTag: ClassTag[Key] = _
+
+ /** [[LoadProfileEstimator]] to estimate the [[LoadProfile]] of the [[udf]]. */
+ private var udfLoadProfileEstimator: LoadProfileEstimator = _
+
+ // TODO: Add these estimators.
+ // /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf]]. */
+ // private var keyUdfCpuEstimator: LoadEstimator = _
+ //
+ // /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf]]. */
+ // private var keyUdfRamEstimator: LoadEstimator = _
+
+ // Try to infer the type classes from the UDFs.
+ locally {
+ var parameters = ReflectionUtils.getTypeParameters(udf.getClass, classOf[SerializableBinaryOperator[_]])
+ parameters.get("Type") match {
+ case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+ case _ => logger.warn("Could not infer types from {}.", udf)
+ }
+
+ parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]])
+ parameters.get("Input") match {
+ case cls: Class[T] => this.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+ case _ => logger.warn("Could not infer types from {}.", keyUdf)
+ }
+ this.keyTag = parameters.get("Output") match {
+ case cls: Class[Key] => ClassTag(cls)
+ case _ =>
+ logger.warn("Could not infer types from {}.", keyUdf)
+ ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+ }
+ }
+
+ /**
+ * Set a [[LoadProfileEstimator]] for the load of the UDF.
+ *
+ * @param udfLoadProfileEstimator the [[LoadProfileEstimator]]
+ * @return this instance
+ */
+ def withUdfLoad(udfLoadProfileEstimator: LoadProfileEstimator) = {
+ this.udfLoadProfileEstimator = udfLoadProfileEstimator
+ this
+ }
+
+ override protected def build =
+ inputDataQuanta.dataQuanta().reduceByKeyJava(keyUdf, udf, this.udfLoadProfileEstimator)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/RepeatDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/RepeatDataQuantaBuilder.scala
new file mode 100644
index 0000000..f7170f7
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/RepeatDataQuantaBuilder.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+
+import java.util.function.{Function => JavaFunction}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DoWhileOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param numRepetitions number of repetitions of the loop
+ * @param bodyBuilder builds the loop body
+ */
+class RepeatDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T],
+ numRepetitions: Int,
+ bodyBuilder: JavaFunction[DataQuantaBuilder[_, T], DataQuantaBuilder[_, T]])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[RepeatDataQuantaBuilder[T], T] {
+
+ // Reuse the input TypeTrap to enforce type equality between input and output.
+ override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+ // TODO: We could improve by combining the TypeTraps in the body loop.
+
+ override protected def build =
+ inputDataQuanta.dataQuanta().repeat(numRepetitions, startDataQuanta => {
+ val loopStartbuilder = new FakeDataQuantaBuilder(startDataQuanta)
+ bodyBuilder(loopStartbuilder).dataQuanta()
+ })
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SampleDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SampleDataQuantaBuilder.scala
new file mode 100644
index 0000000..0642fe6
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SampleDataQuantaBuilder.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.basic.operators.SampleOperator
+
+import java.util.function.IntUnaryOperator
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.SampleOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number
+ */
+class SampleDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T], sampleSizeFunction: IntUnaryOperator)
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[SampleDataQuantaBuilder[T], T] {
+
+ /**
+ * Size of the dataset to be sampled.
+ */
+ private var datasetSize = SampleOperator.UNKNOWN_DATASET_SIZE
+
+ /**
+ * Sampling method to use.
+ */
+ private var sampleMethod = SampleOperator.Methods.ANY
+
+ /**
+ * Seed to use.
+ */
+ private var seed: Option[Long] = None
+
+ // Reuse the input TypeTrap to enforce type equality between input and output.
+ override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+ /**
+ * Set the size of the dataset that should be sampled.
+ *
+ * @param datasetSize the size of the dataset
+ * @return this instance
+ */
+ def withDatasetSize(datasetSize: Long) = {
+ this.datasetSize = datasetSize
+ this
+ }
+
+ /**
+ * Set the sample method to be used.
+ *
+ * @param sampleMethod the sample method
+ * @return this instance
+ */
+ def withSampleMethod(sampleMethod: SampleOperator.Methods) = {
+ this.sampleMethod = sampleMethod
+ this
+ }
+
+ /**
+ * Set the sample method to be used.
+ *
+ * @param seed
+ * @return this instance
+ */
+ def withSeed(seed: Long) = {
+ this.seed = Some(seed)
+ this
+ }
+
+ override protected def build =
+ inputDataQuanta.dataQuanta().sampleDynamicJava(sampleSizeFunction, this.datasetSize, this.seed, this.sampleMethod)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SortDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SortDataQuantaBuilder.scala
new file mode 100644
index 0000000..d1af25a
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SortDataQuantaBuilder.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.util.TypeTrap
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.core.optimizer.costs.LoadEstimator
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.ReflectionUtils
+
+import scala.reflect.ClassTag
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.SortOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ * @param keyUdf UDF for the [[org.apache.wayang.basic.operators.SortOperator]]
+ */
+class SortDataQuantaBuilder[T, Key](inputDataQuanta: DataQuantaBuilder[_, T],
+ keyUdf: SerializableFunction[T, Key])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[SortDataQuantaBuilder[T, Key], T] {
+
+ // Reuse the input TypeTrap to enforce type equality between input and output.
+ override def getOutputTypeTrap: TypeTrap = inputDataQuanta.outputTypeTrap
+
+ /** [[ClassTag]] or surrogate of [[Key]] */
+ implicit var keyTag: ClassTag[Key] = _
+
+ /** [[LoadEstimator]] to estimate the CPU load of the [[keyUdf]]. */
+ private var keyUdfCpuEstimator: LoadEstimator = _
+
+ /** [[LoadEstimator]] to estimate the RAM load of the [[keyUdf]]. */
+ private var keyUdfRamEstimator: LoadEstimator = _
+
+
+ // Try to infer the type classes from the UDFs.
+ locally {
+ val parameters = ReflectionUtils.getTypeParameters(keyUdf.getClass, classOf[SerializableFunction[_, _]])
+ parameters.get("Input") match {
+ case cls: Class[T] => inputDataQuanta.outputTypeTrap.dataSetType = DataSetType.createDefault(cls)
+ case _ => logger.warn("Could not infer types from {}.", keyUdf)
+ }
+
+ this.keyTag = parameters.get("Output") match {
+ case cls: Class[Key] => ClassTag(cls)
+ case _ =>
+ logger.warn("Could not infer types from {}.", keyUdf)
+ ClassTag(DataSetType.none.getDataUnitType.getTypeClass)
+ }
+ }
+
+
+ /**
+ * Set a [[LoadEstimator]] for the CPU load of the first key extraction UDF. Currently effectless.
+ *
+ * @param udfCpuEstimator the [[LoadEstimator]]
+ * @return this instance
+ */
+ def withThisKeyUdfCpuEstimator(udfCpuEstimator: LoadEstimator) = {
+ this.keyUdfCpuEstimator = udfCpuEstimator
+ this
+ }
+
+ /**
+ * Set a [[LoadEstimator]] for the RAM load of first the key extraction UDF. Currently effectless.
+ *
+ * @param udfRamEstimator the [[LoadEstimator]]
+ * @return this instance
+ */
+ def withThisKeyUdfRamEstimator(udfRamEstimator: LoadEstimator) = {
+ this.keyUdfRamEstimator = udfRamEstimator
+ this
+ }
+
+ override protected def build =
+ inputDataQuanta.dataQuanta().sortJava(keyUdf)(this.keyTag)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnarySourceDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnarySourceDataQuantaBuilder.scala
new file mode 100644
index 0000000..d97b08b
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnarySourceDataQuantaBuilder.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.core.plan.wayangplan.UnarySource
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.core.plan.wayangplan.UnarySource]]s.
+ *
+ * @param source the [[UnarySource]]
+ * @param javaPlanBuilder the [[JavaPlanBuilder]]
+ */
+class UnarySourceDataQuantaBuilder[This <: DataQuantaBuilder[_, Out], Out](source: UnarySource[Out])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[This, Out] {
+
+ override protected def build: DataQuanta[Out] = javaPlanBuilder.planBuilder.load(source)(this.classTag)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnionDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnionDataQuantaBuilder.scala
new file mode 100644
index 0000000..04fc6ac
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnionDataQuantaBuilder.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.UnionAllOperator]]s.
+ *
+ * @param inputDataQuanta0 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ * @param inputDataQuanta1 [[DataQuantaBuilder]] for the first input [[DataQuanta]]
+ */
+class UnionDataQuantaBuilder[T](inputDataQuanta0: DataQuantaBuilder[_, T],
+ inputDataQuanta1: DataQuantaBuilder[_, T])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[UnionDataQuantaBuilder[T], T] {
+
+ override def getOutputTypeTrap = inputDataQuanta0.outputTypeTrap
+
+ override protected def build = inputDataQuanta0.dataQuanta().union(inputDataQuanta1.dataQuanta())
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ZipWithIdDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ZipWithIdDataQuantaBuilder.scala
new file mode 100644
index 0000000..ded640c
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ZipWithIdDataQuantaBuilder.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquantabuilder
+
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.basic.data.{Tuple2 => WT2}
+
+/**
+ * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.ZipWithIdOperator]]s.
+ *
+ * @param inputDataQuanta [[DataQuantaBuilder]] for the input [[DataQuanta]]
+ */
+class ZipWithIdDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T])
+ (implicit javaPlanBuilder: JavaPlanBuilder)
+ extends BasicDataQuantaBuilder[ZipWithIdDataQuantaBuilder[T], WT2[java.lang.Long, T]] {
+
+ // Since we are currently not looking at type parameters, we can statically determine the output type.
+ locally {
+ this.outputTypeTrap.dataSetType = dataSetType[WT2[_, _]]
+ }
+
+ override protected def build = inputDataQuanta.dataQuanta().zipWithId
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala
index 17f5e32..1eaec53 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala
@@ -18,8 +18,9 @@
package org.apache.wayang.api.graph
+import org.apache.wayang.api.dataquantabuilder.BasicDataQuantaBuilder
import org.apache.wayang.api.util.DataQuantaBuilderDecorator
-import org.apache.wayang.api.{BasicDataQuantaBuilder, DataQuanta, DataQuantaBuilder, JavaPlanBuilder, _}
+import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, _}
import org.apache.wayang.basic.operators.PageRankOperator
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
diff --git a/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java b/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java
index 2d4bb6c..71996a0 100644
--- a/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java
+++ b/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java
@@ -18,6 +18,8 @@
package org.apache.wayang.api;
+import org.apache.wayang.api.dataquantabuilder.GlobalReduceDataQuantaBuilder;
+import org.apache.wayang.api.dataquantabuilder.LoadCollectionDataQuantaBuilder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
diff --git a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java
index 59b48cd..3b55a51 100644
--- a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java
+++ b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java
@@ -21,8 +21,8 @@ package org.apache.wayang.tests;
import org.junit.Assert;
import org.junit.Test;
import org.apache.wayang.api.JavaPlanBuilder;
-import org.apache.wayang.api.LoadCollectionDataQuantaBuilder;
-import org.apache.wayang.api.MapDataQuantaBuilder;
+import org.apache.wayang.api.dataquantabuilder.LoadCollectionDataQuantaBuilder;
+import org.apache.wayang.api.dataquantabuilder.MapDataQuantaBuilder;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.util.WayangArrays;
import org.apache.wayang.java.Java;