You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:24 UTC
[incubator-wayang] 03/32: [WAYANG-#8] Structure the Python file inside of a module
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit fc94a5fd660ac89906ab82f723c696240e4aec69
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Mon Apr 4 20:03:12 2022 +0200
[WAYANG-#8] Structure the Python file inside of a module
Signed-off-by: bertty <be...@apache.org>
---
python/src/pywayang/context.py | 35 ++++++++++
python/src/pywayang/dataquanta.py | 32 +++++++++
python/{ => src/pywayang/operator}/__init__.py | 0
python/src/pywayang/operator/base.py | 37 +++++++++++
python/src/pywayang/operator/source.py | 28 ++++++++
python/src/pywayang/operator/unary.py | 90 ++++++++++++++++++++++++++
python/src/pywayang/orchestrator/operator.py | 8 ++-
python/src/pywayang/platform.py | 24 +++++++
python/src/pywayang/plugin.py | 27 ++++++++
python/src/pywayang/test.py | 61 +++++++++++++++++
python/src/pywayang/types.py | 57 ++++++++++++++++
11 files changed, 397 insertions(+), 2 deletions(-)
diff --git a/python/src/pywayang/context.py b/python/src/pywayang/context.py
new file mode 100644
index 00000000..95942ef7
--- /dev/null
+++ b/python/src/pywayang/context.py
@@ -0,0 +1,35 @@
+from pywayang.plugin import Plugin
+from pywayang.dataquanta import DataQuanta
+from pywayang.operator.source import TextFileSource
+
+class WayangContext:
+ """
+ This is the entry point for users to work with Wayang.
+ """
+ def __init__(self):
+ self.plugins = set()
+
+ """
+ add a :class:`Plugin` to the :class:`Context`
+ """
+ def register(self, *p: Plugin):
+ self.plugins.add(p)
+ return self
+
+ """
+ remove a :class:`Plugin` from the :class:`Context`
+ """
+ def unregister(self, p: Plugin):
+ self.plugins.remove(p)
+ return self
+
+ def textFile(self, file_path: str) -> DataQuanta[str]:
+ return DataQuanta(TextFileSource(file_path))
+
+
+ def __str__(self):
+ return "Plugins: {} \n".format(str(self.plugins))
+
+ def __repr__(self):
+ return self.__str__()
+
diff --git a/python/src/pywayang/dataquanta.py b/python/src/pywayang/dataquanta.py
new file mode 100644
index 00000000..5f740941
--- /dev/null
+++ b/python/src/pywayang/dataquanta.py
@@ -0,0 +1,32 @@
+from pywayang.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO)
+from pywayang.operator.base import (BaseOperator)
+from pywayang.operator.unary import (FilterOperator, MapOperator, FlatmapOperator)
+
+
+class DataQuanta(GenericTco):
+ """
+ Represents an intermediate result/data flow edge in a [[WayangPlan]].
+ """
+ previous : BaseOperator = None
+
+ def __init__(self, operator: BaseOperator):
+ self.operator = operator
+
+
+ def filter(self: "DataQuanta[T]", p: Predicate) -> "DataQuanta[T]" :
+ return DataQuanta(FilterOperator(p))
+
+ def map(self: "DataQuanta[I]", f: Function) -> "DataQuanta[O]" :
+ return DataQuanta(MapOperator(f))
+
+ def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]" :
+ return DataQuanta(FlatmapOperator(f))
+
+ def getOperator(self):
+ return self.operator
+
+ def __str__(self):
+ return str(self.operator)
+
+ def __repr__(self):
+ return self.__str__()
diff --git a/python/__init__.py b/python/src/pywayang/operator/__init__.py
similarity index 100%
rename from python/__init__.py
rename to python/src/pywayang/operator/__init__.py
diff --git a/python/src/pywayang/operator/base.py b/python/src/pywayang/operator/base.py
new file mode 100644
index 00000000..ad2deed5
--- /dev/null
+++ b/python/src/pywayang/operator/base.py
@@ -0,0 +1,37 @@
+from typing import (TypeVar, Optional, List)
+
+
+class BaseOperator:
+
+ inputSlot : List[TypeVar]
+ inputs : int
+ outputSlot : List[TypeVar]
+ outputs: int
+
+ def __init__(self,
+ name: str,
+ input: Optional[TypeVar] = None,
+ output: Optional[TypeVar] = None,
+ input_lenght: Optional[int] = 1,
+ output_lenght: Optional[int] = 1
+ ):
+ self.name = name
+ self.inputSlot = input
+ self.inputs = input_lenght
+ self.outputSlot = output
+ self.outputs = output_lenght
+
+ def __str__(self):
+ return "BaseOperator: \n\t- name: {}\n\t- inputs: {} {}\n\t- outputs: {} {} \n".format(
+ str(self.name),
+ str(self.inputs),
+ str(self.inputSlot),
+ str(self.outputs),
+ str(self.outputSlot),
+ )
+
+ def __repr__(self):
+ return self.__str__()
+
+
+
diff --git a/python/src/pywayang/operator/source.py b/python/src/pywayang/operator/source.py
new file mode 100644
index 00000000..34a16644
--- /dev/null
+++ b/python/src/pywayang/operator/source.py
@@ -0,0 +1,28 @@
+from pywayang.operator.base import BaseOperator
+
+class SourceUnaryOperator(BaseOperator):
+
+ def __init__(self, name:str):
+ super().__init__(name, None, str, 0, 1)
+
+ def __str__(self):
+ return super().__str__()
+
+ def __repr__(self):
+ return super().__repr__()
+
+
+
+class TextFileSource(SourceUnaryOperator):
+
+ path: str
+
+ def __init__(self, path: str):
+ super().__init__('TextFileSource')
+ self.path = path
+
+ def __str__(self):
+ return super().__str__()
+
+ def __repr__(self):
+ return super().__repr__()
\ No newline at end of file
diff --git a/python/src/pywayang/operator/unary.py b/python/src/pywayang/operator/unary.py
new file mode 100644
index 00000000..24e5df2f
--- /dev/null
+++ b/python/src/pywayang/operator/unary.py
@@ -0,0 +1,90 @@
+from pywayang.operator.base import BaseOperator
+from pywayang.types import (
+ GenericTco,
+ GenericUco,
+ Predicate,
+ getTypePredicate,
+ Function,
+ getTypeFunction,
+ FlatmapFunction,
+ getTypeFlatmapFunction
+ )
+from itertools import chain
+
+
+class UnaryToUnaryOperator(BaseOperator):
+
+ def __init__(self, name:str, input:GenericTco, output:GenericUco):
+ super().__init__(name, input, output, 1, 1)
+
+ def __str__(self):
+ return super().__str__()
+
+ def __repr__(self):
+ return super().__repr__()
+
+
+
+class FilterOperator(UnaryToUnaryOperator):
+
+ predicate: Predicate
+
+ def __init__(self, predicate: Predicate):
+ type = getTypePredicate(predicate)
+ super().__init__("FilterOperator", type, type)
+ self.predicate = predicate
+
+ def getWrapper(self):
+ udf = self.predicate
+ def func(iterator):
+ return filter(udf, iterator)
+ return func
+
+ def __str__(self):
+ return super().__str__()
+
+ def __repr__(self):
+ return super().__repr__()
+
+class MapOperator(UnaryToUnaryOperator):
+
+ function: Function
+
+ def __init__(self, function: Function):
+ types = getTypeFunction(function)
+ super().__init__("MapOperator", types[0], types[1])
+ self.function = function
+
+ def getWrapper(self):
+ udf = self.function
+ def func(iterator):
+ return map(udf, iterator)
+ return func
+
+ def __str__(self):
+ return super().__str__()
+
+ def __repr__(self):
+ return super().__repr__()
+
+
+class FlatmapOperator(UnaryToUnaryOperator):
+
+ fmfunction: FlatmapFunction
+
+ def __init__(self, fmfunction: FlatmapFunction):
+ types = getTypeFlatmapFunction(fmfunction)
+ super().__init__("FlatmapOperator", types[0], types[1])
+ self.fmfunction = fmfunction
+
+ def getWrapper(self):
+ udf = self.fmfunction
+ def func(iterator):
+ return chain.from_iterable(map(udf, iterator))
+ return func
+
+ def __str__(self):
+ return super().__str__()
+
+ def __repr__(self):
+ return super().__repr__()
\ No newline at end of file
diff --git a/python/src/pywayang/orchestrator/operator.py b/python/src/pywayang/orchestrator/operator.py
index 1307b5f2..87087642 100644
--- a/python/src/pywayang/orchestrator/operator.py
+++ b/python/src/pywayang/orchestrator/operator.py
@@ -30,8 +30,12 @@ pickle_protocol = pickle.HIGHEST_PROTOCOL
class Operator:
def __init__(
- self, operator_type=None, udf=None, previous=None,
- iterator=None, python_exec=False
+ self,
+ operator_type=None,
+ udf=None,
+ previous=None,
+ iterator=None,
+ python_exec=False
):
# Operator ID
diff --git a/python/src/pywayang/platform.py b/python/src/pywayang/platform.py
new file mode 100644
index 00000000..caece352
--- /dev/null
+++ b/python/src/pywayang/platform.py
@@ -0,0 +1,24 @@
+
+class Platform:
+ """
+ A platform describes an execution engine that is used for execute the
+ wayang plan
+
+ Parameters
+ ----------
+ name: str
+ platform name, it uses as identification
+ """
+
+ name : str
+ #configuration : dict[str, str]
+
+ def __init__(self, name):
+ self.name = name
+
+ def __str__(self):
+ return "name: {}".format(self.name)
+
+ def __repr__(self):
+ return self.__str__()
+
diff --git a/python/src/pywayang/plugin.py b/python/src/pywayang/plugin.py
new file mode 100644
index 00000000..82d75431
--- /dev/null
+++ b/python/src/pywayang/plugin.py
@@ -0,0 +1,27 @@
+from pywayang.platform import Platform
+
+class Plugin:
+ """
+ A plugin contributes the following components to a :class:`Context`
+ - mappings
+ - channels
+ - configurations
+ In turn, it may require several :clas:`Platform`s for its operation.
+ """
+
+ platforms = []
+
+ def __init__(self, *platform:Platform):
+ self.platforms = list(platform)
+
+ def __str__(self):
+ return "Platforms: {}".format(str(self.platforms))
+
+ def __repr__(self):
+ return self.__str__()
+
+
+# define the basic plugins that can be used
+java = Plugin(Platform('java'))
+spark = Plugin(Platform('spark'))
+flink = Plugin(Platform('flink'))
diff --git a/python/src/pywayang/test.py b/python/src/pywayang/test.py
new file mode 100644
index 00000000..66ddab0a
--- /dev/null
+++ b/python/src/pywayang/test.py
@@ -0,0 +1,61 @@
+from typing import Iterable
+
+from pywayang.platform import Platform
+from pywayang.context import WayangContext
+from pywayang.plugin import java, spark
+from pywayang.operator.unary import *
+
+p = Platform("nana")
+print(p)
+
+
+print(str(WayangContext().register(java, spark)))
+
+from pywayang.types import Predicate, getTypePredicate
+
+predicate : Predicate = lambda x: x % 2 == 0
+getTypePredicate(predicate)
+
+def pre(a:str):
+ return len(a) > 3
+
+def func(s:str) -> int:
+ return len(s)
+
+def fmfunc(i:int) -> str:
+ for x in range(i):
+ yield str(x)
+
+fileop = WayangContext()\
+ .register(java)\
+ .textFile("here")\
+
+filterop: FilterOperator = fileop.filter(pre).getOperator()
+fop_pre = filterop.getWrapper()
+fop_pre_res = fop_pre(["la", "lala"])
+#for i in fop_pre_res:
+# print(i)
+
+
+mapop: MapOperator = fileop.map(func).getOperator()
+mop_func = mapop.getWrapper()
+mop_func_res = mop_func(["la", "lala"])
+#for i in mop_func_res:
+# print(i)
+
+
+fmop: FlatmapOperator = fileop.flatmap(fmfunc).getOperator()
+fmop_func = fmop.getWrapper()
+fmop_func_res = fmop_func([2, 3])
+#for i in fmop_func_res:
+# print(i)
+
+def concatenate(function_a, function_b):
+ def executable(iterable):
+ return function_b(function_a(iterable))
+ return executable
+
+res = concatenate(concatenate(fop_pre, mop_func), fmop_func)
+res_pro = res(["la", "lala"])
+for i in res_pro:
+ print(i)
\ No newline at end of file
diff --git a/python/src/pywayang/types.py b/python/src/pywayang/types.py
new file mode 100644
index 00000000..8be502e0
--- /dev/null
+++ b/python/src/pywayang/types.py
@@ -0,0 +1,57 @@
+from typing import ( Generic, TypeVar, Callable, Hashable, Iterable)
+from inspect import signature
+
+T = TypeVar("T") # Type
+I = TypeVar("I") # Input Type number 1
+I2 = TypeVar("I2") # Input Type number 2
+O = TypeVar("O") # Output Type
+
+IterableT = Iterable[T] # Iterable of type 'T'
+IterableO = Iterable[O] # Iterable of type 'O'
+
+T_co = TypeVar("T_co", covariant=True)
+U_co = TypeVar("U_co", covariant=True)
+K = TypeVar("K", bound=Hashable)
+
+GenericTco = Generic[T_co]
+GenericUco = Generic[U_co]
+
+Predicate = Callable[[T], bool]
+Function = Callable[[I], O]
+BiFunction = Callable[[I, I2], O]
+Function = Callable[[I], O]
+
+FlatmapFunction = Callable[[T], IterableO]
+
+
+def getTypePredicate(callable: Predicate) -> Generic :
+ sig = signature(callable)
+ if(len(sig.parameters) != 1):
+ raise Exception("the parameters for the Predicate are distinct than one, {}".format(str(sig.parameters)))
+
+ keys = list(sig.parameters.keys())
+ return sig.parameters[keys[0]].annotation
+
+def getTypeFunction(callable: Function) -> Generic :
+ sig = signature(callable)
+ if(len(sig.parameters) != 1):
+ raise Exception("the parameters for the Function are distinct than one, {}".format(str(sig.parameters)))
+
+ keys = list(sig.parameters.keys())
+ return (sig.parameters[keys[0]].annotation, sig.return_annotation)
+
+def getTypeBiFunction(callable: BiFunction) -> (Generic, Generic, Generic) :
+ sig = signature(callable)
+ if(len(sig.parameters) != 2):
+ raise Exception("the parameters for the BiFunction are distinct than two, {}".format(str(sig.parameters)))
+
+ keys = list(sig.parameters.keys())
+ return (sig.parameters[keys[0]].annotation, sig.parameters[keys[1]].annotation, sig.return_annotation)
+
+def getTypeFlatmapFunction(callable: FlatmapFunction) -> (Generic, Generic) :
+ sig = signature(callable)
+ if(len(sig.parameters) != 1):
+ raise Exception("the parameters for the FlatmapFunction are distinct than one, {}".format(str(sig.parameters)))
+
+ keys = list(sig.parameters.keys())
+ return (sig.parameters[keys[0]].annotation, sig.return_annotation)
\ No newline at end of file