You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/11 22:32:35 UTC

[incubator-wayang] branch wayang-211 created (now f738e66f)

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a change to branch wayang-211
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git


      at f738e66f [WAYANG-#211] JVM-Platform translator almost done

This branch includes the following new commits:

     new 80ff50db [WAYANG-#210] the documentation to pywy.core
     new cecaf8ec [WAYANG-#211] restructure the channels for Python-Platform
     new 8ec4cc82 [WAYANG-#211] seed structure for JVM-Platform
     new f738e66f [WAYANG-#211] JVM-Platform translator almost done

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-wayang] 04/04: [WAYANG-#211] JVM-Platform translator almost done

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch wayang-211
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit f738e66fd4db66b08c2c0e67f35ca3101c9a3bf5
Author: bertty Contreras <be...@gmail.com>
AuthorDate: Tue Apr 12 00:32:13 2022 +0200

    [WAYANG-#211] JVM-Platform translator almost done
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/src/pywy/core/__init__.py                   |   4 +-
 python/src/pywy/core/core.py                       | 174 +++++++++++++++++++++
 python/src/pywy/core/mapping.py                    |   4 +-
 python/src/pywy/core/plan.py                       | 107 -------------
 python/src/pywy/core/plugin.py                     |  51 ------
 python/src/pywy/core/translator.py                 |  58 -------
 python/src/pywy/dataquanta.py                      |  12 +-
 python/src/pywy/operators/base.py                  |   4 +-
 .../jvm_execution_operator.py => context.py}       |  20 +--
 python/src/pywy/platforms/jvm/execution.py         |  14 +-
 .../jvm/operator/jvm_execution_operator.py         |  15 ++
 .../platforms/jvm/operator/jvm_sink_textfile.py    |   6 +-
 .../platforms/jvm/operator/jvm_source_textfile.py  |   6 +-
 .../platforms/jvm/operator/jvm_unary_filter.py     |   5 +-
 python/src/pywy/platforms/jvm/plugin.py            |   5 +-
 .../platforms/jvm/serializable/plan_writter.py     | 131 ++++++++++++++++
 .../jvm/serializable/wayang_jvm_operator.py        |  18 +++
 .../platforms/python/operator/py_sink_textfile.py  |   2 +-
 .../python/operator/py_source_textfile.py          |   2 +-
 .../platforms/python/operator/py_unary_filter.py   |   2 +-
 .../platforms/python/operator/py_unary_flatmap.py  |   2 +-
 .../pywy/platforms/python/operator/py_unary_map.py |   2 +-
 .../pywy/tests/integration/jvm_platform_test.py    |   1 -
 23 files changed, 379 insertions(+), 266 deletions(-)

diff --git a/python/src/pywy/core/__init__.py b/python/src/pywy/core/__init__.py
index a28e3fcd..f9be82ca 100644
--- a/python/src/pywy/core/__init__.py
+++ b/python/src/pywy/core/__init__.py
@@ -18,10 +18,8 @@
 from pywy.core.channel import Channel, ChannelDescriptor
 from pywy.core.executor import Executor
 from pywy.core.mapping import Mapping
-from pywy.core.plan import PywyPlan
+from pywy.core.core import PywyPlan, Plugin, Translator
 from pywy.core.platform import Platform
-from pywy.core.plugin import Plugin
-from pywy.core.translator import Translator
 
 __ALL__ = [
     Channel,
diff --git a/python/src/pywy/core/core.py b/python/src/pywy/core/core.py
new file mode 100644
index 00000000..084ffc25
--- /dev/null
+++ b/python/src/pywy/core/core.py
@@ -0,0 +1,174 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from typing import Set, Iterable
+
+from pywy.core.executor import Executor
+from pywy.core.platform import Platform
+from pywy.core.mapping import Mapping
+from pywy.graph.graph import WayangGraph
+from pywy.graph.types import WGraphOfVec, NodeOperator, NodeVec
+from pywy.operators import SinkOperator
+
+
+class TranslateContext:
+    """TranslateContext contextual variables a parameters for the translation
+    """
+    pass
+
+
+class Plugin:
+    """ TODO: enrich this documentation
+    A plugin contributes the following components to a :class:`Context`
+    - mappings
+    - channels
+    - configurations
+    In turn, it may require several :clas:`Platform`s for its operation.
+    """
+
+    platforms: Set[Platform]
+    mappings: Mapping
+    translate_context: TranslateContext
+
+    def __init__(
+            self,
+            platforms: Set[Platform],
+            mappings: Mapping = Mapping(),
+            translate_context: TranslateContext = None):
+        self.platforms = platforms
+        self.mappings = mappings
+        self.translate_context = translate_context
+
+    def get_mappings(self) -> Mapping:
+        return self.mappings
+
+    def get_executor(self) -> Executor:
+        pass
+
+    def __str__(self):
+        return "Platforms: {}, Mappings: {}".format(str(self.platforms), str(self.mappings))
+
+    def __repr__(self):
+        return self.__str__()
+
+
+class PywyPlan:
+    """A PywyPlan consists of a set of :py:class:`pywy.operators.base.PywyOperator`
+
+    the operator inside PywyPlan follow a Directed acyclic graph(DAG), and describe
+    how the execution needs to be performed
+
+    Attributes
+    ----------
+    graph : :py:class:`pywy.graph.graph.WayangGraph`
+       Graph that describe the DAG, and it provides the iterable properties to
+       the PywyPlan
+    plugins : :obj:`set` of :py:class:`pywy.core.plugin.Plugin`
+        plugins is the set of possible platforms that can be uses to execute
+        the PywyPlan
+    sinks : :py:class:`typing.Iterable` of :py:class:`pywy.operators.sink.SinkOperator`
+        The list of sink operators, this describe the end of the pipeline, and
+        they are used to build the `graph`
+    """
+    graph: WayangGraph
+
+    def __init__(self, plugins: Set[Plugin], sinks: Iterable[SinkOperator]):
+        """basic Constructor of PywyPlan
+
+        this constructor set the plugins and sinks element, and it prepares
+        everything for been executed
+
+        Parameters
+        ----------
+        plugins
+            Description of `plugins`.
+        sinks
+            Description of `sinks`.
+        """
+        self.plugins = plugins
+        self.sinks = sinks
+        self.set_graph()
+
+    def set_graph(self):
+        """ it builds the :py:class:`pywy.graph.graph.WayangGraph` of the current PywyPlan
+        """
+        self.graph = WGraphOfVec(self.sinks)
+
+    def execute(self):
+        """ Execute the plan with the plugin provided at the moment of creation
+        """
+        plug = next(iter(self.plugins))
+        trs: Translator = Translator(plug, self)
+        new_plan = trs.translate()
+        plug.get_executor().execute(new_plan)
+
+
+class Translator:
+    """Translator use the :py:class:`pywy.core.Mapping` to convert the :py:class:`pywy.operators.base.PywyOperator`
+
+    Translator take a plan a produce the executable version of the plan using as tool
+    the :py:class:`pywy.core.Mapping` of the :py:class:`pywy.core.core.Plugin` and convert
+    the :py:class:`pywy.operators.base.PywyOperator` into an executable version inside
+    the :py:class:`pywy.core.Platform`
+
+    Attributes
+    ----------
+    plugin : :py:class:`pywy.core.core.Plugin`
+        plugin use in the translation
+    plan : :py:class:`pywy.core.core.PywyPlan`
+        Plan to be translated by the translator
+    translate_context: :py:class:`pywy.core.core.TranslateContext`
+        context used by the translates at runtime in some case is not needed
+    """
+
+    plugin: Plugin
+    plan: PywyPlan
+    translate_context: TranslateContext
+
+    def __init__(self, plugin: Plugin, plan: PywyPlan):
+        self.plugin = plugin
+        self.plan = plan
+        self.translate_context = plugin.translate_context
+
+    def translate(self):
+        mappings: Mapping = self.plugin.get_mappings()
+        graph = WGraphOfVec(self.plan.sinks)
+
+        translate = self.translate_context
+
+        def translate2plugin(current_op: NodeVec, next_op: NodeVec):
+            if current_op is None:
+                return
+
+            if current_op.current[1] is None:
+                current_op.current[1] = mappings.get_instanceof(current_op.current[0], **{'translate_context': translate})
+
+            if next_op is None:
+                return
+            if next_op.current[1] is None:
+                next_op.current[1] = mappings.get_instanceof(next_op.current[0], **{'translate_context': translate})
+
+            # TODO not necesary it it 0
+            current_op.current[1].connect(0, next_op.current[1], 0)
+
+        graph.traversal(graph.starting_nodes, translate2plugin)
+
+        node = []
+        for elem in graph.starting_nodes:
+            node.append(elem.current[1])
+
+        return PywyPlan({self.plugin}, node)
diff --git a/python/src/pywy/core/mapping.py b/python/src/pywy/core/mapping.py
index 81a57715..b44375e2 100644
--- a/python/src/pywy/core/mapping.py
+++ b/python/src/pywy/core/mapping.py
@@ -50,7 +50,7 @@ class Mapping:
         """
         self.mappings[operator.name_basic()] = type(operator)
 
-    def get_instanceof(self, operator: PywyOperator):
+    def get_instanceof(self, operator: PywyOperator, **kwargs):
         """Instance the executable version of :py:class:`pywy.operators.base.PywyOperator`
 
         Parameters
@@ -70,7 +70,7 @@ class Mapping:
                     operator.name
                 )
             )
-        return template(operator)
+        return template(operator, **kwargs)
 
     def __str__(self):
         return str(self.mappings)
diff --git a/python/src/pywy/core/plan.py b/python/src/pywy/core/plan.py
deleted file mode 100644
index 48407b98..00000000
--- a/python/src/pywy/core/plan.py
+++ /dev/null
@@ -1,107 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
-
-from typing import (Iterable, Set)
-
-from pywy.graph.graph import WayangGraph
-from pywy.graph.types import (NodeOperator, WGraphOfVec, NodeVec)
-from pywy.operators.sink import SinkOperator
-from pywy.core.plugin import Plugin
-
-
-class PywyPlan:
-    """A PywyPlan consists of a set of :py:class:`pywy.operators.base.PywyOperator`
-
-    the operator inside PywyPlan follow a Directed acyclic graph(DAG), and describe
-    how the execution needs to be performed
-
-    Attributes
-    ----------
-    graph : :py:class:`pywy.graph.graph.WayangGraph`
-       Graph that describe the DAG, and it provides the iterable properties to
-       the PywyPlan
-    plugins : :obj:`set` of :py:class:`pywy.core.plugin.Plugin`
-        plugins is the set of possible platforms that can be uses to execute
-        the PywyPlan
-    sinks : :py:class:`typing.Iterable` of :py:class:`pywy.operators.sink.SinkOperator`
-        The list of sink operators, this describe the end of the pipeline, and
-        they are used to build the `graph`
-    """
-    graph: WayangGraph
-
-    def __init__(self, plugins: Set[Plugin], sinks: Iterable[SinkOperator]):
-        """basic Constructor of PywyPlan
-
-        this constructor set the plugins and sinks element, and it prepares
-        everything for been executed
-
-        Parameters
-        ----------
-        plugins
-            Description of `plugins`.
-        sinks
-            Description of `sinks`.
-        """
-        self.plugins = plugins
-        self.sinks = sinks
-        self.set_graph()
-
-    def set_graph(self):
-        """ it builds the :py:class:`pywy.graph.graph.WayangGraph` of the current PywyPlan
-        """
-        self.graph = WGraphOfVec(self.sinks)
-
-    def print(self):
-        def print_plan(current: NodeOperator, previous: NodeOperator):
-            if current is None:
-                print("this is source")
-                print(previous.current)
-                return
-            if previous is None:
-                print("this is sink")
-                print(current.current)
-                return
-
-            print(
-                "===========\n{}\n@@@@@ => previous is\n{}\n===========\n".format(
-                    current.current,
-                    previous.current
-                )
-            )
-
-        self.graph.traversal(self.graph.starting_nodes, print_plan)
-
-    def printTuple(self):
-        def print_plan(current: NodeVec, previous: NodeVec):
-            if current is None:
-                print("this is source")
-                print(previous.current)
-                return
-            if previous is None:
-                print("this is sink")
-                print(current.current)
-                return
-
-            print(
-                "############\n{}\n@@@@@ => previous is\n{}\n############\n"
-                    .format(
-                    current.current,
-                    previous.current
-                )
-            )
-
-        self.graph.traversal(self.graph.starting_nodes, print_plan)
diff --git a/python/src/pywy/core/plugin.py b/python/src/pywy/core/plugin.py
deleted file mode 100644
index b79cbf56..00000000
--- a/python/src/pywy/core/plugin.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
-
-from typing import Set
-
-from pywy.core.executor import Executor
-from pywy.core.platform import Platform
-from pywy.core.mapping import Mapping
-
-
-class Plugin:
-    """
-    A plugin contributes the following components to a :class:`Context`
-    - mappings
-    - channels
-    - configurations
-    In turn, it may require several :clas:`Platform`s for its operation.
-    """
-
-    platforms: Set[Platform]
-    mappings: Mapping
-
-    def __init__(self, platforms: Set[Platform], mappings: Mapping = Mapping()):
-        self.platforms = platforms
-        self.mappings = mappings
-
-    def get_mappings(self) -> Mapping:
-        return self.mappings
-
-    def get_executor(self) -> Executor:
-        pass
-
-    def __str__(self):
-        return "Platforms: {}, Mappings: {}".format(str(self.platforms), str(self.mappings))
-
-    def __repr__(self):
-        return self.__str__()
diff --git a/python/src/pywy/core/translator.py b/python/src/pywy/core/translator.py
deleted file mode 100644
index 7c0b99be..00000000
--- a/python/src/pywy/core/translator.py
+++ /dev/null
@@ -1,58 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
-
-from pywy.graph.types import (WGraphOfVec, NodeVec)
-from pywy.core.plugin import Plugin
-from pywy.core.plan import PywyPlan
-from pywy.core.mapping import Mapping
-
-
-class Translator:
-
-    plugin: Plugin
-    plan: PywyPlan
-
-    def __init__(self, plugin: Plugin, plan: PywyPlan):
-        self.plugin = plugin
-        self.plan = plan
-
-    def translate(self):
-        mappings: Mapping = self.plugin.get_mappings()
-        graph = WGraphOfVec(self.plan.sinks)
-
-        def translate2plugin(current_op: NodeVec, next_op: NodeVec):
-            if current_op is None:
-                return
-
-            if current_op.current[1] is None:
-                current_op.current[1] = mappings.get_instanceof(current_op.current[0])
-
-            if next_op is None:
-                return
-            if next_op.current[1] is None:
-                next_op.current[1] = mappings.get_instanceof(next_op.current[0])
-
-            # TODO not necesary it it 0
-            current_op.current[1].connect(0, next_op.current[1], 0)
-
-        graph.traversal(graph.starting_nodes, translate2plugin)
-
-        node = []
-        for elem in graph.starting_nodes:
-            node.append(elem.current[1])
-
-        return PywyPlan({self.plugin}, node)
diff --git a/python/src/pywy/dataquanta.py b/python/src/pywy/dataquanta.py
index 6e811cb1..c0836dfb 100644
--- a/python/src/pywy/dataquanta.py
+++ b/python/src/pywy/dataquanta.py
@@ -17,12 +17,10 @@
 
 from typing import Set, List, cast
 
-from pywy.core import Translator
+from pywy.core.core import Plugin, PywyPlan
 from pywy.operators.base import PO_T
 from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableOut, T, In, Out)
 from pywy.operators import *
-from pywy.core import PywyPlan
-from pywy.core import Plugin
 
 
 class WayangContext:
@@ -94,13 +92,7 @@ class DataQuanta(GenericTco):
                 )
             )
         ]
