You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:25 UTC

[incubator-wayang] 04/32: [WAYANG-#8] Seed creation of Platforms/python

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 2894e5429d673fafd7186dcfbdeaae6b2b016090
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Wed Apr 6 11:24:32 2022 +0200

    [WAYANG-#8] Seed creation of Platforms/python
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/src/pywayang/dataquanta.py                  |  6 +--
 python/src/pywayang/operator/base.py               | 34 ++++++++++--
 python/src/pywayang/operator/source.py             |  4 +-
 python/src/pywayang/operator/unary.py              | 16 ++----
 python/src/pywayang/platforms/__init__.py          |  0
 python/src/pywayang/platforms/python/__init__.py   |  0
 python/src/pywayang/platforms/python/channels.py   | 60 ++++++++++++++++++++++
 .../pywayang/platforms/python/compiler/__init__.py |  0
 .../platforms/python/execution/__init__.py         |  0
 python/src/pywayang/platforms/python/mappings.py   | 35 +++++++++++++
 .../platforms/python/operators/PyFilterOperator.py | 43 ++++++++++++++++
 .../python/operators/PythonExecutionOperator.py    |  7 +++
 .../platforms/python/operators/__init__.py         |  7 +++
 .../pywayang/platforms/python/platform/__init__.py |  0
 .../pywayang/platforms/python/plugin/__init__.py   |  0
 python/src/pywayang/test.py                        | 50 +++++++++++++++---
 16 files changed, 236 insertions(+), 26 deletions(-)

diff --git a/python/src/pywayang/dataquanta.py b/python/src/pywayang/dataquanta.py
index 5f740941..3ccc2839 100644
--- a/python/src/pywayang/dataquanta.py
+++ b/python/src/pywayang/dataquanta.py
@@ -1,5 +1,5 @@
 from pywayang.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO)
-from pywayang.operator.base import (BaseOperator)
+from pywayang.operator.base import (WyOperator)
 from pywayang.operator.unary import (FilterOperator, MapOperator, FlatmapOperator)
 
 
@@ -7,9 +7,9 @@ class DataQuanta(GenericTco):
     """
     Represents an intermediate result/data flow edge in a [[WayangPlan]].
     """
-    previous : BaseOperator = None
+    previous : WyOperator = None
 
-    def __init__(self, operator: BaseOperator):
+    def __init__(self, operator: WyOperator):
         self.operator = operator
 
 
diff --git a/python/src/pywayang/operator/base.py b/python/src/pywayang/operator/base.py
index ad2deed5..b7834d17 100644
--- a/python/src/pywayang/operator/base.py
+++ b/python/src/pywayang/operator/base.py
@@ -1,11 +1,13 @@
-from typing import (TypeVar, Optional, List)
+from typing import (TypeVar, Optional, List, Set)
+from pywayang.platforms.python.channels import ChannelDescriptor
 
-
-class BaseOperator:
+class WyOperator:
 
     inputSlot : List[TypeVar]
+    inputChannel : ChannelDescriptor
     inputs : int
     outputSlot : List[TypeVar]
+    OutputChannel: ChannelDescriptor
     outputs: int
 
     def __init__(self,
@@ -21,6 +23,32 @@ class BaseOperator:
         self.outputSlot = output
         self.outputs = output_lenght
 
+    def validateInputs(self, vec):
+        if len(vec) != self.inputs:
+            raise Exception(
+                "the inputs channel contains {} elements and need to have {}".format(
+                    len(vec),
+                    self.inputs
+                )
+            )
+    def validateOutputs(self, vec):
+        if len(vec) != self.outputs:
+            raise Exception(
+                "the output channel contains {} elements and need to have {}".format(
+                    len(vec),
+                    self.inputs
+                )
+            )
+    def validateChannels(self, input, output):
+        self.validateInputs(input)
+        self.validateOutputs(output)
+
+    def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+        pass
+
+    def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+        pass
+
     def __str__(self):
         return "BaseOperator: \n\t- name: {}\n\t- inputs: {} {}\n\t- outputs: {} {} \n".format(
             str(self.name),
diff --git a/python/src/pywayang/operator/source.py b/python/src/pywayang/operator/source.py
index 34a16644..97aff75a 100644
--- a/python/src/pywayang/operator/source.py
+++ b/python/src/pywayang/operator/source.py
@@ -1,6 +1,6 @@
-from pywayang.operator.base import BaseOperator
+from pywayang.operator.base import WyOperator
 
-class SourceUnaryOperator(BaseOperator):
+class SourceUnaryOperator(WyOperator):
 
     def __init__(self, name:str):
         super().__init__(name, None, str, 0, 1)
diff --git a/python/src/pywayang/operator/unary.py b/python/src/pywayang/operator/unary.py
index 24e5df2f..559effa5 100644
--- a/python/src/pywayang/operator/unary.py
+++ b/python/src/pywayang/operator/unary.py
@@ -1,4 +1,4 @@
-from pywayang.operator.base import BaseOperator
+from pywayang.operator.base import WyOperator
 from pywayang.types import (
                                 GenericTco,
                                 GenericUco,
@@ -12,7 +12,7 @@ from pywayang.types import (
 from itertools import chain
 
 
-class UnaryToUnaryOperator(BaseOperator):
+class UnaryToUnaryOperator(WyOperator):
 
     def __init__(self, name:str, input:GenericTco, output:GenericUco):
         super().__init__(name, input, output, 1, 1)
@@ -30,16 +30,10 @@ class FilterOperator(UnaryToUnaryOperator):
     predicate: Predicate
 
     def __init__(self, predicate: Predicate):
-        type = getTypePredicate(predicate)
+        type = getTypePredicate(predicate) if predicate else None
         super().__init__("FilterOperator", type, type)
         self.predicate = predicate
 
-    def getWrapper(self):
-        udf = self.predicate
-        def func(iterator):
-            return filter(udf, iterator)
-        return func
-
     def __str__(self):
         return super().__str__()
 
@@ -51,7 +45,7 @@ class MapOperator(UnaryToUnaryOperator):
     function: Function
 
     def __init__(self, function: Function):
-        types = getTypeFunction(function)
+        types = getTypeFunction(function) if function else (None, None)
         super().__init__("MapOperator", types[0], types[1])
         self.function = function
 
@@ -73,7 +67,7 @@ class FlatmapOperator(UnaryToUnaryOperator):
     fmfunction: FlatmapFunction
 
     def __init__(self, fmfunction: FlatmapFunction):
-        types = getTypeFlatmapFunction(fmfunction)
+        types = getTypeFlatmapFunction(fmfunction) if fmfunction else (None, None)
         super().__init__("FlatmapOperator", types[0], types[1])
         self.fmfunction = fmfunction
 
diff --git a/python/src/pywayang/platforms/__init__.py b/python/src/pywayang/platforms/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/python/src/pywayang/platforms/python/__init__.py b/python/src/pywayang/platforms/python/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/python/src/pywayang/platforms/python/channels.py b/python/src/pywayang/platforms/python/channels.py
new file mode 100644
index 00000000..f611a258
--- /dev/null
+++ b/python/src/pywayang/platforms/python/channels.py
@@ -0,0 +1,60 @@
+from typing import ( Iterable, Callable )
+
+class Channel:
+
+    def __init__(self):
+        pass
+
+    def getchannel(self) -> 'Channel':
+        return self
+
+    def gettype(self):
+        return type(self)
+
+class ChannelDescriptor:
+
+    def __init__(self, channelType: type, isReusable: bool, isSuitableForBreakpoint: bool):
+        self.channelType = channelType
+        self.isReusable = isReusable
+        self.isSuitableForBreakpoint = isSuitableForBreakpoint
+
+    def create_instance(self) -> Channel:
+        return self.channelType()
+
+
+class PyIteratorChannel(Channel):
+
+    iterable : Iterable
+
+    def __init__(self):
+        Channel.__init__(self)
+
+    def provide_iterable(self) -> Iterable:
+        return self.iterable
+
+    def accept_iterable(self, iterable) -> 'PyIteratorChannel':
+        self.iterable = iterable
+        return self
+
+class PyCallableChannel(Channel):
+
+    udf : Callable
+
+    def __init__(self):
+        Channel.__init__(self)
+
+    def provide_callable(self) -> Callable:
+        return self.udf
+
+    def accept_callable(self, udf: Callable) -> 'PyCallableChannel':
+        self.udf = udf
+        return self
+
+    @staticmethod
+    def concatenate(function_a: Callable, function_b: Callable):
+        def executable(iterable):
+            return function_a(function_b(iterable))
+        return executable
+
+PyIteratorChannelDescriptor = ChannelDescriptor(type(PyIteratorChannel()), False, False)
+PyCallableChannelDescriptor = ChannelDescriptor(type(PyCallableChannel()), False, False)
\ No newline at end of file
diff --git a/python/src/pywayang/platforms/python/compiler/__init__.py b/python/src/pywayang/platforms/python/compiler/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/python/src/pywayang/platforms/python/execution/__init__.py b/python/src/pywayang/platforms/python/execution/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/python/src/pywayang/platforms/python/mappings.py b/python/src/pywayang/platforms/python/mappings.py
new file mode 100644
index 00000000..977ccada
--- /dev/null
+++ b/python/src/pywayang/platforms/python/mappings.py
@@ -0,0 +1,35 @@
+from typing import Dict
+
+from pywayang.operator.base import WyOperator
+from pywayang.platforms.python.operators import *
+
+class Mapping:
+    mappings: Dict[str, type]
+
+    def __init__(self):
+        self.mappings = {}
+
+    def add_mapping(self, operator: PythonExecutionOperator):
+        self.mappings[operator.name] = type(operator)
+
+    def get_instanceof(self, operator: WyOperator):
+        template = self.mappings[operator.name]
+        if template is None:
+            raise Exception(
+                "the operator {} does not have valid mapping".format(
+                    operator.name
+                )
+            )
+        return template(operator)
+
+
+    def __str__(self):
+        return str(self.mappings)
+
+    def __repr__(self):
+        return self.__str__()
+
+OperatorMappings = Mapping()
+
+OperatorMappings.add_mapping(PyFilterOperator())
+
diff --git a/python/src/pywayang/platforms/python/operators/PyFilterOperator.py b/python/src/pywayang/platforms/python/operators/PyFilterOperator.py
new file mode 100644
index 00000000..f1d7dcb4
--- /dev/null
+++ b/python/src/pywayang/platforms/python/operators/PyFilterOperator.py
@@ -0,0 +1,43 @@
+from pywayang.operator.unary import FilterOperator
+from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywayang.platforms.python.channels import (Channel, ChannelDescriptor, PyIteratorChannel,
+                                                PyIteratorChannelDescriptor, PyCallableChannelDescriptor,
+                                                PyCallableChannel)
+from typing import Set
+
+class PyFilterOperator(FilterOperator, PythonExecutionOperator):
+
+    def __init__(self, origin: FilterOperator = None):
+        predicate = None if origin is None else origin.predicate
+        super().__init__(predicate)
+        pass
+
+    def execute(self, inputs: Channel, outputs: Channel):
+        self.validateChannels(inputs, outputs)
+        udf = self.predicate
+        if isinstance(inputs[0], PyIteratorChannel) :
+            py_in_iter_channel: PyIteratorChannel = inputs[0]
+            py_out_iter_channel: PyIteratorChannel = outputs[0]
+            py_out_iter_channel.accept_iterable(filter(udf, py_in_iter_channel.provide_iterable()))
+        elif isinstance(inputs[0], PyCallableChannel) :
+            py_in_call_channel: PyCallableChannel = inputs[0]
+            py_out_call_channel: PyCallableChannel = outputs[0]
+
+            def func(iterator):
+                return filter(udf, iterator)
+
+            py_out_call_channel.accept_callable(
+                PyCallableChannel.concatenate(
+                    func,
+                    py_in_call_channel.provide_callable()
+                )
+            )
+        else:
+            raise Exception("Channel Type does not supported")
+
+
+    def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+        return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor}
+
+    def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+        return {PyIteratorChannelDescriptor, PyCallableChannelDescriptor}
diff --git a/python/src/pywayang/platforms/python/operators/PythonExecutionOperator.py b/python/src/pywayang/platforms/python/operators/PythonExecutionOperator.py
new file mode 100644
index 00000000..4a79616c
--- /dev/null
+++ b/python/src/pywayang/platforms/python/operators/PythonExecutionOperator.py
@@ -0,0 +1,7 @@
+from pywayang.operator.base import WyOperator
+from pywayang.platforms.python.channels import Channel
+
+class PythonExecutionOperator(WyOperator):
+
+    def execute(self, inputs: Channel, output: Channel):
+        pass
\ No newline at end of file
diff --git a/python/src/pywayang/platforms/python/operators/__init__.py b/python/src/pywayang/platforms/python/operators/__init__.py
new file mode 100644
index 00000000..208a2fc0
--- /dev/null
+++ b/python/src/pywayang/platforms/python/operators/__init__.py
@@ -0,0 +1,7 @@
+from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywayang.platforms.python.operators.PyFilterOperator import PyFilterOperator
+
+__ALL__ = [
+    PythonExecutionOperator,
+    PyFilterOperator
+]
\ No newline at end of file
diff --git a/python/src/pywayang/platforms/python/platform/__init__.py b/python/src/pywayang/platforms/python/platform/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/python/src/pywayang/platforms/python/plugin/__init__.py b/python/src/pywayang/platforms/python/plugin/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/python/src/pywayang/test.py b/python/src/pywayang/test.py
index 66ddab0a..884acfc5 100644
--- a/python/src/pywayang/test.py
+++ b/python/src/pywayang/test.py
@@ -2,11 +2,17 @@ from typing import Iterable
 
 from pywayang.platform import Platform
 from pywayang.context import WayangContext
+from pywayang.platforms.python.channels import Channel
 from pywayang.plugin import java, spark
 from pywayang.operator.unary import *
 
 p = Platform("nana")
-print(p)
+print("LALA "+str(p))
+pt = type(p)
+print(pt)
+p2 = pt("chao")
+print(p2)
+print(type(p2))
 
 
 print(str(WayangContext().register(java, spark)))
@@ -31,8 +37,8 @@ fileop = WayangContext()\
             .textFile("here")\
 
 filterop: FilterOperator = fileop.filter(pre).getOperator()
-fop_pre = filterop.getWrapper()
-fop_pre_res = fop_pre(["la", "lala"])
+#fop_pre = filterop.getWrapper()
+#fop_pre_res = fop_pre(["la", "lala"])
 #for i in fop_pre_res:
 #    print(i)
 
@@ -55,7 +61,37 @@ def concatenate(function_a, function_b):
         return function_b(function_a(iterable))
     return executable
 
-res = concatenate(concatenate(fop_pre, mop_func), fmop_func)
-res_pro = res(["la", "lala"])
-for i in res_pro:
-    print(i)
\ No newline at end of file
+#res = concatenate(concatenate(fop_pre, mop_func), fmop_func)
+#res_pro = res(["la", "lala"])
+#for i in res_pro:
+#    print(i)
+
+from pywayang.platforms.python.mappings import OperatorMappings
+from pywayang.platforms.python.operators import *
+
+print(OperatorMappings)
+
+pyF = PyFilterOperator()
+print(pyF)
+print(pyF.getInputChannelDescriptors())
+print(type(pyF.getInputChannelDescriptors().pop().create_instance()))
+
+qq : Channel = pyF.getInputChannelDescriptors().pop().create_instance()
+print(qq)
+print(type(qq))
+print("ads")
+
+
+def pre_lala(a:str):
+    print("executed")
+    return len(a) > 3
+
+ou1 = filter(pre_lala, ["la", "lala"])
+print(ou1)
+
+for i in ou1:
+    print(i)
+
+pyFM = OperatorMappings.get_instanceof(filterop)
+print(pyFM)
+print(type(pyFM))
\ No newline at end of file