You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/11 22:32:37 UTC
[incubator-wayang] 02/04: [WAYANG-#211] restructure the channels for Python-Platform
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch wayang-211
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit cecaf8ec971beafb689fa6f4344b364c88be8e15
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Mon Apr 11 11:19:11 2022 +0200
[WAYANG-#211] restructure the channels for Python-Platform
Signed-off-by: bertty <be...@apache.org>
---
python/src/pywy/platforms/commons/__init__.py | 16 +++++++++
.../pywy/platforms/{python => commons}/channels.py | 28 ++++-----------
python/src/pywy/platforms/python/channels.py | 40 +---------------------
.../platforms/python/operator/py_sink_textfile.py | 3 +-
.../python/operator/py_source_textfile.py | 5 ++-
.../platforms/python/operator/py_unary_filter.py | 27 ++++++++-------
.../platforms/python/operator/py_unary_flatmap.py | 27 ++++++++-------
.../pywy/platforms/python/operator/py_unary_map.py | 27 ++++++++-------
8 files changed, 68 insertions(+), 105 deletions(-)
diff --git a/python/src/pywy/platforms/commons/__init__.py b/python/src/pywy/platforms/commons/__init__.py
new file mode 100644
index 00000000..8d2bad81
--- /dev/null
+++ b/python/src/pywy/platforms/commons/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
\ No newline at end of file
diff --git a/python/src/pywy/platforms/python/channels.py b/python/src/pywy/platforms/commons/channels.py
similarity index 66%
copy from python/src/pywy/platforms/python/channels.py
copy to python/src/pywy/platforms/commons/channels.py
index 0d0f65e6..743a7169 100644
--- a/python/src/pywy/platforms/python/channels.py
+++ b/python/src/pywy/platforms/commons/channels.py
@@ -15,26 +15,11 @@
# limitations under the License.
#
-from typing import ( Iterable, Callable )
+from typing import Callable
from pywy.core import (Channel, ChannelDescriptor)
-class PyIteratorChannel(Channel):
-
- iterable: Iterable
-
- def __init__(self):
- Channel.__init__(self)
-
- def provide_iterable(self) -> Iterable:
- return self.iterable
-
- def accept_iterable(self, iterable: Iterable) -> 'PyIteratorChannel':
- self.iterable = iterable
- return self
-
-
-class PyCallableChannel(Channel):
+class CommonsCallableChannel(Channel):
udf: Callable
@@ -44,7 +29,7 @@ class PyCallableChannel(Channel):
def provide_callable(self) -> Callable:
return self.udf
- def accept_callable(self, udf: Callable) -> 'PyCallableChannel':
+ def accept_callable(self, udf: Callable) -> 'CommonsCallableChannel':
self.udf = udf
return self
@@ -55,7 +40,7 @@ class PyCallableChannel(Channel):
return executable
-class PyFileChannel(Channel):
+class CommonsFileChannel(Channel):
path: str
@@ -70,6 +55,5 @@ class PyFileChannel(Channel):
return self
-PY_ITERATOR_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyIteratorChannel()), False, False)
-PY_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyCallableChannel()), False, False)
-PY_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyFileChannel()), False, False)
+COMMONS_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsCallableChannel()), False, False)
+COMMONS_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsFileChannel()), False, False)
diff --git a/python/src/pywy/platforms/python/channels.py b/python/src/pywy/platforms/python/channels.py
index 0d0f65e6..f79a67ed 100644
--- a/python/src/pywy/platforms/python/channels.py
+++ b/python/src/pywy/platforms/python/channels.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-from typing import ( Iterable, Callable )
+from typing import Iterable
from pywy.core import (Channel, ChannelDescriptor)
@@ -34,42 +34,4 @@ class PyIteratorChannel(Channel):
return self
-class PyCallableChannel(Channel):
-
- udf: Callable
-
- def __init__(self):
- Channel.__init__(self)
-
- def provide_callable(self) -> Callable:
- return self.udf
-
- def accept_callable(self, udf: Callable) -> 'PyCallableChannel':
- self.udf = udf
- return self
-
- @staticmethod
- def concatenate(function_a: Callable, function_b: Callable):
- def executable(iterable):
- return function_a(function_b(iterable))
- return executable
-
-
-class PyFileChannel(Channel):
-
- path: str
-
- def __init__(self):
- Channel.__init__(self)
-
- def provide_path(self) -> str:
- return self.path
-
- def accept_path(self, path: str) -> 'PyIteratorChannel':
- self.path = path
- return self
-
-
PY_ITERATOR_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyIteratorChannel()), False, False)
-PY_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyCallableChannel()), False, False)
-PY_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyFileChannel()), False, False)
diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
index 9914382f..7d8eec1b 100644
--- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
@@ -17,11 +17,10 @@
from typing import Set, List, Type
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
from pywy.operators.sink import TextFileSink
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
from pywy.platforms.python.channels import (
- ChannelDescriptor,
PyIteratorChannel,
PY_ITERATOR_CHANNEL_DESCRIPTOR
)
diff --git a/python/src/pywy/platforms/python/operator/py_source_textfile.py b/python/src/pywy/platforms/python/operator/py_source_textfile.py
index 30831880..245d090d 100644
--- a/python/src/pywy/platforms/python/operator/py_source_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_source_textfile.py
@@ -17,14 +17,13 @@
from typing import Set, List, Type
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
from pywy.operators.source import TextFileSource
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
from pywy.platforms.python.channels import (
- ChannelDescriptor,
PyIteratorChannel,
PY_ITERATOR_CHANNEL_DESCRIPTOR
- )
+)
class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator):
diff --git a/python/src/pywy/platforms/python/operator/py_unary_filter.py b/python/src/pywy/platforms/python/operator/py_unary_filter.py
index 0788e974..2d807282 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_filter.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_filter.py
@@ -17,16 +17,17 @@
from typing import Set, List, Type
-from pywy.core.channel import CH_T
+from pywy.core.channel import CH_T, ChannelDescriptor
from pywy.operators.unary import FilterOperator
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.commons.channels import (
+ COMMONS_CALLABLE_CHANNEL_DESCRIPTOR,
+ CommonsCallableChannel
+)
from pywy.platforms.python.channels import (
- ChannelDescriptor,
- PyIteratorChannel,
- PY_ITERATOR_CHANNEL_DESCRIPTOR,
- PY_CALLABLE_CHANNEL_DESCRIPTOR,
- PyCallableChannel
- )
+ PyIteratorChannel,
+ PY_ITERATOR_CHANNEL_DESCRIPTOR,
+)
class PyFilterOperator(FilterOperator, PyExecutionOperator):
@@ -43,15 +44,15 @@ class PyFilterOperator(FilterOperator, PyExecutionOperator):
py_in_iter_channel: PyIteratorChannel = inputs[0]
py_out_iter_channel: PyIteratorChannel = outputs[0]
py_out_iter_channel.accept_iterable(filter(udf, py_in_iter_channel.provide_iterable()))
- elif isinstance(inputs[0], PyCallableChannel):
- py_in_call_channel: PyCallableChannel = inputs[0]
- py_out_call_channel: PyCallableChannel = outputs[0]
+ elif isinstance(inputs[0], CommonsCallableChannel):
+ py_in_call_channel: CommonsCallableChannel = inputs[0]
+ py_out_call_channel: CommonsCallableChannel = outputs[0]
def func(iterator):
return filter(udf, iterator)
py_out_call_channel.accept_callable(
- PyCallableChannel.concatenate(
+ CommonsCallableChannel.concatenate(
func,
py_in_call_channel.provide_callable()
)
@@ -60,7 +61,7 @@ class PyFilterOperator(FilterOperator, PyExecutionOperator):
raise Exception("Channel Type does not supported")
def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
- return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+ return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
- return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+ return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
index 97f467d4..72016a8c 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
@@ -18,16 +18,17 @@
from itertools import chain
from typing import Set, List, Type
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
from pywy.operators.unary import FlatmapOperator
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.commons.channels import (
+ COMMONS_CALLABLE_CHANNEL_DESCRIPTOR,
+ CommonsCallableChannel
+)
from pywy.platforms.python.channels import (
- ChannelDescriptor,
- PyIteratorChannel,
- PY_ITERATOR_CHANNEL_DESCRIPTOR,
- PY_CALLABLE_CHANNEL_DESCRIPTOR,
- PyCallableChannel
- )
+ PyIteratorChannel,
+ PY_ITERATOR_CHANNEL_DESCRIPTOR,
+)
class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator):
@@ -43,15 +44,15 @@ class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator):
py_in_iter_channel: PyIteratorChannel = inputs[0]
py_out_iter_channel: PyIteratorChannel = outputs[0]
py_out_iter_channel.accept_iterable(chain.from_iterable(map(udf, py_in_iter_channel.provide_iterable())))
- elif isinstance(inputs[0], PyCallableChannel):
- py_in_call_channel: PyCallableChannel = inputs[0]
- py_out_call_channel: PyCallableChannel = outputs[0]
+ elif isinstance(inputs[0], CommonsCallableChannel):
+ py_in_call_channel: CommonsCallableChannel = inputs[0]
+ py_out_call_channel: CommonsCallableChannel = outputs[0]
def fm_func(iterator):
return chain.from_iterable(map(udf, iterator))
py_out_call_channel.accept_callable(
- PyCallableChannel.concatenate(
+ CommonsCallableChannel.concatenate(
fm_func,
py_in_call_channel.provide_callable()
)
@@ -60,7 +61,7 @@ class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator):
raise Exception("Channel Type does not supported")
def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
- return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+ return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
- return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+ return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/platforms/python/operator/py_unary_map.py b/python/src/pywy/platforms/python/operator/py_unary_map.py
index b8741a92..a8e53a48 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_map.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_map.py
@@ -17,16 +17,17 @@
from typing import Set, List, Type
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
from pywy.operators.unary import MapOperator
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.commons.channels import (
+ COMMONS_CALLABLE_CHANNEL_DESCRIPTOR,
+ CommonsCallableChannel
+)
from pywy.platforms.python.channels import (
- ChannelDescriptor,
- PyIteratorChannel,
- PY_ITERATOR_CHANNEL_DESCRIPTOR,
- PY_CALLABLE_CHANNEL_DESCRIPTOR,
- PyCallableChannel
- )
+ PyIteratorChannel,
+ PY_ITERATOR_CHANNEL_DESCRIPTOR,
+)
class PyMapOperator(MapOperator, PyExecutionOperator):
@@ -43,15 +44,15 @@ class PyMapOperator(MapOperator, PyExecutionOperator):
py_in_iter_channel: PyIteratorChannel = inputs[0]
py_out_iter_channel: PyIteratorChannel = outputs[0]
py_out_iter_channel.accept_iterable(map(udf, py_in_iter_channel.provide_iterable()))
- elif isinstance(inputs[0], PyCallableChannel):
- py_in_call_channel: PyCallableChannel = inputs[0]
- py_out_call_channel: PyCallableChannel = outputs[0]
+ elif isinstance(inputs[0], CommonsCallableChannel):
+ py_in_call_channel: CommonsCallableChannel = inputs[0]
+ py_out_call_channel: CommonsCallableChannel = outputs[0]
def func(iterator):
return map(udf, iterator)
py_out_call_channel.accept_callable(
- PyCallableChannel.concatenate(
+ CommonsCallableChannel.concatenate(
func,
py_in_call_channel.provide_callable()
)
@@ -60,7 +61,7 @@ class PyMapOperator(MapOperator, PyExecutionOperator):
raise Exception("Channel Type does not supported")
def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
- return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+ return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
- return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+ return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}