-        plan = PywyPlan(self.context.plugins, last)
-
-        plug = self.context.plugins.pop()
-        trs: Translator = Translator(plug, plan)
-        new_plan = trs.translate()
-        plug.get_executor().execute(new_plan)
-        # TODO add the logic to execute the plan
+        PywyPlan(self.context.plugins, last).execute()
 
     def _connect(self, op: PO_T, port_op: int = 0) -> PywyOperator:
         self.operator.connect(0, op, port_op)
diff --git a/python/src/pywy/operators/base.py b/python/src/pywy/operators/base.py
index 59aabf2a..14620c40 100644
--- a/python/src/pywy/operators/base.py
+++ b/python/src/pywy/operators/base.py
@@ -38,7 +38,9 @@ class PywyOperator:
                  input_type: TypeVar = None,
                  output_type: TypeVar = None,
                  input_length: Optional[int] = 1,
-                 output_length: Optional[int] = 1
+                 output_length: Optional[int] = 1,
+                 *args,
+                 **kwargs
                  ):
         self.name = (self.prefix() + name + self.postfix()).strip()
         self.inputSlot = [input_type]
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py b/python/src/pywy/platforms/jvm/context.py
similarity index 67%
copy from python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
copy to python/src/pywy/platforms/jvm/context.py
index 0049c5c2..100016dd 100644
--- a/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
+++ b/python/src/pywy/platforms/jvm/context.py
@@ -15,19 +15,19 @@
 #  limitations under the License.
 #
 
-from typing import List, Type
-
-from pywy.core.channel import CH_T
-from pywy.operators.base import PywyOperator
+from pywy.core.core import TranslateContext
+from pywy.platforms.jvm.serializable.plan_writter import PlanWritter
 from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator
 
 
-class JVMExecutionOperator(PywyOperator):
+class JVMTranslateContext(TranslateContext):
+    plan_writer: PlanWritter
 
-    dispatch_operator: WayangJVMOperator
+    def __init__(self):
+        self.plan_writer = PlanWritter()
 
-    def prefix(self) -> str:
-        return 'JVM'
+    def add_operator(self, op: WayangJVMOperator):
+        self.plan_writer.add_operator(op)
 
-    def execute(self, inputs: List[Type[CH_T]], output: List[CH_T]):
-        pass
+    def generate_request(self):
+        self.plan_writer.send_message_to_wayang()
diff --git a/python/src/pywy/platforms/jvm/execution.py b/python/src/pywy/platforms/jvm/execution.py
index 2dad76d6..b64973ef 100644
--- a/python/src/pywy/platforms/jvm/execution.py
+++ b/python/src/pywy/platforms/jvm/execution.py
@@ -20,6 +20,8 @@ from pywy.core import PywyPlan
 from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR
 from pywy.platforms.jvm.graph import NodeDispatch, WGraphDispatch
 from pywy.platforms.jvm.operator import JVMExecutionOperator
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator
+
 
 class JVMExecutor(Executor):
 
@@ -84,16 +86,8 @@ class JVMExecutor(Executor):
 
         graph.traversal(graph.starting_nodes, execute)
 
+        magic: JVMExecutionOperator = graph.starting_nodes[0].current
 
-
-        starting: WayangJVMOperator = graph.starting_nodes[0].current.dispatch_operator
-        while starting.previous[0]:
-            print(starting)
-            #print(starting.nexts[0])
-            starting = starting.previous[0]
-            if len(starting.previous) == 0 :
-                break
-        print(starting)
-
+        magic.translate_context.generate_request()
 
 
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py b/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
index 0049c5c2..8bc137a7 100644
--- a/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
+++ b/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
@@ -19,6 +19,7 @@ from typing import List, Type
 
 from pywy.core.channel import CH_T
 from pywy.operators.base import PywyOperator
+from pywy.platforms.jvm.context import JVMTranslateContext
 from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator
 
 
@@ -26,6 +27,20 @@ class JVMExecutionOperator(PywyOperator):
 
     dispatch_operator: WayangJVMOperator
 
+    translate_context: JVMTranslateContext
+
+    def set_context(self, **kwargs):
+        if 'translate_context' not in kwargs:
+            return
+        self.translate_context = kwargs['translate_context']
+
+    def close_operator(self, op: WayangJVMOperator):
+        if self.translate_context is None:
+            return
+
+        self.translate_context.add_operator(op)
+
+
     def prefix(self) -> str:
         return 'JVM'
 
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py b/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py
index 555ba048..a047de41 100644
--- a/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py
+++ b/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py
@@ -27,11 +27,12 @@ from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMTextFil
 
 class JVMTextFileSinkOperator(TextFileSink, JVMExecutionOperator):
 
