You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:24 UTC

[incubator-wayang] 03/32: [WAYANG-#8] Structure the Python file inside of a module

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit fc94a5fd660ac89906ab82f723c696240e4aec69
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Mon Apr 4 20:03:12 2022 +0200

    [WAYANG-#8] Structure the Python file inside of a module
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/src/pywayang/context.py                 | 35 ++++++++++
 python/src/pywayang/dataquanta.py              | 32 +++++++++
 python/{ => src/pywayang/operator}/__init__.py |  0
 python/src/pywayang/operator/base.py           | 37 +++++++++++
 python/src/pywayang/operator/source.py         | 28 ++++++++
 python/src/pywayang/operator/unary.py          | 90 ++++++++++++++++++++++++++
 python/src/pywayang/orchestrator/operator.py   |  8 ++-
 python/src/pywayang/platform.py                | 24 +++++++
 python/src/pywayang/plugin.py                  | 27 ++++++++
 python/src/pywayang/test.py                    | 61 +++++++++++++++++
 python/src/pywayang/types.py                   | 57 ++++++++++++++++
 11 files changed, 397 insertions(+), 2 deletions(-)

diff --git a/python/src/pywayang/context.py b/python/src/pywayang/context.py
new file mode 100644
index 00000000..95942ef7
--- /dev/null
+++ b/python/src/pywayang/context.py
@@ -0,0 +1,35 @@
+from pywayang.plugin import Plugin
+from pywayang.dataquanta import DataQuanta
+from pywayang.operator.source import TextFileSource
+
+class WayangContext:
+  """
+  This is the entry point for users to work with Wayang.
+  """
+  def __init__(self):
+    self.plugins = set()
+
+  """
+  add a :class:`Plugin` to the :class:`Context`
+  """
+  def register(self, *p: Plugin):
+    self.plugins.add(p)
+    return self
+
+  """
+  remove a :class:`Plugin` from the :class:`Context`
+  """
+  def unregister(self, p: Plugin):
+    self.plugins.remove(p)
+    return self
+
+  def textFile(self, file_path: str) -> DataQuanta[str]:
+    return DataQuanta(TextFileSource(file_path))
+
+
+  def __str__(self):
+    return "Plugins: {} \n".format(str(self.plugins))
+
+  def __repr__(self):
+    return self.__str__()
+
diff --git a/python/src/pywayang/dataquanta.py b/python/src/pywayang/dataquanta.py
new file mode 100644
index 00000000..5f740941
--- /dev/null
+++ b/python/src/pywayang/dataquanta.py
@@ -0,0 +1,32 @@
+from pywayang.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO)
+from pywayang.operator.base import (BaseOperator)
+from pywayang.operator.unary import (FilterOperator, MapOperator, FlatmapOperator)
+
+
+class DataQuanta(GenericTco):
+    """
+    Represents an intermediate result/data flow edge in a [[WayangPlan]].
+    """
+    previous : BaseOperator = None
+
+    def __init__(self, operator: BaseOperator):
+        self.operator = operator
+
+
+    def filter(self: "DataQuanta[T]", p: Predicate) -> "DataQuanta[T]" :
+        return DataQuanta(FilterOperator(p))
+
+    def map(self: "DataQuanta[I]", f: Function) -> "DataQuanta[O]" :
+        return DataQuanta(MapOperator(f))
+
+    def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]" :
+        return DataQuanta(FlatmapOperator(f))
+
+    def getOperator(self):
+        return self.operator
+
+    def __str__(self):
+        return str(self.operator)
+
+    def __repr__(self):
+        return self.__str__()
diff --git a/python/__init__.py b/python/src/pywayang/operator/__init__.py
similarity index 100%
rename from python/__init__.py
rename to python/src/pywayang/operator/__init__.py
diff --git a/python/src/pywayang/operator/base.py b/python/src/pywayang/operator/base.py
new file mode 100644
index 00000000..ad2deed5
--- /dev/null
+++ b/python/src/pywayang/operator/base.py
@@ -0,0 +1,37 @@
+from typing import (TypeVar, Optional, List)
+
+
+class BaseOperator:
+
+    inputSlot : List[TypeVar]
+    inputs : int
+    outputSlot : List[TypeVar]
+    outputs: int
+
+    def __init__(self,
+                 name: str,
+                 input: Optional[TypeVar] = None,
+                 output: Optional[TypeVar] = None,
+                 input_lenght: Optional[int] = 1,
+                 output_lenght: Optional[int] = 1
+    ):
+        self.name = name
+        self.inputSlot = input
+        self.inputs = input_lenght
+        self.outputSlot = output
+        self.outputs = output_lenght
+
+    def __str__(self):
+        return "BaseOperator: \n\t- name: {}\n\t- inputs: {} {}\n\t- outputs: {} {} \n".format(
+            str(self.name),
+            str(self.inputs),
+            str(self.inputSlot),
+            str(self.outputs),
+            str(self.outputSlot),
+        )
+
+    def __repr__(self):
+        return self.__str__()
+
+
+
diff --git a/python/src/pywayang/operator/source.py b/python/src/pywayang/operator/source.py
new file mode 100644
index 00000000..34a16644
--- /dev/null
+++ b/python/src/pywayang/operator/source.py
@@ -0,0 +1,28 @@
+from pywayang.operator.base import BaseOperator
+
+class SourceUnaryOperator(BaseOperator):
+
+    def __init__(self, name:str):
+        super().__init__(name, None, str, 0, 1)
+
+    def __str__(self):
+        return super().__str__()
+
+    def __repr__(self):
+        return super().__repr__()
+
+
+
+class TextFileSource(SourceUnaryOperator):
+
+    path: str
+
+    def __init__(self, path: str):
+        super().__init__('TextFileSource')
+        self.path = path
+
+    def __str__(self):
+        return super().__str__()
+
+    def __repr__(self):
+        return super().__repr__()
\ No newline at end of file
diff --git a/python/src/pywayang/operator/unary.py b/python/src/pywayang/operator/unary.py
new file mode 100644
index 00000000..24e5df2f
--- /dev/null
+++ b/python/src/pywayang/operator/unary.py
@@ -0,0 +1,90 @@
+from pywayang.operator.base import BaseOperator
+from pywayang.types import (
+                                GenericTco,
+                                GenericUco,
+                                Predicate,
+                                getTypePredicate,
+                                Function,
+                                getTypeFunction,
+                                FlatmapFunction,
+                                getTypeFlatmapFunction
+                            )
+from itertools import chain
+
+
+class UnaryToUnaryOperator(BaseOperator):
+
+    def __init__(self, name:str, input:GenericTco, output:GenericUco):
+        super().__init__(name, input, output, 1, 1)
+
+    def __str__(self):
+        return super().__str__()
+
+    def __repr__(self):
+        return super().__repr__()
+
+
+
+class FilterOperator(UnaryToUnaryOperator):
+
+    predicate: Predicate
+
+    def __init__(self, predicate: Predicate):
+        type = getTypePredicate(predicate)
+        super().__init__("FilterOperator", type, type)
+        self.predicate = predicate
+
+    def getWrapper(self):
+        udf = self.predicate
+        def func(iterator):
+            return filter(udf, iterator)
+        return func
+
+    def __str__(self):
+        return super().__str__()
+
+    def __repr__(self):
+        return super().__repr__()
+
+class MapOperator(UnaryToUnaryOperator):
+
+    function: Function
+
+    def __init__(self, function: Function):
+        types = getTypeFunction(function)
+        super().__init__("MapOperator", types[0], types[1])
+        self.function = function
+
+    def getWrapper(self):
+        udf = self.function
+        def func(iterator):
+            return map(udf, iterator)
+        return func
+
+    def __str__(self):
+        return super().__str__()
+
+    def __repr__(self):
+        return super().__repr__()
+
+
+class FlatmapOperator(UnaryToUnaryOperator):
+
+    fmfunction: FlatmapFunction
+
+    def __init__(self, fmfunction: FlatmapFunction):
+        types = getTypeFlatmapFunction(fmfunction)
+        super().__init__("FlatmapOperator", types[0], types[1])
+        self.fmfunction = fmfunction
+
+    def getWrapper(self):
+        udf = self.fmfunction
+        def func(iterator):
+            return chain.from_iterable(map(udf, iterator))
+        return func
+
+    def __str__(self):
+        return super().__str__()
+
+    def __repr__(self):
+        return super().__repr__()
\ No newline at end of file
diff --git a/python/src/pywayang/orchestrator/operator.py b/python/src/pywayang/orchestrator/operator.py
index 1307b5f2..87087642 100644
--- a/python/src/pywayang/orchestrator/operator.py
+++ b/python/src/pywayang/orchestrator/operator.py
@@ -30,8 +30,12 @@ pickle_protocol = pickle.HIGHEST_PROTOCOL
 class Operator:
 
     def __init__(
-            self, operator_type=None, udf=None, previous=None,
-            iterator=None, python_exec=False
+        self,
+        operator_type=None,
+        udf=None,
+        previous=None,
+        iterator=None,
+        python_exec=False
     ):
 
         # Operator ID
