You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/11 22:32:37 UTC

[incubator-wayang] 02/04: [WAYANG-#211] restructure the channels for Python-Platform

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch wayang-211
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit cecaf8ec971beafb689fa6f4344b364c88be8e15
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Mon Apr 11 11:19:11 2022 +0200

    [WAYANG-#211] restructure the channels for Python-Platform
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/src/pywy/platforms/commons/__init__.py      | 16 +++++++++
 .../pywy/platforms/{python => commons}/channels.py | 28 ++++-----------
 python/src/pywy/platforms/python/channels.py       | 40 +---------------------
 .../platforms/python/operator/py_sink_textfile.py  |  3 +-
 .../python/operator/py_source_textfile.py          |  5 ++-
 .../platforms/python/operator/py_unary_filter.py   | 27 ++++++++-------
 .../platforms/python/operator/py_unary_flatmap.py  | 27 ++++++++-------
 .../pywy/platforms/python/operator/py_unary_map.py | 27 ++++++++-------
 8 files changed, 68 insertions(+), 105 deletions(-)

diff --git a/python/src/pywy/platforms/commons/__init__.py b/python/src/pywy/platforms/commons/__init__.py
new file mode 100644
index 00000000..8d2bad81
--- /dev/null
+++ b/python/src/pywy/platforms/commons/__init__.py
@@ -0,0 +1,16 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
\ No newline at end of file
diff --git a/python/src/pywy/platforms/python/channels.py b/python/src/pywy/platforms/commons/channels.py
similarity index 66%
copy from python/src/pywy/platforms/python/channels.py
copy to python/src/pywy/platforms/commons/channels.py
index 0d0f65e6..743a7169 100644
--- a/python/src/pywy/platforms/python/channels.py
+++ b/python/src/pywy/platforms/commons/channels.py
@@ -15,26 +15,11 @@
 #  limitations under the License.
 #
 
-from typing import ( Iterable, Callable )
+from typing import Callable
 from pywy.core import (Channel, ChannelDescriptor)
 
 
-class PyIteratorChannel(Channel):
-
-    iterable: Iterable
-
-    def __init__(self):
-        Channel.__init__(self)
-
-    def provide_iterable(self) -> Iterable:
-        return self.iterable
-
-    def accept_iterable(self, iterable: Iterable) -> 'PyIteratorChannel':
-        self.iterable = iterable
-        return self
-
-
-class PyCallableChannel(Channel):
+class CommonsCallableChannel(Channel):
 
     udf: Callable
 
@@ -44,7 +29,7 @@ class PyCallableChannel(Channel):
     def provide_callable(self) -> Callable:
         return self.udf
 
-    def accept_callable(self, udf: Callable) -> 'PyCallableChannel':
+    def accept_callable(self, udf: Callable) -> 'CommonsCallableChannel':
         self.udf = udf
         return self
 
@@ -55,7 +40,7 @@ class PyCallableChannel(Channel):
         return executable
 
 
-class PyFileChannel(Channel):
+class CommonsFileChannel(Channel):
 
     path: str
 
@@ -70,6 +55,5 @@ class PyFileChannel(Channel):
         return self
 
 
-PY_ITERATOR_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyIteratorChannel()), False, False)
-PY_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyCallableChannel()), False, False)
-PY_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyFileChannel()), False, False)
+COMMONS_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsCallableChannel()), False, False)
+COMMONS_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsFileChannel()), False, False)
diff --git a/python/src/pywy/platforms/python/channels.py b/python/src/pywy/platforms/python/channels.py
index 0d0f65e6..f79a67ed 100644
--- a/python/src/pywy/platforms/python/channels.py
+++ b/python/src/pywy/platforms/python/channels.py
@@ -15,7 +15,7 @@
 #  limitations under the License.
 #
 
-from typing import ( Iterable, Callable )
+from typing import Iterable
 from pywy.core import (Channel, ChannelDescriptor)
 
 
@@ -34,42 +34,4 @@ class PyIteratorChannel(Channel):
         return self
 
 
