You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:22 UTC

[incubator-wayang] 01/32: Wayang 8 (#89)

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit edab66a194ec90f0ac948255ee681cc7d098ee90
Author: Rodrigo Pardo Meza <ro...@gmail.com>
AuthorDate: Mon Mar 21 14:03:00 2022 +0100

    Wayang 8 (#89)
    
    * [WAYANG-8][API-PYTHON] Creation of functions to be consumed by MapPartitionsDescriptor
    
    * [WAYANG-8][API-PYTHON] Included PythonProcessCaller that manages the python process execution and Java - Python connection
    
    * [WAYANG-8][API-PYTHON] POM fixes plus minor test
    
    * [WAYANG-8][API-PYTHON] Python connection through TCP socket enabled
    
    * [WAYANG-8][API-PYTHON] Writing from Java to Python. Not taking into care about Iterator Datatypes.
    
    * [WAYANG-8][API-PYTHON] Java Socket Writter improvements
    
    * [WAYANG-8][API-PYTHON] Python UTF8 Deserializer included
    
    * [WAYANG-8][API-PYTHON] Python UTF8 Reading Stream
    
    * [WAYANG-8][API-PYTHON] Getting results from Python and continue processing
    
    * [WAYANG-8][API-PYTHON] Config files for pywayang
    
    * [WAYANG-8][API-PYTHON] Structures to save the plan with functional fashion plus most basic operators
    
    * [WAYANG-8][API-PYTHON] Main program to test plan executions locally
    
    * [WAYANG-8][API-PYTHON] Minor comments and
    TODOs
    
    * [WAYANG-8][API-PYTHON] Most basic test for protobuff communication with java
    
    * [WAYANG-8][API-PYTHON] Addjacency list from PyWayang Plan
    
    * [WAYANG-8][API-PYTHON] Graph traversal implementation with visitor pattern
    
    * [WAYANG-8][API-PYTHON] Protobuf python message generator
    
    * [WAYANG-8][API-PYTHON] Wayang Web Service project structure
    
    * [WAYANG-8][API-PYTHON] Protobuf message generation fixes
    
    * [WAYANG-8][API-PYTHON] Wayang Web Service executes most basic plans directly
    
    * [WAYANG-8][API-PYTHON] Receiving Base64 passing to byte array and unpickling
    
    * [WAYANG-8][API-PYTHON] Updated classes to process a single Serialized UDF
    
    * [WAYANG-8][API-PYTHON] New test with single UDF
    
    * [WAYANG-8][API-PYTHON] Protobuf command
    
    * [WAYANG-8][API-PYTHON] Protobuf message template updated
    
    * [WAYANG-8][API-PYTHON] POM fixes
    
    * [WAYANG-8][API-PYTHON] License comments added
    
    * [WAYANG-8][API-PYTHON] Correction on missing licenses
    
    * [WAYANG-8][API-PYTHON] Serializable module creation
    
    * [WAYANG-8][API-PYTHON] adding protoc to travis
    
    * [WAYANG-8][API-PYTHON] protoc executable path correction
    
    * [WAYANG-8][API-PYTHON] Commenting objc_class_prefix
    
    * [WAYANG-8][API-PYTHON] Obtaining pipelines
    
    * [WAYANG-8][API-PYTHON] Dataquanta writing message
    
    * [WAYANG-8][API-PYTHON] Plan writer pipeline based adjustments
    
    * [WAYANG-8][API-PYTHON] Operator Python executable indicator
    
    * [WAYANG-8][API-PYTHON] Plan writer improved to use less sockets
    
    * [WAYANG-8][API-PYTHON] New version of Wayang protobuf message
    
    * [WAYANG-8][API-PYTHON] Wayang REST improved to allow multi pipelined executions
    
    * [WAYANG-8][API-PYTHON] More test programs
    
    * [WAYANG-8][API-PYTHON] Commentaries and logging for Graph module
    
    * [WAYANG-8][API-PYTHON] Commentaries and logging for Orchestrator module
    
    * [WAYANG-8][API-PYTHON] Commentaries and logging for Protobuf module
    
    * [WAYANG-8][API-PYTHON] Fix usage of relative paths
    
    * [WAYANG-8][API-PYTHON] Scripts to compile protobuf has been deleted. Now Maven executes them
    
    * [WAYANG-8][API-PYTHON] Execution Log configuration
    
    * [WAYANG-8][API-PYTHON] Fix - Python Map partition with single operator
    
    * [WAYANG-8][API-PYTHON] Unitary Testing preparing the Wayang Plan
    
    * [WAYANG-8][API-PYTHON] Plugin selection through Plan Descriptor
    
    * [WAYANG-8][API-PYTHON] Unitary Testing preparing the Wayang Plan with Spark Execution
    
    * [WAYANG-8][API-PYTHON] Pywayang sends protobuf message in API request as bytes using base64
    
    * [WAYANG-8][API-PYTHON] New Operators Flatmap group by, reduce and Reduce By Key. Only Python Side.
    
    * [WAYANG-8][API-PYTHON] Protobuf Wayang Plan message updated to allow more Complex Java-Python Operators
    
    * [WAYANG-8][API-PYTHON] Adding TPC-H 1st Test
    
    * [WAYANG-8][API-PYTHON] Last changes, not working
    
    * [WAYANG-8] Fixing errors with dependencies
    
    * [WAYANG-8] Fix to Pom versions problem
    
    * [WAYANG-8] Protoc path updated
    
    * [WAYANG-8] Correction in the pom.xml for flags
    
    Signed-off-by: bertty <be...@apache.org>
    
    Co-authored-by: Bertty Contreras-Rojas <be...@databloom.ai>
    Signed-off-by: bertty <be...@apache.org>
---
 pom.xml                               |   2 +
 pywayang/config/__init__.py           |  20 +++
 pywayang/config/config_reader.py      |  51 ++++++
 pywayang/config/pywayang_config.ini   |  38 ++++
 pywayang/graph/__init__.py            |  19 ++
 pywayang/graph/graph.py               |  71 ++++++++
 pywayang/graph/node.py                |  48 +++++
 pywayang/graph/traversal.py           |  51 ++++++
 pywayang/graph/visitant.py            |  52 ++++++
 pywayang/orchestrator/__init__.py     |  20 +++
 pywayang/orchestrator/dataquanta.py   | 330 ++++++++++++++++++++++++++++++++++
 pywayang/orchestrator/execdirectly.py | 162 +++++++++++++++++
 pywayang/orchestrator/main.py         | 173 ++++++++++++++++++
 pywayang/orchestrator/operator.py     | 121 +++++++++++++
 pywayang/orchestrator/plan.py         |  52 ++++++
 pywayang/protobuf/__init__.py         |  18 ++
 pywayang/protobuf/old_planwriter.py   | 308 +++++++++++++++++++++++++++++++
 pywayang/protobuf/planwriter.py       | 277 ++++++++++++++++++++++++++++
 pywayang/test/demo_testing.py         |  30 ++++
 pywayang/test/full_java_test.py       |  69 +++++++
 pywayang/test/full_spark_test.py      |  67 +++++++
 21 files changed, 1979 insertions(+)

diff --git a/pom.xml b/pom.xml
index 3ff48f95..e0ac2bda 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1248,6 +1248,8 @@
                         <exclude>**/README.md</exclude>
                         <exclude>**/general-todos.md</exclude>
                         <exclude>**/scala_1*</exclude>
+
+                        <exclude>**/*pb2.py</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git a/pywayang/config/__init__.py b/pywayang/config/__init__.py
new file mode 100644
index 00000000..008475c2
--- /dev/null
+++ b/pywayang/config/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from config.config_reader import get_source_types
+from config.config_reader import get_sink_types
+from config.config_reader import get_boundary_types
diff --git a/pywayang/config/config_reader.py b/pywayang/config/config_reader.py
new file mode 100644
index 00000000..c8f58732
--- /dev/null
+++ b/pywayang/config/config_reader.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import configparser
+import os
+
+
+def get_boundary_types():
+    config = configparser.ConfigParser()
+    config.sections()
+    config.read('../config/pywayang_config.ini')
+    boundary_types = dict(config.items('BOUNDARY_TYPES'))
+    boundary_types.pop("variable_to_access")
+    return boundary_types.values()
+
+
+def get_source_types():
+    config = configparser.ConfigParser()
+    #print("path: ", os.getcwd())
+    config.read("../config/pywayang_config.ini")
+    source_types = dict(config.items('SOURCE_TYPES'))
+    source_types.pop("variable_to_access")
+    return source_types.values()
+    #sections_list = config.sections()
+    #for section in sections_list:
+    #    print(section)
+    #print("source_types")
+    #for x in source_types.values():
+    #    print(x)
+
+def get_sink_types():
+    config = configparser.ConfigParser()
+    #print("path: ", os.getcwd())
+    config.read("../config/pywayang_config.ini")
+    sink_types = dict(config.items('SINK_TYPES'))
+    sink_types.pop("variable_to_access")
+    return sink_types.values()
\ No newline at end of file
diff --git a/pywayang/config/pywayang_config.ini b/pywayang/config/pywayang_config.ini
new file mode 100644
index 00000000..78cc2b48
--- /dev/null
+++ b/pywayang/config/pywayang_config.ini
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[DEFAULT]
+variable_to_access = value
+
+[INPUT]
+txnname_mod = string1
+txnmemo_mod = string2
+
+[MODIFY]
+txnname_mod = string3
+txnmemo_mod = string4
+
+[BOUNDARY_TYPES]
+boundary_type_1 = union
+
+[SOURCE_TYPES]
+source_type_1 = source
+source_type_2 = text
+
+[SINK_TYPES]
+sink_type_1 = sink
+sink_type_2 = sonk
\ No newline at end of file
diff --git a/pywayang/graph/__init__.py b/pywayang/graph/__init__.py
new file mode 100644
index 00000000..17e2deb5
--- /dev/null
+++ b/pywayang/graph/__init__.py
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import graph.graph
+import graph.node
\ No newline at end of file
diff --git a/pywayang/graph/graph.py b/pywayang/graph/graph.py
new file mode 100644
index 00000000..be7a32f0
--- /dev/null
+++ b/pywayang/graph/graph.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from graph.node import Node
+import logging
+
+
+# Adjacency Matrix used to analise the plan
+class Graph:
+    def __init__(self):
+        self.graph = {}
+        self.nodes_no = 0
+        self.nodes = []
+
+    # Fills the Graph
+    def populate(self, sinks):
+        for sink in iter(sinks):
+            self.process_operator(sink)
+
+    # Add current operator and set dependencies
+    def process_operator(self, operator):
+        self.add_node(operator.operator_type, operator.id, operator)
+
+        if len(operator.previous) > 0:
+            for parent in operator.previous:
+                if parent:
+                    self.add_node(parent.operator_type, parent.id, parent)
+                    self.add_link(operator.id, parent.id, 1)
+                    self.process_operator(parent)
+
+    def add_node(self, name, id, operator):
+        if id in self.nodes:
+            return
+
+        self.nodes_no += 1
+        self.nodes.append(id)
+        new_node = Node(name, id, operator)
+
+        self.graph[id] = new_node
+
+    def add_link(self, id_child, id_parent, e):
+        if id_child in self.nodes:
+            if id_parent in self.nodes:
+                self.graph[id_child].add_predecessor(id_parent, e)
+                self.graph[id_parent].add_successor(id_child, e)
+
+    def print_adjlist(self):
+
+        for key in self.graph:
+            logging.debug("Node: ", self.graph[key].operator_type, " - ", key)
+            for key2 in self.graph[key].predecessors:
+                logging.debug("- Parent: ", self.graph[key2].operator_type, " - ", self.graph[key].predecessors[key2], " - ", key2)
+            for key2 in self.graph[key].successors:
+                logging.debug("- Child: ", self.graph[key2].operator_type, " - ", self.graph[key].successors[key2], " - ", key2)
+
+    def get_node(self, id):
+        return self.graph[id]
diff --git a/pywayang/graph/node.py b/pywayang/graph/node.py
new file mode 100644
index 00000000..d0d696fd
--- /dev/null
+++ b/pywayang/graph/node.py
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import abc
+
+
+class Element(metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def accept(self, visitor, udf, orientation, last_iter):
+        pass
+
+
+# Describes an Operator in the Graph
+class Node(Element):
+    def __init__(self, operator_type, id, operator):
+        self.operator_type = operator_type
+        self.id = id
+        self.predecessors = {}
+        self.successors = {}
+        self.python_exec = operator.python_exec
+
+        # Temporal
+        self.operator = operator
+
+    def add_predecessor(self, id_parent, e):
+        self.predecessors[id_parent] = e
+
+    def add_successor(self, id_child, e):
+        self.successors[id_child] = e
+
+    # Nodes are visited by objects of class Visitant.
+    # Visitants are being used to execute a UDF through the Graph
+    def accept(self, visitor, udf, orientation, last_iter):
+        visitor.visit_node(self, udf, orientation, last_iter)
diff --git a/pywayang/graph/traversal.py b/pywayang/graph/traversal.py
new file mode 100644
index 00000000..e2dd8516
--- /dev/null
+++ b/pywayang/graph/traversal.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from graph.visitant import Visitant
+import logging
+
+
+# Defines how a UDF will be applied over the Graph
+class Traversal:
+
+    def __init__(self, graph, origin, udf):
+        self.graph = graph
+        self.origin = origin
+        self.udf = udf
+        self.app = Visitant(graph, [])
+
+        # Starting from Sinks or Sources sets an specific orientation
+        if origin[0].source:
+            self.orientation = "successors"
+        elif origin[0].sink:
+            self.orientation = "predecessors"
+        else:
+            logging.error("Origin point to traverse the plan wrongly defined")
+            return
+
+        for operator in iter(origin):
+            logging.debug("operator origin: " + str(operator.id))
+            node = graph.get_node(operator.id)
+            self.app.visit_node(
+                node=node,
+                udf=self.udf,
+                orientation=self.orientation,
+                last_iter=None
+            )
+
+    def get_collected_data(self):
+        return self.app.get_collection()
diff --git a/pywayang/graph/visitant.py b/pywayang/graph/visitant.py
new file mode 100644
index 00000000..3d2f874f
--- /dev/null
+++ b/pywayang/graph/visitant.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import abc
+import logging
+
+
+class Visitor(metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def visit_node(self, node, udf, orientation, last_iter):
+        pass
+
+
+# Applies a UDF in current Node
+class Visitant(Visitor):
+
+    def __init__(self, graph, results):
+        self.collection = results
+        self.graph = graph
+
+    # UDF can store results in ApplyFunction.collection whenever its requires.
+    # last_iter has the generated current value obtained in the previous iteration
+    def visit_node(self, node, udf, orientation, last_iter):
+        logging.debug("Applying UDf" + str(orientation))
+        current_value = udf(node, last_iter, self.collection)
+        logging.debug("orientation result " + str(getattr(node, orientation)))
+        next_iter = getattr(node, orientation)
+        if len(next_iter) > 0:
+            for next_iter_id in next_iter:
+                if next_iter_id:
+                    logging.debug("next_id: " + str(next_iter_id))
+                    next_iter_node = self.graph.get_node(next_iter_id)
+                    logging.debug("next_iter_node: " + next_iter_node.operator_type + " " + str(next_iter_node.id))
+                    next_iter_node.accept(visitor=self, udf=udf, orientation=orientation, last_iter=current_value)
+        pass
+
+    def get_collection(self):
+        return self.collection
diff --git a/pywayang/orchestrator/__init__.py b/pywayang/orchestrator/__init__.py
new file mode 100644
index 00000000..ed7d0ac9
--- /dev/null
+++ b/pywayang/orchestrator/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import orchestrator.plan
+import orchestrator.dataquanta
+import graph.graph
diff --git a/pywayang/orchestrator/dataquanta.py b/pywayang/orchestrator/dataquanta.py
new file mode 100644
index 00000000..7d700eb5
--- /dev/null
+++ b/pywayang/orchestrator/dataquanta.py
@@ -0,0 +1,330 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from orchestrator.operator import Operator
+from graph.graph import Graph
+from graph.traversal import Traversal
+from protobuf.planwriter import MessageWriter
+import itertools
+import collections
+import logging
+from functools import reduce
+import operator
+
+
+# Wraps a Source operation to create an iterable
+class DataQuantaBuilder:
+    def __init__(self, descriptor):
+        self.descriptor = descriptor
+
+    def source(self, source):
+
+        if type(source) is str:
+            source_ori = open(source, "r")
+        else:
+            source_ori = source
+        return DataQuanta(
+            Operator(
+                operator_type="source",
+                udf=source,
+                iterator=iter(source_ori),
+                previous=[],
+                python_exec=False
+            ),
+            descriptor=self.descriptor
+        )
+
+
+# Wraps an operation over an iterable
+class DataQuanta:
+    def __init__(self, operator=None, descriptor=None):
+        self.operator = operator
+        self.descriptor = descriptor
+        if self.operator.is_source():
+            self.descriptor.add_source(self.operator)
+        if self.operator.is_sink():
+            self.descriptor.add_sink(self.operator)
+
+    # Operational Functions
+    def filter(self, udf):
+        def func(iterator):
+            return filter(udf, iterator)
+
+        return DataQuanta(
+            Operator(
+                operator_type="filter",
+                udf=func,
+                previous=[self.operator],
+                python_exec=True
+            ),
+            descriptor=self.descriptor
+        )
+
+    def flatmap(self, udf):
+
+        def auxfunc(iterator):
+            return itertools.chain.from_iterable(map(udf, iterator))
+
+        def func(iterator):
+            mapped = map(udf, iterator)
+            flattened = flatten_single_dim(mapped)
+            yield from flattened
+
+        def flatten_single_dim(mapped):
+            for item in mapped:
+                for subitem in item:
+                    yield subitem
+
+        return DataQuanta(
+            Operator(
+                operator_type="flatmap",
+                udf=func,
+                previous=[self.operator],
+                python_exec=True
+            ),
+            descriptor=self.descriptor
+        )
+
+    def group_by(self, udf):
+        def func(iterator):
+            # TODO key should be given by "udf"
+            return itertools.groupby(iterator, key=operator.itemgetter(0))
+            #return itertools.groupby(sorted(iterator), key=itertools.itemgetter(0))
+
+        return DataQuanta(
+            Operator(
+                operator_type="group_by",
+                udf=func,
+                previous=[self.operator],
+                python_exec=True
+            ),
+            descriptor=self.descriptor
+        )
+
+    def map(self, udf):
+        def func(iterator):
+            return map(udf, iterator)
+
+        return DataQuanta(
+            Operator(
+                operator_type="map",
+                udf=func,
+                previous=[self.operator],
+                python_exec=True
+            ),
+            descriptor=self.descriptor
+        )
+
+    # Key specifies pivot dimensions
+    # UDF specifies reducer function
+    def reduce_by_key(self, keys, udf):
+
+        op = Operator(
+            operator_type="reduce_by_key",
+            udf=udf,
+            previous=[self.operator],
+            python_exec=False
+        )
+
+        print(len(keys), keys)
+        for i in range(0, len(keys)):
+            """if keys[i] is int:
+                op.set_parameter("vector_position|"+str(i), keys[i])
+            else:
+                op.set_parameter("dimension_key|"+str(i), keys[i])"""
+
+            # TODO maybe would be better just leave the number as key
+            op.set_parameter("dimension|"+str(i+1), keys[i])
+
+        return DataQuanta(
+            op,
+            descriptor=self.descriptor
+        )
+
+    def reduce(self, udf):
+        def func(iterator):
+            return reduce(udf, iterator)
+
+        return DataQuanta(
+            Operator(
+                operator_type="reduce",
+                udf=func,
+                previous=[self.operator],
+                python_exec=True
+            ),
+            descriptor=self.descriptor
+        )
+
+    def sink(self, path, end="\n"):
+        def consume(iterator):
+            with open(path, 'w') as f:
+                for x in iterator:
+                    f.write(str(x) + end)
+
+        def func(iterator):
+            consume(iterator)
+            # return self.__run(consume)
+
+        return DataQuanta(
+            Operator(
+                operator_type="sink",
+
+                udf=path,
+                # To execute directly uncomment
+                # udf=func,
+
+                previous=[self.operator],
+                python_exec=False
+            ),
+            descriptor=self.descriptor
+        )
+
+    def sort(self, udf):
+
+        def func(iterator):
+            return sorted(iterator, key=udf)
+
+        return DataQuanta(
+            Operator(
+                operator_type="sort",
+                udf=func,
+                previous=[self.operator],
+                python_exec=True
+            ),
+            descriptor=self.descriptor
+        )
+
+    # This function allow the union to be performed by Python
+    # Nevertheless, current configuration runs it over Java
+    def union(self, other):
+
+        def func(iterator):
+            return itertools.chain(iterator, other.operator.getIterator())
+
+        return DataQuanta(
+            Operator(
+                operator_type="union",
+                udf=func,
+                previous=[self.operator, other.operator],
+                python_exec=False
+            ),
+            descriptor=self.descriptor
+        )
+
+    def __run(self, consumer):
+        consumer(self.operator.getIterator())
+
+    # Execution Functions
+    def console(self, end="\n"):
+        def consume(iterator):
+            for x in iterator:
+                print(x, end=end)
+
+        self.__run(consume)
+
+    # Only for debugging purposes!
+    # To execute the plan directly in the program driver
+    def execute(self):
+        logging.warn("DEBUG Execution")
+        logging.info("Reminder to swap SINK UDF value from path to func")
+        logging.debug(self.operator.previous[0].operator_type)
+        if self.operator.is_sink():
+            logging.debug(self.operator.operator_type)
+            logging.debug(self.operator.udf)
+            logging.debug(len(self.operator.previous))
+            self.operator.udf(self.operator.previous[0].getIterator())
+        else:
+            logging.error("Plan must call execute from SINK type of operator")
+            raise RuntimeError
+
+    # Converts Python Functional Plan to valid Wayang Plan
+    def to_wayang_plan(self):
+
+        sinks = self.descriptor.get_sinks()
+        if len(sinks) == 0:
+            return
+
+        graph = Graph()
+        graph.populate(self.descriptor.get_sinks())
+
+        # Uncomment to check the Graph built
+        # graph.print_adjlist()
+
+        # Function to be consumed by Traverse
+        # Separates Python Plan into a List of Pipelines
+        def define_pipelines(node1, current_pipeline, collection):
+            def store_unique(pipe_to_insert):
+                for pipe in collection:
+                    if equivalent_lists(pipe, pipe_to_insert):
+                        return
+                collection.append(pipe_to_insert)
+
+            def equivalent_lists(l1, l2):
+                if collections.Counter(l1) == collections.Counter(l2):
+                    return True
+                else:
+                    return False
+
+            if not current_pipeline:
+                current_pipeline = [node1]
+
+            elif node1.operator.is_boundary():
+                store_unique(current_pipeline.copy())
+                current_pipeline.clear()
+                current_pipeline.append(node1)
+
+            else:
+                current_pipeline.append(node1)
+
+            if node1.operator.sink:
+                store_unique(current_pipeline.copy())
+                current_pipeline.clear()
+
+            return current_pipeline
+
+        # Works over the graph
+        trans = Traversal(
+            graph=graph,
+            origin=self.descriptor.get_sources(),
+            # udf=lambda x, y, z: d(x, y, z)
+            # UDF always will receive:
+            # x: a Node object,
+            # y: an object representing the result of the last iteration,
+            # z: a collection to store final results inside your UDF
+            udf=lambda x, y, z: define_pipelines(x, y, z)
+        )
+
+        # Gets the results of the traverse process
+        collected_stages = trans.get_collected_data()
+
+        # Passing the Stages to a Wayang message writer
+        writer = MessageWriter()
+        a = 0
+        # Stage is composed of class Node objects
+        for stage in collected_stages:
+            a += 1
+            logging.info("///")
+            logging.info("stage" + str(a))
+            writer.process_pipeline(stage)
+
+        writer.set_dependencies()
+
+        # Uses a file to provide the plan
+        # writer.write_message(self.descriptor)
+
+        # Send the plan to Wayang REST api directly
+        writer.send_message(self.descriptor)
diff --git a/pywayang/orchestrator/execdirectly.py b/pywayang/orchestrator/execdirectly.py
new file mode 100644
index 00000000..452ccab7
--- /dev/null
+++ b/pywayang/orchestrator/execdirectly.py
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+import datetime
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+    sink_dataquanta = \
+        plan.source("../test/words.txt") \
+            .sort(lambda elem: elem.lower()) \
+            .sink("../test/output.txt", end="")
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort_filter(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+    sink_dataquanta = \
+        plan.source("../test/words.txt") \
+            .sort(lambda elem: elem.lower()) \
+            .filter(lambda elem: str(elem).startswith("f")) \
+            .sink("../test/output.txt", end="")
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter_text(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+
+    sink_dataquanta = \
+        plan.source("../test/words.txt") \
+            .filter(lambda elem: str(elem).startswith("f")) \
+            .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+
+    sink_dataquanta = \
+        plan.source("../test/numbers.txt") \
+            .filter(lambda elem: int(elem) % 2 != 0) \
+            .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_basic(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+
+    sink_dataquanta = \
+        plan.source("../test/lines.txt") \
+            .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_junction(descriptor):
+
+    plan = DataQuantaBuilder(descriptor)
+
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt") \
+        .filter(lambda elem: str(elem).startswith("I"))
+    dq_source_c = plan.source("../test/lastlines.txt") \
+        .filter(lambda elem: str(elem).startswith("W"))
+
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .union(dq_source_c) \
+        .sort(lambda elem: elem.lower()) \
+        .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+def plan_java_junction(descriptor):
+
+    plan = DataQuantaBuilder(descriptor)
+
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt")
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .filter(lambda elem: str(elem).startswith("I")) \
+        .sort(lambda elem: elem.lower()) \
+        .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+def plan_tpch_q1(descriptor):
+
+    #TODO create reduce by
+    plan = DataQuantaBuilder(descriptor)
+
+    def reducer(obj1, obj2):
+        return obj1[0]
+
+    sink = plan.source("../test/lineitem.txt") \
+        .map(lambda elem: elem.split("|")) \
+        .filter(lambda elem: datetime.datetime.strptime(elem[10], '%Y-%m-%d') <= datetime.datetime.strptime("1998-09-02", '%Y-%m-%d')) \
+        .map(lambda elem:
+           [elem[8], elem[9], elem[4], elem[5],
+            float(elem[5]) * (1 - float(elem[6])),
+            float(elem[5]) * (1 - float(elem[6])) * (1 + float(elem[7])),
+            elem[4], elem[5],
+            elem[6], 1]) \
+        .sink("../test/output.txt", end="")
+        # .group_by(lambda elem: elem) \
+        # .reduce_by(reducer) \
+        # .flatmap(lambda elem: elem.split("|"))
+        # .map(lambda elem: (elem, elem.split("|"))) \
+        # L_RETURNFLAG 8
+        # L_LINESTATUS 9
+        # L_QUANTITY 4
+        # L_EXTENDEDPRICE 5
+        # discount 6
+        # tax 7
+
+    return dq_source_b
+
+
+def plan_full_java(descriptor):
+
+    plan = DataQuantaBuilder(descriptor)
+
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt")
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+if __name__ == '__main__':
+
+    # Plan will contain general info about the Wayang Plan created here
+    descriptor = Descriptor()
+
+    plan_dataquanta_sink = plan_tpch_q1(descriptor)
+    plan_dataquanta_sink.execute()
diff --git a/pywayang/orchestrator/main.py b/pywayang/orchestrator/main.py
new file mode 100644
index 00000000..b634eeb9
--- /dev/null
+++ b/pywayang/orchestrator/main.py
@@ -0,0 +1,173 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+import datetime
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+    sink_dataquanta = \
+        plan.source("../test/words.txt") \
+            .sort(lambda elem: elem.lower()) \
+            .sink("../test/output.txt", end="")
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort_filter(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+    sink_dataquanta = \
+        plan.source("../test/words.txt") \
+            .sort(lambda elem: elem.lower()) \
+            .filter(lambda elem: str(elem).startswith("f")) \
+            .sink("../test/output.txt", end="")
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter_text(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+
+    sink_dataquanta = \
+        plan.source("../test/words.txt") \
+            .filter(lambda elem: str(elem).startswith("f")) \
+            .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+
+    sink_dataquanta = \
+        plan.source("../test/numbers.txt") \
+            .filter(lambda elem: int(elem) % 2 != 0) \
+            .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_basic(descriptor):
+    plan = DataQuantaBuilder(descriptor)
+
+    sink_dataquanta = \
+        plan.source("../test/lines.txt") \
+            .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_junction(descriptor):
+
+    plan = DataQuantaBuilder(descriptor)
+
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt") \
+        .filter(lambda elem: str(elem).startswith("I"))
+    dq_source_c = plan.source("../test/lastlines.txt") \
+        .filter(lambda elem: str(elem).startswith("W"))
+
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .union(dq_source_c) \
+        .sort(lambda elem: elem.lower()) \
+        .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+def plan_java_junction(descriptor):
+
+    plan = DataQuantaBuilder(descriptor)
+
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt")
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .filter(lambda elem: str(elem).startswith("I")) \
+        .sort(lambda elem: elem.lower()) \
+        .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+def plan_tpch_q1(descriptor):
+
+    # TODO create reduce by
+    plan = DataQuantaBuilder(descriptor)
+
+    def reducer(obj1, obj2):
+        return obj1[0], obj1[1], obj1[2] + obj2[2], obj1[3] + obj2[3], obj1[4] + obj2[4], obj1[5] + obj2[5], \
+               obj1[6] + obj2[6], obj1[7] + obj2[7], obj1[8] + obj2[8], obj1[9] + obj2[9]
+
+    sink = plan.source("../test/lineitem.txt") \
+        .map(lambda elem: elem.split("|")) \
+        .sink("../test/output.txt", end="")
+    """
+        .filter(lambda elem: datetime.datetime.strptime(elem[10], '%Y-%m-%d') <= datetime.datetime.strptime('1998-09-02', '%Y-%m-%d')) \
+        .map(lambda elem:
+             [elem[8], elem[9], elem[4], elem[5],
+              float(elem[5]) * (1 - float(elem[6])),
+              float(elem[5]) * (1 - float(elem[6])) * (1 + float(elem[7])),
+              elem[4], elem[5],
+              elem[6], 1]) \
+        .sink("../test/output.txt", end="")"""
+        # .reduce_by_key([0, 1], reducer) \
+
+
+    return sink
+
+
+def plan_full_java(descriptor):
+
+    plan = DataQuantaBuilder(descriptor)
+
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt")
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .sink("../test/output.txt", end="")
+
+    return sink_dataquanta
+
+
+def plan_wordcount(descriptor):
+
+    plan = DataQuantaBuilder(descriptor)
+    sink_wordcount = plan.source("../test/lineitem.txt") \
+        .filter(lambda elem: len(str(elem).split("|")[0]) < 4) \
+        .flatmap(lambda elem: str(elem).split("|")) \
+        .sink("../test/output.txt", end="")
+
+    return sink_wordcount
+
+
+if __name__ == '__main__':
+
+    # Plan will contain general info about the Wayang Plan created here
+    descriptor = Descriptor()
+    descriptor.add_plugin(Descriptor.Plugin.spark)
+    descriptor.add_plugin(Descriptor.Plugin.java)
+
+    plan_dataquanta_sink = plan_wordcount(descriptor)
+    # plan_dataquanta_sink.execute()
+    # plan_dataquanta_sink.console()
+
+    plan_dataquanta_sink.to_wayang_plan()
diff --git a/pywayang/orchestrator/operator.py b/pywayang/orchestrator/operator.py
new file mode 100644
index 00000000..ecaa6bdd
--- /dev/null
+++ b/pywayang/orchestrator/operator.py
@@ -0,0 +1,121 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+import cloudpickle
+from config.config_reader import get_source_types
+from config.config_reader import get_sink_types
+from config.config_reader import get_boundary_types
+import logging
+
+pickle_protocol = pickle.HIGHEST_PROTOCOL
+
+
+# Describes an Operation over an intermediate result
+# Each operation could be processed by Python or Java platforms
+class Operator:
+
+    def __init__(
+            self, operator_type=None, udf=None, previous=None,
+            iterator=None, python_exec=False
+    ):
+
+        # Operator ID
+        self.id = id(self)
+
+        # Operator Type
+        self.operator_type = operator_type
+
+        # Set Boundaries
+        if self.operator_type in get_boundary_types():
+            self.boundary = True
+        else:
+            self.boundary = False
+
+        # UDF Function
+        self.udf = udf
+
+        # Source types must come with an Iterator
+        self.iterator = iterator
+        if operator_type in get_source_types():
+            if iterator is None:
+                print("Source Operator Type without an Iterator")
+                raise
+            else:
+                self.source = True
+        else:
+            self.source = False
+
+        # Sink Operators
+        if operator_type in get_sink_types():
+            self.sink = True
+        else:
+            self.sink = False
+
+        # TODO Why managing previous and predecessors per separate?
+        self.previous = previous
+
+        self.successor = []
+        self.predecessor = []
+
+        self.parameters = {}
+
+        # Set predecessors and successors from previous
+        if self.previous:
+            for prev in self.previous:
+                if prev is not None:
+                    prev.set_successor(self)
+                    self.set_predecessor(prev)
+
+        self.python_exec = python_exec
+
+        logging.info("Operator:" + str(self.getID()) + ", type:" + self.operator_type + ", PythonExecutable: " +
+                     str(self.python_exec) +
+                     ", is boundary: " + str(self.is_boundary()) + ", is source: " +
+                     str(self.source) + ", is sink: " + str(self.sink))
+
+    def getID(self):
+        return self.id
+
+    def is_source(self):
+        return self.source
+
+    def is_sink(self):
+        return self.sink
+
+    def is_boundary(self):
+        return self.boundary
+
+    def serialize_udf(self):
+        self.udf = cloudpickle.dumps(self.udf)
+
+    def getIterator(self):
+        if self.is_source():
+            return self.iterator
+        # TODO this should iterate through previous REDESIGN
+        return self.udf(self.previous[0].getIterator())
+
+    def set_parameter(self, key, value):
+        self.parameters[key] = value
+
+    def set_successor(self, suc):
+        if (not self.is_sink()) and self.successor.count(suc) == 0:
+            self.successor.append(suc)
+
+    def set_predecessor(self, suc):
+        if self.predecessor.count(suc) == 0:
+            self.predecessor.append(suc)
diff --git a/pywayang/orchestrator/plan.py b/pywayang/orchestrator/plan.py
new file mode 100644
index 00000000..25610cc7
--- /dev/null
+++ b/pywayang/orchestrator/plan.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+from enum import Enum
+
+class Descriptor:
+
+    def __init__(self):
+        self.sinks = []
+        self.sources = []
+        self.boundary_operators = None
+        logging.basicConfig(filename='../config/execution.log', level=logging.DEBUG)
+        self.plugins = []
+
+    class Plugin(Enum):
+        java = 0
+        spark = 1
+
+    def get_boundary_operators(self):
+        return self.boundary_operators
+
+    def add_source(self, operator):
+        self.sources.append(operator)
+
+    def get_sources(self):
+        return self.sources
+
+    def add_sink(self, operator):
+        self.sinks.append(operator)
+
+    def get_sinks(self):
+        return self.sinks
+
+    def add_plugin(self, plugin):
+        self.plugins.append(plugin)
+
+    def get_plugins(self):
+        return self.plugins
diff --git a/pywayang/protobuf/__init__.py b/pywayang/protobuf/__init__.py
new file mode 100644
index 00000000..15a80ad9
--- /dev/null
+++ b/pywayang/protobuf/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import protobuf.pywayangplan_pb2
diff --git a/pywayang/protobuf/old_planwriter.py b/pywayang/protobuf/old_planwriter.py
new file mode 100644
index 00000000..e8700f01
--- /dev/null
+++ b/pywayang/protobuf/old_planwriter.py
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import protobuf.pywayangplan_pb2 as pwb
+import os
+import pickle
+import struct
+import base64
+
+
+class OldMessageWriter:
+
+    def __init__(self, descriptor):
+
+        sink = descriptor.get_sinks()[0]
+        source = descriptor.get_sources()[0]
+
+        op = source
+        visited = []
+        middle_operators = []
+        while op.sink is not True and len(op.successor) > 0:
+            pre = op.successor[0]
+            if pre not in visited and pre.sink is not True:
+                pre.serialize_udf()
+                middle_operators.append(pre)
+                """base64_bytes = base64.b64encode(pre.udf)
+                pre.udf = base64_bytes"""
+
+                """pre.serialize_udf()
+                print("pre.udf")
+                print(pre.udf)
+                func = pickle.loads(pre.udf)
+                print("func")
+                print(func)
+                middle_operators.append(pre)
+
+                # Testing
+                msg = pre.udf
+                base64_bytes = base64.b64encode(msg)
+                base64_message = base64.b64decode(base64_bytes)
+                func2 = pickle.loads(base64_message)
+                print(base64_message)
+                func3 = pickle.loads(b'\x80\x04\x955\x04\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\r_builtin_type\x94\x93\x94\x8c\nLambdaType\x94\x85\x94R\x94(h\x02\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x01K\x03K\x13C\nt\x00\x88\x00|\x00\x83\x02S\x00\x94N\x85\x94\x8c\x06filter\x94\x85\x94\x8c\x08iterator\x94\x85\x94\x8cS/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/orchestrator/dataquanta.py\x94\x8c\x04func\x94K%C\x02\x00\x01\x94\x8c\x03udf\x94\x85\ [...]
+                for i in func3([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]):
+                    print(i)"""
+            op = pre
+
+        """for mid in middle_operators:
+            print(mid.operator_type)
+            print(pickle.loads(mid.udf))
+            func = pickle.loads(mid.udf)
+            for i in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]):
+                print(i)"""
+
+        finalpath = "/Users/rodrigopardomeza/wayang/incubator-wayang/protobuf/filter_message"
+        planconf = pwb.WayangPlan()
+        try:
+            f = open(finalpath, "rb")
+            planconf.ParseFromString(f.read())
+            f.close()
+        except IOError:
+            print(finalpath + ": Could not open file.  Creating a new one.")
+
+        so = pwb.Source()
+        so.id = source.id
+        so.type = source.operator_type
+        so.path = os.path.abspath(source.udf)
+
+        operators = []
+        for mid in middle_operators:
+            op = pwb.Operator()
+            op.id = mid.id
+            op.type = mid.operator_type
+            op.udf = mid.udf
+            operators.append(op)
+
+        si = pwb.Sink()
+        si.id = sink.id
+        si.type = sink.operator_type
+        si.path = os.path.abspath(sink.udf)
+
+        plan = pwb.Plan()
+        plan.source.CopyFrom(so)
+        plan.sink.CopyFrom(si)
+        plan.operators.extend(operators)
+        plan.input = pwb.Plan.string
+        plan.output = pwb.Plan.string
+
+        ctx = pwb.Context()
+        ctx.platforms.extend([pwb.Context.Platform.java])
+
+        planconf.plan.CopyFrom(plan)
+        planconf.context.CopyFrom(ctx)
+
+        f = open(finalpath, "wb")
+        f.write(planconf.SerializeToString())
+        f.close()
+        pass
+
+class func_inteface:
+
+    def __init__(self, node, nested_udf):
+        self.node = node
+        self.nested_udf = nested_udf
+
+    def func(self, iterable):
+        return self.node.operator.udf(self.nested_udf(iterable))
+
+
+class MessageWriter:
+    sources = []
+    operators = []
+    sinks = []
+
+    def add_source(self, operator_id, operator_type, path, predecessors, successors):
+        source = pwb.OperatorProto()
+        source.id = operator_id
+        source.type = operator_type
+        source.path = os.path.abspath(path)
+        source.udf = None
+        source.predecessors = predecessors
+        source.successors = successors
+        self.sources.append(source)
+
+    def add_sink(self, operator_id, operator_type, path, predecessors, successors):
+        sink = pwb.OperatorProto()
+        sink.id = operator_id
+        sink.type = operator_type
+        sink.path = os.path.abspath(path)
+        sink.udf = None
+        sink.predecessors = predecessors
+        sink.successors = successors
+        self.sinks.append(sink)
+
+    def add_operator(self, operator_id, operator_type, udf, path, predecessors, successors):
+        op = pwb.OperatorProto()
+        op.id = operator_id
+        op.type = operator_type
+        op.udf = udf
+        op.path = path
+        op.predecessors = predecessors
+        op.successors = successors
+        self.operators.append(op)
+
+    def process_pipeline(self, stage):
+
+        nested_udf = None
+        nested_id = ""
+        for node in reversed(stage):
+            print("########")
+            print(node.operator_type, "executable:", node.python_exec, "id:", node.id)
+
+            if nested_udf is not None:
+                print("review pre")
+                print( nested_udf)
+                print( nested_udf(["Wilo","lifo","Wifo"]))
+
+            if not node.python_exec:
+                if nested_udf is not None:
+                    """self.add_operator(nested_id, "map_partition", nested_udf, None
+                                      # obtain predecessors and successors
+                                      , successors=[node.id]
+                                      )"""
+                    print("node", nested_id)
+                    print(nested_udf)
+                    print("he muerto")
+                    print( nested_udf(["Wilo","lifo","Wifo"]))
+
+                    t = nested_udf(["Wilo","lifo","Wifo"])
+                    print("jajajarvard")
+                    print(t)
+                    for i in t:
+                        print(i)
+                    nested_udf = None
+                    nested_id = ""
+
+                """if node.operator.source:
+                    self.add_source(
+                        node.id, node.operator_type, node.operator.udf,
+                        node.predecessors, node.operator.successor)
+                else:
+                    self.add_operator(
+                        node.id, node.operator_type, None, node.operator.udf,
+                        node.predecessors, node.operator.successor)"""
+            else:
+                print("adding", node.id)
+                if nested_udf is None:
+                    nested_udf = node.operator.udf
+                    nested_id = node.id
+                else:
+                    print("paseeeeeee viste")
+                    tmp = nested_udf
+
+                    print( tmp(["Wilo","lifo","Wifo"]))
+
+                    #def func(_, iterable):
+                    #    return nested_udf(node.operator.udf(iterable))
+                    nested_udf = self.concatenate(nested_udf, node.operator.udf)
+                    print( nested_udf(["Wilo","lifo","Wifo"]))
+                    print(nested_udf)
+
+                    # nested_udf = func_inteface(node, nested_udf)
+                    nested_id = str(node.id) + "," + str(nested_id)
+
+            if nested_udf is not None:
+                print("review")
+                print( nested_udf)
+                print( nested_udf(["Wilo","lifo","Wifo"]))
+
+        if nested_udf is not None:
+            """self.add_operator(nested_id, "map_partition", nested_udf, None
+                              # obtain predecessors and successors
+                              , successors=[node.id]
+                              )"""
+            print("node", nested_id)
+            print(nested_udf)
+            t = nested_udf(["Wilo","lifo","Wifo"])
+            print("jajajarvard2")
+            print(t)
+            for i in t:
+                print(i)
+            nested_udf = None
+            nested_id = ""
+
+    def __init__(self):
+        print("lala")
+
+    def concatenate(self, function_a, function_b):
+        def executable(iterable):
+            return function_a(function_b(iterable))
+        return executable
+
+    def old(self, descriptor):
+
+        sink = descriptor.get_sinks()[0]
+        source = descriptor.get_sources()[0]
+
+        op = source
+        visited = []
+        middle_operators = []
+        while op.sink is not True and len(op.successor) > 0:
+            pre = op.successor[0]
+            if pre not in visited and pre.sink is not True:
+                pre.serialize_udf()
+                middle_operators.append(pre)
+            op = pre
+
+        finalpath = "/Users/rodrigopardomeza/wayang/incubator-wayang/protobuf/filter_message"
+        planconf = pwb.WayangPlan()
+        try:
+            f = open(finalpath, "rb")
+            planconf.ParseFromString(f.read())
+            f.close()
+        except IOError:
+            print(finalpath + ": Could not open file.  Creating a new one.")
+
+        so = pwb.Source()
+        so.id = source.id
+        so.type = source.operator_type
+        so.path = os.path.abspath(source.udf)
+
+        operators = []
+        for mid in middle_operators:
+            op = pwb.Operator()
+            op.id = mid.id
+            op.type = mid.operator_type
+            op.udf = mid.udf
+            operators.append(op)
+
+        si = pwb.Sink()
+        si.id = sink.id
+        si.type = sink.operator_type
+        si.path = os.path.abspath(sink.udf)
+
+        plan = pwb.Plan()
+        plan.source.CopyFrom(so)
+        plan.sink.CopyFrom(si)
+        plan.operators.extend(operators)
+        plan.input = pwb.Plan.string
+        plan.output = pwb.Plan.string
+
+        ctx = pwb.Context()
+        ctx.platforms.extend([pwb.Context.Platform.java])
+
+        planconf.plan.CopyFrom(plan)
+        planconf.context.CopyFrom(ctx)
+
+        f = open(finalpath, "wb")
+        f.write(planconf.SerializeToString())
+        f.close()
+        pass
+
+    def pipeline_singleton(self):
+        print("lala")
diff --git a/pywayang/protobuf/planwriter.py b/pywayang/protobuf/planwriter.py
new file mode 100644
index 00000000..b63dcbbc
--- /dev/null
+++ b/pywayang/protobuf/planwriter.py
@@ -0,0 +1,277 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import protobuf.pywayangplan_pb2 as pwb
+import os
+import cloudpickle
+import logging
+import pathlib
+import requests
+import base64
+
+
+# Writes Wayang Plan from several stages
+class MessageWriter:
+    sources = []
+    operators = []
+    sinks = []
+    operator_references = {}
+    boundaries = {}
+
+    # Creates and appends Source type of operator
+    def add_source(self, operator_id, operator_type, path):
+        source = pwb.OperatorProto()
+        source.id = str(operator_id)
+        source.type = operator_type
+        source.path = os.path.abspath(path)
+        source.udf = chr(0).encode('utf-8')
+        # source.parameters = {}
+        self.sources.append(source)
+        return source
+
+    # Creates and appends Sink type of operator
+    def add_sink(self, operator_id, operator_type, path):
+        sink = pwb.OperatorProto()
+        sink.id = str(operator_id)
+        sink.type = operator_type
+        sink.path = os.path.abspath(path)
+        sink.udf = chr(0).encode('utf-8')
+        # sink.parameters = {}
+        self.sinks.append(sink)
+        return sink
+
+    # Creates and appends a Python operator
+    # Python OP don't require parameters, UDF has the function ready to be executed directly
+    def add_operator(self, operator_id, operator_type, udf):
+        op = pwb.OperatorProto()
+        op.id = str(operator_id)
+        op.type = operator_type
+        op.udf = cloudpickle.dumps(udf)
+        op.path = str(None)
+        # op.parameters = {}
+        self.operators.append(op)
+        return op
+
+    # Creates and appends a Java operator
+    def add_java_operator(self, operator_id, operator_type, udf, parameters):
+        op = pwb.OperatorProto()
+        op.id = str(operator_id)
+        op.type = operator_type
+        op.udf = cloudpickle.dumps(udf)
+        op.path = str(None)
+        #op.parameters = parameters
+        for param in parameters:
+            print(param, parameters[param])
+            op.parameters[param] = str(parameters[param])
+            # op.parameters[]
+        #m.mapfield[5] = 10
+        self.operators.append(op)
+        return op
+
+    # Receive a chain of operators, separate them in Wayang Operators
+    # Compacts several Python executable operators in one Map Partition Wayang Operator
+    def process_pipeline(self, stage):
+
+        nested_udf = None
+        nested_id = ""
+        nested_predecessors = None
+        nested_successors = None
+        for node in reversed(stage):
+            logging.debug(node.operator_type + " executable: " + str(node.python_exec) + " id: " + str(node.id))
+
+            if not node.python_exec:
+                if nested_udf is not None:
+
+                    # Predecessors depends on last operator
+                    # Successors depends on first operator
+                    op = self.add_operator(nested_id, "map_partition", nested_udf)
+
+                    ids = str(nested_id).split(",")
+                    for id in ids:
+                        self.operator_references[str(id)] = op
+
+                    self.boundaries[str(nested_id)] = {}
+                    self.boundaries[str(nested_id)]["end"] = nested_successors
+                    self.boundaries[str(nested_id)]["start"] = nested_predecessors
+
+                    nested_udf = None
+                    nested_id = ""
+                    nested_predecessors = None
+                    nested_successors = None
+
+                if node.operator.source:
+                    op = self.add_source(node.id, node.operator_type, node.operator.udf)
+                    self.operator_references[str(node.id)] = op
+                    self.boundaries[str(node.id)] = {}
+                    self.boundaries[str(node.id)]["end"] = node.successors.keys()
+
+                elif node.operator.sink:
+                    op = self.add_sink(node.id, node.operator_type, node.operator.udf)
+                    self.operator_references[str(node.id)] = op
+                    self.boundaries[str(node.id)] = {}
+                    self.boundaries[str(node.id)]["start"] = node.predecessors.keys()
+
+                # Regular operator to be processed in Java
+                # Notice that those could include more parameters for Java
+                else:
+                    op = self.add_java_operator(node.id, node.operator_type, node.operator.udf, node.operator.parameters)
+                    self.operator_references[str(node.id)] = op
+                    self.boundaries[str(node.id)] = {}
+                    self.boundaries[str(node.id)]["start"] = node.predecessors.keys()
+                    self.boundaries[str(node.id)]["end"] = node.successors.keys()
+
+            else:
+
+                if nested_udf is None:
+                    nested_udf = node.operator.udf
+                    nested_id = node.id
+                    # It is the last operator to execute in the map partition
+                    nested_successors = node.successors.keys()
+
+                else:
+                    nested_udf = self.concatenate(nested_udf, node.operator.udf)
+                    nested_id = str(node.id) + "," + str(nested_id)
+
+                # Every iteration assign the first known predecessors
+                nested_predecessors = node.predecessors.keys()
+
+        # Just in case in the future some pipelines start with Python operators
+        if nested_udf is not None:
+            self.add_operator(nested_id, "map_partition", nested_udf)
+
+            ids = nested_id.split(",")
+            for id in ids:
+                self.operator_references[id] = op
+
+            self.boundaries[nested_id] = {}
+            self.boundaries[nested_id]["end"] = nested_successors
+            self.boundaries[nested_id]["start"] = nested_predecessors
+
+    def __init__(self):
+        pass
+
+    # Takes 2 Functions and compact them in only one function
+    @staticmethod
+    def concatenate(function_a, function_b):
+        def executable(iterable):
+            return function_a(function_b(iterable))
+
+        return executable
+
+    # Set dependencies over final Wayang Operators
+    def set_dependencies(self):
+
+        for source in self.sources:
+
+            if 'end' in self.boundaries[source.id]:
+                op_successors = []
+                for op_id in self.boundaries[source.id]['end']:
+                    op_successors.append(str(self.operator_references[str(op_id)].id))
+                source.successors.extend(op_successors)
+
+        for sink in self.sinks:
+            if 'start' in self.boundaries[sink.id]:
+                op_predecessors = []
+                for op_id in self.boundaries[sink.id]['start']:
+                    op_predecessors.append(str(self.operator_references[str(op_id)].id))
+                sink.predecessors.extend(op_predecessors)
+
+        for op in self.operators:
+            if 'start' in self.boundaries[op.id]:
+                op_predecessors = []
+                for op_id in self.boundaries[op.id]['start']:
+                    op_predecessors.append(str(self.operator_references[str(op_id)].id))
+                op.predecessors.extend(op_predecessors)
+
+            if 'end' in self.boundaries[op.id]:
+                op_successors = []
+                for op_id in self.boundaries[op.id]['end']:
+                    op_successors.append(str(self.operator_references[str(op_id)].id))
+                op.successors.extend(op_successors)
+
+    # Writes the message to a local directory
+    def write_message(self, descriptor):
+
+        finalpath = "../../protobuf/wayang_message"
+        plan_configuration = pwb.WayangPlanProto()
+
+        try:
+            f = open(finalpath, "rb")
+            plan_configuration.ParseFromString(f.read())
+            f.close()
+        except IOError:
+            logging.warn("File " + finalpath + " did not exist. System generated a new file")
+
+        plan = pwb.PlanProto()
+        plan.sources.extend(self.sources)
+        plan.operators.extend(self.operators)
+        plan.sinks.extend(self.sinks)
+        plan.input = pwb.PlanProto.string
+        plan.output = pwb.PlanProto.string
+
+        ctx = pwb.ContextProto()
+        # ctx.platforms.extend([pwb.ContextProto.PlatformProto.java])
+        for plug in descriptor.plugins:
+            ctx.platforms.append(plug.value)
+        # ctx.platforms.extend(descriptor.get_plugins())
+
+        plan_configuration.plan.CopyFrom(plan)
+        plan_configuration.context.CopyFrom(ctx)
+
+        f = open(finalpath, "wb")
+        f.write(plan_configuration.SerializeToString())
+        f.close()
+        pass
+
+    # Send message as bytes to the Wayang Rest API
+    def send_message(self, descriptor):
+
+        plan_configuration = pwb.WayangPlanProto()
+
+        plan = pwb.PlanProto()
+        plan.sources.extend(self.sources)
+        plan.operators.extend(self.operators)
+        plan.sinks.extend(self.sinks)
+        plan.input = pwb.PlanProto.string
+        plan.output = pwb.PlanProto.string
+
+        ctx = pwb.ContextProto()
+        # ctx.platforms.extend([pwb.ContextProto.PlatformProto.java])
+        for plug in descriptor.plugins:
+            ctx.platforms.append(plug.value)
+        # ctx.platforms.extend(descriptor.get_plugins())
+
+        plan_configuration.plan.CopyFrom(plan)
+        plan_configuration.context.CopyFrom(ctx)
+
+        print("plan!")
+        print(plan_configuration)
+
+        msg_bytes = plan_configuration.SerializeToString()
+        msg_64 = base64.b64encode(msg_bytes)
+
+        logging.debug(msg_bytes)
+        # response = requests.get("http://localhost:8080/plan/create/fromfile")
+        data = {
+            'message': msg_64
+        }
+        response = requests.post("http://localhost:8080/plan/create", data)
+        logging.debug(response)
+        # f = open(finalpath, "wb")
+        # f.write(plan_configuration.SerializeToString())
+        # f.close()
+        pass
diff --git a/pywayang/test/demo_testing.py b/pywayang/test/demo_testing.py
new file mode 100644
index 00000000..c096a897
--- /dev/null
+++ b/pywayang/test/demo_testing.py
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+
+class MyTestCase(unittest.TestCase):
+
+    def test_something(self):
+        self.assertEqual(True, False)
+
+    def test_upper(self):
+        self.assertEqual('foo'.upper(), 'FOO')
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/pywayang/test/full_java_test.py b/pywayang/test/full_java_test.py
new file mode 100644
index 00000000..d17aedd0
--- /dev/null
+++ b/pywayang/test/full_java_test.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+
+
+class MyTestCase(unittest.TestCase):
+
+    def test_most_basic(self):
+        descriptor = Descriptor()
+        descriptor.add_plugin(Descriptor.Plugin.java)
+
+        plan = DataQuantaBuilder(descriptor)
+        sink_dataquanta = \
+            plan.source("../test/lines.txt") \
+                .sink("../test/output.txt", end="")
+
+        sink_dataquanta.to_wayang_plan()
+
+
+    def test_single_juncture(self):
+        descriptor = Descriptor()
+        descriptor.add_plugin(Descriptor.Plugin.java)
+
+        plan = DataQuantaBuilder(descriptor)
+        dq_source_a = plan.source("../test/lines.txt")
+        dq_source_b = plan.source("../test/morelines.txt")
+        sink_dataquanta = dq_source_a.union(dq_source_b) \
+            .sink("../test/output.txt", end="")
+
+        sink_dataquanta.to_wayang_plan()
+
+
+    def test_multiple_juncture(self):
+        descriptor = Descriptor()
+        descriptor.add_plugin(Descriptor.Plugin.java)
+
+        plan = DataQuantaBuilder(descriptor)
+        dq_source_a = plan.source("../test/lines.txt")
+        dq_source_b = plan.source("../test/morelines.txt") \
+            .filter(lambda elem: str(elem).startswith("I"))
+        dq_source_c = plan.source("../test/lastlines.txt") \
+            .filter(lambda elem: str(elem).startswith("W"))
+
+        sink_dataquanta = dq_source_a.union(dq_source_b) \
+            .union(dq_source_c) \
+            .sort(lambda elem: elem.lower()) \
+            .sink("../test/output.txt", end="")
+
+        sink_dataquanta.to_wayang_plan()
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/pywayang/test/full_spark_test.py b/pywayang/test/full_spark_test.py
new file mode 100644
index 00000000..9276ccc6
--- /dev/null
+++ b/pywayang/test/full_spark_test.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+
+
+def test_most_basic(self):
+    descriptor = Descriptor()
+    descriptor.add_plugin(Descriptor.Plugin.spark)
+
+    plan = DataQuantaBuilder(descriptor)
+    sink_dataquanta = \
+        plan.source("../test/lines.txt") \
+            .sink("../test/output.txt", end="")
+
+    sink_dataquanta.to_wayang_plan()
+
+
+def test_single_juncture(self):
+    descriptor = Descriptor()
+    descriptor.add_plugin(Descriptor.Plugin.spark)
+
+    plan = DataQuantaBuilder(descriptor)
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt")
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .sink("../test/output.txt", end="")
+
+    sink_dataquanta.to_wayang_plan()
+
+
+def test_multiple_juncture(self):
+    descriptor = Descriptor()
+    descriptor.add_plugin(Descriptor.Plugin.spark)
+
+    plan = DataQuantaBuilder(descriptor)
+    dq_source_a = plan.source("../test/lines.txt")
+    dq_source_b = plan.source("../test/morelines.txt") \
+        .filter(lambda elem: str(elem).startswith("I"))
+    dq_source_c = plan.source("../test/lastlines.txt") \
+        .filter(lambda elem: str(elem).startswith("W"))
+
+    sink_dataquanta = dq_source_a.union(dq_source_b) \
+        .union(dq_source_c) \
+        .sort(lambda elem: elem.lower()) \
+        .sink("../test/output.txt", end="")
+
+    sink_dataquanta.to_wayang_plan()
+
+
+if __name__ == '__main__':
+    unittest.main()