You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by rp...@apache.org on 2022/06/19 22:23:05 UTC
[incubator-wayang] branch main updated: Wayang-211 add JVM-platform inside of Python-API (#238)
This is an automated email from the ASF dual-hosted git repository.
rpardomeza pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
The following commit(s) were added to refs/heads/main by this push:
new d859a97d Wayang-211 add JVM-platform inside of Python-API (#238)
d859a97d is described below
commit d859a97d43a8c3c3c964150eaff8f3833e41ea75
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Mon Jun 20 00:23:00 2022 +0200
Wayang-211 add JVM-platform inside of Python-API (#238)
* [LICENSE] add check-license.sh for the project, and specially for python
Signed-off-by: bertty <be...@apache.org>
* [POM] update pom.xml
Signed-off-by: bertty <be...@apache.org>
* [WAYANG-211] python java implementation
Signed-off-by: bertty <be...@apache.org>
---
bin/check-license.sh | 61 ++++
pom.xml | 3 +-
python/.rat-excludes | 1 +
python/bin/check-license.sh | 28 --
python/src/pywy/platforms/jvm/worker.py | 374 +++++++++++++++++++++
wayang-api/wayang-api-python/pom.xml | 8 +
.../wayang/api/python/executor/ProcessFeeder.java | 184 ++++++++++
.../api/python/executor/ProcessReceiver.java | 53 +++
.../api/python/executor/PythonProcessCaller.java | 130 +++++++
.../api/python/executor/PythonWorkerManager.java | 70 ++++
.../wayang/api/python/executor/ReaderIterator.java | 98 ++++++
.../wayang/api/python/function/PythonCode.java | 32 ++
.../api/python/function/PythonFunctionWrapper.java | 46 +++
.../wayang/api/python/function/PythonUDF.java | 25 ++
.../wayang-api-python-defaults.properties | 3 +-
.../server/spring/decoder/WayangPlanBuilder.java | 239 +++++++++++++
.../server/spring/general/WayangController.java | 304 +++++++++++++++++
17 files changed, 1629 insertions(+), 30 deletions(-)
diff --git a/bin/check-license.sh b/bin/check-license.sh
new file mode 100755
index 00000000..f38f89d5
--- /dev/null
+++ b/bin/check-license.sh
@@ -0,0 +1,61 @@
+#!/bin/bash
+
+################################################################################
+##
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+################################################################################
+
+# set the variables
+BASE=$(cd "$(dirname "$0")/.." | pwd)
+VERSION_RAT="0.14"
+RAT_HOME=${BASE}/.rat/apache-rat-${VERSION_RAT}
+RAT_JAR=${RAT_HOME}/apache-rat-${VERSION_RAT}.jar
+RAT_EXCLUSION=$1
+
+# Validate if the folder is created
+if [[ ! -f "${BASE}/.rat" ]]; then
+ mkdir -p ${BASE}/.rat
+fi
+
+# download the elements required
+if [[ ! -f "$RAT_JAR" ]]; then
+ cd ${BASE}/.rat
+ wget https://dlcdn.apache.org/creadur/apache-rat-${VERSION_RAT}/apache-rat-${VERSION_RAT}-bin.tar.gz
+ tar -xvf apache-rat-${VERSION_RAT}-bin.tar.gz
+fi
+
+cd ${BASE}
+# Set the parameters for rat
+rat_opts=("-jar" "${RAT_JAR}" "-d" "${BASE}")
+
+#add the default exclusion for the project
+rat_opts+=("-e" "target/*")
+rat_opts+=("-e" "^.*.input")
+rat_opts+=("-e" "^.*.md")
+rat_opts+=("-e" "^.*.iml")
+rat_opts+=("-e" "^.*_pb2.py") # code generated by protocol buffer
+rat_opts+=("-e" "Gemfile.lock")
+rat_opts+=("-e" ".gitignore")
+rat_opts+=("-e" ".gitmodules")
+rat_opts+=("-e" ".rat-excludes")
+if [[ ! -z "$RAT_EXCLUSION" ]]; then
+ rat_opts+=("-E")
+ rat_opts+=($RAT_EXCLUSION)
+fi
+
+# execute the validation using rat
+java "${rat_opts[@]}" 2>&1 | grep "== File:"
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index a4dedf81..36ab3ede 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1109,7 +1109,8 @@
<configuration>
<detectLinks>false</detectLinks>
<detectJavaApiLink>true</detectJavaApiLink>
- <additionalparam>-Xdoclint:none</additionalparam>
+ <doclint>none</doclint>
+<!-- <additionalparam>-Xdoclint:none</additionalparam>-->
</configuration>
</plugin>
diff --git a/python/.rat-excludes b/python/.rat-excludes
index 8d1082aa..51b18ba7 100644
--- a/python/.rat-excludes
+++ b/python/.rat-excludes
@@ -1,3 +1,4 @@
+.gitignore
.rat-excludes
venv/*
10e*.input
diff --git a/python/bin/check-license.sh b/python/bin/check-license.sh
deleted file mode 100755
index 7af700f8..00000000
--- a/python/bin/check-license.sh
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/bin/bash
-
-################################################################################
-##
-## Licensed to the Apache Software Foundation (ASF) under one or more
-## contributor license agreements. See the NOTICE file distributed with
-## this work for additional information regarding copyright ownership.
-## The ASF licenses this file to You under the Apache License, Version 2.0
-## (the "License"); you may not use this file except in compliance with
-## the License. You may obtain a copy of the License at
-##
-## http://www.apache.org/licenses/LICENSE-2.0
-##
-## Unless required by applicable law or agreed to in writing, software
-## distributed under the License is distributed on an "AS IS" BASIS,
-## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-## See the License for the specific language governing permissions and
-## limitations under the License.
-##
-################################################################################
-
-# TODO download Apache rat if is needed
-
-BASE=$(cd "$(dirname "$0")/.." | pwd)
-cd ${BASE}
-RAT_HOME=/opt/apache-rat-0.13
-
-java -jar ${RAT_HOME}/apache-rat-0.13.jar -E .rat-excludes -d . | grep "== File:"
\ No newline at end of file
diff --git a/python/src/pywy/platforms/jvm/worker.py b/python/src/pywy/platforms/jvm/worker.py
new file mode 100644
index 00000000..9e1fe3bb
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/worker.py
@@ -0,0 +1,374 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import socket
+import struct
+import pickle
+from itertools import chain
+
+import cloudpickle
+import base64
+import re
+import sys
+import time
+
+class SpecialLengths(object):
+ END_OF_DATA_SECTION = -1
+ PYTHON_EXCEPTION_THROWN = -2
+ TIMING_DATA = -3
+ END_OF_STREAM = -4
+ NULL = -5
+ START_ARROW_STREAM = -6
+
+
+def read_int(stream):
+ length = stream.read(4)
+ if not length:
+ raise EOFError
+ res = struct.unpack("!i", length)[0]
+ return res
+
+
+class UTF8Deserializer:
+ """
+ Deserializes streams written by String.getBytes.
+ """
+
+ def __init__(self, use_unicode=True):
+ self.use_unicode = use_unicode
+
+ def loads(self, stream):
+ length = read_int(stream)
+ if length == SpecialLengths.END_OF_DATA_SECTION:
+ raise EOFError
+ elif length == SpecialLengths.NULL:
+ return None
+ s = stream.read(length)
+ return s.decode("utf-8") if self.use_unicode else s
+
+ def load_stream(self, stream):
+ try:
+ while True:
+ yield self.loads(stream)
+ except struct.error:
+ return
+ except EOFError:
+ return
+
+ def __repr__(self):
+ return "UTF8Deserializer(%s)" % self.use_unicode
+
+
+def write_int(p, outfile):
+ outfile.write(struct.pack("!i", p))
+
+
+def write_with_length(obj, stream):
+ serialized = obj.encode('utf-8')
+ if serialized is None:
+ raise ValueError("serialized value should not be None")
+ if len(serialized) > (1 << 31):
+ raise ValueError("can not serialize object larger than 2G")
+ write_int(len(serialized), stream)
+ stream.write(serialized)
+
+
+class Serializer:
+ def dump_stream(self, iterator, stream):
+ """
+ Serialize an iterator of objects to the output stream.
+ """
+ raise NotImplementedError
+
+ def load_stream(self, stream):
+ """
+ Return an iterator of deserialized objects from the input stream.
+ """
+ raise NotImplementedError
+
+ def dumps(self, obj):
+ """
+ Serialize an object into a byte array.
+ When batching is used, this will be called with an array of objects.
+ """
+ raise NotImplementedError
+
+ def _load_stream_without_unbatching(self, stream):
+ """
+ Return an iterator of deserialized batches (iterable) of objects from the input stream.
+ If the serializer does not operate on batches the default implementation returns an
+ iterator of single element lists.
+ """
+ return map(lambda x: [x], self.load_stream(stream))
+
+ # Note: our notion of "equality" is that output generated by
+ # equal serializers can be deserialized using the same serializer.
+
+ # This default implementation handles the simple cases;
+ # subclasses should override __eq__ as appropriate.
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __repr__(self):
+ return "%s()" % self.__class__.__name__
+
+ def __hash__(self):
+ return hash(str(self))
+
+class FramedSerializer(Serializer):
+
+ """
+ Serializer that writes objects as a stream of (length, data) pairs,
+ where `length` is a 32-bit integer and data is `length` bytes.
+ """
+
+ def dump_stream(self, iterator, stream):
+ for obj in iterator:
+ self._write_with_length(obj, stream)
+
+ def load_stream(self, stream):
+ while True:
+ try:
+ yield self._read_with_length(stream)
+ except EOFError:
+ return
+
+ def _write_with_length(self, obj, stream):
+ serialized = self.dumps(obj)
+ if serialized is None:
+ raise ValueError("serialized value should not be None")
+ if len(serialized) > (1 << 31):
+ raise ValueError("can not serialize object larger than 2G")
+ write_int(len(serialized), stream)
+ stream.write(serialized)
+
+ def _read_with_length(self, stream):
+ length = read_int(stream)
+ if length == SpecialLengths.END_OF_DATA_SECTION:
+ raise EOFError
+ elif length == SpecialLengths.NULL:
+ return None
+ obj = stream.read(length)
+ if len(obj) < length:
+ raise EOFError
+ return self.loads(obj)
+
+ def dumps(self, obj):
+ """
+ Serialize an object into a byte array.
+ When batching is used, this will be called with an array of objects.
+ """
+ raise NotImplementedError
+
+ def loads(self, obj):
+ """
+ Deserialize an object from a byte array.
+ """
+ raise NotImplementedError
+
+
+class BatchedSerializer(Serializer):
+
+ """
+ Serializes a stream of objects in batches by calling its wrapped
+ Serializer with streams of objects.
+ """
+
+ UNLIMITED_BATCH_SIZE = -1
+ UNKNOWN_BATCH_SIZE = 0
+
+ def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE):
+ self.serializer = serializer
+ self.batchSize = batchSize
+
+ def _batched(self, iterator):
+ if self.batchSize == self.UNLIMITED_BATCH_SIZE:
+ print("hahahhaha")
+ yield list(iterator)
+ elif hasattr(iterator, "__len__") and hasattr(iterator, "__getslice__"):
+ n = len(iterator)
+ for i in range(0, n, self.batchSize):
+ toc = time.perf_counter()
+ print(f"batched toc1={toc:0.4f}")
+ yield iterator[i : i + self.batchSize]
+ else:
+ items = []
+ count = 0
+ for item in iterator:
+ items.append(item)
+ count += 1
+ if count == self.batchSize:
+ yield items
+ items = []
+ count = 0
+ if items:
+ yield items
+
+ def dump_stream(self, iterator, stream):
+ self.serializer.dump_stream(self._batched(iterator), stream)
+
+ def load_stream(self, stream):
+ return chain.from_iterable(self._load_stream_without_unbatching(stream))
+
+ def _load_stream_without_unbatching(self, stream):
+ return self.serializer.load_stream(stream)
+
+ def __repr__(self):
+ return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize)
+
+
+class PickleSerializer(FramedSerializer):
+
+ """
+ Serializes objects using Python's pickle serializer:
+
+ http://docs.python.org/2/library/pickle.html
+
+ This serializer supports nearly any Python object, but may
+ not be as fast as more specialized serializers.
+ """
+
+ def dumps(self, obj):
+ return pickle.dumps(obj, pickle_protocol)
+
+ def loads(self, obj, encoding="bytes"):
+ return pickle.loads(obj, encoding=encoding)
+
+pickle_protocol = pickle.HIGHEST_PROTOCOL
+class CloudPickleSerializer(FramedSerializer):
+ def dumps(self, obj):
+ try:
+ return cloudpickle.dumps(obj, pickle_protocol)
+ except pickle.PickleError:
+ raise
+ except Exception as e:
+ emsg = str(e)
+ if "'i' format requires" in emsg:
+ msg = "Object too large to serialize: %s" % emsg
+ else:
+ msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
+# print_exec(sys.stderr)
+ raise pickle.PicklingError(msg)
+
+ def loads(self, obj, encoding="bytes"):
+ return cloudpickle.loads(obj, encoding=encoding)
+
+#if sys.version_info < (3, 8):
+CPickleSerializer = PickleSerializer
+#else:
+# CPickleSerializer = CloudPickleSerializer
+
+def dump_stream(iterator, stream):
+
+ for obj in iterator:
+ if type(obj) is str:
+ print("here?2")
+ write_with_length(obj, stream)
+ ## elif type(obj) is list:
+ ## write_with_length(obj, stream)
+ print("Termine")
+ write_int(SpecialLengths.END_OF_DATA_SECTION, stream)
+ print("Escribi Fin")
+
+
+def process(infile, outfile):
+ """udf64 = os.environ["UDF"]
+ print("udf64")
+ print(udf64)
+ #serialized_udf = binascii.a2b_base64(udf64)
+ #serialized_udf = base64.b64decode(udf64)
+ serialized_udf = bytearray(udf64, encoding='utf-16')
+ # NOT VALID TO BE UTF8 serialized_udf = bytes(udf64, 'UTF-8')
+ print("serialized_udf")
+ print(serialized_udf)
+ # input to be ast.literal_eval(serialized_udf)
+ func = pickle.loads(serialized_udf, encoding="bytes")
+ print ("func")
+ print (func)
+ print(func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
+ # func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])"""
+
+
+
+ # TODO First we must receive the operator + UDF
+ """udf = lambda elem: elem.lower()
+
+ def func(it):
+ return sorted(it, key=udf)"""
+ udf_length = read_int(infile)
+ print("udf_length")
+ print(udf_length)
+ serialized_udf = infile.read(udf_length)
+ print("serialized_udf")
+ print(serialized_udf)
+ #base64_message = base64.b64decode(serialized_udf + "===")
+ #print("base64_message")
+ #print(base64_message)
+ func = pickle.loads(serialized_udf)
+ #func = ori.lala(serialized_udf)
+ #print (func)
+ #for x in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]): print(x)
+
+
+ """print("example")
+ for x in func("2344|234|efrf|$#|ffrf"): print(x)"""
+ # TODO Here we are temporarily assuming that the user is exclusively sending UTF8. User has several types
+ iterator = UTF8Deserializer().load_stream(infile)
+ # out_iter = sorted(iterator, key=lambda elem: elem.lower())
+ # out_iter = batched(func(iterator))
+ ser = BatchedSerializer(CPickleSerializer(), 100)
+ ser.dump_stream(func(iterator), outfile)
+ #dump_stream(iterator=out_iter, stream=outfile)
+
+
+def local_connect(port):
+ sock = None
+ errors = []
+ # Support for both IPv4 and IPv6.
+ # On most of IPv6-ready systems, IPv6 will take precedence.
+ for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
+ af, socktype, proto, _, sa = res
+ try:
+ sock = socket.socket(af, socktype, proto)
+ # sock.settimeout(int(os.environ.get("SPARK_AUTH_SOCKET_TIMEOUT", 15)))
+ sock.settimeout(30)
+ sock.connect(sa)
+ # sockfile = sock.makefile("rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536)))
+ sockfile = sock.makefile("rwb", 65536)
+ # _do_server_auth(sockfile, auth_secret)
+ return (sockfile, sock)
+ except socket.error as e:
+ emsg = str(e)
+ errors.append("tried to connect to %s, but an error occurred: %s" % (sa, emsg))
+ sock.close()
+ sock = None
+ raise Exception("could not open socket: %s" % errors)
+
+
+if __name__ == '__main__':
+ print("Python version")
+ print (sys.version)
+ java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
+ sock_file, sock = local_connect(java_port)
+ process(sock_file, sock_file)
+ sock_file.flush()
+ exit()
diff --git a/wayang-api/wayang-api-python/pom.xml b/wayang-api/wayang-api-python/pom.xml
index 5a3ad443..9ac2e2c1 100644
--- a/wayang-api/wayang-api-python/pom.xml
+++ b/wayang-api/wayang-api-python/pom.xml
@@ -35,4 +35,12 @@
<properties>
<java-module-name>org.apache.wayang.api</java-module-name>
</properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-core</artifactId>
+ <version>0.6.1-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
</project>
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java
new file mode 100644
index 00000000..ee694322
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import org.apache.wayang.api.python.function.PythonCode;
+import org.apache.wayang.api.python.function.PythonUDF;
+import org.apache.wayang.core.api.exception.WayangException;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.Map;
+
+public class ProcessFeeder<Input, Output> {
+
+ private Socket socket;
+ private PythonUDF<Input, Output> udf;
+ private PythonCode serializedUDF;
+ private Iterable<Input> input;
+
+ //TODO add to a config file
+ int END_OF_DATA_SECTION = -1;
+ int NULL = -5;
+
+ public ProcessFeeder(
+ Socket socket,
+ PythonUDF<Input, Output> udf,
+ PythonCode serializedUDF,
+ Iterable<Input> input){
+
+ if(input == null) throw new WayangException("Nothing to process with Python API");
+
+ this.socket = socket;
+ this.udf = udf;
+ this.serializedUDF = serializedUDF;
+ this.input = input;
+
+ }
+
+ public void send(){
+
+ try{
+ //TODO use config buffer size
+ int BUFFER_SIZE = 8 * 1024;
+
+ BufferedOutputStream stream = new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE);
+ DataOutputStream dataOut = new DataOutputStream(stream);
+
+ writeUDF(serializedUDF, dataOut);
+ this.writeIteratorToStream(input.iterator(), dataOut);
+ dataOut.writeInt(END_OF_DATA_SECTION);
+ dataOut.flush();
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void writeUDF(PythonCode serializedUDF, DataOutputStream dataOut){
+
+ //write(serializedUDF.toByteArray(), dataOut);
+ writeBytes(serializedUDF.toByteArray(), dataOut);
+ System.out.println("UDF written");
+
+ }
+
+ public void writeIteratorToStream(Iterator<Input> iter, DataOutputStream dataOut)
+ throws IOException {
+
+ System.out.println("iterator being send");
+ int buffer = 0;
+ for (Iterator<Input> it = iter; it.hasNext(); ) {
+ Input elem = it.next();
+ //System.out.println(elem.toString());
+ write(elem, dataOut);
+ }
+ }
+
+ /*TODO Missing case PortableDataStream */
+ public void write(Object obj, DataOutputStream dataOut){
+ try {
+
+ if(obj == null)
+ dataOut.writeInt(this.NULL);
+
+ /**
+ * Byte Array cases
+ */
+ else if (obj instanceof Byte[] || obj instanceof byte[]) {
+ System.out.println("Writing Bytes");
+ writeBytes(obj, dataOut);
+ }
+ /**
+ * String case
+ * */
+ else if (obj instanceof String)
+ writeUTF((String) obj, dataOut);
+
+ /**
+ * Key, Value case
+ * */
+ else if (obj instanceof Map.Entry)
+ writeKeyValue((Map.Entry) obj, dataOut);
+
+ else{
+ throw new WayangException("Unexpected element type " + obj.getClass());
+ }
+
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void writeBytes(Object obj, DataOutputStream dataOut){
+
+ try{
+
+ if (obj instanceof Byte[]) {
+
+ int length = ((Byte[]) obj).length;
+
+ byte[] bytes = new byte[length];
+ int j=0;
+
+ // Unboxing Byte values. (Byte[] to byte[])
+ for(Byte b: ((Byte[]) obj))
+ bytes[j++] = b.byteValue();
+
+ dataOut.writeInt(length);
+ dataOut.write(bytes);
+
+ } else if (obj instanceof byte[]) {
+
+ dataOut.writeInt(((byte[]) obj).length);
+ dataOut.write(((byte[]) obj));
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void writeUTF(String str, DataOutputStream dataOut){
+
+ byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
+
+ try {
+
+ dataOut.writeInt(bytes.length);
+ dataOut.write(bytes);
+ } catch (SocketException e){
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void writeKeyValue(Map.Entry obj, DataOutputStream dataOut){
+
+ write(obj.getKey(), dataOut);
+ write(obj.getValue(), dataOut);
+ }
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java
new file mode 100644
index 00000000..7ef5f983
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Iterator;
+/*TODO cannot be always string, include definition for every operator
+* like: map(udf, inputtype, outputtype)*/
+public class ProcessReceiver<Output> {
+
+ private ReaderIterator<Output> iterator;
+
+ public ProcessReceiver(Socket socket){
+ try{
+ //TODO use config buffer size
+ int BUFFER_SIZE = 1 * 1024;
+
+ DataInputStream stream = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE));
+ this.iterator = new ReaderIterator<>(stream);
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public Iterable<Output> getIterable(){
+ return () -> iterator;
+ }
+
+ public void print(){
+ iterator.forEachRemaining(x -> System.out.println(x.toString()));
+
+ }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java
new file mode 100644
index 00000000..7eca5595
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import java.lang.ProcessBuilder.Redirect;
+import org.apache.wayang.api.python.function.PythonCode;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.exception.WayangException;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.wayang.core.util.ReflectionUtils;
+
+public class PythonProcessCaller {
+
+ private Thread process;
+ private Socket socket;
+ private ServerSocket serverSocket;
+ private boolean ready;
+
+ //TODO How to get the config
+ private Configuration configuration;
+
+ public PythonProcessCaller(PythonCode serializedUDF){
+
+ //TODO create documentation to how to the configuration in the code
+ this.configuration = new Configuration();
+ this.configuration.load(ReflectionUtils.loadResource("wayang-api-python-defaults.properties"));
+ this.ready = false;
+ byte[] addr = new byte[4];
+ addr[0] = 127; addr[1] = 0; addr[2] = 0; addr[3] = 1;
+
+ try {
+ /*TODO should NOT be assigned an specific port, set port as 0 (zero)*/
+ this.serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(addr));
+
+ Runnable run1 = () -> {
+ ProcessBuilder pb = new ProcessBuilder(
+ Arrays.asList(
+ "python3",
+ this.configuration.getStringProperty("wayang.api.python.worker")
+ )
+ );
+ Map<String, String> workerEnv = pb.environment();
+ workerEnv.put("PYTHON_WORKER_FACTORY_PORT",
+ String.valueOf(this.serverSocket.getLocalPort()));
+
+ // TODO See what is happening with ENV Python version
+ workerEnv.put(
+ "PYTHONPATH",
+ this.configuration.getStringProperty("wayang.api.python.path")
+ );
+
+ pb.redirectOutput(Redirect.INHERIT);
+ pb.redirectError(Redirect.INHERIT);
+ try {
+ pb.start();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ };
+ this.process = new Thread(run1);
+ this.process.start();
+
+ // Redirect worker stdout and stderr
+ //IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)
+
+ // Wait for it to connect to our socket
+ this.serverSocket.setSoTimeout(100000);
+
+ try {
+ this.socket = this.serverSocket.accept();
+ this.serverSocket.setSoTimeout(0);
+
+ if(socket.isConnected())
+ this.ready = true;
+
+ } catch (Exception e) {
+ System.out.println(e);
+ throw new WayangException("Python worker failed to connect back.", e);
+ }
+ } catch (Exception e){
+ System.out.println(e);
+ throw new WayangException("Python worker failed");
+ }
+ }
+
+ public Thread getProcess() {
+ return process;
+ }
+
+ public Socket getSocket() {
+ return socket;
+ }
+
+ public boolean isReady(){
+ return ready;
+ }
+
+ public void close(){
+ try {
+ this.process.interrupt();
+ this.socket.close();
+ this.serverSocket.close();
+ System.out.println("Everything closed");
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java
new file mode 100644
index 00000000..cef0c148
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import org.apache.wayang.api.python.function.PythonCode;
+import org.apache.wayang.api.python.function.PythonUDF;
+import org.apache.wayang.core.api.exception.WayangException;
+
+public class PythonWorkerManager<Input, Output> {
+
+ private PythonUDF<Input, Output> udf;
+ private PythonCode serializedUDF;
+ private Iterable<Input> inputIterator;
+
+ public PythonWorkerManager(
+ PythonUDF<Input, Output> udf,
+ PythonCode serializedUDF,
+ Iterable<Input> input
+ ){
+ this.udf = udf;
+ this.serializedUDF = serializedUDF;
+ this.inputIterator = input;
+ }
+
+ public Iterable<Output> execute(){
+ PythonProcessCaller worker = new PythonProcessCaller(this.serializedUDF);
+
+ if(worker.isReady()){
+ Runnable run1 = () -> {
+ ProcessFeeder<Input, Output> feed = new ProcessFeeder<>(
+ worker.getSocket(),
+ this.udf,
+ this.serializedUDF,
+ this.inputIterator
+ );
+ feed.send();
+ };
+ Thread lala = new Thread(run1);
+ lala.start();
+ ProcessReceiver<Output> r = new ProcessReceiver<>(worker.getSocket());
+
+ //r.print();
+ return r.getIterable();
+ //return (Iterable<Output>) this.inputIterator;
+
+ } else{
+
+ int port = worker.getSocket().getLocalPort();
+ worker.close();
+ throw new WayangException("Not possible to work with the Socket provided on port: " + port);
+ }
+
+ }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java
new file mode 100644
index 00000000..cac90d8e
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class ReaderIterator <Output> implements Iterator<Output> {
+
+ private Output nextObj = null;
+ private boolean eos = false;
+ private boolean fst = false;
+ private DataInputStream stream = null;
+
+ public ReaderIterator(DataInputStream stream) {
+
+ this.stream = stream;
+ this.eos = false;
+ this.nextObj = null;
+ }
+
+ private Output read() {
+
+ int END_OF_DATA_SECTION = -1;
+
+ try {
+ int length = this.stream.readInt();
+
+ if (length > 0) {
+ byte[] obj = new byte[length];
+ stream.readFully(obj);
+ String s = new String(obj, StandardCharsets.UTF_8);
+ Output it = (Output) s;
+ return it;
+ } else if (length == END_OF_DATA_SECTION) {
+ this.eos = true;
+ return null;
+ }
+ } catch (EOFException e){
+ this.eos = true;
+ return null;
+ } catch (IOException e) {
+ //e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+
+ if(!this.eos){
+ nextObj = read();
+ // System.out.println(nextObj + " " + !this.eos);
+ /*To work with null values it is suppose to use -5
+ if(this.nextObj == null){
+ return false;
+ }*/
+
+ return !this.eos;
+ }
+
+ return false;
+ }
+
+ @Override
+ public Output next() {
+
+ if(!this.eos){
+ Output obj = nextObj;
+ nextObj = null;
+ return obj;
+ }
+
+ throw new NoSuchElementException();
+ }
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonCode.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonCode.java
new file mode 100644
index 00000000..55e6a346
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonCode.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.api.python.function;
+
+import java.io.Serializable;
+
+public class PythonCode implements Serializable {
+
+ private byte[] code;
+ public PythonCode(byte[] code){
+ this.code = code;
+ }
+
+ public byte[] toByteArray(){
+ return this.code;
+ }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonFunctionWrapper.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonFunctionWrapper.java
new file mode 100644
index 00000000..d7300d26
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonFunctionWrapper.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.function;
+
+import org.apache.wayang.api.python.executor.PythonWorkerManager;
+import org.apache.wayang.core.function.FunctionDescriptor;
+
+public class PythonFunctionWrapper<Input, Output> implements FunctionDescriptor.SerializableFunction<Iterable<Input>, Iterable<Output>> {
+
+ private PythonUDF<Input, Output> myUDF;
+ private PythonCode serializedUDF;
+
+ public PythonFunctionWrapper(PythonUDF<Input, Output> myUDF, PythonCode serializedUDF){
+ this.myUDF = myUDF;
+ this.serializedUDF = serializedUDF;
+ }
+
+ @Override
+ public Iterable<Output> apply(Iterable<Input> input) {
+
+ PythonWorkerManager<Input, Output> manager = new PythonWorkerManager<>(
+ this.myUDF,
+ this.serializedUDF,
+ input
+ );
+ Iterable<Output> output = manager.execute();
+ return output;
+ }
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUDF.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUDF.java
new file mode 100644
index 00000000..1f8dc5dc
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUDF.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.function;
+
+import org.apache.wayang.core.function.FunctionDescriptor;
+
+public interface PythonUDF<Input, Output> extends FunctionDescriptor.SerializableFunction<Iterable<Input>, Iterable<Output>>{
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties
index ff3a99fe..5f9e8f4e 100644
--- a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties
+++ b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties
@@ -15,4 +15,5 @@
# limitations under the License.
#
-# TODO: Add properties here
\ No newline at end of file
+wayang.api.python.worker = python/src/pywy/platforms/jvm/worker.py
+wayang.api.python.path = /opt/python3
diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java
new file mode 100644
index 00000000..c45cf2ea
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.rest.server.spring.decoder;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.wayang.api.python.function.WrappedPythonFunction;
+import org.apache.wayang.basic.operators.MapPartitionsOperator;
+import org.apache.wayang.basic.operators.TextFileSink;
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.basic.operators.UnionAllOperator;
+import org.apache.wayang.commons.serializable.OperatorProto;
+import org.apache.wayang.commons.serializable.PlanProto;
+import org.apache.wayang.commons.serializable.WayangPlanProto;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.MapPartitionsDescriptor;
+import org.apache.wayang.core.plan.wayangplan.OperatorBase;
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.java.Java;
+import org.apache.wayang.spark.Spark;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.Base64;
+
+public class WayangPlanBuilder {
+
+ private WayangPlan wayangPlan;
+ private WayangContext wayangContext;
+
+ public WayangPlanBuilder(FileInputStream planFile){
+ try {
+
+ WayangPlanProto plan = WayangPlanProto.parseFrom(planFile);
+
+ this.wayangContext = buildContext(plan);
+ this.wayangPlan = buildPlan(plan);
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public WayangPlanBuilder(String writtenPlan){
+
+ System.out.println(writtenPlan);
+ byte[] message = Base64.getDecoder().decode(writtenPlan);
+ System.out.println(message);
+
+ try {
+ WayangPlanProto plan = WayangPlanProto.parseFrom(message);
+
+ this.wayangContext = buildContext(plan);
+ this.wayangPlan = buildPlan(plan);
+ } catch (InvalidProtocolBufferException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ private WayangContext buildContext(WayangPlanProto plan){
+
+ WayangContext ctx = new WayangContext();
+// plan.getContext().getPlatformsList().forEach(platform -> {
+// if (platform.getNumber() == 0)
+// ctx.with(Java.basicPlugin());
+// else if (platform.getNumber() == 1)
+// ctx.with(Spark.basicPlugin());
+// });
+ ctx.with(Spark.basicPlugin());
+
+ return ctx;
+ }
+
+ private WayangPlan buildPlan(WayangPlanProto plan){
+
+ System.out.println(plan);
+
+ PlanProto planProto = plan.getPlan();
+ LinkedList<OperatorProto> protoList = new LinkedList<>();
+ planProto.getSourcesList().forEach(protoList::addLast);
+
+ Map<String, OperatorBase> operators = new HashMap<>();
+ List<OperatorBase> sinks = new ArrayList<>();
+ while(! protoList.isEmpty()) {
+
+ OperatorProto proto = protoList.pollFirst();
+
+ /* Checking if protoOperator can be connected to the current WayangPlan*/
+ boolean processIt;
+ if(proto.getType().equals("source")) processIt = true;
+
+ else {
+ /* Checking if ALL predecessors were already processed */
+ processIt = true;
+ for(String predecessor : proto.getPredecessorsList()){
+ if (!operators.containsKey(predecessor)) {
+ processIt = false;
+ break;
+ }
+ }
+ }
+
+ /* Operators should not be processed twice*/
+ if(operators.containsKey(proto.getId())) processIt = false;
+
+ if(processIt) {
+
+ /* Create and store Wayang operator */
+ OperatorBase operator = createOperatorByType(proto);
+ operators.put(proto.getId(), operator);
+
+ /*TODO Connect with predecessors requires more details in connection slot*/
+ int order = 0;
+ for (String pre_id : proto.getPredecessorsList()) {
+
+ OperatorBase predecessor = operators.get(pre_id);
+ /* Only works without replicate topology */
+ predecessor.connectTo(0, operator, order);
+ order++;
+
+ if(proto.getType().toLowerCase().contains("sink")){
+ sinks.add(operator);
+ //if(!sinks.contains(operator)) {
+ // sinks.add(operator);
+ //}
+ }
+ }
+
+ /*List of OperatorProto successors
+ * They will be added to the protoList
+ * nevertheless they must be processed only if the parents are in operators list */
+ List<OperatorProto> listSuccessors = planProto.getOperatorsList()
+ .stream()
+ .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+ .collect(Collectors.toList());
+ for (OperatorProto successor : listSuccessors){
+ if(!protoList.contains(successor)){
+ protoList.addLast(successor);
+ }
+ }
+
+ List<OperatorProto> sinkSuccessors = planProto.getSinksList()
+ .stream()
+ .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+ .collect(Collectors.toList());
+ for (OperatorProto successor : sinkSuccessors){
+ if(!protoList.contains(successor)){
+ protoList.addLast(successor);
+ }
+ }
+
+ } else {
+
+ /* In case we cannot process it yet, It must be added again at the end*/
+ protoList.addLast(proto);
+ }
+ }
+
+ WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
+ return wayangPlan;
+ }
+
+ public OperatorBase createOperatorByType(OperatorProto operator){
+
+ switch(operator.getType()){
+ case "TextFileSource":
+ try {
+ String source_path = operator.getPath();
+ URL url = new File(source_path).toURI().toURL();
+ return new TextFileSource(url.toString());
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ break;
+ case "TextFileSink":
+ try {
+ String sink_path = operator.getPath();
+ URL url = new File(sink_path).toURI().toURL();
+ return new TextFileSink<String>(
+ url.toString(),
+ String.class
+ );
+
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ break;
+ case "MapPartitionOperator":
+ return new MapPartitionsOperator<>(
+ new MapPartitionsDescriptor<String, String>(
+ new WrappedPythonFunction<String, String>(
+ l -> l,
+ operator.getUdf()
+ ),
+ String.class,
+ String.class
+ )
+ );
+
+ case "union":
+ return new UnionAllOperator<String>(
+ String.class
+ );
+
+ }
+
+ throw new WayangException("Operator Type not supported "+operator.getType());
+ }
+
+ public WayangContext getWayangContext() {
+ return wayangContext;
+ }
+
+ public WayangPlan getWayangPlan() {
+ return wayangPlan;
+ }
+}
diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java
new file mode 100644
index 00000000..d87044fc
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.rest.server.spring.general;
+
+import com.google.protobuf.ByteString;
+import org.apache.wayang.api.python.function.WrappedPythonFunction;
+import org.apache.wayang.api.rest.server.spring.decoder.WayangPlanBuilder;
+import org.apache.wayang.basic.operators.*;
+import org.apache.wayang.commons.serializable.OperatorProto;
+import org.apache.wayang.commons.serializable.PlanProto;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.MapPartitionsDescriptor;
+import org.apache.wayang.core.plan.wayangplan.OperatorBase;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.java.Java;
+import org.apache.wayang.spark.Spark;
+
+import org.apache.wayang.commons.serializable.WayangPlanProto;
+import org.springframework.web.multipart.MultipartFile;
+
+
+@RestController
+public class WayangController {
+
+ @GetMapping("/plan/create/fromfile")
+ public String planFromFile(
+ //@RequestParam("file") MultipartFile file
+ ){
+
+ try {
+ FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
+ WayangPlanBuilder wpb = new WayangPlanBuilder(inputStream);
+
+ /*TODO ADD id to executions*/
+ wpb.getWayangContext().execute(wpb.getWayangPlan());
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return "Builder works";
+ }
+
+ @PostMapping("/plan/create")
+ public String planFromMessage(
+ @RequestParam("message") String message
+ ){
+
+ WayangPlanBuilder wpb = new WayangPlanBuilder(message);
+
+ /*TODO ADD id to executions*/
+ wpb.getWayangContext().execute(wpb.getWayangPlan());
+
+ return "";
+ }
+
+ @GetMapping("/")
+ public String all(){
+ System.out.println("detected!");
+
+ try {
+ FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
+ WayangPlanProto plan = WayangPlanProto.parseFrom(inputStream);
+
+ WayangContext wc = buildContext(plan);
+ WayangPlan wp = buildPlan(plan);
+
+ System.out.println("Plan!");
+ System.out.println(wp.toString());
+
+ wc.execute(wp);
+ return("Works!");
+
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return "Not working";
+ }
+
+ private WayangContext buildContext(WayangPlanProto plan){
+
+ WayangContext ctx = new WayangContext();
+// plan.getContext().getPlatformsList().forEach(platform -> {
+// if (platform.getNumber() == 0)
+// ctx.with(Java.basicPlugin());
+// else if (platform.getNumber() == 1)
+// ctx.with(Spark.basicPlugin());
+// });
+ ctx.with(Spark.basicPlugin());
+
+ return ctx;
+ }
+
+ private WayangPlan buildPlan(WayangPlanProto plan){
+
+ System.out.println(plan);
+
+ PlanProto planProto = plan.getPlan();
+ LinkedList<OperatorProto> protoList = new LinkedList<>();
+ planProto.getSourcesList().forEach(protoList::addLast);
+
+ Map<String, OperatorBase> operators = new HashMap<>();
+ List<OperatorBase> sinks = new ArrayList<>();
+ while(! protoList.isEmpty()) {
+
+ OperatorProto proto = protoList.pollFirst();
+
+ /* Checking if protoOperator can be connected to the current WayangPlan*/
+ boolean processIt;
+ if(proto.getType().equals("source")) processIt = true;
+
+ else {
+ /* Checking if ALL predecessors were already processed */
+ processIt = true;
+ for(String predecessor : proto.getPredecessorsList()){
+ if (!operators.containsKey(predecessor)) {
+ processIt = false;
+ break;
+ }
+ }
+ }
+
+ /* Operators should not be processed twice*/
+ if(operators.containsKey(proto.getId())) processIt = false;
+
+ if(processIt) {
+
+ /* Create and store Wayang operator */
+ OperatorBase operator = createOperatorByType(proto);
+ operators.put(proto.getId(), operator);
+
+ /*TODO Connect with predecessors requires more details in connection slot*/
+ int order = 0;
+ for (String pre_id : proto.getPredecessorsList()) {
+
+ OperatorBase predecessor = operators.get(pre_id);
+ /* Only works without replicate topology */
+ predecessor.connectTo(0, operator, order);
+ order++;
+
+ if(proto.getType().equals("sink")){
+ sinks.add(operator);
+ //if(!sinks.contains(operator)) {
+ // sinks.add(operator);
+ //}
+ }
+ }
+
+ /*List of OperatorProto successors
+ * They will be added to the protoList
+ * nevertheless they must be processed only if the parents are in operators list */
+ List<OperatorProto> listSuccessors = planProto.getOperatorsList()
+ .stream()
+ .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+ .collect(Collectors.toList());
+ for (OperatorProto successor : listSuccessors){
+ if(!protoList.contains(successor)){
+ protoList.addLast(successor);
+ }
+ }
+
+ List<OperatorProto> sinkSuccessors = planProto.getSinksList()
+ .stream()
+ .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+ .collect(Collectors.toList());
+ for (OperatorProto successor : sinkSuccessors){
+ if(!protoList.contains(successor)){
+ protoList.addLast(successor);
+ }
+ }
+
+ } else {
+
+ /* In case we cannot process it yet, It must be added again at the end*/
+ protoList.addLast(proto);
+ }
+ }
+
+ WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
+ return wayangPlan;
+ }
+
+ public OperatorBase createOperatorByType(OperatorProto operator){
+
+ System.out.println("Typo: " + operator.getType());
+ switch(operator.getType()){
+ case "source":
+ try {
+ String source_path = operator.getPath();
+ URL url = new File(source_path).toURI().toURL();
+ return new TextFileSource(url.toString());
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ break;
+ case "sink":
+ try {
+ String sink_path = operator.getPath();
+ URL url = new File(sink_path).toURI().toURL();
+ return new TextFileSink<String>(
+ url.toString(),
+ String.class
+ );
+
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ break;
+ case "reduce_by_key":
+ try {
+ /* Function to be applied in Python workers */
+ ByteString function = operator.getUdf();
+
+ /* Has dimension or positions that compose GroupKey */
+ Map<String, String> parameters = operator.getParametersMap();
+
+ PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator(
+ operator.getParametersMap(),
+ operator.getUdf() ,
+ String.class,
+ String.class,
+ false
+ );
+
+ String sink_path = operator.getPath();
+ URL url = new File(sink_path).toURI().toURL();
+ return new TextFileSink<String>(
+ url.toString(),
+ String.class
+ );
+
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ break;
+ case "map_partition":
+ return new MapPartitionsOperator<>(
+ new MapPartitionsDescriptor<String, String>(
+ new WrappedPythonFunction<String, String>(
+ l -> l,
+ operator.getUdf()
+ ),
+ String.class,
+ String.class
+ )
+ );
+
+ case "union":
+ return new UnionAllOperator<String>(
+ String.class
+ );
+
+ }
+
+ throw new WayangException("Operator Type not supported");
+ }
+
+ public static URI createUri(String resourcePath) {
+ try {
+ return Thread.currentThread().getClass().getResource(resourcePath).toURI();
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Illegal URI.", e);
+ }
+
+ }
+
+}