-class PyCallableChannel(Channel):
-
-    udf: Callable
-
-    def __init__(self):
-        Channel.__init__(self)
-
-    def provide_callable(self) -> Callable:
-        return self.udf
-
-    def accept_callable(self, udf: Callable) -> 'PyCallableChannel':
-        self.udf = udf
-        return self
-
-    @staticmethod
-    def concatenate(function_a: Callable, function_b: Callable):
-        def executable(iterable):
-            return function_a(function_b(iterable))
-        return executable
-
-
-class PyFileChannel(Channel):
-
-    path: str
-
-    def __init__(self):
-        Channel.__init__(self)
-
-    def provide_path(self) -> str:
-        return self.path
-
-    def accept_path(self, path: str) -> 'PyIteratorChannel':
-        self.path = path
-        return self
-
-
 PY_ITERATOR_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyIteratorChannel()), False, False)
-PY_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyCallableChannel()), False, False)
-PY_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyFileChannel()), False, False)
diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
index 9914382f..7d8eec1b 100644
--- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
@@ -17,11 +17,10 @@
 
 from typing import Set, List, Type
 
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
 from pywy.operators.sink import TextFileSink
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
 from pywy.platforms.python.channels import (
-    ChannelDescriptor,
     PyIteratorChannel,
     PY_ITERATOR_CHANNEL_DESCRIPTOR
 )
diff --git a/python/src/pywy/platforms/python/operator/py_source_textfile.py b/python/src/pywy/platforms/python/operator/py_source_textfile.py
index 30831880..245d090d 100644
--- a/python/src/pywy/platforms/python/operator/py_source_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_source_textfile.py
@@ -17,14 +17,13 @@
 
 from typing import Set, List, Type
 
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
 from pywy.operators.source import TextFileSource
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
 from pywy.platforms.python.channels import (
-    ChannelDescriptor,
     PyIteratorChannel,
     PY_ITERATOR_CHANNEL_DESCRIPTOR
-                                            )
+)
 
 
 class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator):
diff --git a/python/src/pywy/platforms/python/operator/py_unary_filter.py b/python/src/pywy/platforms/python/operator/py_unary_filter.py
index 0788e974..2d807282 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_filter.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_filter.py
@@ -17,16 +17,17 @@
 
 from typing import Set, List, Type
 
-from pywy.core.channel import CH_T
+from pywy.core.channel import CH_T, ChannelDescriptor
 from pywy.operators.unary import FilterOperator
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.commons.channels import (
+    COMMONS_CALLABLE_CHANNEL_DESCRIPTOR,
+    CommonsCallableChannel
+)
 from pywy.platforms.python.channels import (
-                                                ChannelDescriptor,
-                                                PyIteratorChannel,
-                                                PY_ITERATOR_CHANNEL_DESCRIPTOR,
-                                                PY_CALLABLE_CHANNEL_DESCRIPTOR,
-                                                PyCallableChannel
-                                            )
+    PyIteratorChannel,
+    PY_ITERATOR_CHANNEL_DESCRIPTOR,
+)
 
 
 class PyFilterOperator(FilterOperator, PyExecutionOperator):
@@ -43,15 +44,15 @@ class PyFilterOperator(FilterOperator, PyExecutionOperator):
             py_in_iter_channel: PyIteratorChannel = inputs[0]
             py_out_iter_channel: PyIteratorChannel = outputs[0]
             py_out_iter_channel.accept_iterable(filter(udf, py_in_iter_channel.provide_iterable()))
-        elif isinstance(inputs[0], PyCallableChannel):
-            py_in_call_channel: PyCallableChannel = inputs[0]
-            py_out_call_channel: PyCallableChannel = outputs[0]
+        elif isinstance(inputs[0], CommonsCallableChannel):
+            py_in_call_channel: CommonsCallableChannel = inputs[0]
+            py_out_call_channel: CommonsCallableChannel = outputs[0]
 
             def func(iterator):
                 return filter(udf, iterator)
 
             py_out_call_channel.accept_callable(
-                PyCallableChannel.concatenate(
+                CommonsCallableChannel.concatenate(
                     func,
                     py_in_call_channel.provide_callable()
                 )
@@ -60,7 +61,7 @@ class PyFilterOperator(FilterOperator, PyExecutionOperator):
             raise Exception("Channel Type does not supported")
 
     def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
 
     def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
index 97f467d4..72016a8c 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
@@ -18,16 +18,17 @@
 from itertools import chain
 from typing import Set, List, Type
 
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
 from pywy.operators.unary import FlatmapOperator
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.commons.channels import (
+    COMMONS_CALLABLE_CHANNEL_DESCRIPTOR,
+    CommonsCallableChannel
+)
 from pywy.platforms.python.channels import (
-                                                ChannelDescriptor,
-                                                PyIteratorChannel,
-                                                PY_ITERATOR_CHANNEL_DESCRIPTOR,
-                                                PY_CALLABLE_CHANNEL_DESCRIPTOR,
-                                                PyCallableChannel
-                                            )
+    PyIteratorChannel,
+    PY_ITERATOR_CHANNEL_DESCRIPTOR,
+)
 
 
 class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator):
