You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/09/22 15:21:50 UTC
[incubator-wayang] 09/15: [WAYANG-31] small correction to be
extensible
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch WAYANG-28
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 99a666566d7a2882e0ce1333d7c86e5ac9ccabe7
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Fri May 14 08:54:56 2021 -0400
[WAYANG-31] small correction to be extensible
---
wayang-api/wayang-api-scala-java/pom.xml | 25 ++++++++++++++--
.../apache/wayang/api/dataquanta/DataQuanta.scala | 2 +-
.../wayang/api/dataquanta/DataQuantaCreator.scala | 33 ++++++++++++++++++++++
.../wayang/api/dataquanta/DataQuantaDefault.scala | 6 ++--
.../wayang/api/dataquanta/DataQuantaFactory.scala | 21 +++++++++++++-
5 files changed, 80 insertions(+), 7 deletions(-)
diff --git a/wayang-api/wayang-api-scala-java/pom.xml b/wayang-api/wayang-api-scala-java/pom.xml
index 1470c30..d385dae 100644
--- a/wayang-api/wayang-api-scala-java/pom.xml
+++ b/wayang-api/wayang-api-scala-java/pom.xml
@@ -91,7 +91,12 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
- <scope>provided</scope>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -103,6 +108,11 @@
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ <version>0.9.12</version>
+ </dependency>
</dependencies>
<build>
@@ -114,8 +124,19 @@
<useSystemClassLoader>true</useSystemClassLoader>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
-
</project>
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
index 6e464e0..664079b 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
@@ -122,7 +122,7 @@ abstract class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outpu
* @param operator the [[Operator]] to connect to
* @param inputIndex the input index of the [[Operator]]s [[InputSlot]]
*/
- private[api] def connectTo(operator: Operator, inputIndex: Int): Unit =
+ def connectTo(operator: Operator, inputIndex: Int): Unit =
this.operator.connectTo(outputIndex, operator, inputIndex)
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaCreator.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaCreator.scala
new file mode 100644
index 0000000..6388cc9
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaCreator.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquanta
+
+import org.apache.wayang.api.PlanBuilder
+import org.apache.wayang.core.plan.wayangplan.{ElementaryOperator, OutputSlot}
+
+import scala.reflect.ClassTag
+
+abstract class DataQuantaCreator {
+
+ def wrap[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 0)(implicit planBuilder: PlanBuilder): DataQuanta[T]
+
+ def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuanta[_]
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala
index 1f79ae7..aa9c7d0 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala
@@ -19,7 +19,7 @@
package org.apache.wayang.api.dataquanta
-import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, groupedDataSetType, groupedDataUnitType, toConsumer, toSerializableBinaryOperator, toSerializableFlatteningFunction, toSerializableFunction, toSerializablePartitionFunction, toSerializablePredicate}
+import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, groupedDataSetType, groupedDataUnitType}
import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
import org.apache.wayang.basic.function.ProjectionDescriptor
import org.apache.wayang.basic.operators._
@@ -31,7 +31,7 @@ import org.apache.wayang.core.plan.wayangplan._
import java.lang
import java.lang.{Iterable => JavaIterable}
-import java.util.function.{Consumer, IntUnaryOperator, Function => JavaFunction}
+import java.util.function.{IntUnaryOperator}
import scala.reflect._
/**
@@ -396,7 +396,7 @@ class DataQuantaDefault[Out: ClassTag]
}
}
-object DataQuantaDefault {
+object DataQuantaDefault extends DataQuantaCreator {
def wrap[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 0)(implicit planBuilder: PlanBuilder): DataQuantaDefault[T] = {
new DataQuantaDefault[T](operator, outputIndex)
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala
index aad0e42..ef8fe0d 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala
@@ -21,7 +21,11 @@ package org.apache.wayang.api.dataquanta
import org.apache.wayang.api.PlanBuilder
import org.apache.wayang.core.plan.wayangplan.ElementaryOperator
+import org.reflections.Reflections
+import org.reflections.scanners.{FieldAnnotationsScanner, MethodParameterScanner, ResourcesScanner, SubTypesScanner}
+import org.reflections.util.{ClasspathHelper, ConfigurationBuilder}
+import scala.collection.immutable.List
import scala.reflect.ClassTag
/**
@@ -32,6 +36,21 @@ import scala.reflect.ClassTag
object DataQuantaFactory {
/**
+ * template is the instance of [[DataQuantaCreator]] that will be use in the creation of [[DataQuanta]] instance
+ */
+ var template: DataQuantaCreator = DataQuantaDefault
+
+ /**
+ * set the [[DataQuantaCreator]]
+ *
+ * @param dataQuantaCreator it will be use as creator when the [[DataQuantaFactory.build()]] is called
+ */
+ def setTemplate(dataQuantaCreator: DataQuantaCreator) = {
+ this.template = dataQuantaCreator
+
+ }
+
+ /**
* Given the configuration loaded the [[DataQuantaFactory.build()]] the right extender, if not configuration is
* provided the [[DataQuantaFactory]] will create a [[DataQuantaDefault]] instance
*
@@ -43,7 +62,7 @@ object DataQuantaFactory {
*/
def build[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 0)(implicit planBuilder: PlanBuilder): DataQuanta[T] = {
//TODO validate if the correct way
- DataQuantaDefault.wrap[T](operator, outputIndex)
+ this.template.wrap[T](operator, outputIndex)
}
}