-    def __init__(self, origin: TextFileSink = None):
+    def __init__(self, origin: TextFileSink = None, **kwargs):
         path = None if origin is None else origin.path
         type_class = None if origin is None else origin.inputSlot[0]
         end_line = None if origin is None else origin.end_line
         super().__init__(path, type_class, end_line)
+        self.set_context(**kwargs)
 
     def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
         self.validate_channels(inputs, outputs)
@@ -45,6 +46,9 @@ class JVMTextFileSinkOperator(TextFileSink, JVMExecutionOperator):
 
             operator.connect_to(0, sink, 0)
 
+            self.close_operator(operator)
+            self.close_operator(sink)
+
             self.dispatch_operator = sink
         else:
             raise Exception("Channel Type does not supported")
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py b/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py
index 3ca3b911..030c702f 100644
--- a/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py
+++ b/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py
@@ -21,15 +21,15 @@ from pywy.core.channel import (CH_T, ChannelDescriptor)
 from pywy.operators.source import TextFileSource
 from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR, DispatchableChannel
 from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator
-from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMTextFileSource
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMTextFileSource, WayangJVMOperator
 
 
 class JVMTextFileSourceOperator(TextFileSource, JVMExecutionOperator):
 
-    def __init__(self, origin: TextFileSource = None):
+    def __init__(self, origin: TextFileSource = None, **kwargs):
         path = None if origin is None else origin.path
         super().__init__(path)
-        pass
+        self.set_context(**kwargs)
 
     def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
         self.validate_channels(inputs, outputs)
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py b/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py
index 06ba9e16..90da7f84 100644
--- a/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py
+++ b/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py
@@ -29,10 +29,10 @@ from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMMappart
 
 class JVMFilterOperator(FilterOperator, JVMExecutionOperator):
 
-    def __init__(self, origin: FilterOperator = None):
+    def __init__(self, origin: FilterOperator = None, **kwargs):
         predicate = None if origin is None else origin.predicate
         super().__init__(predicate)
-        pass
+        self.set_context(**kwargs)
 
     def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
         self.validate_channels(inputs, outputs)
@@ -60,6 +60,7 @@ class JVMFilterOperator(FilterOperator, JVMExecutionOperator):
             current: WayangJVMMappartitionOperator = WayangJVMMappartitionOperator(self.name)
             # TODO check for the case where the index matter
             op.connect_to(0, current, 0)
+            self.close_operator(op)
             py_out_dispatch_channel.accept_dispatchable(current)
 
         else:
diff --git a/python/src/pywy/platforms/jvm/plugin.py b/python/src/pywy/platforms/jvm/plugin.py
index eff1e04b..eb52b7af 100644
--- a/python/src/pywy/platforms/jvm/plugin.py
+++ b/python/src/pywy/platforms/jvm/plugin.py
@@ -16,7 +16,8 @@
 #
 
 from pywy.core import Executor
-from pywy.core import Plugin
+from pywy.core.core import Plugin
+from pywy.platforms.jvm.context import JVMTranslateContext
 from pywy.platforms.jvm.execution import JVMExecutor
 from pywy.platforms.jvm.mappings import JVM_OPERATOR_MAPPINGS
 from pywy.platforms.jvm.platform import JVMPlatform
@@ -25,7 +26,7 @@ from pywy.platforms.jvm.platform import JVMPlatform
 class JVMPlugin(Plugin):
 
     def __init__(self):
-        super(JVMPlugin, self).__init__({JVMPlatform()}, JVM_OPERATOR_MAPPINGS)
+        super(JVMPlugin, self).__init__({JVMPlatform()}, JVM_OPERATOR_MAPPINGS, JVMTranslateContext())
 
     def get_executor(self) -> Executor:
         return JVMExecutor()
diff --git a/python/src/pywy/platforms/jvm/serializable/plan_writter.py b/python/src/pywy/platforms/jvm/serializable/plan_writter.py
new file mode 100644
index 00000000..17929181
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/serializable/plan_writter.py
@@ -0,0 +1,131 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+from typing import Set
+
+import pywy.platforms.jvm.serializable.pywayangplan_pb2 as pwb
+import os
+import cloudpickle
+import logging
+import requests
+import base64
+
+
+# Writes Wayang Plan from several stages
+from pywy.exception import PywyException
+from pywy.operators import SinkOperator
+from pywy.operators.source import SourceUnaryOperator
+from pywy.operators.unary import UnaryToUnaryOperator
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator, WayangJVMTextFileSink
+
+
+class PlanWritter:
+
+    def __init__(self):
+        self.originals: Set[WayangJVMOperator] = set()
+
+    def add_operator(self, operator: WayangJVMOperator):
+        self.originals.add(operator)
+
+    def add_proto_unary_operator(self, operator: WayangJVMOperator):
+        op = pwb.OperatorProto()
+        op.id = str(operator.name)
+        op.type = operator.kind
+        op.udf = cloudpickle.dumps(operator.udf)
+        op.path = str(None)
+        return op
+
+    def add_proto_source_operator(self, operator: WayangJVMOperator):
+        source = pwb.OperatorProto()
+        source.id = str(operator.name)
+        source.type = operator.kind
+        source.path = os.path.abspath(operator.path)
+        source.udf = chr(0).encode('utf-8')
+        return source
+
+    def add_proto_sink_operator(self, operator: WayangJVMOperator):
+        sink = pwb.OperatorProto()
+        sink.id = str(operator.name)
+        sink.type = operator.kind
+        sink.path = os.path.abspath(operator.path)
+        sink.udf = chr(0).encode('utf-8')
+        return sink
+
+    def send_message_to_wayang(self):
+        connections = {}
+        sources = []
+        sinks = []
+        operators = []
+        for operator in self.originals:
+            if not operator.is_unary():
+                raise PywyException(
+                    "the not unary operator are not supported".format(
+                        type(operator),
+                        operator
+                    )
+                )
+            if operator.is_operator():
+                connections[operator] = self.add_proto_unary_operator(operator)
+                operators.append(connections[operator])
+            elif operator.is_source():
+                connections[operator] = self.add_proto_source_operator(operator)
+                sources.append(connections[operator])
+            elif operator.is_sink():
+                connections[operator] = self.add_proto_sink_operator(operator)
+                sinks.append(connections[operator])
+            else:
+                raise PywyException(
+                    "the type {} for the operator {} is not supported {}".format(
+                        type(operator),
+                        operator,
+                        WayangJVMTextFileSink.mro()
+                    )
+                )
+        for operator in self.originals:
+            current = connections[operator]
+            for ele in operator.previous:
+                current.predecessors.append(connections.get(ele).id)
+            for ele in operator.nexts:
+                current.successors.append(connections.get(ele).id)
+
+        plan_configuration = pwb.WayangPlanProto()
+
+        plan = pwb.PlanProto()
+        plan.sources.extend(sources)
+        plan.operators.extend(operators)
+        plan.sinks.extend(sinks)
+        plan.input = pwb.PlanProto.string
+        plan.output = pwb.PlanProto.string
+
+        ctx = pwb.ContextProto()
+        ctx.platforms.extend([pwb.ContextProto.PlatformProto.java])
+
+        plan_configuration.plan.CopyFrom(plan)
+        plan_configuration.context.CopyFrom(ctx)
+
+        print("plan!")
+        print(plan_configuration)
+
+        msg_bytes = plan_configuration.SerializeToString()
+        msg_64 = base64.b64encode(msg_bytes)
+
+        logging.debug(msg_bytes)
+        # response = requests.get("http://localhost:8080/plan/create/fromfile")
+        data = {
+            'message': msg_64
+        }
+        response = requests.post("http://localhost:8080/plan/create", data)
+        logging.debug(response)
diff --git a/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py b/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py
index d18f155b..ab3a1a4e 100644
--- a/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py
+++ b/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py
@@ -71,6 +71,18 @@ class WayangJVMOperator:
             self.nexts
         )
 
+    def is_source(self):
+        return False
+
+    def is_sink(self):
+        return False
+
+    def is_unary(self):
+        return True
+
+    def is_operator(self):
+        return False
+
 WJO_T = TypeVar('WJO_T', bound=WayangJVMOperator)
 
 
@@ -80,6 +92,8 @@ class WayangJVMMappartitionOperator(WayangJVMOperator):
         super().__init__("MapPartitionOperator", name)
         self.udf = udf
 
+    def is_operator(self):
+        return True
 
 class WayangJVMTextFileSource(WayangJVMOperator):
 
@@ -87,6 +101,8 @@ class WayangJVMTextFileSource(WayangJVMOperator):
         super().__init__("TextFileSource", name)
         self.path = path
 
+    def is_source(self):
+        return True
 
 class WayangJVMTextFileSink(WayangJVMOperator):
 
@@ -94,3 +110,5 @@ class WayangJVMTextFileSink(WayangJVMOperator):
         super().__init__("TextFileSink", name)
         self.path = path
 
