You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/05/18 04:50:56 UTC

[incubator-wayang] 08/08: [WAYANG-28] seed version of the Hackit API

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch debugger
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit a272d73203333bd4b141e923e525bc6df6c635e1
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Tue May 18 00:50:30 2021 -0400

    [WAYANG-28] seed version of the Hackit API
---
 .../apache/wayang/api/dataquanta/DataQuanta.scala  |   2 +-
 .../wayang-hackit/wayang-hackit-api/pom.xml        |   5 +
 .../plugin/hackit/api/DataQuantaHackit.scala       | 226 +++++++++++++--------
 .../apache/wayang/plugin/hackit/api/Hackit.scala   |  78 +++++++
 .../plugin/hackit/api/ApiExtensionTest.scala       |  42 +++-
 5 files changed, 260 insertions(+), 93 deletions(-)

diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
index 664079b..4f9eb43 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
@@ -49,7 +49,7 @@ import scala.reflect._
   * @param ev$1        the data type of the elements in this instance
   * @param planBuilder keeps track of the [[WayangPlan]] being build
   */
-abstract class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: Int = 0)(implicit val planBuilder: PlanBuilder) {
+abstract class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, var outputIndex: Int = 0)(implicit val planBuilder: PlanBuilder) {
 
   Validate.isTrue(operator.getNumOutputs > outputIndex)
 
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
index ae95199..85a37b7 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
@@ -50,6 +50,11 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-hackit-core</artifactId>
+            <version>0.6.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
             <artifactId>wayang-api-scala-java_2.11</artifactId>
             <version>0.6.0-SNAPSHOT</version>
         </dependency>
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala
index 3f76c7b..f1505e0 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala
@@ -29,10 +29,17 @@ import org.apache.wayang.core.function.{FlatMapDescriptor, MapPartitionsDescript
 import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
 import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
 import org.apache.wayang.core.plan.wayangplan.{ElementaryOperator, Operator, OutputSlot, WayangPlan}
+import org.apache.wayang.plugin.hackit.core.tagger.HackitTagger
+import org.apache.wayang.plugin.hackit.core.tagger.wrapper.FunctionWrapperHackit
+import org.apache.wayang.plugin.hackit.core.tags.HackitTag
+import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple
 
-import java.lang
+
+
+import java.{lang, util}
 import java.lang.{Iterable => JavaIterable}
 import java.util.function.IntUnaryOperator
+import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
 /**
@@ -42,10 +49,35 @@ import scala.reflect.ClassTag
  * @param ev$1        the data type of the elements in this instance
  * @param planBuilder keeps track of the [[WayangPlan]] being build
  */
-class DataQuantaHackit[Out: ClassTag]
+class DataQuantaHackit[Out : ClassTag]
                             (override val operator: ElementaryOperator, outputIndex: Int = 0)
                             (implicit override val  planBuilder: PlanBuilder)
-                    extends DataQuanta[Out](operator, outputIndex) {
+                    extends DataQuanta[Out](operator, outputIndex)
+{
+
+  var tagger : HackitTagger = null;
+  //var sniffer : HackitSniffer = null;
+
+  /**
+   * add a [[HackitTag]] on the [[HackitTagger]] to enable the process
+   *
+   * @param tag [[HackitTag]] to be added
+   * @return the self instance
+   */
+  //TODO add the version of collection
+  def addTag(tag: HackitTag): DataQuantaHackit[Out] = {
+//    tagger.addPreTag(tag)
+    println("here")
+    this
+  }
+
+  def underHackit[Key: ClassTag](): DataQuantaHackit[HackitTuple[Key, Out]] = {
+//    HackItRDD<K, T> ktHackItRDD = new HackItRDD<K, T>(
+//      rdd.map((Function<T, HackItTuple<K, T>>) HackItTuple::new)
+//    );
+
+null
+  }
 
   /**
    * Feed this instance into a [[MapOperator]].
@@ -54,16 +86,21 @@ class DataQuantaHackit[Out: ClassTag]
    * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
    * @return a new instance representing the [[MapOperator]]'s output
    */
-  override def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, NewOut],
+   override def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, NewOut],
                                          udfLoad: LoadProfileEstimator = null): DataQuantaHackit[NewOut] = {
-    val lala = new SerializableFunction[Out, NewOut] {
-      override def apply(t: Out): NewOut = {
+
+    val wrapper_internal = new FunctionWrapperHackit[Object, Out, NewOut](udf)
+    this.tagger = wrapper_internal
+
+    val wrapper = new SerializableFunction[HackitTuple[Object, Out], HackitTuple[Object, NewOut]](){
+      override def apply(t: HackitTuple[Object, Out]): HackitTuple[Object, NewOut] = {
         println(t)
-        udf.apply(t)
+        wrapper_internal.apply(t)
       }
     }
+
     val mapOperator = new MapOperator(new TransformationDescriptor(
-      lala, basicDataUnitType[Out], basicDataUnitType[NewOut], udfLoad
+      wrapper, basicDataUnitType[HackitTuple[Object, Out]], basicDataUnitType[HackitTuple[Object, NewOut]], udfLoad
     ))
     this.connectTo(mapOperator, 0)
     DataQuantaHackit.wrap[NewOut](mapOperator)
@@ -114,11 +151,12 @@ class DataQuantaHackit[Out: ClassTag]
                           sqlUdf: String = null,
                           selectivity: ProbabilisticDoubleInterval = null,
                           udfLoad: LoadProfileEstimator = null): DataQuantaHackit[Out] = {
-    val filterOperator = new FilterOperator(new PredicateDescriptor(
-      udf, this.output.getType.getDataUnitType.toBasicDataUnitType, selectivity, udfLoad
-    ).withSqlImplementation(sqlUdf))
-    this.connectTo(filterOperator, 0)
-    DataQuantaHackit.wrap[Out](filterOperator)
+//    val filterOperator = new FilterOperator(new PredicateDescriptor(
+//      udf, this.output.getType.getDataUnitType.toBasicDataUnitType, selectivity, udfLoad
+//    ).withSqlImplementation(sqlUdf))
+//    this.connectTo(filterOperator, 0)
+//    DataQuantaHackit.wrap[Out](filterOperator)
+    null
   }
 
   /**
@@ -132,11 +170,12 @@ class DataQuantaHackit[Out: ClassTag]
   override def flatMapJava[NewOut: ClassTag](udf: SerializableFunction[Out, JavaIterable[NewOut]],
                                              selectivity: ProbabilisticDoubleInterval = null,
                                              udfLoad: LoadProfileEstimator = null): DataQuantaHackit[NewOut] = {
-    val flatMapOperator = new FlatMapOperator(new FlatMapDescriptor(
-      udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad
-    ))
-    this.connectTo(flatMapOperator, 0)
-    DataQuantaHackit.wrap[NewOut](flatMapOperator)
+//    val flatMapOperator = new FlatMapOperator(new FlatMapDescriptor(
+//      udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad
+//    ))
+//    this.connectTo(flatMapOperator, 0)
+//    DataQuantaHackit.wrap[NewOut](flatMapOperator)
+    null
   }
 
 
@@ -199,12 +238,13 @@ class DataQuantaHackit[Out: ClassTag]
                                               udf: SerializableBinaryOperator[Out],
                                               udfLoad: LoadProfileEstimator = null)
   : DataQuantaHackit[Out] = {
-    val reduceByOperator = new ReduceByOperator(
-      new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
-      new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad)
-    )
-    this.connectTo(reduceByOperator, 0)
-    DataQuantaHackit.wrap[Out](reduceByOperator)
+//    val reduceByOperator = new ReduceByOperator(
+//      new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
+//      new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad)
+//    )
+//    this.connectTo(reduceByOperator, 0)
+//    DataQuantaHackit.wrap[Out](reduceByOperator)
+    null
   }
 
   /**
@@ -216,13 +256,14 @@ class DataQuantaHackit[Out: ClassTag]
    */
   override def groupByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key],
                                              keyUdfLoad: LoadProfileEstimator = null): DataQuantaHackit[java.lang.Iterable[Out]] = {
-    val groupByOperator = new MaterializedGroupByOperator(
-      new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key], keyUdfLoad),
-      dataSetType[Out],
-      groupedDataSetType[Out]
-    )
-    this.connectTo(groupByOperator, 0)
-    DataQuantaHackit.wrap[java.lang.Iterable[Out]](groupByOperator)
+//    val groupByOperator = new MaterializedGroupByOperator(
+//      new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key], keyUdfLoad),
+//      dataSetType[Out],
+//      groupedDataSetType[Out]
+//    )
+//    this.connectTo(groupByOperator, 0)
+//    DataQuantaHackit.wrap[java.lang.Iterable[Out]](groupByOperator)
+    null
   }
 
   /**
@@ -234,11 +275,12 @@ class DataQuantaHackit[Out: ClassTag]
    */
   override def reduceJava(udf: SerializableBinaryOperator[Out],
                           udfLoad: LoadProfileEstimator = null): DataQuantaHackit[Out] = {
-    val globalReduceOperator = new GlobalReduceOperator(
-      new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad)
-    )
-    this.connectTo(globalReduceOperator, 0)
-    DataQuantaHackit.wrap[Out](globalReduceOperator)
+//    val globalReduceOperator = new GlobalReduceOperator(
+//      new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad)
+//    )
+//    this.connectTo(globalReduceOperator, 0)
+//    DataQuantaHackit.wrap[Out](globalReduceOperator)
+    null
   }
 
   /**
@@ -250,14 +292,15 @@ class DataQuantaHackit[Out: ClassTag]
    * @return a new instance representing the [[JoinOperator]]'s output
    */
   override def joinJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuantaHackit[WayangTuple2[Out, ThatOut]] = {
-    require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
-    val joinOperator = new JoinOperator(
-      new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
-      new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key])
-    )
-    this.connectTo(joinOperator, 0)
-    that.connectTo(joinOperator, 1)
-    DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](joinOperator)
+//    require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+//    val joinOperator = new JoinOperator(
+//      new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
+//      new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key])
+//    )
+//    this.connectTo(joinOperator, 0)
+//    that.connectTo(joinOperator, 1)
+//    DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](joinOperator)
+    null
   }
 
   /**
@@ -269,14 +312,15 @@ class DataQuantaHackit[Out: ClassTag]
    * @return a new instance representing the [[CoGroupOperator]]'s output
    */
   override def coGroupJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuantaHackit[WayangTuple2[JavaIterable[Out], JavaIterable[ThatOut]]] = {
-    require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
-    val coGroupOperator = new CoGroupOperator(
-      new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
-      new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key])
-    )
-    this.connectTo(coGroupOperator, 0)
-    that.connectTo(coGroupOperator, 1)
-    DataQuantaHackit.wrap[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]](coGroupOperator)
+//    require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+//    val coGroupOperator = new CoGroupOperator(
+//      new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
+//      new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key])
+//    )
+//    this.connectTo(coGroupOperator, 0)
+//    that.connectTo(coGroupOperator, 1)
+//    DataQuantaHackit.wrap[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]](coGroupOperator)
+    null
   }
 
 
