You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:50 UTC

[incubator-wayang] 29/32: [WAYANG-#8] Correction in the channel types

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit aafc8a026bba31c9b42ee56e0661129d8b710034
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Fri Apr 8 10:31:07 2022 +0200

    [WAYANG-#8] Correction in the channel types
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/src/pywy/core/channel.py                                  | 6 ++++++
 .../src/pywy/platforms/python/operator/py_execution_operator.py  | 6 ++++--
 python/src/pywy/platforms/python/operator/py_sink_textfile.py    | 9 +++++----
 python/src/pywy/platforms/python/operator/py_source_textfile.py  | 7 ++++---
 python/src/pywy/platforms/python/operator/py_unary_filter.py     | 7 ++++---
 5 files changed, 23 insertions(+), 12 deletions(-)

diff --git a/python/src/pywy/core/channel.py b/python/src/pywy/core/channel.py
index a4f3bb97..c10cc09c 100644
--- a/python/src/pywy/core/channel.py
+++ b/python/src/pywy/core/channel.py
@@ -1,3 +1,6 @@
+from typing import TypeVar
+
+
 class Channel:
 
     def __init__(self):
@@ -19,3 +22,6 @@ class ChannelDescriptor:
 
     def create_instance(self) -> Channel:
         return self.channelType()
+
+
+CH_T = TypeVar('CH_T', bound=Channel)
diff --git a/python/src/pywy/platforms/python/operator/py_execution_operator.py b/python/src/pywy/platforms/python/operator/py_execution_operator.py
index a2e82590..9b5f1a75 100644
--- a/python/src/pywy/platforms/python/operator/py_execution_operator.py
+++ b/python/src/pywy/platforms/python/operator/py_execution_operator.py
@@ -1,5 +1,7 @@
+from typing import List, Type
+
+from pywy.core.channel import CH_T
 from pywy.operators.base import PywyOperator
-from pywy.platforms.python.channels import Channel
 
 
 class PyExecutionOperator(PywyOperator):
@@ -7,5 +9,5 @@ class PyExecutionOperator(PywyOperator):
     def prefix(self) -> str:
         return 'Py'
 
-    def execute(self, inputs: Channel, output: Channel):
+    def execute(self, inputs: List[Type[CH_T]], output: List[Type[CH_T]]):
         pass
diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
index 429da96c..ab5f5af7 100644
--- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
@@ -1,8 +1,9 @@
-from typing import Set
+from typing import Set, List, Type
+
+from pywy.core.channel import CH_T
 from pywy.operators.sink import TextFileSink
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
 from pywy.platforms.python.channels import (
-    Channel,
     ChannelDescriptor,
     PyIteratorChannel,
     PY_ITERATOR_CHANNEL_DESCRIPTOR
@@ -16,12 +17,12 @@ class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator):
         super().__init__(path)
         pass
 
-    def execute(self, inputs: Channel, outputs: Channel):
+    def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
         self.validate_channels(inputs, outputs)
         if isinstance(inputs[0], PyIteratorChannel):
             file = open(self.path, 'w')
             py_in_iter_channel: PyIteratorChannel = inputs[0]
-            iterable = py_in_iter_channel.provide_iterable();
+            iterable = py_in_iter_channel.provide_iterable()
             for element in iterable:
                 file.write(str(element))
             file.close()
diff --git a/python/src/pywy/platforms/python/operator/py_source_textfile.py b/python/src/pywy/platforms/python/operator/py_source_textfile.py
index 7fdbf740..067ec72b 100644
--- a/python/src/pywy/platforms/python/operator/py_source_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_source_textfile.py
@@ -1,8 +1,9 @@
-from typing import Set
+from typing import Set, List, Type
+
+from pywy.core.channel import CH_T
 from pywy.operators.source import TextFileSource
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
 from pywy.platforms.python.channels import (
-    Channel,
     ChannelDescriptor,
     PyIteratorChannel,
     PY_ITERATOR_CHANNEL_DESCRIPTOR
@@ -16,7 +17,7 @@ class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator):
         super().__init__(path)
         pass
 
-    def execute(self, inputs: Channel, outputs: Channel):
+    def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
         self.validate_channels(inputs, outputs)
         if isinstance(outputs[0], PyIteratorChannel):
             py_out_iter_channel: PyIteratorChannel = outputs[0]
diff --git a/python/src/pywy/platforms/python/operator/py_unary_filter.py b/python/src/pywy/platforms/python/operator/py_unary_filter.py
index b08bb5ca..c5b2cde7 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_filter.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_filter.py
@@ -1,8 +1,9 @@
-from typing import Set
+from typing import Set, List, Type
+
+from pywy.core.channel import CH_T
 from pywy.operators.unary import FilterOperator
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
 from pywy.platforms.python.channels import (
-                                                Channel,
                                                 ChannelDescriptor,
                                                 PyIteratorChannel,
                                                 PY_ITERATOR_CHANNEL_DESCRIPTOR,
@@ -18,7 +19,7 @@ class PyFilterOperator(FilterOperator, PyExecutionOperator):
         super().__init__(predicate)
         pass
 
-    def execute(self, inputs: Channel, outputs: Channel):
+    def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
         self.validate_channels(inputs, outputs)
         udf = self.predicate
         if isinstance(inputs[0], PyIteratorChannel):