+    def is_sink(self):
+        return True
diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
index 7d8eec1b..bccab733 100644
--- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
@@ -28,7 +28,7 @@ from pywy.platforms.python.channels import (
 
 class PyTextFileSinkOperator(TextFileSink, PyExecutionOperator):
 
-    def __init__(self, origin: TextFileSink = None):
+    def __init__(self, origin: TextFileSink = None, **kwargs):
         path = None if origin is None else origin.path
         type_class = None if origin is None else origin.inputSlot[0]
         end_line = None if origin is None else origin.end_line
diff --git a/python/src/pywy/platforms/python/operator/py_source_textfile.py b/python/src/pywy/platforms/python/operator/py_source_textfile.py
index 245d090d..f5651e79 100644
--- a/python/src/pywy/platforms/python/operator/py_source_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_source_textfile.py
@@ -28,7 +28,7 @@ from pywy.platforms.python.channels import (
 
 class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator):
 
-    def __init__(self, origin: TextFileSource = None):
+    def __init__(self, origin: TextFileSource = None, **kwargs):
         path = None if origin is None else origin.path
         super().__init__(path)
         pass
diff --git a/python/src/pywy/platforms/python/operator/py_unary_filter.py b/python/src/pywy/platforms/python/operator/py_unary_filter.py
index 2d807282..200f85f1 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_filter.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_filter.py
@@ -32,7 +32,7 @@ from pywy.platforms.python.channels import (
 
 class PyFilterOperator(FilterOperator, PyExecutionOperator):
 
-    def __init__(self, origin: FilterOperator = None):
+    def __init__(self, origin: FilterOperator = None, **kwargs):
         predicate = None if origin is None else origin.predicate
         super().__init__(predicate)
         pass
diff --git a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
index 72016a8c..43a02465 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
@@ -33,7 +33,7 @@ from pywy.platforms.python.channels import (
 
 class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator):
 
-    def __init__(self, origin: FlatmapOperator = None):
+    def __init__(self, origin: FlatmapOperator = None, **kwargs):
         fm_function = None if origin is None else origin.fm_function
         super().__init__(fm_function)
 
diff --git a/python/src/pywy/platforms/python/operator/py_unary_map.py b/python/src/pywy/platforms/python/operator/py_unary_map.py
index a8e53a48..54418d46 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_map.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_map.py
@@ -32,7 +32,7 @@ from pywy.platforms.python.channels import (
 
 class PyMapOperator(MapOperator, PyExecutionOperator):
 
-    def __init__(self, origin: MapOperator = None):
+    def __init__(self, origin: MapOperator = None, **kwargs):
         function = None if origin is None else origin.function
         super().__init__(function)
         pass
diff --git a/python/src/pywy/tests/integration/jvm_platform_test.py b/python/src/pywy/tests/integration/jvm_platform_test.py
index 987e7334..c3bb3e39 100644
--- a/python/src/pywy/tests/integration/jvm_platform_test.py
+++ b/python/src/pywy/tests/integration/jvm_platform_test.py
@@ -80,7 +80,6 @@ class TestIntegrationJVMPlatform(unittest.TestCase):
         self.assertEqual(lines_filter, lines_platform)
 
     def test_grep(self):
-
         dq, path_tmp, pre = self.seed_small_grep(self.file_10e0)
 
         dq.store_textfile(path_tmp)


[incubator-wayang] 03/04: [WAYANG-#211] seed structure for JVM-Platform

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch wayang-211
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 8ec4cc82d651014affdcdbb53c31b24c33ca7912
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Mon Apr 11 18:40:46 2022 +0200

    [WAYANG-#211] seed structure for JVM-Platform
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/src/pywy/platforms/commons/channels.py      |   6 +
 .../pywy/{plugins.py => platforms/jvm/__init__.py} |  11 --
 .../pywy/platforms/{commons => jvm}/channels.py    |  46 +++----
 python/src/pywy/platforms/jvm/execution.py         |  99 ++++++++++++++
 python/src/pywy/platforms/jvm/graph.py             |  52 ++++++++
 .../pywy/{plugins.py => platforms/jvm/mappings.py} |  17 ++-
 .../jvm/operator/__init__.py}                      |  19 +--
 .../jvm/operator/jvm_execution_operator.py}        |  24 ++--
 .../platforms/jvm/operator/jvm_sink_textfile.py    |  56 ++++++++
 .../platforms/jvm/operator/jvm_source_textfile.py  |  48 +++++++
 .../platforms/jvm/operator/jvm_unary_filter.py     |  72 +++++++++++
 .../pywy/{plugins.py => platforms/jvm/platform.py} |  13 +-
 .../pywy/{plugins.py => platforms/jvm/plugin.py}   |  20 +--
 .../jvm/serializable/__init__.py}                  |  11 --
 .../jvm/serializable/wayang_jvm_operator.py        |  96 ++++++++++++++
 python/src/pywy/plugins.py                         |   2 +
 .../pywy/tests/integration/jvm_platform_test.py    | 144 +++++++++++++++++++++
 17 files changed, 642 insertions(+), 94 deletions(-)

diff --git a/python/src/pywy/platforms/commons/channels.py b/python/src/pywy/platforms/commons/channels.py
index 743a7169..57cebeb1 100644
--- a/python/src/pywy/platforms/commons/channels.py
+++ b/python/src/pywy/platforms/commons/channels.py
@@ -17,6 +17,7 @@
 
 from typing import Callable
 from pywy.core import (Channel, ChannelDescriptor)
+from pywy.exception import PywyException
 
 
 class CommonsCallableChannel(Channel):
@@ -35,6 +36,11 @@ class CommonsCallableChannel(Channel):
 
     @staticmethod
     def concatenate(function_a: Callable, function_b: Callable):
+        if function_a is None:
+            raise PywyException("the function_a can't be None")
+        if function_b is None:
+            return function_a
+
         def executable(iterable):
             return function_a(function_b(iterable))
         return executable
diff --git a/python/src/pywy/plugins.py b/python/src/pywy/platforms/jvm/__init__.py
similarity index 70%
copy from python/src/pywy/plugins.py
copy to python/src/pywy/platforms/jvm/__init__.py
index e3509b68..d9e26de2 100644
--- a/python/src/pywy/plugins.py
+++ b/python/src/pywy/platforms/jvm/__init__.py
@@ -14,14 +14,3 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 #
-
-from pywy.core.platform import Platform
-from pywy.core import Plugin
-from pywy.platforms.python.plugin import PythonPlugin
-
-# define the basic plugins that can be used
-JAVA = Plugin({Platform('java')})
-SPARK = Plugin({Platform('spark')})
-FLINK = Plugin({Platform('flink')})
-# plugin for the python platform
-PYTHON = PythonPlugin()
diff --git a/python/src/pywy/platforms/commons/channels.py b/python/src/pywy/platforms/jvm/channels.py
similarity index 51%
copy from python/src/pywy/platforms/commons/channels.py
copy to python/src/pywy/platforms/jvm/channels.py
index 743a7169..9c1e76f2 100644
--- a/python/src/pywy/platforms/commons/channels.py
+++ b/python/src/pywy/platforms/jvm/channels.py
@@ -14,46 +14,34 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 #
-
 from typing import Callable
+
 from pywy.core import (Channel, ChannelDescriptor)
+from pywy.exception import PywyException
+from pywy.platforms.commons.channels import CommonsCallableChannel
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator, WayangJVMMappartitionOperator
 
 
-class CommonsCallableChannel(Channel):
+class DispatchableChannel(CommonsCallableChannel):
 
-    udf: Callable
+    operator: WayangJVMOperator
 
     def __init__(self):
         Channel.__init__(self)
+        self.udf = None
+        self.operator = None
 
-    def provide_callable(self) -> Callable:
-        return self.udf
-
-    def accept_callable(self, udf: Callable) -> 'CommonsCallableChannel':
-        self.udf = udf
-        return self
-
-    @staticmethod
-    def concatenate(function_a: Callable, function_b: Callable):
-        def executable(iterable):
-            return function_a(function_b(iterable))
-        return executable
-
-
-class CommonsFileChannel(Channel):
-
-    path: str
-
-    def __init__(self):
-        Channel.__init__(self)
+    def provide_dispatchable(self, do_wrapper: bool = False) -> WayangJVMOperator:
+        if self.operator is None:
+            raise PywyException("The operator was not define")
+        if do_wrapper:
+            self.operator.udf = self.udf
 
-    def provide_path(self) -> str:
-        return self.path
+        return self.operator
 
-    def accept_path(self, path: str) -> 'PyIteratorChannel':
-        self.path = path
+    def accept_dispatchable(self, operator: WayangJVMOperator) -> 'WayangJVMOperator':
+        self.operator = operator
         return self
 
 
-COMMONS_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsCallableChannel()), False, False)
-COMMONS_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsFileChannel()), False, False)
+DISPATCHABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(DispatchableChannel()), True, True)
diff --git a/python/src/pywy/platforms/jvm/execution.py b/python/src/pywy/platforms/jvm/execution.py
new file mode 100644
index 00000000..2dad76d6
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/execution.py
@@ -0,0 +1,99 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from pywy.core import Executor, ChannelDescriptor
+from pywy.core import PywyPlan
+from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR
+from pywy.platforms.jvm.graph import NodeDispatch, WGraphDispatch
+from pywy.platforms.jvm.operator import JVMExecutionOperator
+
+class JVMExecutor(Executor):
+
+    def __init__(self):
+        super(JVMExecutor, self).__init__()
+
+    def execute(self, plan):
+        pywyPlan: PywyPlan = plan
+        graph = WGraphDispatch(pywyPlan.sinks)
+
+        # TODO get this information by a configuration and ideally by the context
+        descriptor_default: ChannelDescriptor = DISPATCHABLE_CHANNEL_DESCRIPTOR
+
+        def execute(op_current: NodeDispatch, op_next: NodeDispatch):
+            if op_current is None:
+                return
+
+            jvm_current: JVMExecutionOperator = op_current.current
+            if jvm_current.outputs == 0:
+                jvm_current.execute(jvm_current.inputChannel, [])
+                return
+
+            if op_next is None:
+                return
+
+            jvm_next: JVMExecutionOperator = op_next.current
+            outputs = jvm_current.get_output_channeldescriptors()
+            inputs = jvm_next.get_input_channeldescriptors()
+
+            intersect = outputs.intersection(inputs)
+            if len(intersect) == 0:
+                raise Exception(
+                    "The operator(A) {} can't connect with (B) {}, "
+                    "because the output of (A) is {} and the input of (B) is {} ".format(
+                        jvm_current,
+                        jvm_next,
+                        outputs,
+                        inputs
+                    )
+                )
+
+            if len(intersect) > 1:
+                if descriptor_default is None:
+                    raise Exception(
+                        "The interaction between the operator (A) {} and (B) {}, "
+                        "can't be decided because are several channel availables {}".format(
+                            jvm_current,
+                            jvm_next,
+                            intersect
+                        )
+                    )
+                descriptor = descriptor_default
+            else:
+                descriptor = intersect.pop()
+
+            # TODO validate if is valite for several output
+            jvm_current.outputChannel[0] = descriptor.create_instance()
+
+            jvm_current.execute(jvm_current.inputChannel, jvm_current.outputChannel)
+
+            jvm_next.inputChannel = jvm_current.outputChannel
+
+        graph.traversal(graph.starting_nodes, execute)
+
+
+
+        starting: WayangJVMOperator = graph.starting_nodes[0].current.dispatch_operator
+        while starting.previous[0]:
+            print(starting)
+            #print(starting.nexts[0])
+            starting = starting.previous[0]
+            if len(starting.previous) == 0 :
+                break
+        print(starting)
+
+
+
diff --git a/python/src/pywy/platforms/jvm/graph.py b/python/src/pywy/platforms/jvm/graph.py
new file mode 100644
index 00000000..78636517
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/graph.py
@@ -0,0 +1,52 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+from typing import Iterable, List, Tuple
+
+from pywy.graph.graph import WayangGraph, GraphNode
+from pywy.operators.base import PO_T
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WJO_T
+
+
+class NodeDispatch(GraphNode[PO_T, PO_T]):
+    wop: WJO_T
+
+    def __init__(self, op: PO_T):
+        super(NodeDispatch, self).__init__(op)
+
+    def get_adjacents(self) -> List[PO_T]:
+        operator: PO_T = self.current
+        if operator is None or operator.inputs == 0:
+            return []
+        return operator.inputOperator
+
+    def build_node(self, t: PO_T) -> 'NodeDispatch':
+        return NodeDispatch(t)
+
+    def __str__(self):
+        return "NodeDispatch {}".format(self.current)
+
+    def __repr__(self):
+        return self.__str__()
+
+
+class WGraphDispatch(WayangGraph[PO_T, NodeDispatch]):
+
+    def __init__(self, nodes: Iterable[PO_T]):
+        super(WGraphDispatch, self).__init__(nodes)
+
+    def build_node(self, t: PO_T) -> NodeDispatch:
+        return NodeDispatch(t)
diff --git a/python/src/pywy/plugins.py b/python/src/pywy/platforms/jvm/mappings.py
similarity index 70%
copy from python/src/pywy/plugins.py
copy to python/src/pywy/platforms/jvm/mappings.py
index e3509b68..4709a735 100644
--- a/python/src/pywy/plugins.py
+++ b/python/src/pywy/platforms/jvm/mappings.py
@@ -15,13 +15,12 @@
 #  limitations under the License.
 #
 