@@ -286,13 +330,12 @@ class DataQuantaHackit[Out: ClassTag]
    * @param keyUdf UDF to extract key from data quanta in this instance
    * @return a new instance representing the [[SortOperator]]'s output
    */
-  override def sortJava[Key: ClassTag]
-  (keyUdf: SerializableFunction[Out, Key])
-  : DataQuantaHackit[Out] = {
-    val sortOperator = new SortOperator(new TransformationDescriptor(
-      keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]))
-    this.connectTo(sortOperator, 0)
-    DataQuantaHackit.wrap[Out](sortOperator)
+  override def sortJava[Key: ClassTag] (keyUdf: SerializableFunction[Out, Key]) : DataQuantaHackit[Out] = {
+//    val sortOperator = new SortOperator(new TransformationDescriptor(
+//      keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]))
+//    this.connectTo(sortOperator, 0)
+//    DataQuantaHackit.wrap[Out](sortOperator)
+     null
   }
 
 
@@ -302,9 +345,10 @@ class DataQuantaHackit[Out: ClassTag]
    * @return a new instance representing the [[GlobalMaterializedGroupOperator]]'s output
    */
   override def group(): DataQuantaHackit[JavaIterable[Out]] = {
-    val groupOperator = new GlobalMaterializedGroupOperator(dataSetType[Out], groupedDataSetType[Out])
-    this.connectTo(groupOperator, 0)
-    DataQuantaHackit.wrap[JavaIterable[Out]](groupOperator)
+//    val groupOperator = new GlobalMaterializedGroupOperator(dataSetType[Out], groupedDataSetType[Out])
+//    this.connectTo(groupOperator, 0)
+//    DataQuantaHackit.wrap[JavaIterable[Out]](groupOperator)
+    null
   }
 
   /**
@@ -314,11 +358,12 @@ class DataQuantaHackit[Out: ClassTag]
    * @return a new instance representing the [[UnionAllOperator]]'s output
    */
   override def union(that: DataQuanta[Out]): DataQuantaHackit[Out] = {
-    require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
-    val unionAllOperator = new UnionAllOperator(dataSetType[Out])
-    this.connectTo(unionAllOperator, 0)
-    that.connectTo(unionAllOperator, 1)
-    DataQuantaHackit.wrap[Out](unionAllOperator)
+//    require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+//    val unionAllOperator = new UnionAllOperator(dataSetType[Out])
+//    this.connectTo(unionAllOperator, 0)
+//    that.connectTo(unionAllOperator, 1)
+//    DataQuantaHackit.wrap[Out](unionAllOperator)
+    null
   }
 
   /**
@@ -328,11 +373,12 @@ class DataQuantaHackit[Out: ClassTag]
    * @return a new instance representing the [[IntersectOperator]]'s output
    */
   override def intersect(that: DataQuanta[Out]): DataQuantaHackit[Out] =  {
-    require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
-    val intersectOperator = new IntersectOperator(dataSetType[Out])
-    this.connectTo(intersectOperator, 0)
-    that.connectTo(intersectOperator, 1)
-    DataQuantaHackit.wrap[Out](intersectOperator)
+//    require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+//    val intersectOperator = new IntersectOperator(dataSetType[Out])
+//    this.connectTo(intersectOperator, 0)
+//    that.connectTo(intersectOperator, 1)
+//    DataQuantaHackit.wrap[Out](intersectOperator)
+    null
   }
 
   /**
@@ -342,11 +388,12 @@ class DataQuantaHackit[Out: ClassTag]
    * @return a new instance representing the [[CartesianOperator]]'s output
    */
   override def cartesian[ThatOut: ClassTag](that: DataQuanta[ThatOut]): DataQuantaHackit[WayangTuple2[Out, ThatOut]] = {
-    require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
-    val cartesianOperator = new CartesianOperator(dataSetType[Out], dataSetType[ThatOut])
-    this.connectTo(cartesianOperator, 0)
-    that.connectTo(cartesianOperator, 1)
-    DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](cartesianOperator)
+//    require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+//    val cartesianOperator = new CartesianOperator(dataSetType[Out], dataSetType[ThatOut])
+//    this.connectTo(cartesianOperator, 0)
+//    that.connectTo(cartesianOperator, 1)
+//    DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](cartesianOperator)
+    null
   }
 
   /**
@@ -355,9 +402,10 @@ class DataQuantaHackit[Out: ClassTag]
    * @return a new instance representing the [[ZipWithIdOperator]]'s output
    */
   override def zipWithId: DataQuantaHackit[WayangTuple2[lang.Long, Out]] = {
-    val zipWithIdOperator = new ZipWithIdOperator(dataSetType[Out])
-    this.connectTo(zipWithIdOperator, 0)
-    DataQuantaHackit.wrap[WayangTuple2[lang.Long, Out]](zipWithIdOperator)
+//    val zipWithIdOperator = new ZipWithIdOperator(dataSetType[Out])
+//    this.connectTo(zipWithIdOperator, 0)
+//    DataQuantaHackit.wrap[WayangTuple2[lang.Long, Out]](zipWithIdOperator)
+    null
   }
 
   /**
@@ -366,9 +414,10 @@ class DataQuantaHackit[Out: ClassTag]
    * @return a new instance representing the [[DistinctOperator]]'s output
    */
   override def distinct: DataQuantaHackit[Out] = {
-    val distinctOperator = new DistinctOperator(dataSetType[Out])
-    this.connectTo(distinctOperator, 0)
-    DataQuantaHackit.wrap[Out](distinctOperator)
+//    val distinctOperator = new DistinctOperator(dataSetType[Out])
+//    this.connectTo(distinctOperator, 0)
+//    DataQuantaHackit.wrap[Out](distinctOperator)
+    null
   }
 
   /**
@@ -377,9 +426,10 @@ class DataQuantaHackit[Out: ClassTag]
    * @return a new instance representing the [[CountOperator]]'s output
    */
   override def count: DataQuantaHackit[lang.Long] = {
-    val countOperator = new CountOperator(dataSetType[Out])
-    this.connectTo(countOperator, 0)
-    DataQuantaHackit.wrap[lang.Long](countOperator)
+//    val countOperator = new CountOperator(dataSetType[Out])
+//    this.connectTo(countOperator, 0)
+//    DataQuantaHackit.wrap[lang.Long](countOperator)
+    null
   }
 }
 