diff --git a/python/src/pywayang/platform.py b/python/src/pywayang/platform.py
new file mode 100644
index 00000000..caece352
--- /dev/null
+++ b/python/src/pywayang/platform.py
@@ -0,0 +1,24 @@
+
+class Platform:
+    """
+    A platform describes an execution engine that is used for execute the
+    wayang plan
+
+    Parameters
+    ----------
+    name: str
+      platform name, it uses as identification
+    """
+
+    name : str
+    #configuration : dict[str, str]
+
+    def __init__(self, name):
+        self.name = name
+
+    def __str__(self):
+        return "name: {}".format(self.name)
+
+    def __repr__(self):
+        return self.__str__()
+
diff --git a/python/src/pywayang/plugin.py b/python/src/pywayang/plugin.py
new file mode 100644
index 00000000..82d75431
--- /dev/null
+++ b/python/src/pywayang/plugin.py
@@ -0,0 +1,27 @@
+from pywayang.platform import Platform
+
+class Plugin:
+    """
+    A plugin contributes the following components to a :class:`Context`
+    - mappings
+    - channels
+    - configurations
+    In turn, it may require several :clas:`Platform`s for its operation.
+    """
+
+    platforms = []
+
+    def __init__(self, *platform:Platform):
+        self.platforms = list(platform)
+
+    def __str__(self):
+        return "Platforms: {}".format(str(self.platforms))
+
+    def __repr__(self):
+        return self.__str__()
+
+
+# define the basic plugins that can be used
+java = Plugin(Platform('java'))
+spark = Plugin(Platform('spark'))
+flink = Plugin(Platform('flink'))
diff --git a/python/src/pywayang/test.py b/python/src/pywayang/test.py
new file mode 100644
index 00000000..66ddab0a
--- /dev/null
+++ b/python/src/pywayang/test.py
@@ -0,0 +1,61 @@
+from typing import Iterable
+
+from pywayang.platform import Platform
+from pywayang.context import WayangContext
+from pywayang.plugin import java, spark
+from pywayang.operator.unary import *
+
+p = Platform("nana")
+print(p)
+
+
+print(str(WayangContext().register(java, spark)))
+
+from pywayang.types import Predicate, getTypePredicate
+
+predicate : Predicate = lambda x: x % 2 == 0
+getTypePredicate(predicate)
+
+def pre(a:str):
+    return len(a) > 3
+
+def func(s:str) -> int:
+    return len(s)
+
+def fmfunc(i:int) -> str:
+    for x in range(i):
+        yield str(x)
+
+fileop = WayangContext()\
+            .register(java)\
+            .textFile("here")\
+
+filterop: FilterOperator = fileop.filter(pre).getOperator()
+fop_pre = filterop.getWrapper()
+fop_pre_res = fop_pre(["la", "lala"])
+#for i in fop_pre_res:
+#    print(i)
+
+
+mapop: MapOperator = fileop.map(func).getOperator()
+mop_func = mapop.getWrapper()
+mop_func_res = mop_func(["la", "lala"])
+#for i in mop_func_res:
+#    print(i)
+
+
+fmop: FlatmapOperator = fileop.flatmap(fmfunc).getOperator()
+fmop_func = fmop.getWrapper()
+fmop_func_res = fmop_func([2, 3])
+#for i in fmop_func_res:
+#    print(i)
+
+def concatenate(function_a, function_b):
+    def executable(iterable):
+        return function_b(function_a(iterable))
+    return executable
+
+res = concatenate(concatenate(fop_pre, mop_func), fmop_func)
+res_pro = res(["la", "lala"])
+for i in res_pro:
+    print(i)
\ No newline at end of file
diff --git a/python/src/pywayang/types.py b/python/src/pywayang/types.py
new file mode 100644
index 00000000..8be502e0
--- /dev/null
+++ b/python/src/pywayang/types.py
@@ -0,0 +1,57 @@
+from typing import ( Generic, TypeVar, Callable, Hashable, Iterable)
+from inspect import signature
+
+T = TypeVar("T")   # Type
+I = TypeVar("I")   # Input Type number 1
+I2 = TypeVar("I2") # Input Type number 2
+O = TypeVar("O")   # Output Type
+
+IterableT = Iterable[T] # Iterable of type 'T'
+IterableO = Iterable[O] # Iterable of type 'O'
+
+T_co = TypeVar("T_co", covariant=True)
+U_co = TypeVar("U_co", covariant=True)
+K = TypeVar("K", bound=Hashable)
+
+GenericTco = Generic[T_co]
+GenericUco = Generic[U_co]
+
+Predicate = Callable[[T], bool]
+Function = Callable[[I], O]
+BiFunction = Callable[[I, I2], O]
+Function = Callable[[I], O]
+
+FlatmapFunction = Callable[[T], IterableO]
+
+
+def getTypePredicate(callable: Predicate) -> Generic :
+    sig = signature(callable)
+    if(len(sig.parameters) != 1):
+        raise Exception("the parameters for the Predicate are distinct than one, {}".format(str(sig.parameters)))
+
+    keys = list(sig.parameters.keys())
+    return sig.parameters[keys[0]].annotation
+
+def getTypeFunction(callable: Function) -> Generic :
+    sig = signature(callable)
+    if(len(sig.parameters) != 1):
+        raise Exception("the parameters for the Function are distinct than one, {}".format(str(sig.parameters)))
+
+    keys = list(sig.parameters.keys())
+    return (sig.parameters[keys[0]].annotation, sig.return_annotation)
+
+def getTypeBiFunction(callable: BiFunction) -> (Generic, Generic, Generic) :
+    sig = signature(callable)
+    if(len(sig.parameters) != 2):
+        raise Exception("the parameters for the BiFunction are distinct than two, {}".format(str(sig.parameters)))
+
+    keys = list(sig.parameters.keys())
+    return (sig.parameters[keys[0]].annotation, sig.parameters[keys[1]].annotation, sig.return_annotation)
+
+def getTypeFlatmapFunction(callable: FlatmapFunction) -> (Generic, Generic) :
+    sig = signature(callable)
+    if(len(sig.parameters) != 1):
+        raise Exception("the parameters for the FlatmapFunction are distinct than one, {}".format(str(sig.parameters)))
+
+    keys = list(sig.parameters.keys())
+    return (sig.parameters[keys[0]].annotation, sig.return_annotation)
\ No newline at end of file