You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:26 UTC
[incubator-wayang] 05/32: [WAYANG-#8] add TextFileSourceOperator
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 6e04ecf6c820a96804272f5d9d18570930dd93b7
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Wed Apr 6 11:54:08 2022 +0200
[WAYANG-#8] add TextFileSourceOperator
Signed-off-by: bertty <be...@apache.org>
---
python/src/pywayang/platforms/python/channels.py | 19 +++++++++--
python/src/pywayang/platforms/python/mappings.py | 1 +
.../python/operators/PyTextFileSourceOperator.py | 38 ++++++++++++++++++++++
.../platforms/python/operators/__init__.py | 4 ++-
4 files changed, 59 insertions(+), 3 deletions(-)
diff --git a/python/src/pywayang/platforms/python/channels.py b/python/src/pywayang/platforms/python/channels.py
index f611a258..a2863677 100644
--- a/python/src/pywayang/platforms/python/channels.py
+++ b/python/src/pywayang/platforms/python/channels.py
@@ -32,7 +32,7 @@ class PyIteratorChannel(Channel):
def provide_iterable(self) -> Iterable:
return self.iterable
- def accept_iterable(self, iterable) -> 'PyIteratorChannel':
+ def accept_iterable(self, iterable: Iterable) -> 'PyIteratorChannel':
self.iterable = iterable
return self
@@ -56,5 +56,20 @@ class PyCallableChannel(Channel):
return function_a(function_b(iterable))
return executable
+class PyFileChannel(Channel):
+
+ path : str
+
+ def __init__(self):
+ Channel.__init__(self)
+
+ def provide_path(self) -> str:
+ return self.path
+
+ def accept_path(self, path: str) -> 'PyIteratorChannel':
+ self.path = path
+ return self
+
PyIteratorChannelDescriptor = ChannelDescriptor(type(PyIteratorChannel()), False, False)
-PyCallableChannelDescriptor = ChannelDescriptor(type(PyCallableChannel()), False, False)
\ No newline at end of file
+PyCallableChannelDescriptor = ChannelDescriptor(type(PyCallableChannel()), False, False)
+PyFileChannelDescriptor = ChannelDescriptor(type(PyFileChannel()), False, False)
\ No newline at end of file
diff --git a/python/src/pywayang/platforms/python/mappings.py b/python/src/pywayang/platforms/python/mappings.py
index 977ccada..55a80180 100644
--- a/python/src/pywayang/platforms/python/mappings.py
+++ b/python/src/pywayang/platforms/python/mappings.py
@@ -32,4 +32,5 @@ class Mapping:
OperatorMappings = Mapping()
OperatorMappings.add_mapping(PyFilterOperator())
+OperatorMappings.add_mapping(PyTextFileSourceOperator())
diff --git a/python/src/pywayang/platforms/python/operators/PyTextFileSourceOperator.py b/python/src/pywayang/platforms/python/operators/PyTextFileSourceOperator.py
new file mode 100644
index 00000000..ccfbec48
--- /dev/null
+++ b/python/src/pywayang/platforms/python/operators/PyTextFileSourceOperator.py
@@ -0,0 +1,38 @@
+from pywayang.operator.source import TextFileSource
+from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywayang.platforms.python.channels import (
+ Channel,
+ ChannelDescriptor,
+ PyIteratorChannel,
+ PyIteratorChannelDescriptor,
+ PyFileChannelDescriptor
+ )
+from typing import Set
+
+class PyTextFileSourceOperator(TextFileSource, PythonExecutionOperator):
+
+ def __init__(self, origin: TextFileSource = None):
+ path = None if origin is None else origin.path
+ super().__init__(path)
+ pass
+
+ def execute(self, inputs: Channel, outputs: Channel):
+ self.validateChannels(inputs, outputs)
+ if isinstance(outputs[0], PyIteratorChannel) :
+ py_out_iter_channel: PyIteratorChannel = outputs[0]
+ py_out_iter_channel.accept_iterable(
+ open(
+ self.path,
+ 'r'
+ )
+ )
+
+ else:
+ raise Exception("Channel Type does not supported")
+
+
+ def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+ raise Exception("The PyTextFileSource does not support Input Channels")
+
+ def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+ return {PyIteratorChannelDescriptor}
diff --git a/python/src/pywayang/platforms/python/operators/__init__.py b/python/src/pywayang/platforms/python/operators/__init__.py
index 208a2fc0..5db92431 100644
--- a/python/src/pywayang/platforms/python/operators/__init__.py
+++ b/python/src/pywayang/platforms/python/operators/__init__.py
@@ -1,7 +1,9 @@
from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
from pywayang.platforms.python.operators.PyFilterOperator import PyFilterOperator
+from pywayang.platforms.python.operators.PyTextFileSourceOperator import PyTextFileSourceOperator
__ALL__ = [
PythonExecutionOperator,
- PyFilterOperator
+ PyFilterOperator,
+ PyTextFileSourceOperator
]
\ No newline at end of file