You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/05/18 04:50:56 UTC
[incubator-wayang] 08/08: [WAYANG-28] seed version of the Hackit API
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch debugger
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit a272d73203333bd4b141e923e525bc6df6c635e1
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Tue May 18 00:50:30 2021 -0400
[WAYANG-28] seed version of the Hackit API
---
.../apache/wayang/api/dataquanta/DataQuanta.scala | 2 +-
.../wayang-hackit/wayang-hackit-api/pom.xml | 5 +
.../plugin/hackit/api/DataQuantaHackit.scala | 226 +++++++++++++--------
.../apache/wayang/plugin/hackit/api/Hackit.scala | 78 +++++++
.../plugin/hackit/api/ApiExtensionTest.scala | 42 +++-
5 files changed, 260 insertions(+), 93 deletions(-)
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
index 664079b..4f9eb43 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
@@ -49,7 +49,7 @@ import scala.reflect._
* @param ev$1 the data type of the elements in this instance
* @param planBuilder keeps track of the [[WayangPlan]] being build
*/
-abstract class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: Int = 0)(implicit val planBuilder: PlanBuilder) {
+abstract class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, var outputIndex: Int = 0)(implicit val planBuilder: PlanBuilder) {
Validate.isTrue(operator.getNumOutputs > outputIndex)
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
index ae95199..85a37b7 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
@@ -50,6 +50,11 @@
<dependencies>
<dependency>
<groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-hackit-core</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
<artifactId>wayang-api-scala-java_2.11</artifactId>
<version>0.6.0-SNAPSHOT</version>
</dependency>
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala
index 3f76c7b..f1505e0 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala
@@ -29,10 +29,17 @@ import org.apache.wayang.core.function.{FlatMapDescriptor, MapPartitionsDescript
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
import org.apache.wayang.core.plan.wayangplan.{ElementaryOperator, Operator, OutputSlot, WayangPlan}
+import org.apache.wayang.plugin.hackit.core.tagger.HackitTagger
+import org.apache.wayang.plugin.hackit.core.tagger.wrapper.FunctionWrapperHackit
+import org.apache.wayang.plugin.hackit.core.tags.HackitTag
+import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple
-import java.lang
+
+
+import java.{lang, util}
import java.lang.{Iterable => JavaIterable}
import java.util.function.IntUnaryOperator
+import scala.language.implicitConversions
import scala.reflect.ClassTag
/**
@@ -42,10 +49,35 @@ import scala.reflect.ClassTag
* @param ev$1 the data type of the elements in this instance
* @param planBuilder keeps track of the [[WayangPlan]] being build
*/
-class DataQuantaHackit[Out: ClassTag]
+class DataQuantaHackit[Out : ClassTag]
(override val operator: ElementaryOperator, outputIndex: Int = 0)
(implicit override val planBuilder: PlanBuilder)
- extends DataQuanta[Out](operator, outputIndex) {
+ extends DataQuanta[Out](operator, outputIndex)
+{
+
+ var tagger : HackitTagger = null;
+ //var sniffer : HackitSniffer = null;
+
+ /**
+ * add a [[HackitTag]] on the [[HackitTagger]] to enable the process
+ *
+ * @param tag [[HackitTag]] to be added
+ * @return the self instance
+ */
+ //TODO add the version of collection
+ def addTag(tag: HackitTag): DataQuantaHackit[Out] = {
+// tagger.addPreTag(tag)
+ println("here")
+ this
+ }
+
+ def underHackit[Key: ClassTag](): DataQuantaHackit[HackitTuple[Key, Out]] = {
+// HackItRDD<K, T> ktHackItRDD = new HackItRDD<K, T>(
+// rdd.map((Function<T, HackItTuple<K, T>>) HackItTuple::new)
+// );
+
+null
+ }
/**
* Feed this instance into a [[MapOperator]].
@@ -54,16 +86,21 @@ class DataQuantaHackit[Out: ClassTag]
* @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
* @return a new instance representing the [[MapOperator]]'s output
*/
- override def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, NewOut],
+ override def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, NewOut],
udfLoad: LoadProfileEstimator = null): DataQuantaHackit[NewOut] = {
- val lala = new SerializableFunction[Out, NewOut] {
- override def apply(t: Out): NewOut = {
+
+ val wrapper_internal = new FunctionWrapperHackit[Object, Out, NewOut](udf)
+ this.tagger = wrapper_internal
+
+ val wrapper = new SerializableFunction[HackitTuple[Object, Out], HackitTuple[Object, NewOut]](){
+ override def apply(t: HackitTuple[Object, Out]): HackitTuple[Object, NewOut] = {
println(t)
- udf.apply(t)
+ wrapper_internal.apply(t)
}
}
+
val mapOperator = new MapOperator(new TransformationDescriptor(
- lala, basicDataUnitType[Out], basicDataUnitType[NewOut], udfLoad
+ wrapper, basicDataUnitType[HackitTuple[Object, Out]], basicDataUnitType[HackitTuple[Object, NewOut]], udfLoad
))
this.connectTo(mapOperator, 0)
DataQuantaHackit.wrap[NewOut](mapOperator)
@@ -114,11 +151,12 @@ class DataQuantaHackit[Out: ClassTag]
sqlUdf: String = null,
selectivity: ProbabilisticDoubleInterval = null,
udfLoad: LoadProfileEstimator = null): DataQuantaHackit[Out] = {
- val filterOperator = new FilterOperator(new PredicateDescriptor(
- udf, this.output.getType.getDataUnitType.toBasicDataUnitType, selectivity, udfLoad
- ).withSqlImplementation(sqlUdf))
- this.connectTo(filterOperator, 0)
- DataQuantaHackit.wrap[Out](filterOperator)
+// val filterOperator = new FilterOperator(new PredicateDescriptor(
+// udf, this.output.getType.getDataUnitType.toBasicDataUnitType, selectivity, udfLoad
+// ).withSqlImplementation(sqlUdf))
+// this.connectTo(filterOperator, 0)
+// DataQuantaHackit.wrap[Out](filterOperator)
+ null
}
/**
@@ -132,11 +170,12 @@ class DataQuantaHackit[Out: ClassTag]
override def flatMapJava[NewOut: ClassTag](udf: SerializableFunction[Out, JavaIterable[NewOut]],
selectivity: ProbabilisticDoubleInterval = null,
udfLoad: LoadProfileEstimator = null): DataQuantaHackit[NewOut] = {
- val flatMapOperator = new FlatMapOperator(new FlatMapDescriptor(
- udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad
- ))
- this.connectTo(flatMapOperator, 0)
- DataQuantaHackit.wrap[NewOut](flatMapOperator)
+// val flatMapOperator = new FlatMapOperator(new FlatMapDescriptor(
+// udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad
+// ))
+// this.connectTo(flatMapOperator, 0)
+// DataQuantaHackit.wrap[NewOut](flatMapOperator)
+ null
}
@@ -199,12 +238,13 @@ class DataQuantaHackit[Out: ClassTag]
udf: SerializableBinaryOperator[Out],
udfLoad: LoadProfileEstimator = null)
: DataQuantaHackit[Out] = {
- val reduceByOperator = new ReduceByOperator(
- new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
- new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad)
- )
- this.connectTo(reduceByOperator, 0)
- DataQuantaHackit.wrap[Out](reduceByOperator)
+// val reduceByOperator = new ReduceByOperator(
+// new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
+// new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad)
+// )
+// this.connectTo(reduceByOperator, 0)
+// DataQuantaHackit.wrap[Out](reduceByOperator)
+ null
}
/**
@@ -216,13 +256,14 @@ class DataQuantaHackit[Out: ClassTag]
*/
override def groupByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key],
keyUdfLoad: LoadProfileEstimator = null): DataQuantaHackit[java.lang.Iterable[Out]] = {
- val groupByOperator = new MaterializedGroupByOperator(
- new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key], keyUdfLoad),
- dataSetType[Out],
- groupedDataSetType[Out]
- )
- this.connectTo(groupByOperator, 0)
- DataQuantaHackit.wrap[java.lang.Iterable[Out]](groupByOperator)
+// val groupByOperator = new MaterializedGroupByOperator(
+// new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key], keyUdfLoad),
+// dataSetType[Out],
+// groupedDataSetType[Out]
+// )
+// this.connectTo(groupByOperator, 0)
+// DataQuantaHackit.wrap[java.lang.Iterable[Out]](groupByOperator)
+ null
}
/**
@@ -234,11 +275,12 @@ class DataQuantaHackit[Out: ClassTag]
*/
override def reduceJava(udf: SerializableBinaryOperator[Out],
udfLoad: LoadProfileEstimator = null): DataQuantaHackit[Out] = {
- val globalReduceOperator = new GlobalReduceOperator(
- new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad)
- )
- this.connectTo(globalReduceOperator, 0)
- DataQuantaHackit.wrap[Out](globalReduceOperator)
+// val globalReduceOperator = new GlobalReduceOperator(
+// new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad)
+// )
+// this.connectTo(globalReduceOperator, 0)
+// DataQuantaHackit.wrap[Out](globalReduceOperator)
+ null
}
/**
@@ -250,14 +292,15 @@ class DataQuantaHackit[Out: ClassTag]
* @return a new instance representing the [[JoinOperator]]'s output
*/
override def joinJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuantaHackit[WayangTuple2[Out, ThatOut]] = {
- require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
- val joinOperator = new JoinOperator(
- new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
- new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key])
- )
- this.connectTo(joinOperator, 0)
- that.connectTo(joinOperator, 1)
- DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](joinOperator)
+// require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+// val joinOperator = new JoinOperator(
+// new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
+// new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key])
+// )
+// this.connectTo(joinOperator, 0)
+// that.connectTo(joinOperator, 1)
+// DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](joinOperator)
+ null
}
/**
@@ -269,14 +312,15 @@ class DataQuantaHackit[Out: ClassTag]
* @return a new instance representing the [[CoGroupOperator]]'s output
*/
override def coGroupJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuantaHackit[WayangTuple2[JavaIterable[Out], JavaIterable[ThatOut]]] = {
- require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
- val coGroupOperator = new CoGroupOperator(
- new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
- new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key])
- )
- this.connectTo(coGroupOperator, 0)
- that.connectTo(coGroupOperator, 1)
- DataQuantaHackit.wrap[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]](coGroupOperator)
+// require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+// val coGroupOperator = new CoGroupOperator(
+// new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
+// new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key])
+// )
+// this.connectTo(coGroupOperator, 0)
+// that.connectTo(coGroupOperator, 1)
+// DataQuantaHackit.wrap[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]](coGroupOperator)
+ null
}
@@ -286,13 +330,12 @@ class DataQuantaHackit[Out: ClassTag]
* @param keyUdf UDF to extract key from data quanta in this instance
* @return a new instance representing the [[SortOperator]]'s output
*/
- override def sortJava[Key: ClassTag]
- (keyUdf: SerializableFunction[Out, Key])
- : DataQuantaHackit[Out] = {
- val sortOperator = new SortOperator(new TransformationDescriptor(
- keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]))
- this.connectTo(sortOperator, 0)
- DataQuantaHackit.wrap[Out](sortOperator)
+ override def sortJava[Key: ClassTag] (keyUdf: SerializableFunction[Out, Key]) : DataQuantaHackit[Out] = {
+// val sortOperator = new SortOperator(new TransformationDescriptor(
+// keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]))
+// this.connectTo(sortOperator, 0)
+// DataQuantaHackit.wrap[Out](sortOperator)
+ null
}
@@ -302,9 +345,10 @@ class DataQuantaHackit[Out: ClassTag]
* @return a new instance representing the [[GlobalMaterializedGroupOperator]]'s output
*/
override def group(): DataQuantaHackit[JavaIterable[Out]] = {
- val groupOperator = new GlobalMaterializedGroupOperator(dataSetType[Out], groupedDataSetType[Out])
- this.connectTo(groupOperator, 0)
- DataQuantaHackit.wrap[JavaIterable[Out]](groupOperator)
+// val groupOperator = new GlobalMaterializedGroupOperator(dataSetType[Out], groupedDataSetType[Out])
+// this.connectTo(groupOperator, 0)
+// DataQuantaHackit.wrap[JavaIterable[Out]](groupOperator)
+ null
}
/**
@@ -314,11 +358,12 @@ class DataQuantaHackit[Out: ClassTag]
* @return a new instance representing the [[UnionAllOperator]]'s output
*/
override def union(that: DataQuanta[Out]): DataQuantaHackit[Out] = {
- require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
- val unionAllOperator = new UnionAllOperator(dataSetType[Out])
- this.connectTo(unionAllOperator, 0)
- that.connectTo(unionAllOperator, 1)
- DataQuantaHackit.wrap[Out](unionAllOperator)
+// require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+// val unionAllOperator = new UnionAllOperator(dataSetType[Out])
+// this.connectTo(unionAllOperator, 0)
+// that.connectTo(unionAllOperator, 1)
+// DataQuantaHackit.wrap[Out](unionAllOperator)
+ null
}
/**
@@ -328,11 +373,12 @@ class DataQuantaHackit[Out: ClassTag]
* @return a new instance representing the [[IntersectOperator]]'s output
*/
override def intersect(that: DataQuanta[Out]): DataQuantaHackit[Out] = {
- require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
- val intersectOperator = new IntersectOperator(dataSetType[Out])
- this.connectTo(intersectOperator, 0)
- that.connectTo(intersectOperator, 1)
- DataQuantaHackit.wrap[Out](intersectOperator)
+// require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+// val intersectOperator = new IntersectOperator(dataSetType[Out])
+// this.connectTo(intersectOperator, 0)
+// that.connectTo(intersectOperator, 1)
+// DataQuantaHackit.wrap[Out](intersectOperator)
+ null
}
/**
@@ -342,11 +388,12 @@ class DataQuantaHackit[Out: ClassTag]
* @return a new instance representing the [[CartesianOperator]]'s output
*/
override def cartesian[ThatOut: ClassTag](that: DataQuanta[ThatOut]): DataQuantaHackit[WayangTuple2[Out, ThatOut]] = {
- require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
- val cartesianOperator = new CartesianOperator(dataSetType[Out], dataSetType[ThatOut])
- this.connectTo(cartesianOperator, 0)
- that.connectTo(cartesianOperator, 1)
- DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](cartesianOperator)
+// require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+// val cartesianOperator = new CartesianOperator(dataSetType[Out], dataSetType[ThatOut])
+// this.connectTo(cartesianOperator, 0)
+// that.connectTo(cartesianOperator, 1)
+// DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](cartesianOperator)
+ null
}
/**
@@ -355,9 +402,10 @@ class DataQuantaHackit[Out: ClassTag]
* @return a new instance representing the [[ZipWithIdOperator]]'s output
*/
override def zipWithId: DataQuantaHackit[WayangTuple2[lang.Long, Out]] = {
- val zipWithIdOperator = new ZipWithIdOperator(dataSetType[Out])
- this.connectTo(zipWithIdOperator, 0)
- DataQuantaHackit.wrap[WayangTuple2[lang.Long, Out]](zipWithIdOperator)
+// val zipWithIdOperator = new ZipWithIdOperator(dataSetType[Out])
+// this.connectTo(zipWithIdOperator, 0)
+// DataQuantaHackit.wrap[WayangTuple2[lang.Long, Out]](zipWithIdOperator)
+ null
}
/**
@@ -366,9 +414,10 @@ class DataQuantaHackit[Out: ClassTag]
* @return a new instance representing the [[DistinctOperator]]'s output
*/
override def distinct: DataQuantaHackit[Out] = {
- val distinctOperator = new DistinctOperator(dataSetType[Out])
- this.connectTo(distinctOperator, 0)
- DataQuantaHackit.wrap[Out](distinctOperator)
+// val distinctOperator = new DistinctOperator(dataSetType[Out])
+// this.connectTo(distinctOperator, 0)
+// DataQuantaHackit.wrap[Out](distinctOperator)
+ null
}
/**
@@ -377,9 +426,10 @@ class DataQuantaHackit[Out: ClassTag]
* @return a new instance representing the [[CountOperator]]'s output
*/
override def count: DataQuantaHackit[lang.Long] = {
- val countOperator = new CountOperator(dataSetType[Out])
- this.connectTo(countOperator, 0)
- DataQuantaHackit.wrap[lang.Long](countOperator)
+// val countOperator = new CountOperator(dataSetType[Out])
+// this.connectTo(countOperator, 0)
+// DataQuantaHackit.wrap[lang.Long](countOperator)
+ null
}
}
@@ -392,4 +442,8 @@ object DataQuantaHackit extends DataQuantaCreator{
def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuantaHackit[_] =
new DataQuantaHackit(output.getOwner.asInstanceOf[ElementaryOperator], output.getIndex)(ClassTag(output.getType.getDataUnitType.getTypeClass), planBuilder)
+ implicit def wrapDataQuanta[T:ClassTag](dataQuanta: DataQuanta[T]): DataQuantaHackit[T] = {
+ new DataQuantaHackit[T](dataQuanta.operator, dataQuanta.outputIndex)(ClassTag(dataQuanta.output.getType.getDataUnitType.getTypeClass), dataQuanta.planBuilder);
+ }
+
}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/Hackit.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/Hackit.scala
new file mode 100644
index 0000000..b9499fb
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/Hackit.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.plugin.hackit.api
+
+import org.apache.wayang.api.PlanBuilder
+import org.apache.wayang.api.dataquanta.DataQuanta
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
+import org.apache.wayang.plugin.hackit.core.tagger.HackitTagger
+import org.apache.wayang.plugin.hackit.core.tagger.wrapper.FunctionWrapperHackit
+import org.apache.wayang.plugin.hackit.core.tags.HackitTag
+import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple
+import org.apache.wayang.api.toSerializableFunction
+
+import scala.language.implicitConversions
+import scala.reflect.{ClassTag, classTag}
+
+class Hackit[Key: ClassTag, Type:ClassTag](implicit var planBuilder: PlanBuilder) {
+
+ var dataQuanta: DataQuanta[HackitTuple[Key, Type]] = null
+
+ var tagger : HackitTagger = null;
+
+ def addTag(hackitTag: HackitTag) = {
+ println("you are adding a tag")
+ this
+ }
+
+ def underHackit(dataQuanta: DataQuanta[Type]): Hackit[Key, Type] = {
+ val hackit = new Hackit[Key, Type]()
+ hackit.dataQuanta = dataQuanta.map[HackitTuple[Key, Type]](
+ element => {
+ new HackitTuple[Key, Type](element)
+ }
+ )
+ hackit
+ }
+
+ def toDataQuanta(): DataQuanta[Type] = {
+ this.dataQuanta.map(tuple => tuple.getValue)
+ }
+
+ def map[TypeOut: ClassTag](udf: Type => TypeOut, udfLoad: LoadProfileEstimator = null): Hackit[Key, TypeOut] = {
+ val hackit = new Hackit[Key, TypeOut]()
+ val wrapper = new FunctionWrapperHackit[Key, Type, TypeOut](toSerializableFunction(udf))
+ hackit.dataQuanta = dataQuanta.map[HackitTuple[Key, TypeOut]](element => wrapper.apply(element), udfLoad)
+ hackit
+ }
+
+}
+
+
+object Hackit {
+
+ //TODO: replace the object with a parameter
+ implicit def underHackit[T: ClassTag](dataQuanta: DataQuanta[T]): Hackit[java.lang.Object, T] = {
+ return new Hackit[java.lang.Object, T]()(classTag[java.lang.Object], ClassTag(dataQuanta.output.getType.getDataUnitType.getTypeClass), dataQuanta.planBuilder)
+ .underHackit(dataQuanta)
+ }
+
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala
index 59a0fd9..42867f3 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala
@@ -19,12 +19,17 @@
package org.apache.wayang.plugin.hackit.api
-import org.apache.wayang.api.ApiTest
import org.apache.wayang.api.dataquanta.DataQuantaFactory
-import org.junit.{BeforeClass, Test}
+import org.apache.wayang.api.createPlanBuilder
+import org.apache.wayang.plugin.hackit.api.Hackit.underHackit
+import org.apache.wayang.core.api.WayangContext
+import org.apache.wayang.java.Java
+import org.apache.wayang.spark.Spark
+import org.junit.{Assert, BeforeClass, Test}
import org.junit.jupiter.api.{BeforeAll, BeforeEach}
-class ApiExtensionTest extends ApiTest {
+
+class ApiExtensionTest {
@BeforeEach
def setUp() ={
@@ -32,10 +37,35 @@ class ApiExtensionTest extends ApiTest {
}
@Test
- override def testReadMapCollect(): Unit = {
- DataQuantaFactory.setTemplate(DataQuantaHackit);
+ def testReadMapCollectHackit(): Unit = {
+
+ // Set up WayangContext.
+ val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+
+ // Generate some test data.
+ val inputValues = (for (i <- 1 to 10) yield i).toArray
+
+ // Build and execute a Wayang plan.
+ var outputValues = wayang
+ .loadCollection(inputValues).withName("Load input values")
+ .addTag(null)
+ .map(a => a + 2)//.withName("Add 2")
+ .dataQuanta
+ .collect()
+
+ print(outputValues)
+
+ var lolo = wayang
+ .loadCollection(inputValues).withName("Load input values")
+ .addTag(null)
+ .map(a => a + 2)//.withName("Add 2")
+ .toDataQuanta()
+ .collect()
- super.testReadMapCollect()
+ print(lolo)
+// // Check the outcome.
+// val expectedOutputValues = inputValues.map(_ + 2)
+// Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray)
}
}