-from pywy.core.platform import Platform
-from pywy.core import Plugin
-from pywy.platforms.python.plugin import PythonPlugin
+from pywy.core import Mapping
+from pywy.platforms.jvm.operator import *
 
-# define the basic plugins that can be used
-JAVA = Plugin({Platform('java')})
-SPARK = Plugin({Platform('spark')})
-FLINK = Plugin({Platform('flink')})
-# plugin for the python platform
-PYTHON = PythonPlugin()
+
+JVM_OPERATOR_MAPPINGS = Mapping()
+
+JVM_OPERATOR_MAPPINGS.add_mapping(JVMFilterOperator())
+JVM_OPERATOR_MAPPINGS.add_mapping(JVMTextFileSourceOperator())
+JVM_OPERATOR_MAPPINGS.add_mapping(JVMTextFileSinkOperator())
diff --git a/python/src/pywy/plugins.py b/python/src/pywy/platforms/jvm/operator/__init__.py
similarity index 63%
copy from python/src/pywy/plugins.py
copy to python/src/pywy/platforms/jvm/operator/__init__.py
index e3509b68..a0bc5d29 100644
--- a/python/src/pywy/plugins.py
+++ b/python/src/pywy/platforms/jvm/operator/__init__.py
@@ -15,13 +15,14 @@
 #  limitations under the License.
 #
 
-from pywy.core.platform import Platform
-from pywy.core import Plugin
-from pywy.platforms.python.plugin import PythonPlugin
+from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator
+from pywy.platforms.jvm.operator.jvm_unary_filter import JVMFilterOperator
+from pywy.platforms.jvm.operator.jvm_source_textfile import JVMTextFileSourceOperator
+from pywy.platforms.jvm.operator.jvm_sink_textfile import JVMTextFileSinkOperator
 
-# define the basic plugins that can be used
-JAVA = Plugin({Platform('java')})
-SPARK = Plugin({Platform('spark')})
-FLINK = Plugin({Platform('flink')})
-# plugin for the python platform
-PYTHON = PythonPlugin()
+__ALL__ = [
+    JVMExecutionOperator,
+    JVMFilterOperator,
+    JVMTextFileSourceOperator,
+    JVMTextFileSinkOperator,
+]
diff --git a/python/src/pywy/plugins.py b/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
similarity index 65%
copy from python/src/pywy/plugins.py
copy to python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
index e3509b68..0049c5c2 100644
--- a/python/src/pywy/plugins.py
+++ b/python/src/pywy/platforms/jvm/operator/jvm_execution_operator.py
@@ -15,13 +15,19 @@
 #  limitations under the License.
 #
 
-from pywy.core.platform import Platform
-from pywy.core import Plugin
-from pywy.platforms.python.plugin import PythonPlugin
+from typing import List, Type
 
