You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:50 UTC
[incubator-wayang] 29/32: [WAYANG-#8] Correction in the channel types
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit aafc8a026bba31c9b42ee56e0661129d8b710034
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Fri Apr 8 10:31:07 2022 +0200
[WAYANG-#8] Correction in the channel types
Signed-off-by: bertty <be...@apache.org>
---
python/src/pywy/core/channel.py | 6 ++++++
.../src/pywy/platforms/python/operator/py_execution_operator.py | 6 ++++--
python/src/pywy/platforms/python/operator/py_sink_textfile.py | 9 +++++----
python/src/pywy/platforms/python/operator/py_source_textfile.py | 7 ++++---
python/src/pywy/platforms/python/operator/py_unary_filter.py | 7 ++++---
5 files changed, 23 insertions(+), 12 deletions(-)
diff --git a/python/src/pywy/core/channel.py b/python/src/pywy/core/channel.py
index a4f3bb97..c10cc09c 100644
--- a/python/src/pywy/core/channel.py
+++ b/python/src/pywy/core/channel.py
@@ -1,3 +1,6 @@
+from typing import TypeVar
+
+
class Channel:
def __init__(self):
@@ -19,3 +22,6 @@ class ChannelDescriptor:
def create_instance(self) -> Channel:
return self.channelType()
+
+
+CH_T = TypeVar('CH_T', bound=Channel)
diff --git a/python/src/pywy/platforms/python/operator/py_execution_operator.py b/python/src/pywy/platforms/python/operator/py_execution_operator.py
index a2e82590..9b5f1a75 100644
--- a/python/src/pywy/platforms/python/operator/py_execution_operator.py
+++ b/python/src/pywy/platforms/python/operator/py_execution_operator.py
@@ -1,5 +1,7 @@
+from typing import List, Type
+
+from pywy.core.channel import CH_T
from pywy.operators.base import PywyOperator
-from pywy.platforms.python.channels import Channel
class PyExecutionOperator(PywyOperator):
@@ -7,5 +9,5 @@ class PyExecutionOperator(PywyOperator):
def prefix(self) -> str:
return 'Py'
- def execute(self, inputs: Channel, output: Channel):
+ def execute(self, inputs: List[Type[CH_T]], output: List[Type[CH_T]]):
pass
diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
index 429da96c..ab5f5af7 100644
--- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
@@ -1,8 +1,9 @@
-from typing import Set
+from typing import Set, List, Type
+
+from pywy.core.channel import CH_T
from pywy.operators.sink import TextFileSink
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
from pywy.platforms.python.channels import (
- Channel,
ChannelDescriptor,
PyIteratorChannel,
PY_ITERATOR_CHANNEL_DESCRIPTOR
@@ -16,12 +17,12 @@ class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator):
super().__init__(path)
pass
- def execute(self, inputs: Channel, outputs: Channel):
+ def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
self.validate_channels(inputs, outputs)
if isinstance(inputs[0], PyIteratorChannel):
file = open(self.path, 'w')
py_in_iter_channel: PyIteratorChannel = inputs[0]
- iterable = py_in_iter_channel.provide_iterable();
+ iterable = py_in_iter_channel.provide_iterable()
for element in iterable:
file.write(str(element))
file.close()
diff --git a/python/src/pywy/platforms/python/operator/py_source_textfile.py b/python/src/pywy/platforms/python/operator/py_source_textfile.py
index 7fdbf740..067ec72b 100644
--- a/python/src/pywy/platforms/python/operator/py_source_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_source_textfile.py
@@ -1,8 +1,9 @@
-from typing import Set
+from typing import Set, List, Type
+
+from pywy.core.channel import CH_T
from pywy.operators.source import TextFileSource
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
from pywy.platforms.python.channels import (
- Channel,
ChannelDescriptor,
PyIteratorChannel,
PY_ITERATOR_CHANNEL_DESCRIPTOR
@@ -16,7 +17,7 @@ class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator):
super().__init__(path)
pass
- def execute(self, inputs: Channel, outputs: Channel):
+ def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
self.validate_channels(inputs, outputs)
if isinstance(outputs[0], PyIteratorChannel):
py_out_iter_channel: PyIteratorChannel = outputs[0]
diff --git a/python/src/pywy/platforms/python/operator/py_unary_filter.py b/python/src/pywy/platforms/python/operator/py_unary_filter.py
index b08bb5ca..c5b2cde7 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_filter.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_filter.py
@@ -1,8 +1,9 @@
-from typing import Set
+from typing import Set, List, Type
+
+from pywy.core.channel import CH_T
from pywy.operators.unary import FilterOperator
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
from pywy.platforms.python.channels import (
- Channel,
ChannelDescriptor,
PyIteratorChannel,
PY_ITERATOR_CHANNEL_DESCRIPTOR,
@@ -18,7 +19,7 @@ class PyFilterOperator(FilterOperator, PyExecutionOperator):
super().__init__(predicate)
pass
- def execute(self, inputs: Channel, outputs: Channel):
+ def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
self.validate_channels(inputs, outputs)
udf = self.predicate
if isinstance(inputs[0], PyIteratorChannel):