You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/09/22 15:21:50 UTC

[incubator-wayang] 09/15: [WAYANG-31] small correction to be extensible

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch WAYANG-28
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 99a666566d7a2882e0ce1333d7c86e5ac9ccabe7
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Fri May 14 08:54:56 2021 -0400

    [WAYANG-31] small correction to be extensible
---
 wayang-api/wayang-api-scala-java/pom.xml           | 25 ++++++++++++++--
 .../apache/wayang/api/dataquanta/DataQuanta.scala  |  2 +-
 .../wayang/api/dataquanta/DataQuantaCreator.scala  | 33 ++++++++++++++++++++++
 .../wayang/api/dataquanta/DataQuantaDefault.scala  |  6 ++--
 .../wayang/api/dataquanta/DataQuantaFactory.scala  | 21 +++++++++++++-
 5 files changed, 80 insertions(+), 7 deletions(-)

diff --git a/wayang-api/wayang-api-scala-java/pom.xml b/wayang-api/wayang-api-scala-java/pom.xml
index 1470c30..d385dae 100644
--- a/wayang-api/wayang-api-scala-java/pom.xml
+++ b/wayang-api/wayang-api-scala-java/pom.xml
@@ -91,7 +91,12 @@
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
-            <scope>provided</scope>
+            <version>${scala.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <version>${scala.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
@@ -103,6 +108,11 @@
             <artifactId>hadoop-hdfs</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.reflections</groupId>
+            <artifactId>reflections</artifactId>
+            <version>0.9.12</version>
+        </dependency>
     </dependencies>
 
     <build>
@@ -114,8 +124,19 @@
                     <useSystemClassLoader>true</useSystemClassLoader>
                 </configuration>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.2</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 
-
 </project>
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
index 6e464e0..664079b 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
@@ -122,7 +122,7 @@ abstract class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outpu
     * @param operator   the [[Operator]] to connect to
     * @param inputIndex the input index of the [[Operator]]s [[InputSlot]]
     */
-  private[api] def connectTo(operator: Operator, inputIndex: Int): Unit =
+  def connectTo(operator: Operator, inputIndex: Int): Unit =
     this.operator.connectTo(outputIndex, operator, inputIndex)
 
 
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaCreator.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaCreator.scala
new file mode 100644
index 0000000..6388cc9
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaCreator.scala
@@ -0,0 +1,33 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquanta
+
+import org.apache.wayang.api.PlanBuilder
+import org.apache.wayang.core.plan.wayangplan.{ElementaryOperator, OutputSlot}
+
+import scala.reflect.ClassTag
+
+abstract class DataQuantaCreator {
+
+  def wrap[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 0)(implicit planBuilder: PlanBuilder): DataQuanta[T]
+
+  def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuanta[_]
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala
index 1f79ae7..aa9c7d0 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala
@@ -19,7 +19,7 @@
 
 package org.apache.wayang.api.dataquanta
 
-import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, groupedDataSetType, groupedDataUnitType, toConsumer, toSerializableBinaryOperator, toSerializableFlatteningFunction, toSerializableFunction, toSerializablePartitionFunction, toSerializablePredicate}
+import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, groupedDataSetType, groupedDataUnitType}
 import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
 import org.apache.wayang.basic.function.ProjectionDescriptor
 import org.apache.wayang.basic.operators._
@@ -31,7 +31,7 @@ import org.apache.wayang.core.plan.wayangplan._
 
 import java.lang
 import java.lang.{Iterable => JavaIterable}
-import java.util.function.{Consumer, IntUnaryOperator, Function => JavaFunction}
+import java.util.function.{IntUnaryOperator}
 import scala.reflect._
 
 /**
@@ -396,7 +396,7 @@ class DataQuantaDefault[Out: ClassTag]
   }
 }
 
-object DataQuantaDefault {
+object DataQuantaDefault extends DataQuantaCreator {
 
   def wrap[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 0)(implicit planBuilder: PlanBuilder): DataQuantaDefault[T] = {
     new DataQuantaDefault[T](operator, outputIndex)
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala
index aad0e42..ef8fe0d 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala
@@ -21,7 +21,11 @@ package org.apache.wayang.api.dataquanta
 
 import org.apache.wayang.api.PlanBuilder
 import org.apache.wayang.core.plan.wayangplan.ElementaryOperator
+import org.reflections.Reflections
+import org.reflections.scanners.{FieldAnnotationsScanner, MethodParameterScanner, ResourcesScanner, SubTypesScanner}
+import org.reflections.util.{ClasspathHelper, ConfigurationBuilder}
 
+import scala.collection.immutable.List
 import scala.reflect.ClassTag
 
 /**
@@ -32,6 +36,21 @@ import scala.reflect.ClassTag
 object DataQuantaFactory {
 
   /**
+   * template is the instance of [[DataQuantaCreator]] that will be use in the creation of [[DataQuanta]] instance
+   */
+  var template: DataQuantaCreator = DataQuantaDefault
+
+  /**
+   * set the [[DataQuantaCreator]]
+   *
+   * @param dataQuantaCreator it will be use as creator when the [[DataQuantaFactory.build()]] is called
+   */
+  def setTemplate(dataQuantaCreator: DataQuantaCreator) = {
+    this.template = dataQuantaCreator
+
+  }
+
+  /**
    * Given the configuration loaded the [[DataQuantaFactory.build()]] the right extender, if not configuration is
    * provided the [[DataQuantaFactory]] will create a [[DataQuantaDefault]] instance
    *
@@ -43,7 +62,7 @@ object DataQuantaFactory {
    */
   def build[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 0)(implicit planBuilder: PlanBuilder): DataQuanta[T] = {
     //TODO validate if the correct way
-    DataQuantaDefault.wrap[T](operator, outputIndex)
+    this.template.wrap[T](operator, outputIndex)
   }
 
 }