@@ -43,15 +44,15 @@ class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator):
             py_in_iter_channel: PyIteratorChannel = inputs[0]
             py_out_iter_channel: PyIteratorChannel = outputs[0]
             py_out_iter_channel.accept_iterable(chain.from_iterable(map(udf, py_in_iter_channel.provide_iterable())))
-        elif isinstance(inputs[0], PyCallableChannel):
-            py_in_call_channel: PyCallableChannel = inputs[0]
-            py_out_call_channel: PyCallableChannel = outputs[0]
+        elif isinstance(inputs[0], CommonsCallableChannel):
+            py_in_call_channel: CommonsCallableChannel = inputs[0]
+            py_out_call_channel: CommonsCallableChannel = outputs[0]
 
             def fm_func(iterator):
                 return chain.from_iterable(map(udf, iterator))
 
             py_out_call_channel.accept_callable(
-                PyCallableChannel.concatenate(
+                CommonsCallableChannel.concatenate(
                     fm_func,
                     py_in_call_channel.provide_callable()
                 )
@@ -60,7 +61,7 @@ class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator):
             raise Exception("Channel Type does not supported")
 
     def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
 
     def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/platforms/python/operator/py_unary_map.py b/python/src/pywy/platforms/python/operator/py_unary_map.py
index b8741a92..a8e53a48 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_map.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_map.py
@@ -17,16 +17,17 @@
 
 from typing import Set, List, Type
 
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
 from pywy.operators.unary import MapOperator
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.commons.channels import (
+    COMMONS_CALLABLE_CHANNEL_DESCRIPTOR,
+    CommonsCallableChannel
+)
 from pywy.platforms.python.channels import (
-                                                ChannelDescriptor,
-                                                PyIteratorChannel,
-                                                PY_ITERATOR_CHANNEL_DESCRIPTOR,
-                                                PY_CALLABLE_CHANNEL_DESCRIPTOR,
-                                                PyCallableChannel
-                                            )
+    PyIteratorChannel,
+    PY_ITERATOR_CHANNEL_DESCRIPTOR,
+)
 
 
 class PyMapOperator(MapOperator, PyExecutionOperator):
@@ -43,15 +44,15 @@ class PyMapOperator(MapOperator, PyExecutionOperator):
             py_in_iter_channel: PyIteratorChannel = inputs[0]
             py_out_iter_channel: PyIteratorChannel = outputs[0]
             py_out_iter_channel.accept_iterable(map(udf, py_in_iter_channel.provide_iterable()))
-        elif isinstance(inputs[0], PyCallableChannel):
-            py_in_call_channel: PyCallableChannel = inputs[0]
-            py_out_call_channel: PyCallableChannel = outputs[0]
+        elif isinstance(inputs[0], CommonsCallableChannel):
+            py_in_call_channel: CommonsCallableChannel = inputs[0]
+            py_out_call_channel: CommonsCallableChannel = outputs[0]
 
             def func(iterator):
                 return map(udf, iterator)
 
             py_out_call_channel.accept_callable(
-                PyCallableChannel.concatenate(
+                CommonsCallableChannel.concatenate(
                     func,
                     py_in_call_channel.provide_callable()
                 )
@@ -60,7 +61,7 @@ class PyMapOperator(MapOperator, PyExecutionOperator):
             raise Exception("Channel Type does not supported")
 
     def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
 
     def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}