-# define the basic plugins that can be used
-JAVA = Plugin({Platform('java')})
-SPARK = Plugin({Platform('spark')})
-FLINK = Plugin({Platform('flink')})
-# plugin for the python platform
-PYTHON = PythonPlugin()
+from pywy.core.channel import CH_T
+from pywy.operators.base import PywyOperator
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMOperator
+
+
+class JVMExecutionOperator(PywyOperator):
+
+    dispatch_operator: WayangJVMOperator
+
+    def prefix(self) -> str:
+        return 'JVM'
+
+    def execute(self, inputs: List[Type[CH_T]], output: List[CH_T]):
+        pass
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py b/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py
new file mode 100644
index 00000000..555ba048
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/operator/jvm_sink_textfile.py
@@ -0,0 +1,56 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from typing import Set, List, Type
+
+from pywy.core.channel import (CH_T, ChannelDescriptor)
+from pywy.exception import PywyException
+from pywy.operators.sink import TextFileSink
+from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR, DispatchableChannel
+from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMTextFileSink
+
+
+class JVMTextFileSinkOperator(TextFileSink, JVMExecutionOperator):
+
+    def __init__(self, origin: TextFileSink = None):
+        path = None if origin is None else origin.path
+        type_class = None if origin is None else origin.inputSlot[0]
+        end_line = None if origin is None else origin.end_line
+        super().__init__(path, type_class, end_line)
+
+    def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
+        self.validate_channels(inputs, outputs)
+
+        if isinstance(inputs[0], DispatchableChannel):
+
+            py_in_dispatch_channel: DispatchableChannel = inputs[0]
+            operator = py_in_dispatch_channel.provide_dispatchable(do_wrapper=True)
+
+            sink: WayangJVMTextFileSink = WayangJVMTextFileSink(self.name, self.path)
+
+            operator.connect_to(0, sink, 0)
+
+            self.dispatch_operator = sink
+        else:
+            raise Exception("Channel Type does not supported")
+
+    def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
+        return {DISPATCHABLE_CHANNEL_DESCRIPTOR}
+
+    def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
+        raise Exception("The JVMTextFileSink does not support Output Channels")
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py b/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py
new file mode 100644
index 00000000..3ca3b911
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/operator/jvm_source_textfile.py
@@ -0,0 +1,48 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from typing import Set, List, Type
+
+from pywy.core.channel import (CH_T, ChannelDescriptor)
+from pywy.operators.source import TextFileSource
+from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR, DispatchableChannel
+from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMTextFileSource
+
+
+class JVMTextFileSourceOperator(TextFileSource, JVMExecutionOperator):
+
+    def __init__(self, origin: TextFileSource = None):
+        path = None if origin is None else origin.path
+        super().__init__(path)
+        pass
+
+    def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
+        self.validate_channels(inputs, outputs)
+        if isinstance(outputs[0], DispatchableChannel):
+            py_out_dispatch_channel: DispatchableChannel = outputs[0]
+            py_out_dispatch_channel.accept_dispatchable(
+                WayangJVMTextFileSource(self.name, self.path)
+            )
+        else:
+            raise Exception("Channel Type does not supported")
+
+    def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
+        raise Exception("The JVMTextFileSource does not support Input Channels")
+
+    def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
+        return {DISPATCHABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py b/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py
new file mode 100644
index 00000000..06ba9e16
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/operator/jvm_unary_filter.py
@@ -0,0 +1,72 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+from typing import Set, List, Type
+
+from pywy.core.channel import CH_T, ChannelDescriptor
+from pywy.operators.unary import FilterOperator
+from pywy.platforms.jvm.channels import DISPATCHABLE_CHANNEL_DESCRIPTOR, DispatchableChannel
+from pywy.platforms.jvm.operator.jvm_execution_operator import JVMExecutionOperator
+from pywy.platforms.commons.channels import (
+    CommonsCallableChannel
+)
+from pywy.platforms.jvm.serializable.wayang_jvm_operator import WayangJVMMappartitionOperator, WayangJVMOperator
+
+
+class JVMFilterOperator(FilterOperator, JVMExecutionOperator):
+
+    def __init__(self, origin: FilterOperator = None):
+        predicate = None if origin is None else origin.predicate
+        super().__init__(predicate)
+        pass
+
+    def execute(self, inputs: List[Type[CH_T]], outputs: List[Type[CH_T]]):
+        self.validate_channels(inputs, outputs)
+        udf = self.predicate
+        if isinstance(inputs[0], DispatchableChannel):
+            py_in_dispatch_channel: DispatchableChannel = inputs[0]
+            py_out_dispatch_channel: DispatchableChannel = outputs[0]
+
+            def func(iterator):
+                return filter(udf, iterator)
+
+            py_out_dispatch_channel.accept_callable(
+                CommonsCallableChannel.concatenate(
+                    func,
+                    py_in_dispatch_channel.provide_callable()
+                )
+            )
+
+            op: WayangJVMOperator = py_in_dispatch_channel.provide_dispatchable()
+
+            if isinstance(op, WayangJVMMappartitionOperator):
+                py_out_dispatch_channel.accept_dispatchable(op)
+                return
+
+            current: WayangJVMMappartitionOperator = WayangJVMMappartitionOperator(self.name)
+            # TODO check for the case where the index matter
+            op.connect_to(0, current, 0)
+            py_out_dispatch_channel.accept_dispatchable(current)
+
+        else:
+            raise Exception("Channel Type does not supported")
+
+    def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
+        return {DISPATCHABLE_CHANNEL_DESCRIPTOR}
+
+    def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
+        return {DISPATCHABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/plugins.py b/python/src/pywy/platforms/jvm/platform.py
similarity index 74%
copy from python/src/pywy/plugins.py
copy to python/src/pywy/platforms/jvm/platform.py
index e3509b68..d52ac471 100644
--- a/python/src/pywy/plugins.py
+++ b/python/src/pywy/platforms/jvm/platform.py
@@ -16,12 +16,9 @@
 #
 
 from pywy.core.platform import Platform
-from pywy.core import Plugin
-from pywy.platforms.python.plugin import PythonPlugin
 
-# define the basic plugins that can be used
-JAVA = Plugin({Platform('java')})
-SPARK = Plugin({Platform('spark')})
-FLINK = Plugin({Platform('flink')})
-# plugin for the python platform
-PYTHON = PythonPlugin()
+
+class JVMPlatform(Platform):
+
+    def __init__(self):
+        super(JVMPlatform, self).__init__("JVM")
diff --git a/python/src/pywy/plugins.py b/python/src/pywy/platforms/jvm/plugin.py
similarity index 67%
copy from python/src/pywy/plugins.py
copy to python/src/pywy/platforms/jvm/plugin.py
index e3509b68..eff1e04b 100644
--- a/python/src/pywy/plugins.py
+++ b/python/src/pywy/platforms/jvm/plugin.py
@@ -15,13 +15,17 @@
 #  limitations under the License.
 #
 
-from pywy.core.platform import Platform
+from pywy.core import Executor
 from pywy.core import Plugin
-from pywy.platforms.python.plugin import PythonPlugin
+from pywy.platforms.jvm.execution import JVMExecutor
+from pywy.platforms.jvm.mappings import JVM_OPERATOR_MAPPINGS
+from pywy.platforms.jvm.platform import JVMPlatform
 
-# define the basic plugins that can be used
-JAVA = Plugin({Platform('java')})
-SPARK = Plugin({Platform('spark')})
-FLINK = Plugin({Platform('flink')})
-# plugin for the python platform
-PYTHON = PythonPlugin()
+
+class JVMPlugin(Plugin):
+
+    def __init__(self):
+        super(JVMPlugin, self).__init__({JVMPlatform()}, JVM_OPERATOR_MAPPINGS)
+
+    def get_executor(self) -> Executor:
+        return JVMExecutor()
diff --git a/python/src/pywy/plugins.py b/python/src/pywy/platforms/jvm/serializable/__init__.py
similarity index 70%
copy from python/src/pywy/plugins.py
copy to python/src/pywy/platforms/jvm/serializable/__init__.py
index e3509b68..d9e26de2 100644
--- a/python/src/pywy/plugins.py
+++ b/python/src/pywy/platforms/jvm/serializable/__init__.py
@@ -14,14 +14,3 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 #
-
-from pywy.core.platform import Platform
-from pywy.core import Plugin
-from pywy.platforms.python.plugin import PythonPlugin
-
-# define the basic plugins that can be used
-JAVA = Plugin({Platform('java')})
-SPARK = Plugin({Platform('spark')})
-FLINK = Plugin({Platform('flink')})
-# plugin for the python platform
-PYTHON = PythonPlugin()
diff --git a/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py b/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py
new file mode 100644
index 00000000..d18f155b
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/serializable/wayang_jvm_operator.py
@@ -0,0 +1,96 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+from typing import Callable, List, TypeVar
+
+from pywy.exception import PywyException
+
+
+class WayangJVMOperator:
+
+    kind: str
+    name: str
+    path: str
+    udf: Callable
+
+    previous: List['WayangJVMOperator']
+    nexts: List['WayangJVMOperator']
+
+    def __init__(self, kind, name):
+        self.name = name
+        self.kind = kind
+        self.previous = []
+        self.nexts = []
+
+    def validate_vector(self, vect: List['WayangJVMOperator'], index: int, op: 'WayangJVMOperator' = None):
+        if op is None:
+            op = self
+
+        if vect is None or len(vect) == 0:
+            vect = [None] * (index + 1)
+
+        if len(vect) < index:
+            vect.extend([None for i in range(index + 1 - len(vect))])
+
+        if vect[index] is not None:
+            raise PywyException(
+                'the position in the index "{}" is already in use for "{}" in the operator "{}"'.format(
+                    index,
+                    vect[index],
+                    op
+                )
+            )
+
+        return vect
+
+    def connect_to(self, nexts_index: int, operator: 'WayangJVMOperator', previous_index: int) -> 'WayangJVMOperator':
+        operator.previous = self.validate_vector(operator.previous, previous_index, operator)
+        self.nexts = self.validate_vector(self.nexts, nexts_index)
+
+        self.nexts[nexts_index] = operator
+        operator.previous[previous_index] = self
+        return self
+
+    def __str__(self):
+        return "WayangJVMOperator {}, previous.[{}], nexts.[{}]".format(
+            self.name,
+            self.previous,
+            self.nexts
+        )
+
+WJO_T = TypeVar('WJO_T', bound=WayangJVMOperator)
+
+
+class WayangJVMMappartitionOperator(WayangJVMOperator):
+
+    def __init__(self, name: str, udf: Callable = None):
+        super().__init__("MapPartitionOperator", name)
+        self.udf = udf
+
+
+class WayangJVMTextFileSource(WayangJVMOperator):
+
+    def __init__(self, name: str, path: str):
+        super().__init__("TextFileSource", name)
+        self.path = path
+
+
+class WayangJVMTextFileSink(WayangJVMOperator):
+
+    def __init__(self, name: str, path: str):
+        super().__init__("TextFileSink", name)
+        self.path = path
+
diff --git a/python/src/pywy/plugins.py b/python/src/pywy/plugins.py
index e3509b68..777d2976 100644
--- a/python/src/pywy/plugins.py
+++ b/python/src/pywy/plugins.py
@@ -17,6 +17,7 @@
 
 from pywy.core.platform import Platform
 from pywy.core import Plugin
+from pywy.platforms.jvm.plugin import JVMPlugin
 from pywy.platforms.python.plugin import PythonPlugin
 
 # define the basic plugins that can be used
@@ -25,3 +26,4 @@ SPARK = Plugin({Platform('spark')})
 FLINK = Plugin({Platform('flink')})
 # plugin for the python platform
 PYTHON = PythonPlugin()
+JVMs = JVMPlugin()
diff --git a/python/src/pywy/tests/integration/jvm_platform_test.py b/python/src/pywy/tests/integration/jvm_platform_test.py
new file mode 100644
index 00000000..987e7334
--- /dev/null
+++ b/python/src/pywy/tests/integration/jvm_platform_test.py
@@ -0,0 +1,144 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+import logging
+import os
+import unittest
+import tempfile
+from itertools import chain
+from typing import List, Iterable
+
+from pywy.config import RC_TEST_DIR as ROOT
+from pywy.dataquanta import WayangContext
+from pywy.plugins import JVMs
+
+logger = logging.getLogger(__name__)
+
+
+class TestIntegrationJVMPlatform(unittest.TestCase):
+
+    file_10e0: str
+
+    def setUp(self):
+        self.file_10e0 = "{}/10e0MB.input".format(ROOT)
+
+    @staticmethod
+    def seed_small_grep(validation_file):
+        def pre(a: str) -> bool:
+            return 'six' in a
+
+        fd, path_tmp = tempfile.mkstemp()
+
+        dq = WayangContext() \
+            .register(JVMs) \
+            .textfile(validation_file) \
+            .filter(pre)
+
+        return dq, path_tmp, pre
+
+    def validate_files(self,
+                       validation_file,
+                       outputed_file,
+                       read_and_convert_validation,
+                       read_and_convert_outputed,
+                       delete_outputed=True,
+                       print_variable=False):
+        lines_filter: List[int]
+        with open(validation_file, 'r') as f:
+            lines_filter = list(read_and_convert_validation(f))
+            selectivity = len(lines_filter)
+
+        lines_platform: List[int]
+        with open(outputed_file, 'r') as fp:
+            lines_platform = list(read_and_convert_outputed(fp))
+            elements = len(lines_platform)
+
+        if delete_outputed:
+            os.remove(outputed_file)
+
+        if print_variable:
+            logger.info(f"{lines_platform=}")
+            logger.info(f"{lines_filter=}")
+            logger.info(f"{elements=}")
+            logger.info(f"{selectivity=}")
+
+        self.assertEqual(selectivity, elements)
+        self.assertEqual(lines_filter, lines_platform)
+
+    def test_grep(self):
+
+        dq, path_tmp, pre = self.seed_small_grep(self.file_10e0)
+
+        dq.store_textfile(path_tmp)
+
+        def convert_validation(file):
+            return filter(pre, file.readlines())
+
+        def convert_outputed(file):
+            return file.readlines()
+
+        self.validate_files(
+            self.file_10e0,
+            path_tmp,
+            convert_validation,
+            convert_outputed
+        )
+
+    # def test_dummy_map(self):
+    #
+    #     def convert(a: str) -> int:
+    #         return len(a)
+    #
+    #     dq, path_tmp, pre = self.seed_small_grep(self.file_10e0)
+    #
+    #     dq.map(convert) \
+    #         .store_textfile(path_tmp)
+    #
+    #     def convert_validation(file):
+    #         return map(convert, filter(pre, file.readlines()))
+    #
+    #     def convert_outputed(file):
+    #         return map(lambda x: int(x), file.read().splitlines())
+    #
+    #     self.validate_files(
+    #         self.file_10e0,
+    #         path_tmp,
+    #         convert_validation,
+    #         convert_outputed
+    #     )
+    #
+    # def test_dummy_flatmap(self):
+    #     def fm_func(string: str) -> Iterable[str]:
+    #         return string.strip().split(" ")
+    #
+    #     dq, path_tmp, pre = self.seed_small_grep(self.file_10e0)
+    #
+    #     dq.flatmap(fm_func) \
+    #         .store_textfile(path_tmp, '\n')
+    #
+    #     def convert_validation(file):
+    #         return chain.from_iterable(map(fm_func, filter(pre, file.readlines())))
+    #
+    #     def convert_outputed(file):
+    #         return file.read().splitlines()
+    #
+    #     self.validate_files(
+    #         self.file_10e0,
+    #         path_tmp,
+    #         convert_validation,
+    #         convert_outputed
+    #     )


[incubator-wayang] 01/04: [WAYANG-#210] the documentation to pywy.core

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch wayang-211
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 80ff50db4104db1da31de86be19339e1aca3fa82
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Mon Apr 11 10:45:56 2022 +0200

    [WAYANG-#210] the documentation to pywy.core
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/src/pywy/core/channel.py  | 52 ++++++++++++++++++++++++++++++++++------
 python/src/pywy/core/executor.py | 11 +++++++++
 python/src/pywy/core/mapping.py  | 38 +++++++++++++++++++++++++++--
 python/src/pywy/core/plan.py     | 31 ++++++++++++++++++++++++
 4 files changed, 123 insertions(+), 9 deletions(-)

diff --git a/python/src/pywy/core/channel.py b/python/src/pywy/core/channel.py
index e6b488c4..45bd03cc 100644
--- a/python/src/pywy/core/channel.py
+++ b/python/src/pywy/core/channel.py
@@ -19,26 +19,64 @@ from typing import TypeVar
 
 
 class Channel:
+    """ Models the data movement between to executables :py:class:`pywy.operators.base.PywyOperator`
+
+    Channel is the structure that is used to move the data between executables
+    :py:class:`pywy.operators.base.PywyOperator` and it helps to identify the
+    compatibility between the interaction of operators
+    """
 
     def __init__(self):
         pass
 
-    def get_channel(self) -> 'Channel':
-        return self
-
     def get_type(self):
+        """ return the type of the channel
+
+        Returns
+        -------
+        :py:class:`typing.Type` of the current Channel
+        """
         return type(self)
 
 
 class ChannelDescriptor:
+    """
+
+    Attributes
+    ----------
+    channel_type : :py:class:`typing.Type`
+       Type of the :py:class:`pywy.core.Channel` that the descriptor
+       will generate
+    is_reusable : bool
+        indicates if the source for the channel is reusable for several consumer
+    is_suitable_for_breakpoint : bool
+        indicates if the element support the breakpoint strategies
+    """
 
     def __init__(self, channel_type: type, is_reusable: bool, is_suitable_for_breakpoint: bool):
-        self.channelType = channel_type
-        self.isReusable = is_reusable
-        self.isSuitableForBreakpoint = is_suitable_for_breakpoint
+        """Basic constructor of the ChannelDescriptor
+
+        Parameters
+        ----------
+        channel_type
+            Description of `channelType`.
+        is_reusable
+            Description of `is_reusable`.
+        is_suitable_for_breakpoint
+            Description of `is_suitable_for_breakpoint`.
+        """
+        self.channel_type = channel_type
+        self.is_reusable = is_reusable
+        self.is_suitable_for_breakpoint = is_suitable_for_breakpoint
 
     def create_instance(self) -> Channel:
-        return self.channelType()
+        """Generates an instance of :py:class:`pywy.core.Channel`
+
+        Returns
+        -------
+            instance of :py:class:`pywy.core.Channel`
+        """
+        return self.channel_type()
 
 
 CH_T = TypeVar('CH_T', bound=Channel)
diff --git a/python/src/pywy/core/executor.py b/python/src/pywy/core/executor.py
index a51ec564..6da77d8c 100644
--- a/python/src/pywy/core/executor.py
+++ b/python/src/pywy/core/executor.py
@@ -16,9 +16,20 @@
 #
 
 class Executor:
+    """ Executor is the responsible for execute the plan
 
+    Because in each platform the execution it will be different the plan
+    need to be executed in the different modes, and the Executor is
+    the responsible for execute in the given platform
+    """
     def __init__(self):
         pass
 
     def execute(self, plan):
+        """ execute is the method called for execute the givin plan
+
+        Returns
+        -------
+        does not return anything, but it will differ in some platforms
+        """
         pass
diff --git a/python/src/pywy/core/mapping.py b/python/src/pywy/core/mapping.py
index a6a2f631..81a57715 100644
--- a/python/src/pywy/core/mapping.py
+++ b/python/src/pywy/core/mapping.py
@@ -15,20 +15,54 @@
 #  limitations under the License.
 #
 
-from typing import Dict
+from typing import Dict, Type
 from pywy.operators.base import PywyOperator
 
 
 class Mapping:
-    mappings: Dict[str, type]
+    """Mapping between :py:class:`pywy.operators.base.PywyOperator` and the executable version in a platform
+
+    Mapping is the structure that keep the conversion between :py:class:`pywy.operators.base.PywyOperator`
+    and the executable version of the same operator in a different platforms
+
+    Attributes
+    ----------
+    mappings : :obj:`dict`
+       Mapping using the name as key to retrieve the executable operator
+
+    """
+    mappings: Dict[str, Type]
 
     def __init__(self):
+        """
+        Just instance of the :obj:`dict` to store the mappings
+        """
         self.mappings = {}
 
     def add_mapping(self, operator: PywyOperator):
+        """create the mapping for the instance of :py:class:`pywy.operators.base.PywyOperator`
+
+        Parameters
+        ----------
+        operator : :py:class:`pywy.operators.base.PywyOperator`
+            instance of :py:class:`pywy.operators.base.PywyOperator` that will be used to extract the
+            properties required to create the mapping
+        """
         self.mappings[operator.name_basic()] = type(operator)
 
     def get_instanceof(self, operator: PywyOperator):
+        """Instance the executable version of :py:class:`pywy.operators.base.PywyOperator`
+
+        Parameters
+        ----------
+        operator : :py:class:`pywy.operators.base.PywyOperator`
+            instance of the :py:class:`pywy.operators.base.PywyOperator` that needs to be
+            converted to the executable version
+        Returns
+        -------
+            executable version of :py:class:`pywy.operators.base.PywyOperator` in the
+            platform that the mapping is holding
+        """
         template = self.mappings[operator.name_basic()]
         if template is None:
             raise Exception(
diff --git a/python/src/pywy/core/plan.py b/python/src/pywy/core/plan.py
index e08884f5..48407b98 100644
--- a/python/src/pywy/core/plan.py
+++ b/python/src/pywy/core/plan.py
@@ -24,14 +24,45 @@ from pywy.core.plugin import Plugin
 
 
 class PywyPlan:
+    """A PywyPlan consists of a set of :py:class:`pywy.operators.base.PywyOperator`
+
+    the operator inside PywyPlan follow a Directed acyclic graph(DAG), and describe
+    how the execution needs to be performed
+
+    Attributes
+    ----------
+    graph : :py:class:`pywy.graph.graph.WayangGraph`
+       Graph that describe the DAG, and it provides the iterable properties to
+       the PywyPlan
+    plugins : :obj:`set` of :py:class:`pywy.core.plugin.Plugin`
+        plugins is the set of possible platforms that can be uses to execute
+        the PywyPlan
+    sinks : :py:class:`typing.Iterable` of :py:class:`pywy.operators.sink.SinkOperator`
+        The list of sink operators, this describe the end of the pipeline, and
+        they are used to build the `graph`
+    """
     graph: WayangGraph
 
     def __init__(self, plugins: Set[Plugin], sinks: Iterable[SinkOperator]):
+        """basic Constructor of PywyPlan
+
+        this constructor set the plugins and sinks element, and it prepares
+        everything for been executed
+
+        Parameters
+        ----------
+        plugins
+            Description of `plugins`.
+        sinks
+            Description of `sinks`.
+        """
         self.plugins = plugins
         self.sinks = sinks
         self.set_graph()
 
     def set_graph(self):
+        """ it builds the :py:class:`pywy.graph.graph.WayangGraph` of the current PywyPlan
+        """
         self.graph = WGraphOfVec(self.sinks)
 
     def print(self):


[incubator-wayang] 02/04: [WAYANG-#211] restructure the channels for Python-Platform

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch wayang-211
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit cecaf8ec971beafb689fa6f4344b364c88be8e15
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Mon Apr 11 11:19:11 2022 +0200

    [WAYANG-#211] restructure the channels for Python-Platform
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/src/pywy/platforms/commons/__init__.py      | 16 +++++++++
 .../pywy/platforms/{python => commons}/channels.py | 28 ++++-----------
 python/src/pywy/platforms/python/channels.py       | 40 +---------------------
 .../platforms/python/operator/py_sink_textfile.py  |  3 +-
 .../python/operator/py_source_textfile.py          |  5 ++-
 .../platforms/python/operator/py_unary_filter.py   | 27 ++++++++-------
 .../platforms/python/operator/py_unary_flatmap.py  | 27 ++++++++-------
 .../pywy/platforms/python/operator/py_unary_map.py | 27 ++++++++-------
 8 files changed, 68 insertions(+), 105 deletions(-)

diff --git a/python/src/pywy/platforms/commons/__init__.py b/python/src/pywy/platforms/commons/__init__.py
new file mode 100644
index 00000000..8d2bad81
--- /dev/null
+++ b/python/src/pywy/platforms/commons/__init__.py
@@ -0,0 +1,16 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
\ No newline at end of file
diff --git a/python/src/pywy/platforms/python/channels.py b/python/src/pywy/platforms/commons/channels.py
similarity index 66%
copy from python/src/pywy/platforms/python/channels.py
copy to python/src/pywy/platforms/commons/channels.py
index 0d0f65e6..743a7169 100644
--- a/python/src/pywy/platforms/python/channels.py
+++ b/python/src/pywy/platforms/commons/channels.py
@@ -15,26 +15,11 @@
 #  limitations under the License.
 #
 
-from typing import ( Iterable, Callable )
+from typing import Callable
 from pywy.core import (Channel, ChannelDescriptor)
 
 
-class PyIteratorChannel(Channel):
-
-    iterable: Iterable
-
-    def __init__(self):
-        Channel.__init__(self)
-
-    def provide_iterable(self) -> Iterable:
-        return self.iterable
-
-    def accept_iterable(self, iterable: Iterable) -> 'PyIteratorChannel':
-        self.iterable = iterable
-        return self
-
-
-class PyCallableChannel(Channel):
+class CommonsCallableChannel(Channel):
 
     udf: Callable
 
@@ -44,7 +29,7 @@ class PyCallableChannel(Channel):
     def provide_callable(self) -> Callable:
         return self.udf
 
-    def accept_callable(self, udf: Callable) -> 'PyCallableChannel':
+    def accept_callable(self, udf: Callable) -> 'CommonsCallableChannel':
         self.udf = udf
         return self
 
@@ -55,7 +40,7 @@ class PyCallableChannel(Channel):
         return executable
 
 
-class PyFileChannel(Channel):
+class CommonsFileChannel(Channel):
 
     path: str
 
@@ -70,6 +55,5 @@ class PyFileChannel(Channel):
         return self
 
 
-PY_ITERATOR_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyIteratorChannel()), False, False)
-PY_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyCallableChannel()), False, False)
-PY_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyFileChannel()), False, False)
+COMMONS_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsCallableChannel()), False, False)
+COMMONS_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(CommonsFileChannel()), False, False)
diff --git a/python/src/pywy/platforms/python/channels.py b/python/src/pywy/platforms/python/channels.py
index 0d0f65e6..f79a67ed 100644
--- a/python/src/pywy/platforms/python/channels.py
+++ b/python/src/pywy/platforms/python/channels.py
@@ -15,7 +15,7 @@
 #  limitations under the License.
 #
 