@@ -392,4 +442,8 @@ object DataQuantaHackit extends DataQuantaCreator{
   def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuantaHackit[_] =
     new DataQuantaHackit(output.getOwner.asInstanceOf[ElementaryOperator], output.getIndex)(ClassTag(output.getType.getDataUnitType.getTypeClass), planBuilder)
 
+  implicit def wrapDataQuanta[T:ClassTag](dataQuanta: DataQuanta[T]): DataQuantaHackit[T] = {
+    new DataQuantaHackit[T](dataQuanta.operator, dataQuanta.outputIndex)(ClassTag(dataQuanta.output.getType.getDataUnitType.getTypeClass), dataQuanta.planBuilder);
+  }
+
 }
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/Hackit.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/Hackit.scala
new file mode 100644
index 0000000..b9499fb
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/Hackit.scala
@@ -0,0 +1,78 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.plugin.hackit.api
+
+import org.apache.wayang.api.PlanBuilder
+import org.apache.wayang.api.dataquanta.DataQuanta
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
+import org.apache.wayang.plugin.hackit.core.tagger.HackitTagger
+import org.apache.wayang.plugin.hackit.core.tagger.wrapper.FunctionWrapperHackit
+import org.apache.wayang.plugin.hackit.core.tags.HackitTag
+import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple
+import org.apache.wayang.api.toSerializableFunction
+
+import scala.language.implicitConversions
+import scala.reflect.{ClassTag, classTag}
+
+class Hackit[Key: ClassTag, Type:ClassTag](implicit var planBuilder: PlanBuilder) {
+
+  var dataQuanta: DataQuanta[HackitTuple[Key, Type]] = null
+
+  var tagger : HackitTagger = null;
+
+  def addTag(hackitTag: HackitTag) = {
+    println("you are adding a tag")
+    this
+  }
+
+  def underHackit(dataQuanta: DataQuanta[Type]): Hackit[Key, Type] = {
+    val hackit = new Hackit[Key, Type]()
+    hackit.dataQuanta = dataQuanta.map[HackitTuple[Key, Type]](
+      element => {
+        new HackitTuple[Key, Type](element)
+      }
+    )
+    hackit
+  }
+
+  def toDataQuanta(): DataQuanta[Type] = {
+    this.dataQuanta.map(tuple => tuple.getValue)
+  }
+
+  def map[TypeOut: ClassTag](udf: Type => TypeOut, udfLoad: LoadProfileEstimator = null): Hackit[Key, TypeOut] = {
+    val hackit = new Hackit[Key, TypeOut]()
+    val wrapper = new FunctionWrapperHackit[Key, Type, TypeOut](toSerializableFunction(udf))
+    hackit.dataQuanta = dataQuanta.map[HackitTuple[Key, TypeOut]](element => wrapper.apply(element), udfLoad)
+    hackit
+  }
+
+}
+
+
+object Hackit {
+
+  //TODO: replace the object with a parameter
+  implicit def underHackit[T: ClassTag](dataQuanta: DataQuanta[T]): Hackit[java.lang.Object, T] = {
+      return new Hackit[java.lang.Object, T]()(classTag[java.lang.Object], ClassTag(dataQuanta.output.getType.getDataUnitType.getTypeClass), dataQuanta.planBuilder)
+                  .underHackit(dataQuanta)
+  }
+
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala
index 59a0fd9..42867f3 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala
@@ -19,12 +19,17 @@
 
 package org.apache.wayang.plugin.hackit.api
 
-import org.apache.wayang.api.ApiTest
 import org.apache.wayang.api.dataquanta.DataQuantaFactory
-import org.junit.{BeforeClass, Test}
+import org.apache.wayang.api.createPlanBuilder
+import org.apache.wayang.plugin.hackit.api.Hackit.underHackit
+import org.apache.wayang.core.api.WayangContext
+import org.apache.wayang.java.Java
+import org.apache.wayang.spark.Spark
+import org.junit.{Assert, BeforeClass, Test}
 import org.junit.jupiter.api.{BeforeAll, BeforeEach}
 
-class ApiExtensionTest extends ApiTest {
+
+class ApiExtensionTest {
 
   @BeforeEach
   def setUp() ={
@@ -32,10 +37,35 @@ class ApiExtensionTest extends ApiTest {
   }
 
   @Test
-  override def testReadMapCollect(): Unit = {
-    DataQuantaFactory.setTemplate(DataQuantaHackit);
+  def testReadMapCollectHackit(): Unit = {
+
+    // Set up WayangContext.
+    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+
+    // Generate some test data.
+    val inputValues = (for (i <- 1 to 10) yield i).toArray
+
+    // Build and execute a Wayang plan.
+    var outputValues = wayang
+      .loadCollection(inputValues).withName("Load input values")
+      .addTag(null)
+      .map(a => a + 2)//.withName("Add 2")
+      .dataQuanta
+      .collect()
+
+    print(outputValues)
+
+    var lolo = wayang
+      .loadCollection(inputValues).withName("Load input values")
+      .addTag(null)
+      .map(a => a + 2)//.withName("Add 2")
+      .toDataQuanta()
+      .collect()
 
-    super.testReadMapCollect()
+    print(lolo)
+//    // Check the outcome.
+//    val expectedOutputValues = inputValues.map(_ + 2)
+//    Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray)
   }
 
 }