You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/04/08 17:16:22 UTC
[incubator-wayang] 01/32: Wayang 8 (#89)
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch python-platform
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit edab66a194ec90f0ac948255ee681cc7d098ee90
Author: Rodrigo Pardo Meza <ro...@gmail.com>
AuthorDate: Mon Mar 21 14:03:00 2022 +0100
Wayang 8 (#89)
* [WAYANG-8][API-PYTHON] Creation of functions to be consumed by MapPartitionsDescriptor
* [WAYANG-8][API-PYTHON] Included PythonProcessCaller that manages the python process execution and Java - Python connection
* [WAYANG-8][API-PYTHON] POM fixes plus minor test
* [WAYANG-8][API-PYTHON] Python connection through TCP socket enabled
* [WAYANG-8][API-PYTHON] Writing from Java to Python. Not taking into care about Iterator Datatypes.
* [WAYANG-8][API-PYTHON] Java Socket Writter improvements
* [WAYANG-8][API-PYTHON] Python UTF8 Deserializer included
* [WAYANG-8][API-PYTHON] Python UTF8 Reading Stream
* [WAYANG-8][API-PYTHON] Getting results from Python and continue processing
* [WAYANG-8][API-PYTHON] Config files for pywayang
* [WAYANG-8][API-PYTHON] Structures to save the plan with functional fashion plus most basic operators
* [WAYANG-8][API-PYTHON] Main program to test plan executions locally
* [WAYANG-8][API-PYTHON] Minor comments and
TODOs
* [WAYANG-8][API-PYTHON] Most basic test for protobuff communication with java
* [WAYANG-8][API-PYTHON] Addjacency list from PyWayang Plan
* [WAYANG-8][API-PYTHON] Graph traversal implementation with visitor pattern
* [WAYANG-8][API-PYTHON] Protobuf python message generator
* [WAYANG-8][API-PYTHON] Wayang Web Service project structure
* [WAYANG-8][API-PYTHON] Protobuf message generation fixes
* [WAYANG-8][API-PYTHON] Wayang Web Service executes most basic plans directly
* [WAYANG-8][API-PYTHON] Receiving Base64 passing to byte array and unpickling
* [WAYANG-8][API-PYTHON] Updated classes to process a single Serialized UDF
* [WAYANG-8][API-PYTHON] New test with single UDF
* [WAYANG-8][API-PYTHON] Protobuf command
* [WAYANG-8][API-PYTHON] Protobuf message template updated
* [WAYANG-8][API-PYTHON] POM fixes
* [WAYANG-8][API-PYTHON] License comments added
* [WAYANG-8][API-PYTHON] Correction on missing licenses
* [WAYANG-8][API-PYTHON] Serializable module creation
* [WAYANG-8][API-PYTHON] adding protoc to travis
* [WAYANG-8][API-PYTHON] protoc executable path correction
* [WAYANG-8][API-PYTHON] Commenting objc_class_prefix
* [WAYANG-8][API-PYTHON] Obtaining pipelines
* [WAYANG-8][API-PYTHON] Dataquanta writing message
* [WAYANG-8][API-PYTHON] Plan writer pipeline based adjustments
* [WAYANG-8][API-PYTHON] Operator Python executable indicator
* [WAYANG-8][API-PYTHON] Plan writer improved to use less sockets
* [WAYANG-8][API-PYTHON] New version of Wayang protobuf message
* [WAYANG-8][API-PYTHON] Wayang REST improved to allow multi pipelined executions
* [WAYANG-8][API-PYTHON] More test programs
* [WAYANG-8][API-PYTHON] Commentaries and logging for Graph module
* [WAYANG-8][API-PYTHON] Commentaries and logging for Orchestrator module
* [WAYANG-8][API-PYTHON] Commentaries and logging for Protobuf module
* [WAYANG-8][API-PYTHON] Fix usage of relative paths
* [WAYANG-8][API-PYTHON] Scripts to compile protobuf has been deleted. Now Maven executes them
* [WAYANG-8][API-PYTHON] Execution Log configuration
* [WAYANG-8][API-PYTHON] Fix - Python Map partition with single operator
* [WAYANG-8][API-PYTHON] Unitary Testing preparing the Wayang Plan
* [WAYANG-8][API-PYTHON] Plugin selection through Plan Descriptor
* [WAYANG-8][API-PYTHON] Unitary Testing preparing the Wayang Plan with Spark Execution
* [WAYANG-8][API-PYTHON] Pywayang sends protobuf message in API request as bytes using base64
* [WAYANG-8][API-PYTHON] New Operators Flatmap group by, reduce and Reduce By Key. Only Python Side.
* [WAYANG-8][API-PYTHON] Protobuf Wayang Plan message updated to allow more Complex Java-Python Operators
* [WAYANG-8][API-PYTHON] Adding TPC-H 1st Test
* [WAYANG-8][API-PYTHON] Last changes, not working
* [WAYANG-8] Fixing errors with dependencies
* [WAYANG-8] Fix to Pom versions problem
* [WAYANG-8] Protoc path updated
* [WAYANG-8] Correction in the pom.xml for flags
Signed-off-by: bertty <be...@apache.org>
Co-authored-by: Bertty Contreras-Rojas <be...@databloom.ai>
Signed-off-by: bertty <be...@apache.org>
---
pom.xml | 2 +
pywayang/config/__init__.py | 20 +++
pywayang/config/config_reader.py | 51 ++++++
pywayang/config/pywayang_config.ini | 38 ++++
pywayang/graph/__init__.py | 19 ++
pywayang/graph/graph.py | 71 ++++++++
pywayang/graph/node.py | 48 +++++
pywayang/graph/traversal.py | 51 ++++++
pywayang/graph/visitant.py | 52 ++++++
pywayang/orchestrator/__init__.py | 20 +++
pywayang/orchestrator/dataquanta.py | 330 ++++++++++++++++++++++++++++++++++
pywayang/orchestrator/execdirectly.py | 162 +++++++++++++++++
pywayang/orchestrator/main.py | 173 ++++++++++++++++++
pywayang/orchestrator/operator.py | 121 +++++++++++++
pywayang/orchestrator/plan.py | 52 ++++++
pywayang/protobuf/__init__.py | 18 ++
pywayang/protobuf/old_planwriter.py | 308 +++++++++++++++++++++++++++++++
pywayang/protobuf/planwriter.py | 277 ++++++++++++++++++++++++++++
pywayang/test/demo_testing.py | 30 ++++
pywayang/test/full_java_test.py | 69 +++++++
pywayang/test/full_spark_test.py | 67 +++++++
21 files changed, 1979 insertions(+)
diff --git a/pom.xml b/pom.xml
index 3ff48f95..e0ac2bda 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1248,6 +1248,8 @@
<exclude>**/README.md</exclude>
<exclude>**/general-todos.md</exclude>
<exclude>**/scala_1*</exclude>
+
+ <exclude>**/*pb2.py</exclude>
</excludes>
</configuration>
</plugin>
diff --git a/pywayang/config/__init__.py b/pywayang/config/__init__.py
new file mode 100644
index 00000000..008475c2
--- /dev/null
+++ b/pywayang/config/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from config.config_reader import get_source_types
+from config.config_reader import get_sink_types
+from config.config_reader import get_boundary_types
diff --git a/pywayang/config/config_reader.py b/pywayang/config/config_reader.py
new file mode 100644
index 00000000..c8f58732
--- /dev/null
+++ b/pywayang/config/config_reader.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import configparser
+import os
+
+
+def get_boundary_types():
+ config = configparser.ConfigParser()
+ config.sections()
+ config.read('../config/pywayang_config.ini')
+ boundary_types = dict(config.items('BOUNDARY_TYPES'))
+ boundary_types.pop("variable_to_access")
+ return boundary_types.values()
+
+
+def get_source_types():
+ config = configparser.ConfigParser()
+ #print("path: ", os.getcwd())
+ config.read("../config/pywayang_config.ini")
+ source_types = dict(config.items('SOURCE_TYPES'))
+ source_types.pop("variable_to_access")
+ return source_types.values()
+ #sections_list = config.sections()
+ #for section in sections_list:
+ # print(section)
+ #print("source_types")
+ #for x in source_types.values():
+ # print(x)
+
+def get_sink_types():
+ config = configparser.ConfigParser()
+ #print("path: ", os.getcwd())
+ config.read("../config/pywayang_config.ini")
+ sink_types = dict(config.items('SINK_TYPES'))
+ sink_types.pop("variable_to_access")
+ return sink_types.values()
\ No newline at end of file
diff --git a/pywayang/config/pywayang_config.ini b/pywayang/config/pywayang_config.ini
new file mode 100644
index 00000000..78cc2b48
--- /dev/null
+++ b/pywayang/config/pywayang_config.ini
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[DEFAULT]
+variable_to_access = value
+
+[INPUT]
+txnname_mod = string1
+txnmemo_mod = string2
+
+[MODIFY]
+txnname_mod = string3
+txnmemo_mod = string4
+
+[BOUNDARY_TYPES]
+boundary_type_1 = union
+
+[SOURCE_TYPES]
+source_type_1 = source
+source_type_2 = text
+
+[SINK_TYPES]
+sink_type_1 = sink
+sink_type_2 = sonk
\ No newline at end of file
diff --git a/pywayang/graph/__init__.py b/pywayang/graph/__init__.py
new file mode 100644
index 00000000..17e2deb5
--- /dev/null
+++ b/pywayang/graph/__init__.py
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import graph.graph
+import graph.node
\ No newline at end of file
diff --git a/pywayang/graph/graph.py b/pywayang/graph/graph.py
new file mode 100644
index 00000000..be7a32f0
--- /dev/null
+++ b/pywayang/graph/graph.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from graph.node import Node
+import logging
+
+
+# Adjacency Matrix used to analise the plan
+class Graph:
+ def __init__(self):
+ self.graph = {}
+ self.nodes_no = 0
+ self.nodes = []
+
+ # Fills the Graph
+ def populate(self, sinks):
+ for sink in iter(sinks):
+ self.process_operator(sink)
+
+ # Add current operator and set dependencies
+ def process_operator(self, operator):
+ self.add_node(operator.operator_type, operator.id, operator)
+
+ if len(operator.previous) > 0:
+ for parent in operator.previous:
+ if parent:
+ self.add_node(parent.operator_type, parent.id, parent)
+ self.add_link(operator.id, parent.id, 1)
+ self.process_operator(parent)
+
+ def add_node(self, name, id, operator):
+ if id in self.nodes:
+ return
+
+ self.nodes_no += 1
+ self.nodes.append(id)
+ new_node = Node(name, id, operator)
+
+ self.graph[id] = new_node
+
+ def add_link(self, id_child, id_parent, e):
+ if id_child in self.nodes:
+ if id_parent in self.nodes:
+ self.graph[id_child].add_predecessor(id_parent, e)
+ self.graph[id_parent].add_successor(id_child, e)
+
+ def print_adjlist(self):
+
+ for key in self.graph:
+ logging.debug("Node: ", self.graph[key].operator_type, " - ", key)
+ for key2 in self.graph[key].predecessors:
+ logging.debug("- Parent: ", self.graph[key2].operator_type, " - ", self.graph[key].predecessors[key2], " - ", key2)
+ for key2 in self.graph[key].successors:
+ logging.debug("- Child: ", self.graph[key2].operator_type, " - ", self.graph[key].successors[key2], " - ", key2)
+
+ def get_node(self, id):
+ return self.graph[id]
diff --git a/pywayang/graph/node.py b/pywayang/graph/node.py
new file mode 100644
index 00000000..d0d696fd
--- /dev/null
+++ b/pywayang/graph/node.py
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import abc
+
+
+class Element(metaclass=abc.ABCMeta):
+ @abc.abstractmethod
+ def accept(self, visitor, udf, orientation, last_iter):
+ pass
+
+
+# Describes an Operator in the Graph
+class Node(Element):
+ def __init__(self, operator_type, id, operator):
+ self.operator_type = operator_type
+ self.id = id
+ self.predecessors = {}
+ self.successors = {}
+ self.python_exec = operator.python_exec
+
+ # Temporal
+ self.operator = operator
+
+ def add_predecessor(self, id_parent, e):
+ self.predecessors[id_parent] = e
+
+ def add_successor(self, id_child, e):
+ self.successors[id_child] = e
+
+ # Nodes are visited by objects of class Visitant.
+ # Visitants are being used to execute a UDF through the Graph
+ def accept(self, visitor, udf, orientation, last_iter):
+ visitor.visit_node(self, udf, orientation, last_iter)
diff --git a/pywayang/graph/traversal.py b/pywayang/graph/traversal.py
new file mode 100644
index 00000000..e2dd8516
--- /dev/null
+++ b/pywayang/graph/traversal.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from graph.visitant import Visitant
+import logging
+
+
+# Defines how a UDF will be applied over the Graph
+class Traversal:
+
+ def __init__(self, graph, origin, udf):
+ self.graph = graph
+ self.origin = origin
+ self.udf = udf
+ self.app = Visitant(graph, [])
+
+ # Starting from Sinks or Sources sets an specific orientation
+ if origin[0].source:
+ self.orientation = "successors"
+ elif origin[0].sink:
+ self.orientation = "predecessors"
+ else:
+ logging.error("Origin point to traverse the plan wrongly defined")
+ return
+
+ for operator in iter(origin):
+ logging.debug("operator origin: " + str(operator.id))
+ node = graph.get_node(operator.id)
+ self.app.visit_node(
+ node=node,
+ udf=self.udf,
+ orientation=self.orientation,
+ last_iter=None
+ )
+
+ def get_collected_data(self):
+ return self.app.get_collection()
diff --git a/pywayang/graph/visitant.py b/pywayang/graph/visitant.py
new file mode 100644
index 00000000..3d2f874f
--- /dev/null
+++ b/pywayang/graph/visitant.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import abc
+import logging
+
+
+class Visitor(metaclass=abc.ABCMeta):
+ @abc.abstractmethod
+ def visit_node(self, node, udf, orientation, last_iter):
+ pass
+
+
+# Applies a UDF in current Node
+class Visitant(Visitor):
+
+ def __init__(self, graph, results):
+ self.collection = results
+ self.graph = graph
+
+ # UDF can store results in ApplyFunction.collection whenever its requires.
+ # last_iter has the generated current value obtained in the previous iteration
+ def visit_node(self, node, udf, orientation, last_iter):
+ logging.debug("Applying UDf" + str(orientation))
+ current_value = udf(node, last_iter, self.collection)
+ logging.debug("orientation result " + str(getattr(node, orientation)))
+ next_iter = getattr(node, orientation)
+ if len(next_iter) > 0:
+ for next_iter_id in next_iter:
+ if next_iter_id:
+ logging.debug("next_id: " + str(next_iter_id))
+ next_iter_node = self.graph.get_node(next_iter_id)
+ logging.debug("next_iter_node: " + next_iter_node.operator_type + " " + str(next_iter_node.id))
+ next_iter_node.accept(visitor=self, udf=udf, orientation=orientation, last_iter=current_value)
+ pass
+
+ def get_collection(self):
+ return self.collection
diff --git a/pywayang/orchestrator/__init__.py b/pywayang/orchestrator/__init__.py
new file mode 100644
index 00000000..ed7d0ac9
--- /dev/null
+++ b/pywayang/orchestrator/__init__.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import orchestrator.plan
+import orchestrator.dataquanta
+import graph.graph
diff --git a/pywayang/orchestrator/dataquanta.py b/pywayang/orchestrator/dataquanta.py
new file mode 100644
index 00000000..7d700eb5
--- /dev/null
+++ b/pywayang/orchestrator/dataquanta.py
@@ -0,0 +1,330 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from orchestrator.operator import Operator
+from graph.graph import Graph
+from graph.traversal import Traversal
+from protobuf.planwriter import MessageWriter
+import itertools
+import collections
+import logging
+from functools import reduce
+import operator
+
+
+# Wraps a Source operation to create an iterable
+class DataQuantaBuilder:
+ def __init__(self, descriptor):
+ self.descriptor = descriptor
+
+ def source(self, source):
+
+ if type(source) is str:
+ source_ori = open(source, "r")
+ else:
+ source_ori = source
+ return DataQuanta(
+ Operator(
+ operator_type="source",
+ udf=source,
+ iterator=iter(source_ori),
+ previous=[],
+ python_exec=False
+ ),
+ descriptor=self.descriptor
+ )
+
+
+# Wraps an operation over an iterable
+class DataQuanta:
+ def __init__(self, operator=None, descriptor=None):
+ self.operator = operator
+ self.descriptor = descriptor
+ if self.operator.is_source():
+ self.descriptor.add_source(self.operator)
+ if self.operator.is_sink():
+ self.descriptor.add_sink(self.operator)
+
+ # Operational Functions
+ def filter(self, udf):
+ def func(iterator):
+ return filter(udf, iterator)
+
+ return DataQuanta(
+ Operator(
+ operator_type="filter",
+ udf=func,
+ previous=[self.operator],
+ python_exec=True
+ ),
+ descriptor=self.descriptor
+ )
+
+ def flatmap(self, udf):
+
+ def auxfunc(iterator):
+ return itertools.chain.from_iterable(map(udf, iterator))
+
+ def func(iterator):
+ mapped = map(udf, iterator)
+ flattened = flatten_single_dim(mapped)
+ yield from flattened
+
+ def flatten_single_dim(mapped):
+ for item in mapped:
+ for subitem in item:
+ yield subitem
+
+ return DataQuanta(
+ Operator(
+ operator_type="flatmap",
+ udf=func,
+ previous=[self.operator],
+ python_exec=True
+ ),
+ descriptor=self.descriptor
+ )
+
+ def group_by(self, udf):
+ def func(iterator):
+ # TODO key should be given by "udf"
+ return itertools.groupby(iterator, key=operator.itemgetter(0))
+ #return itertools.groupby(sorted(iterator), key=itertools.itemgetter(0))
+
+ return DataQuanta(
+ Operator(
+ operator_type="group_by",
+ udf=func,
+ previous=[self.operator],
+ python_exec=True
+ ),
+ descriptor=self.descriptor
+ )
+
+ def map(self, udf):
+ def func(iterator):
+ return map(udf, iterator)
+
+ return DataQuanta(
+ Operator(
+ operator_type="map",
+ udf=func,
+ previous=[self.operator],
+ python_exec=True
+ ),
+ descriptor=self.descriptor
+ )
+
+ # Key specifies pivot dimensions
+ # UDF specifies reducer function
+ def reduce_by_key(self, keys, udf):
+
+ op = Operator(
+ operator_type="reduce_by_key",
+ udf=udf,
+ previous=[self.operator],
+ python_exec=False
+ )
+
+ print(len(keys), keys)
+ for i in range(0, len(keys)):
+ """if keys[i] is int:
+ op.set_parameter("vector_position|"+str(i), keys[i])
+ else:
+ op.set_parameter("dimension_key|"+str(i), keys[i])"""
+
+ # TODO maybe would be better just leave the number as key
+ op.set_parameter("dimension|"+str(i+1), keys[i])
+
+ return DataQuanta(
+ op,
+ descriptor=self.descriptor
+ )
+
+ def reduce(self, udf):
+ def func(iterator):
+ return reduce(udf, iterator)
+
+ return DataQuanta(
+ Operator(
+ operator_type="reduce",
+ udf=func,
+ previous=[self.operator],
+ python_exec=True
+ ),
+ descriptor=self.descriptor
+ )
+
+ def sink(self, path, end="\n"):
+ def consume(iterator):
+ with open(path, 'w') as f:
+ for x in iterator:
+ f.write(str(x) + end)
+
+ def func(iterator):
+ consume(iterator)
+ # return self.__run(consume)
+
+ return DataQuanta(
+ Operator(
+ operator_type="sink",
+
+ udf=path,
+ # To execute directly uncomment
+ # udf=func,
+
+ previous=[self.operator],
+ python_exec=False
+ ),
+ descriptor=self.descriptor
+ )
+
+ def sort(self, udf):
+
+ def func(iterator):
+ return sorted(iterator, key=udf)
+
+ return DataQuanta(
+ Operator(
+ operator_type="sort",
+ udf=func,
+ previous=[self.operator],
+ python_exec=True
+ ),
+ descriptor=self.descriptor
+ )
+
+ # This function allow the union to be performed by Python
+ # Nevertheless, current configuration runs it over Java
+ def union(self, other):
+
+ def func(iterator):
+ return itertools.chain(iterator, other.operator.getIterator())
+
+ return DataQuanta(
+ Operator(
+ operator_type="union",
+ udf=func,
+ previous=[self.operator, other.operator],
+ python_exec=False
+ ),
+ descriptor=self.descriptor
+ )
+
+ def __run(self, consumer):
+ consumer(self.operator.getIterator())
+
+ # Execution Functions
+ def console(self, end="\n"):
+ def consume(iterator):
+ for x in iterator:
+ print(x, end=end)
+
+ self.__run(consume)
+
+ # Only for debugging purposes!
+ # To execute the plan directly in the program driver
+ def execute(self):
+ logging.warn("DEBUG Execution")
+ logging.info("Reminder to swap SINK UDF value from path to func")
+ logging.debug(self.operator.previous[0].operator_type)
+ if self.operator.is_sink():
+ logging.debug(self.operator.operator_type)
+ logging.debug(self.operator.udf)
+ logging.debug(len(self.operator.previous))
+ self.operator.udf(self.operator.previous[0].getIterator())
+ else:
+ logging.error("Plan must call execute from SINK type of operator")
+ raise RuntimeError
+
+ # Converts Python Functional Plan to valid Wayang Plan
+ def to_wayang_plan(self):
+
+ sinks = self.descriptor.get_sinks()
+ if len(sinks) == 0:
+ return
+
+ graph = Graph()
+ graph.populate(self.descriptor.get_sinks())
+
+ # Uncomment to check the Graph built
+ # graph.print_adjlist()
+
+ # Function to be consumed by Traverse
+ # Separates Python Plan into a List of Pipelines
+ def define_pipelines(node1, current_pipeline, collection):
+ def store_unique(pipe_to_insert):
+ for pipe in collection:
+ if equivalent_lists(pipe, pipe_to_insert):
+ return
+ collection.append(pipe_to_insert)
+
+ def equivalent_lists(l1, l2):
+ if collections.Counter(l1) == collections.Counter(l2):
+ return True
+ else:
+ return False
+
+ if not current_pipeline:
+ current_pipeline = [node1]
+
+ elif node1.operator.is_boundary():
+ store_unique(current_pipeline.copy())
+ current_pipeline.clear()
+ current_pipeline.append(node1)
+
+ else:
+ current_pipeline.append(node1)
+
+ if node1.operator.sink:
+ store_unique(current_pipeline.copy())
+ current_pipeline.clear()
+
+ return current_pipeline
+
+ # Works over the graph
+ trans = Traversal(
+ graph=graph,
+ origin=self.descriptor.get_sources(),
+ # udf=lambda x, y, z: d(x, y, z)
+ # UDF always will receive:
+ # x: a Node object,
+ # y: an object representing the result of the last iteration,
+ # z: a collection to store final results inside your UDF
+ udf=lambda x, y, z: define_pipelines(x, y, z)
+ )
+
+ # Gets the results of the traverse process
+ collected_stages = trans.get_collected_data()
+
+ # Passing the Stages to a Wayang message writer
+ writer = MessageWriter()
+ a = 0
+ # Stage is composed of class Node objects
+ for stage in collected_stages:
+ a += 1
+ logging.info("///")
+ logging.info("stage" + str(a))
+ writer.process_pipeline(stage)
+
+ writer.set_dependencies()
+
+ # Uses a file to provide the plan
+ # writer.write_message(self.descriptor)
+
+ # Send the plan to Wayang REST api directly
+ writer.send_message(self.descriptor)
diff --git a/pywayang/orchestrator/execdirectly.py b/pywayang/orchestrator/execdirectly.py
new file mode 100644
index 00000000..452ccab7
--- /dev/null
+++ b/pywayang/orchestrator/execdirectly.py
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+import datetime
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+ sink_dataquanta = \
+ plan.source("../test/words.txt") \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort_filter(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+ sink_dataquanta = \
+ plan.source("../test/words.txt") \
+ .sort(lambda elem: elem.lower()) \
+ .filter(lambda elem: str(elem).startswith("f")) \
+ .sink("../test/output.txt", end="")
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter_text(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+
+ sink_dataquanta = \
+ plan.source("../test/words.txt") \
+ .filter(lambda elem: str(elem).startswith("f")) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+
+ sink_dataquanta = \
+ plan.source("../test/numbers.txt") \
+ .filter(lambda elem: int(elem) % 2 != 0) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_basic(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+
+ sink_dataquanta = \
+ plan.source("../test/lines.txt") \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_junction(descriptor):
+
+ plan = DataQuantaBuilder(descriptor)
+
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt") \
+ .filter(lambda elem: str(elem).startswith("I"))
+ dq_source_c = plan.source("../test/lastlines.txt") \
+ .filter(lambda elem: str(elem).startswith("W"))
+
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .union(dq_source_c) \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+def plan_java_junction(descriptor):
+
+ plan = DataQuantaBuilder(descriptor)
+
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt")
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .filter(lambda elem: str(elem).startswith("I")) \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+def plan_tpch_q1(descriptor):
+
+ #TODO create reduce by
+ plan = DataQuantaBuilder(descriptor)
+
+ def reducer(obj1, obj2):
+ return obj1[0]
+
+ sink = plan.source("../test/lineitem.txt") \
+ .map(lambda elem: elem.split("|")) \
+ .filter(lambda elem: datetime.datetime.strptime(elem[10], '%Y-%m-%d') <= datetime.datetime.strptime("1998-09-02", '%Y-%m-%d')) \
+ .map(lambda elem:
+ [elem[8], elem[9], elem[4], elem[5],
+ float(elem[5]) * (1 - float(elem[6])),
+ float(elem[5]) * (1 - float(elem[6])) * (1 + float(elem[7])),
+ elem[4], elem[5],
+ elem[6], 1]) \
+ .sink("../test/output.txt", end="")
+ # .group_by(lambda elem: elem) \
+ # .reduce_by(reducer) \
+ # .flatmap(lambda elem: elem.split("|"))
+ # .map(lambda elem: (elem, elem.split("|"))) \
+ # L_RETURNFLAG 8
+ # L_LINESTATUS 9
+ # L_QUANTITY 4
+ # L_EXTENDEDPRICE 5
+ # discount 6
+ # tax 7
+
+ return dq_source_b
+
+
+def plan_full_java(descriptor):
+
+ plan = DataQuantaBuilder(descriptor)
+
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt")
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+if __name__ == '__main__':
+
+ # Plan will contain general info about the Wayang Plan created here
+ descriptor = Descriptor()
+
+ plan_dataquanta_sink = plan_tpch_q1(descriptor)
+ plan_dataquanta_sink.execute()
diff --git a/pywayang/orchestrator/main.py b/pywayang/orchestrator/main.py
new file mode 100644
index 00000000..b634eeb9
--- /dev/null
+++ b/pywayang/orchestrator/main.py
@@ -0,0 +1,173 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+import datetime
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+ sink_dataquanta = \
+ plan.source("../test/words.txt") \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_sort_filter(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+ sink_dataquanta = \
+ plan.source("../test/words.txt") \
+ .sort(lambda elem: elem.lower()) \
+ .filter(lambda elem: str(elem).startswith("f")) \
+ .sink("../test/output.txt", end="")
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter_text(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+
+ sink_dataquanta = \
+ plan.source("../test/words.txt") \
+ .filter(lambda elem: str(elem).startswith("f")) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_filter(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+
+ sink_dataquanta = \
+ plan.source("../test/numbers.txt") \
+ .filter(lambda elem: int(elem) % 2 != 0) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_basic(descriptor):
+ plan = DataQuantaBuilder(descriptor)
+
+ sink_dataquanta = \
+ plan.source("../test/lines.txt") \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+# Returns the Sink Executable Dataquanta of a DEMO plan
+def plan_junction(descriptor):
+
+ plan = DataQuantaBuilder(descriptor)
+
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt") \
+ .filter(lambda elem: str(elem).startswith("I"))
+ dq_source_c = plan.source("../test/lastlines.txt") \
+ .filter(lambda elem: str(elem).startswith("W"))
+
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .union(dq_source_c) \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+def plan_java_junction(descriptor):
+
+ plan = DataQuantaBuilder(descriptor)
+
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt")
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .filter(lambda elem: str(elem).startswith("I")) \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+def plan_tpch_q1(descriptor):
+
+ # TODO create reduce by
+ plan = DataQuantaBuilder(descriptor)
+
+ def reducer(obj1, obj2):
+ return obj1[0], obj1[1], obj1[2] + obj2[2], obj1[3] + obj2[3], obj1[4] + obj2[4], obj1[5] + obj2[5], \
+ obj1[6] + obj2[6], obj1[7] + obj2[7], obj1[8] + obj2[8], obj1[9] + obj2[9]
+
+ sink = plan.source("../test/lineitem.txt") \
+ .map(lambda elem: elem.split("|")) \
+ .sink("../test/output.txt", end="")
+ """
+ .filter(lambda elem: datetime.datetime.strptime(elem[10], '%Y-%m-%d') <= datetime.datetime.strptime('1998-09-02', '%Y-%m-%d')) \
+ .map(lambda elem:
+ [elem[8], elem[9], elem[4], elem[5],
+ float(elem[5]) * (1 - float(elem[6])),
+ float(elem[5]) * (1 - float(elem[6])) * (1 + float(elem[7])),
+ elem[4], elem[5],
+ elem[6], 1]) \
+ .sink("../test/output.txt", end="")"""
+ # .reduce_by_key([0, 1], reducer) \
+
+
+ return sink
+
+
+def plan_full_java(descriptor):
+
+ plan = DataQuantaBuilder(descriptor)
+
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt")
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .sink("../test/output.txt", end="")
+
+ return sink_dataquanta
+
+
+def plan_wordcount(descriptor):
+
+ plan = DataQuantaBuilder(descriptor)
+ sink_wordcount = plan.source("../test/lineitem.txt") \
+ .filter(lambda elem: len(str(elem).split("|")[0]) < 4) \
+ .flatmap(lambda elem: str(elem).split("|")) \
+ .sink("../test/output.txt", end="")
+
+ return sink_wordcount
+
+
+if __name__ == '__main__':
+
+ # Plan will contain general info about the Wayang Plan created here
+ descriptor = Descriptor()
+ descriptor.add_plugin(Descriptor.Plugin.spark)
+ descriptor.add_plugin(Descriptor.Plugin.java)
+
+ plan_dataquanta_sink = plan_wordcount(descriptor)
+ # plan_dataquanta_sink.execute()
+ # plan_dataquanta_sink.console()
+
+ plan_dataquanta_sink.to_wayang_plan()
diff --git a/pywayang/orchestrator/operator.py b/pywayang/orchestrator/operator.py
new file mode 100644
index 00000000..ecaa6bdd
--- /dev/null
+++ b/pywayang/orchestrator/operator.py
@@ -0,0 +1,121 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import pickle
+import cloudpickle
+from config.config_reader import get_source_types
+from config.config_reader import get_sink_types
+from config.config_reader import get_boundary_types
+import logging
+
+pickle_protocol = pickle.HIGHEST_PROTOCOL
+
+
+# Describes an Operation over an intermediate result
+# Each operation could be processed by Python or Java platforms
+class Operator:
+
+ def __init__(
+ self, operator_type=None, udf=None, previous=None,
+ iterator=None, python_exec=False
+ ):
+
+ # Operator ID
+ self.id = id(self)
+
+ # Operator Type
+ self.operator_type = operator_type
+
+ # Set Boundaries
+ if self.operator_type in get_boundary_types():
+ self.boundary = True
+ else:
+ self.boundary = False
+
+ # UDF Function
+ self.udf = udf
+
+ # Source types must come with an Iterator
+ self.iterator = iterator
+ if operator_type in get_source_types():
+ if iterator is None:
+ print("Source Operator Type without an Iterator")
+ raise
+ else:
+ self.source = True
+ else:
+ self.source = False
+
+ # Sink Operators
+ if operator_type in get_sink_types():
+ self.sink = True
+ else:
+ self.sink = False
+
+ # TODO Why managing previous and predecessors per separate?
+ self.previous = previous
+
+ self.successor = []
+ self.predecessor = []
+
+ self.parameters = {}
+
+ # Set predecessors and successors from previous
+ if self.previous:
+ for prev in self.previous:
+ if prev is not None:
+ prev.set_successor(self)
+ self.set_predecessor(prev)
+
+ self.python_exec = python_exec
+
+ logging.info("Operator:" + str(self.getID()) + ", type:" + self.operator_type + ", PythonExecutable: " +
+ str(self.python_exec) +
+ ", is boundary: " + str(self.is_boundary()) + ", is source: " +
+ str(self.source) + ", is sink: " + str(self.sink))
+
+ def getID(self):
+ return self.id
+
+ def is_source(self):
+ return self.source
+
+ def is_sink(self):
+ return self.sink
+
+ def is_boundary(self):
+ return self.boundary
+
+ def serialize_udf(self):
+ self.udf = cloudpickle.dumps(self.udf)
+
+ def getIterator(self):
+ if self.is_source():
+ return self.iterator
+ # TODO this should iterate through previous REDESIGN
+ return self.udf(self.previous[0].getIterator())
+
+ def set_parameter(self, key, value):
+ self.parameters[key] = value
+
+ def set_successor(self, suc):
+ if (not self.is_sink()) and self.successor.count(suc) == 0:
+ self.successor.append(suc)
+
+ def set_predecessor(self, suc):
+ if self.predecessor.count(suc) == 0:
+ self.predecessor.append(suc)
diff --git a/pywayang/orchestrator/plan.py b/pywayang/orchestrator/plan.py
new file mode 100644
index 00000000..25610cc7
--- /dev/null
+++ b/pywayang/orchestrator/plan.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+from enum import Enum
+
+class Descriptor:
+
+ def __init__(self):
+ self.sinks = []
+ self.sources = []
+ self.boundary_operators = None
+ logging.basicConfig(filename='../config/execution.log', level=logging.DEBUG)
+ self.plugins = []
+
+ class Plugin(Enum):
+ java = 0
+ spark = 1
+
+ def get_boundary_operators(self):
+ return self.boundary_operators
+
+ def add_source(self, operator):
+ self.sources.append(operator)
+
+ def get_sources(self):
+ return self.sources
+
+ def add_sink(self, operator):
+ self.sinks.append(operator)
+
+ def get_sinks(self):
+ return self.sinks
+
+ def add_plugin(self, plugin):
+ self.plugins.append(plugin)
+
+ def get_plugins(self):
+ return self.plugins
diff --git a/pywayang/protobuf/__init__.py b/pywayang/protobuf/__init__.py
new file mode 100644
index 00000000..15a80ad9
--- /dev/null
+++ b/pywayang/protobuf/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import protobuf.pywayangplan_pb2
diff --git a/pywayang/protobuf/old_planwriter.py b/pywayang/protobuf/old_planwriter.py
new file mode 100644
index 00000000..e8700f01
--- /dev/null
+++ b/pywayang/protobuf/old_planwriter.py
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import protobuf.pywayangplan_pb2 as pwb
+import os
+import pickle
+import struct
+import base64
+
+
+class OldMessageWriter:
+
+ def __init__(self, descriptor):
+
+ sink = descriptor.get_sinks()[0]
+ source = descriptor.get_sources()[0]
+
+ op = source
+ visited = []
+ middle_operators = []
+ while op.sink is not True and len(op.successor) > 0:
+ pre = op.successor[0]
+ if pre not in visited and pre.sink is not True:
+ pre.serialize_udf()
+ middle_operators.append(pre)
+ """base64_bytes = base64.b64encode(pre.udf)
+ pre.udf = base64_bytes"""
+
+ """pre.serialize_udf()
+ print("pre.udf")
+ print(pre.udf)
+ func = pickle.loads(pre.udf)
+ print("func")
+ print(func)
+ middle_operators.append(pre)
+
+ # Testing
+ msg = pre.udf
+ base64_bytes = base64.b64encode(msg)
+ base64_message = base64.b64decode(base64_bytes)
+ func2 = pickle.loads(base64_message)
+ print(base64_message)
+ func3 = pickle.loads(b'\x80\x04\x955\x04\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\r_builtin_type\x94\x93\x94\x8c\nLambdaType\x94\x85\x94R\x94(h\x02\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x01K\x03K\x13C\nt\x00\x88\x00|\x00\x83\x02S\x00\x94N\x85\x94\x8c\x06filter\x94\x85\x94\x8c\x08iterator\x94\x85\x94\x8cS/Users/rodrigopardomeza/wayang/incubator-wayang/pywayang/orchestrator/dataquanta.py\x94\x8c\x04func\x94K%C\x02\x00\x01\x94\x8c\x03udf\x94\x85\ [...]
+ for i in func3([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]):
+ print(i)"""
+ op = pre
+
+ """for mid in middle_operators:
+ print(mid.operator_type)
+ print(pickle.loads(mid.udf))
+ func = pickle.loads(mid.udf)
+ for i in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]):
+ print(i)"""
+
+ finalpath = "/Users/rodrigopardomeza/wayang/incubator-wayang/protobuf/filter_message"
+ planconf = pwb.WayangPlan()
+ try:
+ f = open(finalpath, "rb")
+ planconf.ParseFromString(f.read())
+ f.close()
+ except IOError:
+ print(finalpath + ": Could not open file. Creating a new one.")
+
+ so = pwb.Source()
+ so.id = source.id
+ so.type = source.operator_type
+ so.path = os.path.abspath(source.udf)
+
+ operators = []
+ for mid in middle_operators:
+ op = pwb.Operator()
+ op.id = mid.id
+ op.type = mid.operator_type
+ op.udf = mid.udf
+ operators.append(op)
+
+ si = pwb.Sink()
+ si.id = sink.id
+ si.type = sink.operator_type
+ si.path = os.path.abspath(sink.udf)
+
+ plan = pwb.Plan()
+ plan.source.CopyFrom(so)
+ plan.sink.CopyFrom(si)
+ plan.operators.extend(operators)
+ plan.input = pwb.Plan.string
+ plan.output = pwb.Plan.string
+
+ ctx = pwb.Context()
+ ctx.platforms.extend([pwb.Context.Platform.java])
+
+ planconf.plan.CopyFrom(plan)
+ planconf.context.CopyFrom(ctx)
+
+ f = open(finalpath, "wb")
+ f.write(planconf.SerializeToString())
+ f.close()
+ pass
+
+class func_inteface:
+
+ def __init__(self, node, nested_udf):
+ self.node = node
+ self.nested_udf = nested_udf
+
+ def func(self, iterable):
+ return self.node.operator.udf(self.nested_udf(iterable))
+
+
+class MessageWriter:
+ sources = []
+ operators = []
+ sinks = []
+
+ def add_source(self, operator_id, operator_type, path, predecessors, successors):
+ source = pwb.OperatorProto()
+ source.id = operator_id
+ source.type = operator_type
+ source.path = os.path.abspath(path)
+ source.udf = None
+ source.predecessors = predecessors
+ source.successors = successors
+ self.sources.append(source)
+
+ def add_sink(self, operator_id, operator_type, path, predecessors, successors):
+ sink = pwb.OperatorProto()
+ sink.id = operator_id
+ sink.type = operator_type
+ sink.path = os.path.abspath(path)
+ sink.udf = None
+ sink.predecessors = predecessors
+ sink.successors = successors
+ self.sinks.append(sink)
+
+ def add_operator(self, operator_id, operator_type, udf, path, predecessors, successors):
+ op = pwb.OperatorProto()
+ op.id = operator_id
+ op.type = operator_type
+ op.udf = udf
+ op.path = path
+ op.predecessors = predecessors
+ op.successors = successors
+ self.operators.append(op)
+
+ def process_pipeline(self, stage):
+
+ nested_udf = None
+ nested_id = ""
+ for node in reversed(stage):
+ print("########")
+ print(node.operator_type, "executable:", node.python_exec, "id:", node.id)
+
+ if nested_udf is not None:
+ print("review pre")
+ print( nested_udf)
+ print( nested_udf(["Wilo","lifo","Wifo"]))
+
+ if not node.python_exec:
+ if nested_udf is not None:
+ """self.add_operator(nested_id, "map_partition", nested_udf, None
+ # obtain predecessors and successors
+ , successors=[node.id]
+ )"""
+ print("node", nested_id)
+ print(nested_udf)
+ print("he muerto")
+ print( nested_udf(["Wilo","lifo","Wifo"]))
+
+ t = nested_udf(["Wilo","lifo","Wifo"])
+ print("jajajarvard")
+ print(t)
+ for i in t:
+ print(i)
+ nested_udf = None
+ nested_id = ""
+
+ """if node.operator.source:
+ self.add_source(
+ node.id, node.operator_type, node.operator.udf,
+ node.predecessors, node.operator.successor)
+ else:
+ self.add_operator(
+ node.id, node.operator_type, None, node.operator.udf,
+ node.predecessors, node.operator.successor)"""
+ else:
+ print("adding", node.id)
+ if nested_udf is None:
+ nested_udf = node.operator.udf
+ nested_id = node.id
+ else:
+ print("paseeeeeee viste")
+ tmp = nested_udf
+
+ print( tmp(["Wilo","lifo","Wifo"]))
+
+ #def func(_, iterable):
+ # return nested_udf(node.operator.udf(iterable))
+ nested_udf = self.concatenate(nested_udf, node.operator.udf)
+ print( nested_udf(["Wilo","lifo","Wifo"]))
+ print(nested_udf)
+
+ # nested_udf = func_inteface(node, nested_udf)
+ nested_id = str(node.id) + "," + str(nested_id)
+
+ if nested_udf is not None:
+ print("review")
+ print( nested_udf)
+ print( nested_udf(["Wilo","lifo","Wifo"]))
+
+ if nested_udf is not None:
+ """self.add_operator(nested_id, "map_partition", nested_udf, None
+ # obtain predecessors and successors
+ , successors=[node.id]
+ )"""
+ print("node", nested_id)
+ print(nested_udf)
+ t = nested_udf(["Wilo","lifo","Wifo"])
+ print("jajajarvard2")
+ print(t)
+ for i in t:
+ print(i)
+ nested_udf = None
+ nested_id = ""
+
+ def __init__(self):
+ print("lala")
+
+ def concatenate(self, function_a, function_b):
+ def executable(iterable):
+ return function_a(function_b(iterable))
+ return executable
+
+ def old(self, descriptor):
+
+ sink = descriptor.get_sinks()[0]
+ source = descriptor.get_sources()[0]
+
+ op = source
+ visited = []
+ middle_operators = []
+ while op.sink is not True and len(op.successor) > 0:
+ pre = op.successor[0]
+ if pre not in visited and pre.sink is not True:
+ pre.serialize_udf()
+ middle_operators.append(pre)
+ op = pre
+
+ finalpath = "/Users/rodrigopardomeza/wayang/incubator-wayang/protobuf/filter_message"
+ planconf = pwb.WayangPlan()
+ try:
+ f = open(finalpath, "rb")
+ planconf.ParseFromString(f.read())
+ f.close()
+ except IOError:
+ print(finalpath + ": Could not open file. Creating a new one.")
+
+ so = pwb.Source()
+ so.id = source.id
+ so.type = source.operator_type
+ so.path = os.path.abspath(source.udf)
+
+ operators = []
+ for mid in middle_operators:
+ op = pwb.Operator()
+ op.id = mid.id
+ op.type = mid.operator_type
+ op.udf = mid.udf
+ operators.append(op)
+
+ si = pwb.Sink()
+ si.id = sink.id
+ si.type = sink.operator_type
+ si.path = os.path.abspath(sink.udf)
+
+ plan = pwb.Plan()
+ plan.source.CopyFrom(so)
+ plan.sink.CopyFrom(si)
+ plan.operators.extend(operators)
+ plan.input = pwb.Plan.string
+ plan.output = pwb.Plan.string
+
+ ctx = pwb.Context()
+ ctx.platforms.extend([pwb.Context.Platform.java])
+
+ planconf.plan.CopyFrom(plan)
+ planconf.context.CopyFrom(ctx)
+
+ f = open(finalpath, "wb")
+ f.write(planconf.SerializeToString())
+ f.close()
+ pass
+
+ def pipeline_singleton(self):
+ print("lala")
diff --git a/pywayang/protobuf/planwriter.py b/pywayang/protobuf/planwriter.py
new file mode 100644
index 00000000..b63dcbbc
--- /dev/null
+++ b/pywayang/protobuf/planwriter.py
@@ -0,0 +1,277 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import protobuf.pywayangplan_pb2 as pwb
+import os
+import cloudpickle
+import logging
+import pathlib
+import requests
+import base64
+
+
+# Writes Wayang Plan from several stages
+class MessageWriter:
+ sources = []
+ operators = []
+ sinks = []
+ operator_references = {}
+ boundaries = {}
+
+ # Creates and appends Source type of operator
+ def add_source(self, operator_id, operator_type, path):
+ source = pwb.OperatorProto()
+ source.id = str(operator_id)
+ source.type = operator_type
+ source.path = os.path.abspath(path)
+ source.udf = chr(0).encode('utf-8')
+ # source.parameters = {}
+ self.sources.append(source)
+ return source
+
+ # Creates and appends Sink type of operator
+ def add_sink(self, operator_id, operator_type, path):
+ sink = pwb.OperatorProto()
+ sink.id = str(operator_id)
+ sink.type = operator_type
+ sink.path = os.path.abspath(path)
+ sink.udf = chr(0).encode('utf-8')
+ # sink.parameters = {}
+ self.sinks.append(sink)
+ return sink
+
+ # Creates and appends a Python operator
+ # Python OP don't require parameters, UDF has the function ready to be executed directly
+ def add_operator(self, operator_id, operator_type, udf):
+ op = pwb.OperatorProto()
+ op.id = str(operator_id)
+ op.type = operator_type
+ op.udf = cloudpickle.dumps(udf)
+ op.path = str(None)
+ # op.parameters = {}
+ self.operators.append(op)
+ return op
+
+ # Creates and appends a Java operator
+ def add_java_operator(self, operator_id, operator_type, udf, parameters):
+ op = pwb.OperatorProto()
+ op.id = str(operator_id)
+ op.type = operator_type
+ op.udf = cloudpickle.dumps(udf)
+ op.path = str(None)
+ #op.parameters = parameters
+ for param in parameters:
+ print(param, parameters[param])
+ op.parameters[param] = str(parameters[param])
+ # op.parameters[]
+ #m.mapfield[5] = 10
+ self.operators.append(op)
+ return op
+
+ # Receive a chain of operators, separate them in Wayang Operators
+ # Compacts several Python executable operators in one Map Partition Wayang Operator
+ def process_pipeline(self, stage):
+
+ nested_udf = None
+ nested_id = ""
+ nested_predecessors = None
+ nested_successors = None
+ for node in reversed(stage):
+ logging.debug(node.operator_type + " executable: " + str(node.python_exec) + " id: " + str(node.id))
+
+ if not node.python_exec:
+ if nested_udf is not None:
+
+ # Predecessors depends on last operator
+ # Successors depends on first operator
+ op = self.add_operator(nested_id, "map_partition", nested_udf)
+
+ ids = str(nested_id).split(",")
+ for id in ids:
+ self.operator_references[str(id)] = op
+
+ self.boundaries[str(nested_id)] = {}
+ self.boundaries[str(nested_id)]["end"] = nested_successors
+ self.boundaries[str(nested_id)]["start"] = nested_predecessors
+
+ nested_udf = None
+ nested_id = ""
+ nested_predecessors = None
+ nested_successors = None
+
+ if node.operator.source:
+ op = self.add_source(node.id, node.operator_type, node.operator.udf)
+ self.operator_references[str(node.id)] = op
+ self.boundaries[str(node.id)] = {}
+ self.boundaries[str(node.id)]["end"] = node.successors.keys()
+
+ elif node.operator.sink:
+ op = self.add_sink(node.id, node.operator_type, node.operator.udf)
+ self.operator_references[str(node.id)] = op
+ self.boundaries[str(node.id)] = {}
+ self.boundaries[str(node.id)]["start"] = node.predecessors.keys()
+
+ # Regular operator to be processed in Java
+ # Notice that those could include more parameters for Java
+ else:
+ op = self.add_java_operator(node.id, node.operator_type, node.operator.udf, node.operator.parameters)
+ self.operator_references[str(node.id)] = op
+ self.boundaries[str(node.id)] = {}
+ self.boundaries[str(node.id)]["start"] = node.predecessors.keys()
+ self.boundaries[str(node.id)]["end"] = node.successors.keys()
+
+ else:
+
+ if nested_udf is None:
+ nested_udf = node.operator.udf
+ nested_id = node.id
+ # It is the last operator to execute in the map partition
+ nested_successors = node.successors.keys()
+
+ else:
+ nested_udf = self.concatenate(nested_udf, node.operator.udf)
+ nested_id = str(node.id) + "," + str(nested_id)
+
+ # Every iteration assign the first known predecessors
+ nested_predecessors = node.predecessors.keys()
+
+ # Just in case in the future some pipelines start with Python operators
+ if nested_udf is not None:
+ self.add_operator(nested_id, "map_partition", nested_udf)
+
+ ids = nested_id.split(",")
+ for id in ids:
+ self.operator_references[id] = op
+
+ self.boundaries[nested_id] = {}
+ self.boundaries[nested_id]["end"] = nested_successors
+ self.boundaries[nested_id]["start"] = nested_predecessors
+
+ def __init__(self):
+ pass
+
+ # Takes 2 Functions and compact them in only one function
+ @staticmethod
+ def concatenate(function_a, function_b):
+ def executable(iterable):
+ return function_a(function_b(iterable))
+
+ return executable
+
+ # Set dependencies over final Wayang Operators
+ def set_dependencies(self):
+
+ for source in self.sources:
+
+ if 'end' in self.boundaries[source.id]:
+ op_successors = []
+ for op_id in self.boundaries[source.id]['end']:
+ op_successors.append(str(self.operator_references[str(op_id)].id))
+ source.successors.extend(op_successors)
+
+ for sink in self.sinks:
+ if 'start' in self.boundaries[sink.id]:
+ op_predecessors = []
+ for op_id in self.boundaries[sink.id]['start']:
+ op_predecessors.append(str(self.operator_references[str(op_id)].id))
+ sink.predecessors.extend(op_predecessors)
+
+ for op in self.operators:
+ if 'start' in self.boundaries[op.id]:
+ op_predecessors = []
+ for op_id in self.boundaries[op.id]['start']:
+ op_predecessors.append(str(self.operator_references[str(op_id)].id))
+ op.predecessors.extend(op_predecessors)
+
+ if 'end' in self.boundaries[op.id]:
+ op_successors = []
+ for op_id in self.boundaries[op.id]['end']:
+ op_successors.append(str(self.operator_references[str(op_id)].id))
+ op.successors.extend(op_successors)
+
+ # Writes the message to a local directory
+ def write_message(self, descriptor):
+
+ finalpath = "../../protobuf/wayang_message"
+ plan_configuration = pwb.WayangPlanProto()
+
+ try:
+ f = open(finalpath, "rb")
+ plan_configuration.ParseFromString(f.read())
+ f.close()
+ except IOError:
+ logging.warn("File " + finalpath + " did not exist. System generated a new file")
+
+ plan = pwb.PlanProto()
+ plan.sources.extend(self.sources)
+ plan.operators.extend(self.operators)
+ plan.sinks.extend(self.sinks)
+ plan.input = pwb.PlanProto.string
+ plan.output = pwb.PlanProto.string
+
+ ctx = pwb.ContextProto()
+ # ctx.platforms.extend([pwb.ContextProto.PlatformProto.java])
+ for plug in descriptor.plugins:
+ ctx.platforms.append(plug.value)
+ # ctx.platforms.extend(descriptor.get_plugins())
+
+ plan_configuration.plan.CopyFrom(plan)
+ plan_configuration.context.CopyFrom(ctx)
+
+ f = open(finalpath, "wb")
+ f.write(plan_configuration.SerializeToString())
+ f.close()
+ pass
+
+ # Send message as bytes to the Wayang Rest API
+ def send_message(self, descriptor):
+
+ plan_configuration = pwb.WayangPlanProto()
+
+ plan = pwb.PlanProto()
+ plan.sources.extend(self.sources)
+ plan.operators.extend(self.operators)
+ plan.sinks.extend(self.sinks)
+ plan.input = pwb.PlanProto.string
+ plan.output = pwb.PlanProto.string
+
+ ctx = pwb.ContextProto()
+ # ctx.platforms.extend([pwb.ContextProto.PlatformProto.java])
+ for plug in descriptor.plugins:
+ ctx.platforms.append(plug.value)
+ # ctx.platforms.extend(descriptor.get_plugins())
+
+ plan_configuration.plan.CopyFrom(plan)
+ plan_configuration.context.CopyFrom(ctx)
+
+ print("plan!")
+ print(plan_configuration)
+
+ msg_bytes = plan_configuration.SerializeToString()
+ msg_64 = base64.b64encode(msg_bytes)
+
+ logging.debug(msg_bytes)
+ # response = requests.get("http://localhost:8080/plan/create/fromfile")
+ data = {
+ 'message': msg_64
+ }
+ response = requests.post("http://localhost:8080/plan/create", data)
+ logging.debug(response)
+ # f = open(finalpath, "wb")
+ # f.write(plan_configuration.SerializeToString())
+ # f.close()
+ pass
diff --git a/pywayang/test/demo_testing.py b/pywayang/test/demo_testing.py
new file mode 100644
index 00000000..c096a897
--- /dev/null
+++ b/pywayang/test/demo_testing.py
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+
+class MyTestCase(unittest.TestCase):
+
+ def test_something(self):
+ self.assertEqual(True, False)
+
+ def test_upper(self):
+ self.assertEqual('foo'.upper(), 'FOO')
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pywayang/test/full_java_test.py b/pywayang/test/full_java_test.py
new file mode 100644
index 00000000..d17aedd0
--- /dev/null
+++ b/pywayang/test/full_java_test.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+
+
+class MyTestCase(unittest.TestCase):
+
+ def test_most_basic(self):
+ descriptor = Descriptor()
+ descriptor.add_plugin(Descriptor.Plugin.java)
+
+ plan = DataQuantaBuilder(descriptor)
+ sink_dataquanta = \
+ plan.source("../test/lines.txt") \
+ .sink("../test/output.txt", end="")
+
+ sink_dataquanta.to_wayang_plan()
+
+
+ def test_single_juncture(self):
+ descriptor = Descriptor()
+ descriptor.add_plugin(Descriptor.Plugin.java)
+
+ plan = DataQuantaBuilder(descriptor)
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt")
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .sink("../test/output.txt", end="")
+
+ sink_dataquanta.to_wayang_plan()
+
+
+ def test_multiple_juncture(self):
+ descriptor = Descriptor()
+ descriptor.add_plugin(Descriptor.Plugin.java)
+
+ plan = DataQuantaBuilder(descriptor)
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt") \
+ .filter(lambda elem: str(elem).startswith("I"))
+ dq_source_c = plan.source("../test/lastlines.txt") \
+ .filter(lambda elem: str(elem).startswith("W"))
+
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .union(dq_source_c) \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+
+ sink_dataquanta.to_wayang_plan()
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pywayang/test/full_spark_test.py b/pywayang/test/full_spark_test.py
new file mode 100644
index 00000000..9276ccc6
--- /dev/null
+++ b/pywayang/test/full_spark_test.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+from orchestrator.plan import Descriptor
+from orchestrator.dataquanta import DataQuantaBuilder
+
+
+def test_most_basic(self):
+ descriptor = Descriptor()
+ descriptor.add_plugin(Descriptor.Plugin.spark)
+
+ plan = DataQuantaBuilder(descriptor)
+ sink_dataquanta = \
+ plan.source("../test/lines.txt") \
+ .sink("../test/output.txt", end="")
+
+ sink_dataquanta.to_wayang_plan()
+
+
+def test_single_juncture(self):
+ descriptor = Descriptor()
+ descriptor.add_plugin(Descriptor.Plugin.spark)
+
+ plan = DataQuantaBuilder(descriptor)
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt")
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .sink("../test/output.txt", end="")
+
+ sink_dataquanta.to_wayang_plan()
+
+
+def test_multiple_juncture(self):
+ descriptor = Descriptor()
+ descriptor.add_plugin(Descriptor.Plugin.spark)
+
+ plan = DataQuantaBuilder(descriptor)
+ dq_source_a = plan.source("../test/lines.txt")
+ dq_source_b = plan.source("../test/morelines.txt") \
+ .filter(lambda elem: str(elem).startswith("I"))
+ dq_source_c = plan.source("../test/lastlines.txt") \
+ .filter(lambda elem: str(elem).startswith("W"))
+
+ sink_dataquanta = dq_source_a.union(dq_source_b) \
+ .union(dq_source_c) \
+ .sort(lambda elem: elem.lower()) \
+ .sink("../test/output.txt", end="")
+
+ sink_dataquanta.to_wayang_plan()
+
+
+if __name__ == '__main__':
+ unittest.main()