-from typing import ( Iterable, Callable )
+from typing import Iterable
 from pywy.core import (Channel, ChannelDescriptor)
 
 
@@ -34,42 +34,4 @@ class PyIteratorChannel(Channel):
         return self
 
 
-class PyCallableChannel(Channel):
-
-    udf: Callable
-
-    def __init__(self):
-        Channel.__init__(self)
-
-    def provide_callable(self) -> Callable:
-        return self.udf
-
-    def accept_callable(self, udf: Callable) -> 'PyCallableChannel':
-        self.udf = udf
-        return self
-
-    @staticmethod
-    def concatenate(function_a: Callable, function_b: Callable):
-        def executable(iterable):
-            return function_a(function_b(iterable))
-        return executable
-
-
-class PyFileChannel(Channel):
-
-    path: str
-
-    def __init__(self):
-        Channel.__init__(self)
-
-    def provide_path(self) -> str:
-        return self.path
-
-    def accept_path(self, path: str) -> 'PyIteratorChannel':
-        self.path = path
-        return self
-
-
 PY_ITERATOR_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyIteratorChannel()), False, False)
-PY_CALLABLE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyCallableChannel()), False, False)
-PY_FILE_CHANNEL_DESCRIPTOR = ChannelDescriptor(type(PyFileChannel()), False, False)
diff --git a/python/src/pywy/platforms/python/operator/py_sink_textfile.py b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
index 9914382f..7d8eec1b 100644
--- a/python/src/pywy/platforms/python/operator/py_sink_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_sink_textfile.py
@@ -17,11 +17,10 @@
 
 from typing import Set, List, Type
 
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
 from pywy.operators.sink import TextFileSink
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
 from pywy.platforms.python.channels import (
-    ChannelDescriptor,
     PyIteratorChannel,
     PY_ITERATOR_CHANNEL_DESCRIPTOR
 )
