You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by rp...@apache.org on 2022/06/19 22:23:05 UTC

[incubator-wayang] branch main updated: Wayang-211 add JVM-platform inside of Python-API (#238)

This is an automated email from the ASF dual-hosted git repository.

rpardomeza pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git


The following commit(s) were added to refs/heads/main by this push:
     new d859a97d Wayang-211 add JVM-platform inside of Python-API (#238)
d859a97d is described below

commit d859a97d43a8c3c3c964150eaff8f3833e41ea75
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Mon Jun 20 00:23:00 2022 +0200

    Wayang-211 add JVM-platform inside of Python-API (#238)
    
    * [LICENSE] add check-license.sh for the project, and specially for python
    
    Signed-off-by: bertty <be...@apache.org>
    
    * [POM] update pom.xml
    
    Signed-off-by: bertty <be...@apache.org>
    
    * [WAYANG-211] python java implementation
    
    Signed-off-by: bertty <be...@apache.org>
---
 bin/check-license.sh                               |  61 ++++
 pom.xml                                            |   3 +-
 python/.rat-excludes                               |   1 +
 python/bin/check-license.sh                        |  28 --
 python/src/pywy/platforms/jvm/worker.py            | 374 +++++++++++++++++++++
 wayang-api/wayang-api-python/pom.xml               |   8 +
 .../wayang/api/python/executor/ProcessFeeder.java  | 184 ++++++++++
 .../api/python/executor/ProcessReceiver.java       |  53 +++
 .../api/python/executor/PythonProcessCaller.java   | 130 +++++++
 .../api/python/executor/PythonWorkerManager.java   |  70 ++++
 .../wayang/api/python/executor/ReaderIterator.java |  98 ++++++
 .../wayang/api/python/function/PythonCode.java     |  32 ++
 .../api/python/function/PythonFunctionWrapper.java |  46 +++
 .../wayang/api/python/function/PythonUDF.java      |  25 ++
 .../wayang-api-python-defaults.properties          |   3 +-
 .../server/spring/decoder/WayangPlanBuilder.java   | 239 +++++++++++++
 .../server/spring/general/WayangController.java    | 304 +++++++++++++++++
 17 files changed, 1629 insertions(+), 30 deletions(-)

diff --git a/bin/check-license.sh b/bin/check-license.sh
new file mode 100755
index 00000000..f38f89d5
--- /dev/null
+++ b/bin/check-license.sh
@@ -0,0 +1,61 @@
+#!/bin/bash
+
+################################################################################
+##
+##  Licensed to the Apache Software Foundation (ASF) under one or more
+##  contributor license agreements.  See the NOTICE file distributed with
+##  this work for additional information regarding copyright ownership.
+##  The ASF licenses this file to You under the Apache License, Version 2.0
+##  (the "License"); you may not use this file except in compliance with
+##  the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+##  Unless required by applicable law or agreed to in writing, software
+##  distributed under the License is distributed on an "AS IS" BASIS,
+##  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+##  See the License for the specific language governing permissions and
+##  limitations under the License.
+##
+################################################################################
+
+# set the variables
+BASE=$(cd "$(dirname "$0")/.." | pwd)
+VERSION_RAT="0.14"
+RAT_HOME=${BASE}/.rat/apache-rat-${VERSION_RAT}
+RAT_JAR=${RAT_HOME}/apache-rat-${VERSION_RAT}.jar
+RAT_EXCLUSION=$1
+
+# Validate if the folder is created
+if [[ ! -f "${BASE}/.rat" ]]; then
+  mkdir -p ${BASE}/.rat
+fi
+
+# download the elements required
+if [[ ! -f "$RAT_JAR" ]]; then
+  cd ${BASE}/.rat
+  wget https://dlcdn.apache.org/creadur/apache-rat-${VERSION_RAT}/apache-rat-${VERSION_RAT}-bin.tar.gz
+  tar -xvf apache-rat-${VERSION_RAT}-bin.tar.gz
+fi
+
+cd ${BASE}
+# Set the parameters for rat
+rat_opts=("-jar" "${RAT_JAR}" "-d" "${BASE}")
+
+#add the default exclusion for the project
+rat_opts+=("-e" "target/*")
+rat_opts+=("-e" "^.*.input")
+rat_opts+=("-e" "^.*.md")
+rat_opts+=("-e" "^.*.iml")
+rat_opts+=("-e" "^.*_pb2.py") # code generated by protocol buffer
+rat_opts+=("-e" "Gemfile.lock")
+rat_opts+=("-e" ".gitignore")
+rat_opts+=("-e" ".gitmodules")
+rat_opts+=("-e" ".rat-excludes")
+if [[ ! -z "$RAT_EXCLUSION" ]]; then
+   rat_opts+=("-E")
+   rat_opts+=($RAT_EXCLUSION)
+fi
+
+# execute the validation using rat
+java "${rat_opts[@]}"  2>&1 | grep "== File:"
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index a4dedf81..36ab3ede 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1109,7 +1109,8 @@
                     <configuration>
                         <detectLinks>false</detectLinks>
                         <detectJavaApiLink>true</detectJavaApiLink>
-                        <additionalparam>-Xdoclint:none</additionalparam>
+                        <doclint>none</doclint>
+<!--                    <additionalparam>-Xdoclint:none</additionalparam>-->
                     </configuration>
                 </plugin>
 
diff --git a/python/.rat-excludes b/python/.rat-excludes
index 8d1082aa..51b18ba7 100644
--- a/python/.rat-excludes
+++ b/python/.rat-excludes
@@ -1,3 +1,4 @@
+.gitignore
 .rat-excludes
 venv/*
 10e*.input
diff --git a/python/bin/check-license.sh b/python/bin/check-license.sh
deleted file mode 100755
index 7af700f8..00000000
--- a/python/bin/check-license.sh
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/bin/bash
-
-################################################################################
-##
-##  Licensed to the Apache Software Foundation (ASF) under one or more
-##  contributor license agreements.  See the NOTICE file distributed with
-##  this work for additional information regarding copyright ownership.
-##  The ASF licenses this file to You under the Apache License, Version 2.0
-##  (the "License"); you may not use this file except in compliance with
-##  the License.  You may obtain a copy of the License at
-##
-##      http://www.apache.org/licenses/LICENSE-2.0
-##
-##  Unless required by applicable law or agreed to in writing, software
-##  distributed under the License is distributed on an "AS IS" BASIS,
-##  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-##  See the License for the specific language governing permissions and
-##  limitations under the License.
-##
-################################################################################
-
-# TODO download Apache rat if is needed
-
-BASE=$(cd "$(dirname "$0")/.." | pwd)
-cd ${BASE}
-RAT_HOME=/opt/apache-rat-0.13
-
-java -jar ${RAT_HOME}/apache-rat-0.13.jar -E .rat-excludes -d . | grep "== File:"
\ No newline at end of file
diff --git a/python/src/pywy/platforms/jvm/worker.py b/python/src/pywy/platforms/jvm/worker.py
new file mode 100644
index 00000000..9e1fe3bb
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/worker.py
@@ -0,0 +1,374 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import socket
+import struct
+import pickle
+from itertools import chain
+
+import cloudpickle
+import base64
+import re
+import sys
+import time
+
+class SpecialLengths(object):
+    END_OF_DATA_SECTION = -1
+    PYTHON_EXCEPTION_THROWN = -2
+    TIMING_DATA = -3
+    END_OF_STREAM = -4
+    NULL = -5
+    START_ARROW_STREAM = -6
+
+
+def read_int(stream):
+    length = stream.read(4)
+    if not length:
+        raise EOFError
+    res = struct.unpack("!i", length)[0]
+    return res
+
+
+class UTF8Deserializer:
+    """
+    Deserializes streams written by String.getBytes.
+    """
+
+    def __init__(self, use_unicode=True):
+        self.use_unicode = use_unicode
+
+    def loads(self, stream):
+        length = read_int(stream)
+        if length == SpecialLengths.END_OF_DATA_SECTION:
+            raise EOFError
+        elif length == SpecialLengths.NULL:
+            return None
+        s = stream.read(length)
+        return s.decode("utf-8") if self.use_unicode else s
+
+    def load_stream(self, stream):
+        try:
+            while True:
+                yield self.loads(stream)
+        except struct.error:
+            return
+        except EOFError:
+            return
+
+    def __repr__(self):
+        return "UTF8Deserializer(%s)" % self.use_unicode
+
+
+def write_int(p, outfile):
+    outfile.write(struct.pack("!i", p))
+
+
+def write_with_length(obj, stream):
+    serialized = obj.encode('utf-8')
+    if serialized is None:
+        raise ValueError("serialized value should not be None")
+    if len(serialized) > (1 << 31):
+        raise ValueError("can not serialize object larger than 2G")
+    write_int(len(serialized), stream)
+    stream.write(serialized)
+
+
+class Serializer:
+    def dump_stream(self, iterator, stream):
+        """
+        Serialize an iterator of objects to the output stream.
+        """
+        raise NotImplementedError
+
+    def load_stream(self, stream):
+        """
+        Return an iterator of deserialized objects from the input stream.
+        """
+        raise NotImplementedError
+
+    def dumps(self, obj):
+        """
+        Serialize an object into a byte array.
+        When batching is used, this will be called with an array of objects.
+        """
+        raise NotImplementedError
+
+    def _load_stream_without_unbatching(self, stream):
+        """
+        Return an iterator of deserialized batches (iterable) of objects from the input stream.
+        If the serializer does not operate on batches the default implementation returns an
+        iterator of single element lists.
+        """
+        return map(lambda x: [x], self.load_stream(stream))
+
+    # Note: our notion of "equality" is that output generated by
+    # equal serializers can be deserialized using the same serializer.
+
+    # This default implementation handles the simple cases;
+    # subclasses should override __eq__ as appropriate.
+
+    def __eq__(self, other):
+        return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __repr__(self):
+        return "%s()" % self.__class__.__name__
+
+    def __hash__(self):
+        return hash(str(self))
+
+class FramedSerializer(Serializer):
+
+    """
+    Serializer that writes objects as a stream of (length, data) pairs,
+    where `length` is a 32-bit integer and data is `length` bytes.
+    """
+
+    def dump_stream(self, iterator, stream):
+        for obj in iterator:
+            self._write_with_length(obj, stream)
+
+    def load_stream(self, stream):
+        while True:
+            try:
+                yield self._read_with_length(stream)
+            except EOFError:
+                return
+
+    def _write_with_length(self, obj, stream):
+        serialized = self.dumps(obj)
+        if serialized is None:
+            raise ValueError("serialized value should not be None")
+        if len(serialized) > (1 << 31):
+            raise ValueError("can not serialize object larger than 2G")
+        write_int(len(serialized), stream)
+        stream.write(serialized)
+
+    def _read_with_length(self, stream):
+        length = read_int(stream)
+        if length == SpecialLengths.END_OF_DATA_SECTION:
+            raise EOFError
+        elif length == SpecialLengths.NULL:
+            return None
+        obj = stream.read(length)
+        if len(obj) < length:
+            raise EOFError
+        return self.loads(obj)
+
+    def dumps(self, obj):
+        """
+        Serialize an object into a byte array.
+        When batching is used, this will be called with an array of objects.
+        """
+        raise NotImplementedError
+
+    def loads(self, obj):
+        """
+        Deserialize an object from a byte array.
+        """
+        raise NotImplementedError
+
+
+class BatchedSerializer(Serializer):
+
+    """
+    Serializes a stream of objects in batches by calling its wrapped
+    Serializer with streams of objects.
+    """
+
+    UNLIMITED_BATCH_SIZE = -1
+    UNKNOWN_BATCH_SIZE = 0
+
+    def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE):
+        self.serializer = serializer
+        self.batchSize = batchSize
+
+    def _batched(self, iterator):
+        if self.batchSize == self.UNLIMITED_BATCH_SIZE:
+            print("hahahhaha")
+            yield list(iterator)
+        elif hasattr(iterator, "__len__") and hasattr(iterator, "__getslice__"):
+            n = len(iterator)
+            for i in range(0, n, self.batchSize):
+                toc = time.perf_counter()
+                print(f"batched toc1={toc:0.4f}")
+                yield iterator[i : i + self.batchSize]
+        else:
+            items = []
+            count = 0
+            for item in iterator:
+                items.append(item)
+                count += 1
+                if count == self.batchSize:
+                    yield items
+                    items = []
+                    count = 0
+            if items:
+                yield items
+
+    def dump_stream(self, iterator, stream):
+        self.serializer.dump_stream(self._batched(iterator), stream)
+
+    def load_stream(self, stream):
+        return chain.from_iterable(self._load_stream_without_unbatching(stream))
+
+    def _load_stream_without_unbatching(self, stream):
+        return self.serializer.load_stream(stream)
+
+    def __repr__(self):
+        return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize)
+
+
+class PickleSerializer(FramedSerializer):
+
+    """
+    Serializes objects using Python's pickle serializer:
+
+        http://docs.python.org/2/library/pickle.html
+
+    This serializer supports nearly any Python object, but may
+    not be as fast as more specialized serializers.
+    """
+
+    def dumps(self, obj):
+        return pickle.dumps(obj, pickle_protocol)
+
+    def loads(self, obj, encoding="bytes"):
+        return pickle.loads(obj, encoding=encoding)
+
+pickle_protocol = pickle.HIGHEST_PROTOCOL
+class CloudPickleSerializer(FramedSerializer):
+    def dumps(self, obj):
+        try:
+            return cloudpickle.dumps(obj, pickle_protocol)
+        except pickle.PickleError:
+            raise
+        except Exception as e:
+            emsg = str(e)
+            if "'i' format requires" in emsg:
+                msg = "Object too large to serialize: %s" % emsg
+            else:
+                msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
+#            print_exec(sys.stderr)
+            raise pickle.PicklingError(msg)
+
+    def loads(self, obj, encoding="bytes"):
+        return cloudpickle.loads(obj, encoding=encoding)
+
+#if sys.version_info < (3, 8):
+CPickleSerializer = PickleSerializer
+#else:
+#    CPickleSerializer = CloudPickleSerializer
+
+def dump_stream(iterator, stream):
+
+    for obj in iterator:
+        if type(obj) is str:
+            print("here?2")
+            write_with_length(obj, stream)
+        ## elif type(obj) is list:
+        ##    write_with_length(obj, stream)
+    print("Termine")
+    write_int(SpecialLengths.END_OF_DATA_SECTION, stream)
+    print("Escribi Fin")
+
+
+def process(infile, outfile):
+    """udf64 = os.environ["UDF"]
+    print("udf64")
+    print(udf64)
+    #serialized_udf = binascii.a2b_base64(udf64)
+    #serialized_udf = base64.b64decode(udf64)
+    serialized_udf = bytearray(udf64, encoding='utf-16')
+    # NOT VALID TO BE UTF8  serialized_udf = bytes(udf64, 'UTF-8')
+    print("serialized_udf")
+    print(serialized_udf)
+    # input to be ast.literal_eval(serialized_udf)
+    func = pickle.loads(serialized_udf, encoding="bytes")
+    print ("func")
+    print (func)
+    print(func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
+    # func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])"""
+
+
+
+    # TODO First we must receive the operator + UDF
+    """udf = lambda elem: elem.lower()
+
+    def func(it):
+        return sorted(it, key=udf)"""
+    udf_length = read_int(infile)
+    print("udf_length")
+    print(udf_length)
+    serialized_udf = infile.read(udf_length)
+    print("serialized_udf")
+    print(serialized_udf)
+    #base64_message = base64.b64decode(serialized_udf + "===")
+    #print("base64_message")
+    #print(base64_message)
+    func = pickle.loads(serialized_udf)
+    #func = ori.lala(serialized_udf)
+    #print (func)
+    #for x in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]): print(x)
+
+
+    """print("example")
+    for x in func("2344|234|efrf|$#|ffrf"): print(x)"""
+    # TODO Here we are temporarily assuming that the user is exclusively sending UTF8. User has several types
+    iterator = UTF8Deserializer().load_stream(infile)
+    # out_iter = sorted(iterator, key=lambda elem: elem.lower())
+    # out_iter = batched(func(iterator))
+    ser = BatchedSerializer(CPickleSerializer(), 100)
+    ser.dump_stream(func(iterator), outfile)
+    #dump_stream(iterator=out_iter, stream=outfile)
+
+
+def local_connect(port):
+    sock = None
+    errors = []
+    # Support for both IPv4 and IPv6.
+    # On most of IPv6-ready systems, IPv6 will take precedence.
+    for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
+        af, socktype, proto, _, sa = res
+        try:
+            sock = socket.socket(af, socktype, proto)
+            # sock.settimeout(int(os.environ.get("SPARK_AUTH_SOCKET_TIMEOUT", 15)))
+            sock.settimeout(30)
+            sock.connect(sa)
+            # sockfile = sock.makefile("rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536)))
+            sockfile = sock.makefile("rwb", 65536)
+            # _do_server_auth(sockfile, auth_secret)
+            return (sockfile, sock)
+        except socket.error as e:
+            emsg = str(e)
+            errors.append("tried to connect to %s, but an error occurred: %s" % (sa, emsg))
+            sock.close()
+            sock = None
+    raise Exception("could not open socket: %s" % errors)
+
+
+if __name__ == '__main__':
+    print("Python version")
+    print (sys.version)
+    java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
+    sock_file, sock = local_connect(java_port)
+    process(sock_file, sock_file)
+    sock_file.flush()
+    exit()
diff --git a/wayang-api/wayang-api-python/pom.xml b/wayang-api/wayang-api-python/pom.xml
index 5a3ad443..9ac2e2c1 100644
--- a/wayang-api/wayang-api-python/pom.xml
+++ b/wayang-api/wayang-api-python/pom.xml
@@ -35,4 +35,12 @@
     <properties>
         <java-module-name>org.apache.wayang.api</java-module-name>
     </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-core</artifactId>
+            <version>0.6.1-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
 </project>
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java
new file mode 100644
index 00000000..ee694322
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import org.apache.wayang.api.python.function.PythonCode;
+import org.apache.wayang.api.python.function.PythonUDF;
+import org.apache.wayang.core.api.exception.WayangException;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.Map;
+
+public class ProcessFeeder<Input, Output> {
+
+    private Socket socket;
+    private PythonUDF<Input, Output> udf;
+    private PythonCode serializedUDF;
+    private Iterable<Input> input;
+
+    //TODO add to a config file
+    int END_OF_DATA_SECTION = -1;
+    int NULL = -5;
+
+    public ProcessFeeder(
+            Socket socket,
+            PythonUDF<Input, Output> udf,
+            PythonCode serializedUDF,
+            Iterable<Input> input){
+
+        if(input == null) throw new WayangException("Nothing to process with Python API");
+
+        this.socket = socket;
+        this.udf = udf;
+        this.serializedUDF = serializedUDF;
+        this.input = input;
+
+    }
+
+    public void send(){
+
+        try{
+            //TODO use config buffer size
+            int BUFFER_SIZE = 8 * 1024;
+
+            BufferedOutputStream stream = new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE);
+            DataOutputStream dataOut = new DataOutputStream(stream);
+
+            writeUDF(serializedUDF, dataOut);
+            this.writeIteratorToStream(input.iterator(), dataOut);
+            dataOut.writeInt(END_OF_DATA_SECTION);
+            dataOut.flush();
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void writeUDF(PythonCode serializedUDF, DataOutputStream dataOut){
+
+        //write(serializedUDF.toByteArray(), dataOut);
+        writeBytes(serializedUDF.toByteArray(), dataOut);
+        System.out.println("UDF written");
+
+    }
+
+    public void writeIteratorToStream(Iterator<Input> iter, DataOutputStream dataOut)
+        throws IOException {
+
+        System.out.println("iterator being send");
+        int buffer = 0;
+        for (Iterator<Input> it = iter; it.hasNext(); ) {
+            Input elem = it.next();
+            //System.out.println(elem.toString());
+            write(elem, dataOut);
+        }
+    }
+
+    /*TODO Missing case PortableDataStream */
+    public void write(Object obj, DataOutputStream dataOut){
+        try {
+
+            if(obj == null)
+                dataOut.writeInt(this.NULL);
+
+            /**
+             * Byte Array cases
+             */
+            else if (obj instanceof Byte[] || obj instanceof byte[]) {
+                System.out.println("Writing Bytes");
+                writeBytes(obj, dataOut);
+            }
+            /**
+             * String case
+             * */
+            else if (obj instanceof String)
+                writeUTF((String) obj, dataOut);
+
+            /**
+             * Key, Value case
+             * */
+            else if (obj instanceof Map.Entry)
+                writeKeyValue((Map.Entry) obj, dataOut);
+
+            else{
+                throw new WayangException("Unexpected element type " + obj.getClass());
+            }
+
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void writeBytes(Object obj, DataOutputStream dataOut){
+
+        try{
+
+            if (obj instanceof Byte[]) {
+
+                int length = ((Byte[]) obj).length;
+
+                byte[] bytes = new byte[length];
+                int j=0;
+
+                // Unboxing Byte values. (Byte[] to byte[])
+                for(Byte b: ((Byte[]) obj))
+                    bytes[j++] = b.byteValue();
+
+                dataOut.writeInt(length);
+                dataOut.write(bytes);
+
+            } else if (obj instanceof byte[]) {
+
+                dataOut.writeInt(((byte[]) obj).length);
+                dataOut.write(((byte[]) obj));
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void writeUTF(String str, DataOutputStream dataOut){
+
+        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
+
+        try {
+
+            dataOut.writeInt(bytes.length);
+            dataOut.write(bytes);
+        } catch (SocketException e){
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void writeKeyValue(Map.Entry obj, DataOutputStream dataOut){
+
+        write(obj.getKey(), dataOut);
+        write(obj.getValue(), dataOut);
+    }
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java
new file mode 100644
index 00000000..7ef5f983
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Iterator;
+/*TODO cannot be always string, include definition for every operator
+*  like: map(udf, inputtype, outputtype)*/
+public class ProcessReceiver<Output> {
+
+    private ReaderIterator<Output> iterator;
+
+    public ProcessReceiver(Socket socket){
+        try{
+            //TODO use config buffer size
+            int BUFFER_SIZE = 1 * 1024;
+
+            DataInputStream stream = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE));
+            this.iterator = new ReaderIterator<>(stream);
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public Iterable<Output> getIterable(){
+        return () -> iterator;
+    }
+
+    public void print(){
+        iterator.forEachRemaining(x -> System.out.println(x.toString()));
+
+    }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java
new file mode 100644
index 00000000..7eca5595
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import java.lang.ProcessBuilder.Redirect;
+import org.apache.wayang.api.python.function.PythonCode;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.exception.WayangException;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.wayang.core.util.ReflectionUtils;
+
+public class PythonProcessCaller {
+
+    private Thread process;
+    private Socket socket;
+    private ServerSocket serverSocket;
+    private boolean ready;
+
+    //TODO How to get the config
+    private Configuration configuration;
+
+    public PythonProcessCaller(PythonCode serializedUDF){
+
+        //TODO create documentation to how to the configuration in the code
+        this.configuration = new Configuration();
+        this.configuration.load(ReflectionUtils.loadResource("wayang-api-python-defaults.properties"));
+        this.ready = false;
+        byte[] addr = new byte[4];
+        addr[0] = 127; addr[1] = 0; addr[2] = 0; addr[3] = 1;
+
+        try {
+            /*TODO should NOT be assigned an specific port, set port as 0 (zero)*/
+            this.serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(addr));
+
+            Runnable run1 = () -> {
+                ProcessBuilder pb = new ProcessBuilder(
+                    Arrays.asList(
+                        "python3",
+                        this.configuration.getStringProperty("wayang.api.python.worker")
+                    )
+                );
+                Map<String, String> workerEnv = pb.environment();
+                workerEnv.put("PYTHON_WORKER_FACTORY_PORT",
+                    String.valueOf(this.serverSocket.getLocalPort()));
+
+                // TODO See what is happening with ENV Python version
+                workerEnv.put(
+                    "PYTHONPATH",
+                    this.configuration.getStringProperty("wayang.api.python.path")
+                );
+
+                pb.redirectOutput(Redirect.INHERIT);
+                pb.redirectError(Redirect.INHERIT);
+                try {
+                    pb.start();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            };
+            this.process = new Thread(run1);
+            this.process.start();
+
+            // Redirect worker stdout and stderr
+            //IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)
+
+            // Wait for it to connect to our socket
+            this.serverSocket.setSoTimeout(100000);
+
+            try {
+                this.socket = this.serverSocket.accept();
+                this.serverSocket.setSoTimeout(0);
+
+                if(socket.isConnected())
+                    this.ready = true;
+
+            } catch (Exception e) {
+                System.out.println(e);
+                throw new WayangException("Python worker failed to connect back.", e);
+            }
+        } catch (Exception e){
+            System.out.println(e);
+            throw new WayangException("Python worker failed");
+        }
+    }
+
+    public Thread getProcess() {
+        return process;
+    }
+
+    public Socket getSocket() {
+        return socket;
+    }
+
+    public boolean isReady(){
+        return ready;
+    }
+
+    public void close(){
+        try {
+            this.process.interrupt();
+            this.socket.close();
+            this.serverSocket.close();
+            System.out.println("Everything closed");
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java
new file mode 100644
index 00000000..cef0c148
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import org.apache.wayang.api.python.function.PythonCode;
+import org.apache.wayang.api.python.function.PythonUDF;
+import org.apache.wayang.core.api.exception.WayangException;
+
+public class PythonWorkerManager<Input, Output> {
+
+    private PythonUDF<Input, Output> udf;
+    private PythonCode serializedUDF;
+    private Iterable<Input> inputIterator;
+
+    public PythonWorkerManager(
+            PythonUDF<Input, Output> udf,
+            PythonCode serializedUDF,
+            Iterable<Input> input
+    ){
+        this.udf = udf;
+        this.serializedUDF = serializedUDF;
+        this.inputIterator = input;
+    }
+
+    public Iterable<Output> execute(){
+        PythonProcessCaller worker = new PythonProcessCaller(this.serializedUDF);
+
+        if(worker.isReady()){
+            Runnable run1 = () -> {
+                ProcessFeeder<Input, Output> feed = new ProcessFeeder<>(
+                    worker.getSocket(),
+                    this.udf,
+                    this.serializedUDF,
+                    this.inputIterator
+                );
+                feed.send();
+            };
+            Thread lala = new Thread(run1);
+            lala.start();
+            ProcessReceiver<Output> r = new ProcessReceiver<>(worker.getSocket());
+
+            //r.print();
+            return r.getIterable();
+            //return (Iterable<Output>) this.inputIterator;
+
+        } else{
+
+            int port = worker.getSocket().getLocalPort();
+            worker.close();
+            throw new WayangException("Not possible to work with the Socket provided on port: " + port);
+        }
+
+    }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java
new file mode 100644
index 00000000..cac90d8e
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class ReaderIterator <Output> implements Iterator<Output> {
+
+    private Output nextObj = null;
+    private boolean eos = false;
+    private boolean fst = false;
+    private DataInputStream stream = null;
+
+    public ReaderIterator(DataInputStream stream) {
+
+        this.stream = stream;
+        this.eos = false;
+        this.nextObj = null;
+    }
+
+    private Output read() {
+
+        int END_OF_DATA_SECTION = -1;
+
+        try {
+            int length = this.stream.readInt();
+
+            if (length > 0) {
+                byte[] obj = new byte[length];
+                stream.readFully(obj);
+                String s = new String(obj, StandardCharsets.UTF_8);
+                Output it = (Output) s;
+                return it;
+            } else if (length == END_OF_DATA_SECTION) {
+                this.eos = true;
+                return null;
+            }
+        } catch (EOFException e){
+            this.eos = true;
+            return null;
+        } catch (IOException e) {
+            //e.printStackTrace();
+            throw new RuntimeException(e);
+        }
+        return null;
+    }
+
+    @Override
+    public boolean hasNext() {
+
+        if(!this.eos){
+            nextObj = read();
+         //   System.out.println(nextObj + " " + !this.eos);
+            /*To work with null values it is suppose to use -5
+            if(this.nextObj == null){
+                return false;
+            }*/
+
+            return !this.eos;
+        }
+
+        return false;
+    }
+
+    @Override
+    public Output next() {
+
+        if(!this.eos){
+            Output obj = nextObj;
+            nextObj = null;
+            return obj;
+        }
+
+        throw new NoSuchElementException();
+    }
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonCode.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonCode.java
new file mode 100644
index 00000000..55e6a346
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonCode.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.api.python.function;
+
+import java.io.Serializable;
+
+public class PythonCode implements Serializable {
+
+  private byte[] code;
+  public PythonCode(byte[] code){
+    this.code = code;
+  }
+
+  public byte[] toByteArray(){
+    return this.code;
+  }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonFunctionWrapper.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonFunctionWrapper.java
new file mode 100644
index 00000000..d7300d26
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonFunctionWrapper.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.function;
+
+import org.apache.wayang.api.python.executor.PythonWorkerManager;
+import org.apache.wayang.core.function.FunctionDescriptor;
+
+public class PythonFunctionWrapper<Input, Output>  implements FunctionDescriptor.SerializableFunction<Iterable<Input>, Iterable<Output>> {
+
+  private PythonUDF<Input, Output> myUDF;
+  private PythonCode serializedUDF;
+
+  public PythonFunctionWrapper(PythonUDF<Input, Output> myUDF, PythonCode serializedUDF){
+    this.myUDF = myUDF;
+    this.serializedUDF = serializedUDF;
+  }
+
+  @Override
+  public Iterable<Output> apply(Iterable<Input> input) {
+
+    PythonWorkerManager<Input, Output> manager = new PythonWorkerManager<>(
+                                                                  this.myUDF,
+                                                                  this.serializedUDF,
+                                                                  input
+    );
+    Iterable<Output> output = manager.execute();
+    return output;
+  }
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUDF.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUDF.java
new file mode 100644
index 00000000..1f8dc5dc
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUDF.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.function;
+
+import org.apache.wayang.core.function.FunctionDescriptor;
+
+public interface PythonUDF<Input, Output> extends FunctionDescriptor.SerializableFunction<Iterable<Input>, Iterable<Output>>{
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties
index ff3a99fe..5f9e8f4e 100644
--- a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties
+++ b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties
@@ -15,4 +15,5 @@
 # limitations under the License.
 #
 
-# TODO: Add properties here
\ No newline at end of file
+wayang.api.python.worker = python/src/pywy/platforms/jvm/worker.py
+wayang.api.python.path = /opt/python3
diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java
new file mode 100644
index 00000000..c45cf2ea
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.rest.server.spring.decoder;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.wayang.api.python.function.WrappedPythonFunction;
+import org.apache.wayang.basic.operators.MapPartitionsOperator;
+import org.apache.wayang.basic.operators.TextFileSink;
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.basic.operators.UnionAllOperator;
+import org.apache.wayang.commons.serializable.OperatorProto;
+import org.apache.wayang.commons.serializable.PlanProto;
+import org.apache.wayang.commons.serializable.WayangPlanProto;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.MapPartitionsDescriptor;
+import org.apache.wayang.core.plan.wayangplan.OperatorBase;
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.java.Java;
+import org.apache.wayang.spark.Spark;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.Base64;
+
+public class WayangPlanBuilder {
+
+    private WayangPlan wayangPlan;
+    private WayangContext wayangContext;
+
+    public WayangPlanBuilder(FileInputStream planFile){
+        try {
+
+            WayangPlanProto plan = WayangPlanProto.parseFrom(planFile);
+
+            this.wayangContext = buildContext(plan);
+            this.wayangPlan = buildPlan(plan);
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public WayangPlanBuilder(String writtenPlan){
+
+        System.out.println(writtenPlan);
+        byte[] message = Base64.getDecoder().decode(writtenPlan);
+        System.out.println(message);
+
+        try {
+            WayangPlanProto plan = WayangPlanProto.parseFrom(message);
+
+            this.wayangContext = buildContext(plan);
+            this.wayangPlan = buildPlan(plan);
+        } catch (InvalidProtocolBufferException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    private WayangContext buildContext(WayangPlanProto plan){
+
+        WayangContext ctx = new WayangContext();
+//        plan.getContext().getPlatformsList().forEach(platform -> {
+//            if (platform.getNumber() == 0)
+//                ctx.with(Java.basicPlugin());
+//            else if (platform.getNumber() == 1)
+//                ctx.with(Spark.basicPlugin());
+//        });
+        ctx.with(Spark.basicPlugin());
+
+        return ctx;
+    }
+
+    private WayangPlan buildPlan(WayangPlanProto plan){
+
+        System.out.println(plan);
+
+        PlanProto planProto = plan.getPlan();
+        LinkedList<OperatorProto> protoList = new LinkedList<>();
+        planProto.getSourcesList().forEach(protoList::addLast);
+
+        Map<String, OperatorBase> operators = new HashMap<>();
+        List<OperatorBase> sinks = new ArrayList<>();
+        while(! protoList.isEmpty()) {
+
+            OperatorProto proto = protoList.pollFirst();
+
+            /* Checking if protoOperator can be connected to the current WayangPlan*/
+            boolean processIt;
+            if(proto.getType().equals("source")) processIt = true;
+
+            else {
+                /* Checking if ALL predecessors were already processed */
+                processIt = true;
+                for(String predecessor : proto.getPredecessorsList()){
+                    if (!operators.containsKey(predecessor)) {
+                        processIt = false;
+                        break;
+                    }
+                }
+            }
+
+            /* Operators should not be processed twice*/
+            if(operators.containsKey(proto.getId())) processIt = false;
+
+            if(processIt) {
+
+                /* Create and store Wayang operator */
+                OperatorBase operator = createOperatorByType(proto);
+                operators.put(proto.getId(), operator);
+
+                /*TODO Connect with predecessors requires more details in connection slot*/
+                int order = 0;
+                for (String pre_id : proto.getPredecessorsList()) {
+
+                    OperatorBase predecessor = operators.get(pre_id);
+                    /* Only works without replicate topology */
+                    predecessor.connectTo(0, operator, order);
+                    order++;
+
+                    if(proto.getType().toLowerCase().contains("sink")){
+                        sinks.add(operator);
+                        //if(!sinks.contains(operator)) {
+                        //    sinks.add(operator);
+                        //}
+                    }
+                }
+
+                /*List of OperatorProto successors
+                 * They will be added to the protoList
+                 * nevertheless they must be processed only if the parents are in operators list */
+                List<OperatorProto> listSuccessors = planProto.getOperatorsList()
+                        .stream()
+                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+                        .collect(Collectors.toList());
+                for (OperatorProto successor : listSuccessors){
+                    if(!protoList.contains(successor)){
+                        protoList.addLast(successor);
+                    }
+                }
+
+                List<OperatorProto> sinkSuccessors = planProto.getSinksList()
+                        .stream()
+                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+                        .collect(Collectors.toList());
+                for (OperatorProto successor : sinkSuccessors){
+                    if(!protoList.contains(successor)){
+                        protoList.addLast(successor);
+                    }
+                }
+
+            } else {
+
+                /* In case we cannot process it yet, It must be added again at the end*/
+                protoList.addLast(proto);
+            }
+        }
+
+        WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
+        return wayangPlan;
+    }
+
+    public OperatorBase createOperatorByType(OperatorProto operator){
+
+        switch(operator.getType()){
+            case "TextFileSource":
+                try {
+                    String source_path = operator.getPath();
+                    URL url = new File(source_path).toURI().toURL();
+                    return new TextFileSource(url.toString());
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                }
+                break;
+            case "TextFileSink":
+                try {
+                    String sink_path = operator.getPath();
+                    URL url = new File(sink_path).toURI().toURL();
+                    return new TextFileSink<String>(
+                            url.toString(),
+                            String.class
+                    );
+
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                }
+                break;
+            case "MapPartitionOperator":
+                return new MapPartitionsOperator<>(
+                        new MapPartitionsDescriptor<String, String>(
+                                new WrappedPythonFunction<String, String>(
+                                        l -> l,
+                                        operator.getUdf()
+                                ),
+                                String.class,
+                                String.class
+                        )
+                );
+
+            case "union":
+                return new UnionAllOperator<String>(
+                        String.class
+                );
+
+        }
+
+        throw new WayangException("Operator Type not supported "+operator.getType());
+    }
+
+    public WayangContext getWayangContext() {
+        return wayangContext;
+    }
+
+    public WayangPlan getWayangPlan() {
+        return wayangPlan;
+    }
+}
diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java
new file mode 100644
index 00000000..d87044fc
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.rest.server.spring.general;
+
+import com.google.protobuf.ByteString;
+import org.apache.wayang.api.python.function.WrappedPythonFunction;
+import org.apache.wayang.api.rest.server.spring.decoder.WayangPlanBuilder;
+import org.apache.wayang.basic.operators.*;
+import org.apache.wayang.commons.serializable.OperatorProto;
+import org.apache.wayang.commons.serializable.PlanProto;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.MapPartitionsDescriptor;
+import org.apache.wayang.core.plan.wayangplan.OperatorBase;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.java.Java;
+import org.apache.wayang.spark.Spark;
+
+import org.apache.wayang.commons.serializable.WayangPlanProto;
+import org.springframework.web.multipart.MultipartFile;
+
+
+@RestController
+public class WayangController {
+
+    @GetMapping("/plan/create/fromfile")
+    public String planFromFile(
+            //@RequestParam("file") MultipartFile file
+    ){
+
+        try {
+            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
+            WayangPlanBuilder wpb = new WayangPlanBuilder(inputStream);
+
+            /*TODO ADD id to executions*/
+            wpb.getWayangContext().execute(wpb.getWayangPlan());
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        return "Builder works";
+    }
+
+    @PostMapping("/plan/create")
+    public String planFromMessage(
+            @RequestParam("message") String message
+    ){
+
+        WayangPlanBuilder wpb = new WayangPlanBuilder(message);
+
+        /*TODO ADD id to executions*/
+        wpb.getWayangContext().execute(wpb.getWayangPlan());
+
+        return "";
+    }
+
+    @GetMapping("/")
+    public String all(){
+        System.out.println("detected!");
+
+        try {
+            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
+            WayangPlanProto plan = WayangPlanProto.parseFrom(inputStream);
+
+            WayangContext wc = buildContext(plan);
+            WayangPlan wp = buildPlan(plan);
+
+            System.out.println("Plan!");
+            System.out.println(wp.toString());
+
+            wc.execute(wp);
+            return("Works!");
+
+        } catch (FileNotFoundException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        return "Not working";
+    }
+
+    private WayangContext buildContext(WayangPlanProto plan){
+
+        WayangContext ctx = new WayangContext();
+//        plan.getContext().getPlatformsList().forEach(platform -> {
+//            if (platform.getNumber() == 0)
+//                ctx.with(Java.basicPlugin());
+//            else if (platform.getNumber() == 1)
+//                ctx.with(Spark.basicPlugin());
+//        });
+        ctx.with(Spark.basicPlugin());
+
+        return ctx;
+    }
+
+    private WayangPlan buildPlan(WayangPlanProto plan){
+
+        System.out.println(plan);
+
+        PlanProto planProto = plan.getPlan();
+        LinkedList<OperatorProto> protoList = new LinkedList<>();
+        planProto.getSourcesList().forEach(protoList::addLast);
+
+        Map<String, OperatorBase> operators = new HashMap<>();
+        List<OperatorBase> sinks = new ArrayList<>();
+        while(! protoList.isEmpty()) {
+
+            OperatorProto proto = protoList.pollFirst();
+
+            /* Checking if protoOperator can be connected to the current WayangPlan*/
+            boolean processIt;
+            if(proto.getType().equals("source")) processIt = true;
+
+            else {
+                /* Checking if ALL predecessors were already processed */
+                processIt = true;
+                for(String predecessor : proto.getPredecessorsList()){
+                    if (!operators.containsKey(predecessor)) {
+                        processIt = false;
+                        break;
+                    }
+                }
+            }
+
+            /* Operators should not be processed twice*/
+            if(operators.containsKey(proto.getId())) processIt = false;
+
+            if(processIt) {
+
+                /* Create and store Wayang operator */
+                OperatorBase operator = createOperatorByType(proto);
+                operators.put(proto.getId(), operator);
+
+                /*TODO Connect with predecessors requires more details in connection slot*/
+                int order = 0;
+                for (String pre_id : proto.getPredecessorsList()) {
+
+                    OperatorBase predecessor = operators.get(pre_id);
+                    /* Only works without replicate topology */
+                    predecessor.connectTo(0, operator, order);
+                    order++;
+
+                    if(proto.getType().equals("sink")){
+                        sinks.add(operator);
+                        //if(!sinks.contains(operator)) {
+                        //    sinks.add(operator);
+                        //}
+                    }
+                }
+
+                /*List of OperatorProto successors
+                 * They will be added to the protoList
+                 * nevertheless they must be processed only if the parents are in operators list */
+                List<OperatorProto> listSuccessors = planProto.getOperatorsList()
+                        .stream()
+                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+                        .collect(Collectors.toList());
+                for (OperatorProto successor : listSuccessors){
+                    if(!protoList.contains(successor)){
+                        protoList.addLast(successor);
+                    }
+                }
+
+                List<OperatorProto> sinkSuccessors = planProto.getSinksList()
+                        .stream()
+                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+                        .collect(Collectors.toList());
+                for (OperatorProto successor : sinkSuccessors){
+                    if(!protoList.contains(successor)){
+                        protoList.addLast(successor);
+                    }
+                }
+
+            } else {
+
+                /* In case we cannot process it yet, It must be added again at the end*/
+                protoList.addLast(proto);
+            }
+        }
+
+        WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
+        return wayangPlan;
+    }
+
+    public OperatorBase createOperatorByType(OperatorProto operator){
+
+        System.out.println("Typo: " + operator.getType());
+        switch(operator.getType()){
+            case "source":
+                try {
+                    String source_path = operator.getPath();
+                    URL url = new File(source_path).toURI().toURL();
+                    return new TextFileSource(url.toString());
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                }
+                break;
+            case "sink":
+                try {
+                    String sink_path = operator.getPath();
+                    URL url = new File(sink_path).toURI().toURL();
+                    return new TextFileSink<String>(
+                            url.toString(),
+                            String.class
+                    );
+
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                }
+                break;
+            case "reduce_by_key":
+                try {
+                    /* Function to be applied in Python workers */
+                    ByteString function = operator.getUdf();
+
+                    /* Has dimension or positions that compose GroupKey */
+                    Map<String, String> parameters = operator.getParametersMap();
+
+                    PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator(
+                        operator.getParametersMap(),
+                        operator.getUdf() ,
+                        String.class,
+                        String.class,
+                            false
+                    );
+
+                    String sink_path = operator.getPath();
+                    URL url = new File(sink_path).toURI().toURL();
+                    return new TextFileSink<String>(
+                            url.toString(),
+                            String.class
+                    );
+
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                }
+                break;
+            case "map_partition":
+                return new MapPartitionsOperator<>(
+                    new MapPartitionsDescriptor<String, String>(
+                        new WrappedPythonFunction<String, String>(
+                            l -> l,
+                            operator.getUdf()
+                        ),
+                        String.class,
+                        String.class
+                    )
+                );
+
+            case "union":
+                return new UnionAllOperator<String>(
+                        String.class
+                );
+
+        }
+
+        throw new WayangException("Operator Type not supported");
+    }
+
+    public static URI createUri(String resourcePath) {
+        try {
+            return Thread.currentThread().getClass().getResource(resourcePath).toURI();
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Illegal URI.", e);
+        }
+
+    }
+
+}