You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:26 UTC

[incubator-wayang] 05/32: [WAYANG-#8] add TextFileSourceOperator

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 6e04ecf6c820a96804272f5d9d18570930dd93b7
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Wed Apr 6 11:54:08 2022 +0200

    [WAYANG-#8] add TextFileSourceOperator
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/src/pywayang/platforms/python/channels.py   | 19 +++++++++--
 python/src/pywayang/platforms/python/mappings.py   |  1 +
 .../python/operators/PyTextFileSourceOperator.py   | 38 ++++++++++++++++++++++
 .../platforms/python/operators/__init__.py         |  4 ++-
 4 files changed, 59 insertions(+), 3 deletions(-)

diff --git a/python/src/pywayang/platforms/python/channels.py b/python/src/pywayang/platforms/python/channels.py
index f611a258..a2863677 100644
--- a/python/src/pywayang/platforms/python/channels.py
+++ b/python/src/pywayang/platforms/python/channels.py
@@ -32,7 +32,7 @@ class PyIteratorChannel(Channel):
     def provide_iterable(self) -> Iterable:
         return self.iterable
 
-    def accept_iterable(self, iterable) -> 'PyIteratorChannel':
+    def accept_iterable(self, iterable: Iterable) -> 'PyIteratorChannel':
         self.iterable = iterable
         return self
 
@@ -56,5 +56,20 @@ class PyCallableChannel(Channel):
             return function_a(function_b(iterable))
         return executable
 
+class PyFileChannel(Channel):
+
+    path : str
+
+    def __init__(self):
+        Channel.__init__(self)
+
+    def provide_path(self) -> str:
+        return self.path
+
+    def accept_path(self, path: str) -> 'PyIteratorChannel':
+        self.path = path
+        return self
+
 PyIteratorChannelDescriptor = ChannelDescriptor(type(PyIteratorChannel()), False, False)
-PyCallableChannelDescriptor = ChannelDescriptor(type(PyCallableChannel()), False, False)
\ No newline at end of file
+PyCallableChannelDescriptor = ChannelDescriptor(type(PyCallableChannel()), False, False)
+PyFileChannelDescriptor = ChannelDescriptor(type(PyFileChannel()), False, False)
\ No newline at end of file
diff --git a/python/src/pywayang/platforms/python/mappings.py b/python/src/pywayang/platforms/python/mappings.py
index 977ccada..55a80180 100644
--- a/python/src/pywayang/platforms/python/mappings.py
+++ b/python/src/pywayang/platforms/python/mappings.py
@@ -32,4 +32,5 @@ class Mapping:
 OperatorMappings = Mapping()
 
 OperatorMappings.add_mapping(PyFilterOperator())
+OperatorMappings.add_mapping(PyTextFileSourceOperator())
 
diff --git a/python/src/pywayang/platforms/python/operators/PyTextFileSourceOperator.py b/python/src/pywayang/platforms/python/operators/PyTextFileSourceOperator.py
new file mode 100644
index 00000000..ccfbec48
--- /dev/null
+++ b/python/src/pywayang/platforms/python/operators/PyTextFileSourceOperator.py
@@ -0,0 +1,38 @@
+from pywayang.operator.source import TextFileSource
+from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywayang.platforms.python.channels import (
+                                                    Channel,
+                                                    ChannelDescriptor,
+                                                    PyIteratorChannel,
+                                                    PyIteratorChannelDescriptor,
+                                                    PyFileChannelDescriptor
+                                                )
+from typing import Set
+
+class PyTextFileSourceOperator(TextFileSource, PythonExecutionOperator):
+
+    def __init__(self, origin: TextFileSource = None):
+        path = None if origin is None else origin.path
+        super().__init__(path)
+        pass
+
+    def execute(self, inputs: Channel, outputs: Channel):
+        self.validateChannels(inputs, outputs)
+        if isinstance(outputs[0], PyIteratorChannel) :
+            py_out_iter_channel: PyIteratorChannel = outputs[0]
+            py_out_iter_channel.accept_iterable(
+                open(
+                    self.path,
+                    'r'
+                )
+            )
+
+        else:
+            raise Exception("Channel Type does not supported")
+
+
+    def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+        raise Exception("The PyTextFileSource does not support Input Channels")
+
+    def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+        return {PyIteratorChannelDescriptor}
diff --git a/python/src/pywayang/platforms/python/operators/__init__.py b/python/src/pywayang/platforms/python/operators/__init__.py
index 208a2fc0..5db92431 100644
--- a/python/src/pywayang/platforms/python/operators/__init__.py
+++ b/python/src/pywayang/platforms/python/operators/__init__.py
@@ -1,7 +1,9 @@
 from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
 from pywayang.platforms.python.operators.PyFilterOperator import PyFilterOperator
+from pywayang.platforms.python.operators.PyTextFileSourceOperator import PyTextFileSourceOperator
 
 __ALL__ = [
     PythonExecutionOperator,
-    PyFilterOperator
+    PyFilterOperator,
+    PyTextFileSourceOperator
 ]
\ No newline at end of file