diff --git a/python/src/pywy/platforms/python/operator/py_source_textfile.py b/python/src/pywy/platforms/python/operator/py_source_textfile.py
index 30831880..245d090d 100644
--- a/python/src/pywy/platforms/python/operator/py_source_textfile.py
+++ b/python/src/pywy/platforms/python/operator/py_source_textfile.py
@@ -17,14 +17,13 @@
 
 from typing import Set, List, Type
 
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
 from pywy.operators.source import TextFileSource
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
 from pywy.platforms.python.channels import (
-    ChannelDescriptor,
     PyIteratorChannel,
     PY_ITERATOR_CHANNEL_DESCRIPTOR
-                                            )
+)
 
 
 class PyTextFileSourceOperator(TextFileSource, PyExecutionOperator):
diff --git a/python/src/pywy/platforms/python/operator/py_unary_filter.py b/python/src/pywy/platforms/python/operator/py_unary_filter.py
index 0788e974..2d807282 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_filter.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_filter.py
@@ -17,16 +17,17 @@
 
 from typing import Set, List, Type
 
-from pywy.core.channel import CH_T
+from pywy.core.channel import CH_T, ChannelDescriptor
 from pywy.operators.unary import FilterOperator
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.commons.channels import (
+    COMMONS_CALLABLE_CHANNEL_DESCRIPTOR,
+    CommonsCallableChannel
+)
 from pywy.platforms.python.channels import (
-                                                ChannelDescriptor,
-                                                PyIteratorChannel,
-                                                PY_ITERATOR_CHANNEL_DESCRIPTOR,
-                                                PY_CALLABLE_CHANNEL_DESCRIPTOR,
-                                                PyCallableChannel
-                                            )
+    PyIteratorChannel,
+    PY_ITERATOR_CHANNEL_DESCRIPTOR,
+)
 
 
 class PyFilterOperator(FilterOperator, PyExecutionOperator):
@@ -43,15 +44,15 @@ class PyFilterOperator(FilterOperator, PyExecutionOperator):
             py_in_iter_channel: PyIteratorChannel = inputs[0]
             py_out_iter_channel: PyIteratorChannel = outputs[0]
             py_out_iter_channel.accept_iterable(filter(udf, py_in_iter_channel.provide_iterable()))
-        elif isinstance(inputs[0], PyCallableChannel):
-            py_in_call_channel: PyCallableChannel = inputs[0]
-            py_out_call_channel: PyCallableChannel = outputs[0]
+        elif isinstance(inputs[0], CommonsCallableChannel):
+            py_in_call_channel: CommonsCallableChannel = inputs[0]
+            py_out_call_channel: CommonsCallableChannel = outputs[0]
 
             def func(iterator):
                 return filter(udf, iterator)
 
             py_out_call_channel.accept_callable(
-                PyCallableChannel.concatenate(
+                CommonsCallableChannel.concatenate(
                     func,
                     py_in_call_channel.provide_callable()
                 )
@@ -60,7 +61,7 @@ class PyFilterOperator(FilterOperator, PyExecutionOperator):
             raise Exception("Channel Type does not supported")
 
     def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
 
     def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
index 97f467d4..72016a8c 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_flatmap.py
@@ -18,16 +18,17 @@
 from itertools import chain
 from typing import Set, List, Type
 
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
 from pywy.operators.unary import FlatmapOperator
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.commons.channels import (
+    COMMONS_CALLABLE_CHANNEL_DESCRIPTOR,
+    CommonsCallableChannel
+)
 from pywy.platforms.python.channels import (
-                                                ChannelDescriptor,
-                                                PyIteratorChannel,
-                                                PY_ITERATOR_CHANNEL_DESCRIPTOR,
-                                                PY_CALLABLE_CHANNEL_DESCRIPTOR,
-                                                PyCallableChannel
-                                            )
+    PyIteratorChannel,
+    PY_ITERATOR_CHANNEL_DESCRIPTOR,
+)
 
 
 class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator):
@@ -43,15 +44,15 @@ class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator):
             py_in_iter_channel: PyIteratorChannel = inputs[0]
             py_out_iter_channel: PyIteratorChannel = outputs[0]
             py_out_iter_channel.accept_iterable(chain.from_iterable(map(udf, py_in_iter_channel.provide_iterable())))
-        elif isinstance(inputs[0], PyCallableChannel):
-            py_in_call_channel: PyCallableChannel = inputs[0]
-            py_out_call_channel: PyCallableChannel = outputs[0]
+        elif isinstance(inputs[0], CommonsCallableChannel):
+            py_in_call_channel: CommonsCallableChannel = inputs[0]
+            py_out_call_channel: CommonsCallableChannel = outputs[0]
 
             def fm_func(iterator):
                 return chain.from_iterable(map(udf, iterator))
 
             py_out_call_channel.accept_callable(
-                PyCallableChannel.concatenate(
+                CommonsCallableChannel.concatenate(
                     fm_func,
                     py_in_call_channel.provide_callable()
                 )
@@ -60,7 +61,7 @@ class PyFlatmapOperator(FlatmapOperator, PyExecutionOperator):
             raise Exception("Channel Type does not supported")
 
     def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
 
     def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
diff --git a/python/src/pywy/platforms/python/operator/py_unary_map.py b/python/src/pywy/platforms/python/operator/py_unary_map.py
index b8741a92..a8e53a48 100644
--- a/python/src/pywy/platforms/python/operator/py_unary_map.py
+++ b/python/src/pywy/platforms/python/operator/py_unary_map.py
@@ -17,16 +17,17 @@
 
 from typing import Set, List, Type
 
-from pywy.core.channel import CH_T
+from pywy.core.channel import (CH_T, ChannelDescriptor)
 from pywy.operators.unary import MapOperator
 from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator
+from pywy.platforms.commons.channels import (
+    COMMONS_CALLABLE_CHANNEL_DESCRIPTOR,
+    CommonsCallableChannel
+)
 from pywy.platforms.python.channels import (
-                                                ChannelDescriptor,
-                                                PyIteratorChannel,
-                                                PY_ITERATOR_CHANNEL_DESCRIPTOR,
-                                                PY_CALLABLE_CHANNEL_DESCRIPTOR,
-                                                PyCallableChannel
-                                            )
+    PyIteratorChannel,
+    PY_ITERATOR_CHANNEL_DESCRIPTOR,
+)
 
 
 class PyMapOperator(MapOperator, PyExecutionOperator):
@@ -43,15 +44,15 @@ class PyMapOperator(MapOperator, PyExecutionOperator):
             py_in_iter_channel: PyIteratorChannel = inputs[0]
             py_out_iter_channel: PyIteratorChannel = outputs[0]
             py_out_iter_channel.accept_iterable(map(udf, py_in_iter_channel.provide_iterable()))
-        elif isinstance(inputs[0], PyCallableChannel):
-            py_in_call_channel: PyCallableChannel = inputs[0]
-            py_out_call_channel: PyCallableChannel = outputs[0]
+        elif isinstance(inputs[0], CommonsCallableChannel):
+            py_in_call_channel: CommonsCallableChannel = inputs[0]
+            py_out_call_channel: CommonsCallableChannel = outputs[0]
 
             def func(iterator):
                 return map(udf, iterator)
 
             py_out_call_channel.accept_callable(
-                PyCallableChannel.concatenate(
+                CommonsCallableChannel.concatenate(
                     func,
                     py_in_call_channel.provide_callable()
                 )
@@ -60,7 +61,7 @@ class PyMapOperator(MapOperator, PyExecutionOperator):
             raise Exception("Channel Type does not supported")
 
     def get_input_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}
 
     def get_output_channeldescriptors(self) -> Set[ChannelDescriptor]:
-        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, PY_CALLABLE_CHANNEL_DESCRIPTOR}
+        return {PY_ITERATOR_CHANNEL_DESCRIPTOR, COMMONS_CALLABLE_CHANNEL_DESCRIPTOR}