You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:44 UTC

[incubator-wayang] 23/32: [WAYANG-#8] add dataquanta Tests and small corrections

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit e0da05b20b23713f528a8be93eb81bf1d9721392
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Thu Apr 7 18:51:16 2022 +0200

    [WAYANG-#8] add dataquanta Tests and small corrections
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/old_code/test.py                            |   4 +-
 python/src/pywy/dataquanta.py                      |  38 +++---
 python/src/pywy/operators/__init__.py              |   3 +-
 .../src/pywy/tests/unit/dataquanta/context_test.py |   2 +-
 .../pywy/tests/unit/dataquanta/dataquanta_test.py  | 134 +++++++++++++++++++++
 5 files changed, 158 insertions(+), 23 deletions(-)

diff --git a/python/old_code/test.py b/python/old_code/test.py
index 9a2b544f..1b3111c5 100644
--- a/python/old_code/test.py
+++ b/python/old_code/test.py
@@ -31,9 +31,9 @@ for index in range(0, 1):
     tic = time.perf_counter()
     fileop = WayangContext()\
                 .register(python)\
-                .textFile("/Users/bertty/databloom/blossom/python/resources/tmp"+str(index))\
+                .textfile("/Users/bertty/databloom/blossom/python/resources/tmp" + str(index))\
                 .filter(pre)\
-                .storeTextFile("/Users/bertty/databloom/blossom/python/resources/out"+str(index))
+                .store_textfile("/Users/bertty/databloom/blossom/python/resources/out" + str(index))
     toc = time.perf_counter()
     print(f"Downloaded the tutorial in {toc - tic:0.4f} seconds")
 
diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py
index 9f10eba2..0b9483d3 100644
--- a/python/src/pywy/dataquanta.py
+++ b/python/src/pywy/dataquanta.py
@@ -1,11 +1,12 @@
-from typing import Set
+from typing import Set, List, cast
 
 from pywy.core import Translator
-from pywy.types import ( GenericTco, Predicate, Function, FlatmapFunction, IterableO )
+from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableO, T, I, O)
 from pywy.operators import *
 from pywy.core import PywyPlan
 from pywy.core import Plugin
 
+
 class WayangContext:
     """
     This is the entry point for users to work with Wayang.
@@ -18,6 +19,7 @@ class WayangContext:
     """
     add a :class:`Plugin` to the :class:`Context`
     """
+
     def register(self, *plugins: Plugin):
         for p in plugins:
             self.plugins.add(p)
@@ -26,12 +28,13 @@ class WayangContext:
     """
     remove a :class:`Plugin` from the :class:`Context`
     """
+
     def unregister(self, *plugins: Plugin):
         for p in plugins:
             self.plugins.remove(p)
         return self
 
-    def textFile(self, file_path: str) -> 'DataQuanta[str]':
+    def textfile(self, file_path: str) -> 'DataQuanta[str]':
         return DataQuanta(self, TextFileSource(file_path))
 
     def __str__(self):
@@ -40,43 +43,40 @@ class WayangContext:
     def __repr__(self):
         return self.__str__()
 
+
 class DataQuanta(GenericTco):
     """
     Represents an intermediate result/data flow edge in a [[WayangPlan]].
     """
-    previous : PywyOperator = None
     context: WayangContext
 
-    def __init__(self, context:WayangContext, operator: PywyOperator):
+    def __init__(self, context: WayangContext, operator: PywyOperator):
         self.operator = operator
         self.context = context
 
-    def filter(self: "DataQuanta[T]", p: Predicate) -> "DataQuanta[T]" :
-        return DataQuanta(self.context, self.__connect(FilterOperator(p)))
+    def filter(self: "DataQuanta[T]", p: Predicate) -> "DataQuanta[T]":
+        return DataQuanta(self.context, self._connect(FilterOperator(p)))
 
-    def map(self: "DataQuanta[I]", f: Function) -> "DataQuanta[O]" :
-        return DataQuanta(self.context,self.__connect(MapOperator(f)))
+    def map(self: "DataQuanta[I]", f: Function) -> "DataQuanta[O]":
+        return DataQuanta(self.context, self._connect(MapOperator(f)))
 
-    def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]" :
-        return DataQuanta(self.context,self.__connect(FlatmapOperator(f)))
+    def flatmap(self: "DataQuanta[I]", f: FlatmapFunction) -> "DataQuanta[IterableO]":
+        return DataQuanta(self.context, self._connect(FlatmapOperator(f)))
 
-    def storeTextFile(self: "DataQuanta[I]", path: str) :
-        last = self.__connect(TextFileSink(path))
-        plan = PywyPlan(self.context.plugins, [last])
+    def store_textfile(self: "DataQuanta[I]", path: str):
+        last: List[SinkOperator] = [cast(SinkOperator, self._connect(TextFileSink(path)))]
+        plan = PywyPlan(self.context.plugins, last)
 
         plug = self.context.plugins.pop()
-        trs: Translator =  Translator(plug, plan)
+        trs: Translator = Translator(plug, plan)
         new_plan = trs.translate()
         plug.get_executor().execute(new_plan)
         # TODO add the logic to execute the plan
 
-    def __connect(self, op:PywyOperator, port_op: int = 0) -> PywyOperator:
+    def _connect(self, op: PywyOperator, port_op: int = 0) -> PywyOperator:
         self.operator.connect(0, op, port_op)
         return op
 
-    def getOperator(self):
-        return self.operator
-
     def __str__(self):
         return str(self.operator)
 
diff --git a/python/src/pywy/operators/__init__.py b/python/src/pywy/operators/__init__.py
index 68306811..4c1c7181 100644
--- a/python/src/pywy/operators/__init__.py
+++ b/python/src/pywy/operators/__init__.py
@@ -1,5 +1,5 @@
 from pywy.operators.base import PywyOperator
-from pywy.operators.sink import TextFileSink
+from pywy.operators.sink import TextFileSink, SinkOperator
 from pywy.operators.source import TextFileSource
 from pywy.operators.unary import FilterOperator, MapOperator, FlatmapOperator
 #
@@ -8,6 +8,7 @@ __ALL__= [
      TextFileSink,
      TextFileSource,
      FilterOperator,
+     SinkOperator
 #     MapOperator,
 #     FlatmapOperator
 ]
\ No newline at end of file
diff --git a/python/src/pywy/tests/unit/dataquanta/context_test.py b/python/src/pywy/tests/unit/dataquanta/context_test.py
index a74c28ba..43e7e376 100644
--- a/python/src/pywy/tests/unit/dataquanta/context_test.py
+++ b/python/src/pywy/tests/unit/dataquanta/context_test.py
@@ -79,7 +79,7 @@ class TestUnitDataquantaContext(unittest.TestCase):
         self.assertIsInstance(context, WayangContext)
         self.assertEqual(len(context.plugins), 0)
 
-        dataQuanta = context.textFile(path)
+        dataQuanta = context.textfile(path)
 
         self.assertIsInstance(dataQuanta, DataQuanta)
         self.assertIsInstance(dataQuanta.operator, TextFileSource)
diff --git a/python/src/pywy/tests/unit/dataquanta/dataquanta_test.py b/python/src/pywy/tests/unit/dataquanta/dataquanta_test.py
new file mode 100644
index 00000000..9739307b
--- /dev/null
+++ b/python/src/pywy/tests/unit/dataquanta/dataquanta_test.py
@@ -0,0 +1,134 @@
+import unittest
+from typing import Tuple, Callable
+from unittest.mock import Mock
+
+from pywy.dataquanta import WayangContext
+from pywy.dataquanta import DataQuanta
+from pywy.operators import *
+
+
+class TestUnitCoreTranslator(unittest.TestCase):
+    context: WayangContext
+
+    def setUp(self):
+        self.context = Mock()
+        pass
+
+    def build_seed(self) -> Tuple[PywyOperator, DataQuanta]:
+        operator = PywyOperator("Empty")
+        dq = DataQuanta(self.context, operator)
+        return operator, dq
+
+    def test_create(self):
+        (operator, dq) = self.build_seed()
+
+        self.assertIsInstance(dq, DataQuanta)
+        self.assertEqual(dq.context, self.context)
+        self.assertEqual(dq.operator, operator)
+
+    def test_connect(self):
+        operator = PywyOperator("Empty1")
+        operator2 = PywyOperator("Empty2")
+        dq = DataQuanta(self.context, operator)
+
+        self.assertIsNone(operator2.inputOperator[0])
+        after_operator2 = dq._connect(operator2)
+        self.assertEqual(operator2, after_operator2)
+        self.assertIsNotNone(operator2.inputOperator[0])
+        self.assertEqual(operator, operator2.inputOperator[0])
+        self.assertEqual(operator.outputOperator[0], operator2)
+
+    def validate_filter(self, filtered: DataQuanta, operator: PywyOperator):
+        self.assertIsInstance(filtered, DataQuanta)
+        self.assertIsInstance(filtered.operator, FilterOperator)
+        self.assertEqual(filtered.context, self.context)
+        self.assertNotEqual(filtered.operator, operator)
+        self.assertEqual(filtered.operator.inputOperator[0], operator)
+
+    def test_filter_lambda(self):
+        (operator, dq) = self.build_seed()
+        pred: Callable = lambda x: "" in x
+        filtered = dq.filter(pred)
+        self.validate_filter(filtered, operator)
+
+    def test_filter_func(self):
+        (operator, dq) = self.build_seed()
+
+        def pred(x: str) -> bool:
+            return "" in x
+
+        filtered = dq.filter(pred)
+        self.validate_filter(filtered, operator)
+
+    def test_filter_func_lambda(self):
+        (operator, dq) = self.build_seed()
+
+        def pred(x):
+            return "" in x
+
+        filtered = dq.filter(lambda x: pred(x))
+        self.validate_filter(filtered, operator)
+
+    def validate_map(self, mapped: DataQuanta, operator: PywyOperator):
+        self.assertIsInstance(mapped, DataQuanta)
+        self.assertIsInstance(mapped.operator, MapOperator)
+        self.assertEqual(mapped.context, self.context)
+        self.assertNotEqual(mapped.operator, operator)
+        self.assertEqual(mapped.operator.inputOperator[0], operator)
+
+    def test_map_lambda(self):
+        (operator, dq) = self.build_seed()
+        func: Callable = lambda x: len(x)
+        mapped = dq.map(func)
+        self.validate_map(mapped, operator)
+
+    def test_map_func(self):
+        (operator, dq) = self.build_seed()
+
+        def func(x: str) -> int:
+            return len(x)
+
+        mapped = dq.map(func)
+        self.validate_map(mapped, operator)
+
+    def test_map_func_lambda(self):
+        (operator, dq) = self.build_seed()
+
+        def func(x):
+            return x == 0
+
+        mapped = dq.map(lambda x: func(x))
+        self.validate_map(mapped, operator)
+
+    def validate_flatmap(self, flatted: DataQuanta, operator: PywyOperator):
+        self.assertIsInstance(flatted, DataQuanta)
+        self.assertIsInstance(flatted.operator, FlatmapOperator)
+        self.assertEqual(flatted.context, self.context)
+        self.assertNotEqual(flatted.operator, operator)
+        self.assertEqual(flatted.operator.inputOperator[0], operator)
+
+    def test_flatmap_lambda(self):
+        (operator, dq) = self.build_seed()
+        func: Callable = lambda x: x.split(" ")
+        flatted = dq.flatmap(func)
+        self.validate_flatmap(flatted, operator)
+
+    def test_flatmap_func(self):
+        (operator, dq) = self.build_seed()
+
+        def fmfunc(i: str) -> str:
+            for x in range(len(i)):
+                yield str(x)
+
+        flatted = dq.flatmap(fmfunc)
+        self.validate_flatmap(flatted, operator)
+
+    def test_flatmap_func_lambda(self):
+        (operator, dq) = self.build_seed()
+
+        def fmfunc(i):
+            for x in range(len(i)):
+                yield str(x)
+
+        flatted = dq.flatmap(lambda x: fmfunc(x))
+        self.validate_flatmap(flatted, operator)