You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:27 UTC

[incubator-wayang] 06/32: [WAYANG-#8] add TextFileSinkOperator

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 650e127c6f95d9df93311580a70257011b9d9c2a
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Wed Apr 6 12:11:34 2022 +0200

    [WAYANG-#8] add TextFileSinkOperator
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/src/pywayang/dataquanta.py                  |  5 +++
 python/src/pywayang/operator/sink.py               | 28 +++++++++++++++++
 python/src/pywayang/platforms/python/mappings.py   |  1 +
 .../python/operators/PyTextFileSinkOperator.py     | 36 ++++++++++++++++++++++
 .../platforms/python/operators/__init__.py         |  4 ++-
 5 files changed, 73 insertions(+), 1 deletion(-)

diff --git a/python/src/pywayang/dataquanta.py b/python/src/pywayang/dataquanta.py
index 3ccc2839..9cd892eb 100644
--- a/python/src/pywayang/dataquanta.py
+++ b/python/src/pywayang/dataquanta.py
@@ -1,6 +1,7 @@
 from pywayang.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO)
 from pywayang.operator.base import (WyOperator)
 from pywayang.operator.unary import (FilterOperator, MapOperator, FlatmapOperator)
+from pywayang.operator.sink import TextFileSink
 
 
 class DataQuanta(GenericTco):
@@ -22,6 +23,10 @@ class DataQuanta(GenericTco):
     def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]" :
         return DataQuanta(FlatmapOperator(f))
 
+    def storeTextFile(self: "DataQuanta[I]", path: str) :
+        last = DataQuanta(TextFileSink(path))
+        # TODO add the logic to execute the plan
+
     def getOperator(self):
         return self.operator
 
diff --git a/python/src/pywayang/operator/sink.py b/python/src/pywayang/operator/sink.py
new file mode 100644
index 00000000..52cbeb0e
--- /dev/null
+++ b/python/src/pywayang/operator/sink.py
@@ -0,0 +1,28 @@
+from pywayang.operator.base import WyOperator
+
+class SinkUnaryOperator(WyOperator):
+
+    def __init__(self, name:str):
+        super().__init__(name, None, str, 0, 1)
+
+    def __str__(self):
+        return super().__str__()
+
+    def __repr__(self):
+        return super().__repr__()
+
+
+
+class TextFileSink(SinkUnaryOperator):
+
+    path: str
+
+    def __init__(self, path: str):
+        super().__init__('TextFileSink')
+        self.path = path
+
+    def __str__(self):
+        return super().__str__()
+
+    def __repr__(self):
+        return super().__repr__()
\ No newline at end of file
diff --git a/python/src/pywayang/platforms/python/mappings.py b/python/src/pywayang/platforms/python/mappings.py
index 55a80180..7060ba07 100644
--- a/python/src/pywayang/platforms/python/mappings.py
+++ b/python/src/pywayang/platforms/python/mappings.py
@@ -33,4 +33,5 @@ OperatorMappings = Mapping()
 
 OperatorMappings.add_mapping(PyFilterOperator())
 OperatorMappings.add_mapping(PyTextFileSourceOperator())
+OperatorMappings.add_mapping(PyTextFileSinkOperator())
 
diff --git a/python/src/pywayang/platforms/python/operators/PyTextFileSinkOperator.py b/python/src/pywayang/platforms/python/operators/PyTextFileSinkOperator.py
new file mode 100644
index 00000000..6589a634
--- /dev/null
+++ b/python/src/pywayang/platforms/python/operators/PyTextFileSinkOperator.py
@@ -0,0 +1,36 @@
+from pywayang.operator.sink import TextFileSink
+from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
+from pywayang.platforms.python.channels import (
+                                                    Channel,
+                                                    ChannelDescriptor,
+                                                    PyIteratorChannel,
+                                                    PyIteratorChannelDescriptor
+                                                )
+from typing import Set
+
+class PyTextFileSinkOperator(TextFileSink, PythonExecutionOperator):
+
+    def __init__(self, origin: TextFileSink = None):
+        path = None if origin is None else origin.path
+        super().__init__(path)
+        pass
+
+    def execute(self, inputs: Channel, outputs: Channel):
+        self.validateChannels(inputs, outputs)
+        if isinstance(inputs[0], PyIteratorChannel) :
+            file = open(self.path,'w')
+            py_in_iter_channel: PyIteratorChannel = inputs[0]
+            iterable = py_in_iter_channel.provide_iterable();
+            for element in iterable:
+                file.write(str(element))
+            file.close()
+
+        else:
+            raise Exception("Channel Type does not supported")
+
+
+    def getInputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+        return {PyIteratorChannelDescriptor}
+
+    def getOutputChannelDescriptors(self) -> Set[ChannelDescriptor]:
+        raise Exception("The PyTextFileSource does not support Output Channels")
diff --git a/python/src/pywayang/platforms/python/operators/__init__.py b/python/src/pywayang/platforms/python/operators/__init__.py
index 5db92431..7a555422 100644
--- a/python/src/pywayang/platforms/python/operators/__init__.py
+++ b/python/src/pywayang/platforms/python/operators/__init__.py
@@ -1,9 +1,11 @@
 from pywayang.platforms.python.operators.PythonExecutionOperator import PythonExecutionOperator
 from pywayang.platforms.python.operators.PyFilterOperator import PyFilterOperator
 from pywayang.platforms.python.operators.PyTextFileSourceOperator import PyTextFileSourceOperator
+from pywayang.platforms.python.operators.PyTextFileSinkOperator import PyTextFileSinkOperator
 
 __ALL__ = [
     PythonExecutionOperator,
     PyFilterOperator,
-    PyTextFileSourceOperator
+    PyTextFileSourceOperator,
+    PyTextFileSinkOperator
 ]
\ No newline at end of file