You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2022/06/19 21:27:41 UTC

[incubator-wayang] branch WAYANG-211 created (now c113fa92)

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a change to branch WAYANG-211
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git


      at c113fa92 [WAYANG-211] python java implementation

This branch includes the following new commits:

     new d0d40e3e [LICENSE] add check-license.sh for the project, and specially for python
     new 8813ede3 [POM] update pom.xml
     new c113fa92 [WAYANG-211] python java implementation

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-wayang] 03/03: [WAYANG-211] python java implementation

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch WAYANG-211
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit c113fa92a0cc33e4016ee97fbcd2bf3f92ee2977
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Sun Jun 19 23:26:57 2022 +0200

    [WAYANG-211] python java implementation
    
    Signed-off-by: bertty <be...@apache.org>
---
 python/src/pywy/platforms/jvm/worker.py            | 374 +++++++++++++++++++++
 wayang-api/wayang-api-python/pom.xml               |   8 +
 .../wayang/api/python/executor/ProcessFeeder.java  | 184 ++++++++++
 .../api/python/executor/ProcessReceiver.java       |  53 +++
 .../api/python/executor/PythonProcessCaller.java   | 130 +++++++
 .../api/python/executor/PythonWorkerManager.java   |  70 ++++
 .../wayang/api/python/executor/ReaderIterator.java |  98 ++++++
 .../wayang/api/python/function/PythonCode.java     |  32 ++
 .../api/python/function/PythonFunctionWrapper.java |  46 +++
 .../wayang/api/python/function/PythonUDF.java      |  25 ++
 .../wayang-api-python-defaults.properties          |   3 +-
 .../server/spring/decoder/WayangPlanBuilder.java   | 239 +++++++++++++
 .../server/spring/general/WayangController.java    | 304 +++++++++++++++++
 13 files changed, 1565 insertions(+), 1 deletion(-)

diff --git a/python/src/pywy/platforms/jvm/worker.py b/python/src/pywy/platforms/jvm/worker.py
new file mode 100644
index 00000000..9e1fe3bb
--- /dev/null
+++ b/python/src/pywy/platforms/jvm/worker.py
@@ -0,0 +1,374 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import socket
+import struct
+import pickle
+from itertools import chain
+
+import cloudpickle
+import base64
+import re
+import sys
+import time
+
+class SpecialLengths(object):
+    END_OF_DATA_SECTION = -1
+    PYTHON_EXCEPTION_THROWN = -2
+    TIMING_DATA = -3
+    END_OF_STREAM = -4
+    NULL = -5
+    START_ARROW_STREAM = -6
+
+
+def read_int(stream):
+    length = stream.read(4)
+    if not length:
+        raise EOFError
+    res = struct.unpack("!i", length)[0]
+    return res
+
+
+class UTF8Deserializer:
+    """
+    Deserializes streams written by String.getBytes.
+    """
+
+    def __init__(self, use_unicode=True):
+        self.use_unicode = use_unicode
+
+    def loads(self, stream):
+        length = read_int(stream)
+        if length == SpecialLengths.END_OF_DATA_SECTION:
+            raise EOFError
+        elif length == SpecialLengths.NULL:
+            return None
+        s = stream.read(length)
+        return s.decode("utf-8") if self.use_unicode else s
+
+    def load_stream(self, stream):
+        try:
+            while True:
+                yield self.loads(stream)
+        except struct.error:
+            return
+        except EOFError:
+            return
+
+    def __repr__(self):
+        return "UTF8Deserializer(%s)" % self.use_unicode
+
+
+def write_int(p, outfile):
+    outfile.write(struct.pack("!i", p))
+
+
+def write_with_length(obj, stream):
+    serialized = obj.encode('utf-8')
+    if serialized is None:
+        raise ValueError("serialized value should not be None")
+    if len(serialized) > (1 << 31):
+        raise ValueError("can not serialize object larger than 2G")
+    write_int(len(serialized), stream)
+    stream.write(serialized)
+
+
+class Serializer:
+    def dump_stream(self, iterator, stream):
+        """
+        Serialize an iterator of objects to the output stream.
+        """
+        raise NotImplementedError
+
+    def load_stream(self, stream):
+        """
+        Return an iterator of deserialized objects from the input stream.
+        """
+        raise NotImplementedError
+
+    def dumps(self, obj):
+        """
+        Serialize an object into a byte array.
+        When batching is used, this will be called with an array of objects.
+        """
+        raise NotImplementedError
+
+    def _load_stream_without_unbatching(self, stream):
+        """
+        Return an iterator of deserialized batches (iterable) of objects from the input stream.
+        If the serializer does not operate on batches the default implementation returns an
+        iterator of single element lists.
+        """
+        return map(lambda x: [x], self.load_stream(stream))
+
+    # Note: our notion of "equality" is that output generated by
+    # equal serializers can be deserialized using the same serializer.
+
+    # This default implementation handles the simple cases;
+    # subclasses should override __eq__ as appropriate.
+
+    def __eq__(self, other):
+        return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __repr__(self):
+        return "%s()" % self.__class__.__name__
+
+    def __hash__(self):
+        return hash(str(self))
+
+class FramedSerializer(Serializer):
+
+    """
+    Serializer that writes objects as a stream of (length, data) pairs,
+    where `length` is a 32-bit integer and data is `length` bytes.
+    """
+
+    def dump_stream(self, iterator, stream):
+        for obj in iterator:
+            self._write_with_length(obj, stream)
+
+    def load_stream(self, stream):
+        while True:
+            try:
+                yield self._read_with_length(stream)
+            except EOFError:
+                return
+
+    def _write_with_length(self, obj, stream):
+        serialized = self.dumps(obj)
+        if serialized is None:
+            raise ValueError("serialized value should not be None")
+        if len(serialized) > (1 << 31):
+            raise ValueError("can not serialize object larger than 2G")
+        write_int(len(serialized), stream)
+        stream.write(serialized)
+
+    def _read_with_length(self, stream):
+        length = read_int(stream)
+        if length == SpecialLengths.END_OF_DATA_SECTION:
+            raise EOFError
+        elif length == SpecialLengths.NULL:
+            return None
+        obj = stream.read(length)
+        if len(obj) < length:
+            raise EOFError
+        return self.loads(obj)
+
+    def dumps(self, obj):
+        """
+        Serialize an object into a byte array.
+        When batching is used, this will be called with an array of objects.
+        """
+        raise NotImplementedError
+
+    def loads(self, obj):
+        """
+        Deserialize an object from a byte array.
+        """
+        raise NotImplementedError
+
+
+class BatchedSerializer(Serializer):
+
+    """
+    Serializes a stream of objects in batches by calling its wrapped
+    Serializer with streams of objects.
+    """
+
+    UNLIMITED_BATCH_SIZE = -1
+    UNKNOWN_BATCH_SIZE = 0
+
+    def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE):
+        self.serializer = serializer
+        self.batchSize = batchSize
+
+    def _batched(self, iterator):
+        if self.batchSize == self.UNLIMITED_BATCH_SIZE:
+            print("hahahhaha")
+            yield list(iterator)
+        elif hasattr(iterator, "__len__") and hasattr(iterator, "__getslice__"):
+            n = len(iterator)
+            for i in range(0, n, self.batchSize):
+                toc = time.perf_counter()
+                print(f"batched toc1={toc:0.4f}")
+                yield iterator[i : i + self.batchSize]
+        else:
+            items = []
+            count = 0
+            for item in iterator:
+                items.append(item)
+                count += 1
+                if count == self.batchSize:
+                    yield items
+                    items = []
+                    count = 0
+            if items:
+                yield items
+
+    def dump_stream(self, iterator, stream):
+        self.serializer.dump_stream(self._batched(iterator), stream)
+
+    def load_stream(self, stream):
+        return chain.from_iterable(self._load_stream_without_unbatching(stream))
+
+    def _load_stream_without_unbatching(self, stream):
+        return self.serializer.load_stream(stream)
+
+    def __repr__(self):
+        return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize)
+
+
+class PickleSerializer(FramedSerializer):
+
+    """
+    Serializes objects using Python's pickle serializer:
+
+        http://docs.python.org/2/library/pickle.html
+
+    This serializer supports nearly any Python object, but may
+    not be as fast as more specialized serializers.
+    """
+
+    def dumps(self, obj):
+        return pickle.dumps(obj, pickle_protocol)
+
+    def loads(self, obj, encoding="bytes"):
+        return pickle.loads(obj, encoding=encoding)
+
+pickle_protocol = pickle.HIGHEST_PROTOCOL
+class CloudPickleSerializer(FramedSerializer):
+    def dumps(self, obj):
+        try:
+            return cloudpickle.dumps(obj, pickle_protocol)
+        except pickle.PickleError:
+            raise
+        except Exception as e:
+            emsg = str(e)
+            if "'i' format requires" in emsg:
+                msg = "Object too large to serialize: %s" % emsg
+            else:
+                msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
+#            print_exec(sys.stderr)
+            raise pickle.PicklingError(msg)
+
+    def loads(self, obj, encoding="bytes"):
+        return cloudpickle.loads(obj, encoding=encoding)
+
+#if sys.version_info < (3, 8):
+CPickleSerializer = PickleSerializer
+#else:
+#    CPickleSerializer = CloudPickleSerializer
+
+def dump_stream(iterator, stream):
+
+    for obj in iterator:
+        if type(obj) is str:
+            print("here?2")
+            write_with_length(obj, stream)
+        ## elif type(obj) is list:
+        ##    write_with_length(obj, stream)
+    print("Termine")
+    write_int(SpecialLengths.END_OF_DATA_SECTION, stream)
+    print("Escribi Fin")
+
+
+def process(infile, outfile):
+    """udf64 = os.environ["UDF"]
+    print("udf64")
+    print(udf64)
+    #serialized_udf = binascii.a2b_base64(udf64)
+    #serialized_udf = base64.b64decode(udf64)
+    serialized_udf = bytearray(udf64, encoding='utf-16')
+    # NOT VALID TO BE UTF8  serialized_udf = bytes(udf64, 'UTF-8')
+    print("serialized_udf")
+    print(serialized_udf)
+    # input to be ast.literal_eval(serialized_udf)
+    func = pickle.loads(serialized_udf, encoding="bytes")
+    print ("func")
+    print (func)
+    print(func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]))
+    # func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])"""
+
+
+
+    # TODO First we must receive the operator + UDF
+    """udf = lambda elem: elem.lower()
+
+    def func(it):
+        return sorted(it, key=udf)"""
+    udf_length = read_int(infile)
+    print("udf_length")
+    print(udf_length)
+    serialized_udf = infile.read(udf_length)
+    print("serialized_udf")
+    print(serialized_udf)
+    #base64_message = base64.b64decode(serialized_udf + "===")
+    #print("base64_message")
+    #print(base64_message)
+    func = pickle.loads(serialized_udf)
+    #func = ori.lala(serialized_udf)
+    #print (func)
+    #for x in func([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]): print(x)
+
+
+    """print("example")
+    for x in func("2344|234|efrf|$#|ffrf"): print(x)"""
+    # TODO Here we are temporarily assuming that the user is exclusively sending UTF8. User has several types
+    iterator = UTF8Deserializer().load_stream(infile)
+    # out_iter = sorted(iterator, key=lambda elem: elem.lower())
+    # out_iter = batched(func(iterator))
+    ser = BatchedSerializer(CPickleSerializer(), 100)
+    ser.dump_stream(func(iterator), outfile)
+    #dump_stream(iterator=out_iter, stream=outfile)
+
+
+def local_connect(port):
+    sock = None
+    errors = []
+    # Support for both IPv4 and IPv6.
+    # On most of IPv6-ready systems, IPv6 will take precedence.
+    for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
+        af, socktype, proto, _, sa = res
+        try:
+            sock = socket.socket(af, socktype, proto)
+            # sock.settimeout(int(os.environ.get("SPARK_AUTH_SOCKET_TIMEOUT", 15)))
+            sock.settimeout(30)
+            sock.connect(sa)
+            # sockfile = sock.makefile("rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536)))
+            sockfile = sock.makefile("rwb", 65536)
+            # _do_server_auth(sockfile, auth_secret)
+            return (sockfile, sock)
+        except socket.error as e:
+            emsg = str(e)
+            errors.append("tried to connect to %s, but an error occurred: %s" % (sa, emsg))
+            sock.close()
+            sock = None
+    raise Exception("could not open socket: %s" % errors)
+
+
+if __name__ == '__main__':
+    print("Python version")
+    print (sys.version)
+    java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
+    sock_file, sock = local_connect(java_port)
+    process(sock_file, sock_file)
+    sock_file.flush()
+    exit()
diff --git a/wayang-api/wayang-api-python/pom.xml b/wayang-api/wayang-api-python/pom.xml
index 5a3ad443..9ac2e2c1 100644
--- a/wayang-api/wayang-api-python/pom.xml
+++ b/wayang-api/wayang-api-python/pom.xml
@@ -35,4 +35,12 @@
     <properties>
         <java-module-name>org.apache.wayang.api</java-module-name>
     </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-core</artifactId>
+            <version>0.6.1-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
 </project>
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java
new file mode 100644
index 00000000..ee694322
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import org.apache.wayang.api.python.function.PythonCode;
+import org.apache.wayang.api.python.function.PythonUDF;
+import org.apache.wayang.core.api.exception.WayangException;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.Map;
+
+public class ProcessFeeder<Input, Output> {
+
+    private Socket socket;
+    private PythonUDF<Input, Output> udf;
+    private PythonCode serializedUDF;
+    private Iterable<Input> input;
+
+    //TODO add to a config file
+    int END_OF_DATA_SECTION = -1;
+    int NULL = -5;
+
+    public ProcessFeeder(
+            Socket socket,
+            PythonUDF<Input, Output> udf,
+            PythonCode serializedUDF,
+            Iterable<Input> input){
+
+        if(input == null) throw new WayangException("Nothing to process with Python API");
+
+        this.socket = socket;
+        this.udf = udf;
+        this.serializedUDF = serializedUDF;
+        this.input = input;
+
+    }
+
+    public void send(){
+
+        try{
+            //TODO use config buffer size
+            int BUFFER_SIZE = 8 * 1024;
+
+            BufferedOutputStream stream = new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE);
+            DataOutputStream dataOut = new DataOutputStream(stream);
+
+            writeUDF(serializedUDF, dataOut);
+            this.writeIteratorToStream(input.iterator(), dataOut);
+            dataOut.writeInt(END_OF_DATA_SECTION);
+            dataOut.flush();
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void writeUDF(PythonCode serializedUDF, DataOutputStream dataOut){
+
+        //write(serializedUDF.toByteArray(), dataOut);
+        writeBytes(serializedUDF.toByteArray(), dataOut);
+        System.out.println("UDF written");
+
+    }
+
+    public void writeIteratorToStream(Iterator<Input> iter, DataOutputStream dataOut)
+        throws IOException {
+
+        System.out.println("iterator being send");
+        int buffer = 0;
+        for (Iterator<Input> it = iter; it.hasNext(); ) {
+            Input elem = it.next();
+            //System.out.println(elem.toString());
+            write(elem, dataOut);
+        }
+    }
+
+    /*TODO Missing case PortableDataStream */
+    public void write(Object obj, DataOutputStream dataOut){
+        try {
+
+            if(obj == null)
+                dataOut.writeInt(this.NULL);
+
+            /**
+             * Byte Array cases
+             */
+            else if (obj instanceof Byte[] || obj instanceof byte[]) {
+                System.out.println("Writing Bytes");
+                writeBytes(obj, dataOut);
+            }
+            /**
+             * String case
+             * */
+            else if (obj instanceof String)
+                writeUTF((String) obj, dataOut);
+
+            /**
+             * Key, Value case
+             * */
+            else if (obj instanceof Map.Entry)
+                writeKeyValue((Map.Entry) obj, dataOut);
+
+            else{
+                throw new WayangException("Unexpected element type " + obj.getClass());
+            }
+
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void writeBytes(Object obj, DataOutputStream dataOut){
+
+        try{
+
+            if (obj instanceof Byte[]) {
+
+                int length = ((Byte[]) obj).length;
+
+                byte[] bytes = new byte[length];
+                int j=0;
+
+                // Unboxing Byte values. (Byte[] to byte[])
+                for(Byte b: ((Byte[]) obj))
+                    bytes[j++] = b.byteValue();
+
+                dataOut.writeInt(length);
+                dataOut.write(bytes);
+
+            } else if (obj instanceof byte[]) {
+
+                dataOut.writeInt(((byte[]) obj).length);
+                dataOut.write(((byte[]) obj));
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void writeUTF(String str, DataOutputStream dataOut){
+
+        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
+
+        try {
+
+            dataOut.writeInt(bytes.length);
+            dataOut.write(bytes);
+        } catch (SocketException e){
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void writeKeyValue(Map.Entry obj, DataOutputStream dataOut){
+
+        write(obj.getKey(), dataOut);
+        write(obj.getValue(), dataOut);
+    }
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java
new file mode 100644
index 00000000..7ef5f983
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessReceiver.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Iterator;
+/*TODO cannot be always string, include definition for every operator
+*  like: map(udf, inputtype, outputtype)*/
+public class ProcessReceiver<Output> {
+
+    private ReaderIterator<Output> iterator;
+
+    public ProcessReceiver(Socket socket){
+        try{
+            //TODO use config buffer size
+            int BUFFER_SIZE = 1 * 1024;
+
+            DataInputStream stream = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE));
+            this.iterator = new ReaderIterator<>(stream);
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public Iterable<Output> getIterable(){
+        return () -> iterator;
+    }
+
+    public void print(){
+        iterator.forEachRemaining(x -> System.out.println(x.toString()));
+
+    }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java
new file mode 100644
index 00000000..7eca5595
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonProcessCaller.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import java.lang.ProcessBuilder.Redirect;
+import org.apache.wayang.api.python.function.PythonCode;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.exception.WayangException;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.wayang.core.util.ReflectionUtils;
+
+public class PythonProcessCaller {
+
+    private Thread process;
+    private Socket socket;
+    private ServerSocket serverSocket;
+    private boolean ready;
+
+    //TODO How to get the config
+    private Configuration configuration;
+
+    public PythonProcessCaller(PythonCode serializedUDF){
+
+        //TODO create documentation to how to the configuration in the code
+        this.configuration = new Configuration();
+        this.configuration.load(ReflectionUtils.loadResource("wayang-api-python-defaults.properties"));
+        this.ready = false;
+        byte[] addr = new byte[4];
+        addr[0] = 127; addr[1] = 0; addr[2] = 0; addr[3] = 1;
+
+        try {
+            /*TODO should NOT be assigned an specific port, set port as 0 (zero)*/
+            this.serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(addr));
+
+            Runnable run1 = () -> {
+                ProcessBuilder pb = new ProcessBuilder(
+                    Arrays.asList(
+                        "python3",
+                        this.configuration.getStringProperty("wayang.api.python.worker")
+                    )
+                );
+                Map<String, String> workerEnv = pb.environment();
+                workerEnv.put("PYTHON_WORKER_FACTORY_PORT",
+                    String.valueOf(this.serverSocket.getLocalPort()));
+
+                // TODO See what is happening with ENV Python version
+                workerEnv.put(
+                    "PYTHONPATH",
+                    this.configuration.getStringProperty("wayang.api.python.path")
+                );
+
+                pb.redirectOutput(Redirect.INHERIT);
+                pb.redirectError(Redirect.INHERIT);
+                try {
+                    pb.start();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            };
+            this.process = new Thread(run1);
+            this.process.start();
+
+            // Redirect worker stdout and stderr
+            //IDK redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)
+
+            // Wait for it to connect to our socket
+            this.serverSocket.setSoTimeout(100000);
+
+            try {
+                this.socket = this.serverSocket.accept();
+                this.serverSocket.setSoTimeout(0);
+
+                if(socket.isConnected())
+                    this.ready = true;
+
+            } catch (Exception e) {
+                System.out.println(e);
+                throw new WayangException("Python worker failed to connect back.", e);
+            }
+        } catch (Exception e){
+            System.out.println(e);
+            throw new WayangException("Python worker failed");
+        }
+    }
+
+    public Thread getProcess() {
+        return process;
+    }
+
+    public Socket getSocket() {
+        return socket;
+    }
+
+    public boolean isReady(){
+        return ready;
+    }
+
+    public void close(){
+        try {
+            this.process.interrupt();
+            this.socket.close();
+            this.serverSocket.close();
+            System.out.println("Everything closed");
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java
new file mode 100644
index 00000000..cef0c148
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/PythonWorkerManager.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import org.apache.wayang.api.python.function.PythonCode;
+import org.apache.wayang.api.python.function.PythonUDF;
+import org.apache.wayang.core.api.exception.WayangException;
+
+public class PythonWorkerManager<Input, Output> {
+
+    private PythonUDF<Input, Output> udf;
+    private PythonCode serializedUDF;
+    private Iterable<Input> inputIterator;
+
+    public PythonWorkerManager(
+            PythonUDF<Input, Output> udf,
+            PythonCode serializedUDF,
+            Iterable<Input> input
+    ){
+        this.udf = udf;
+        this.serializedUDF = serializedUDF;
+        this.inputIterator = input;
+    }
+
+    public Iterable<Output> execute(){
+        PythonProcessCaller worker = new PythonProcessCaller(this.serializedUDF);
+
+        if(worker.isReady()){
+            Runnable run1 = () -> {
+                ProcessFeeder<Input, Output> feed = new ProcessFeeder<>(
+                    worker.getSocket(),
+                    this.udf,
+                    this.serializedUDF,
+                    this.inputIterator
+                );
+                feed.send();
+            };
+            Thread lala = new Thread(run1);
+            lala.start();
+            ProcessReceiver<Output> r = new ProcessReceiver<>(worker.getSocket());
+
+            //r.print();
+            return r.getIterable();
+            //return (Iterable<Output>) this.inputIterator;
+
+        } else{
+
+            int port = worker.getSocket().getLocalPort();
+            worker.close();
+            throw new WayangException("Not possible to work with the Socket provided on port: " + port);
+        }
+
+    }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java
new file mode 100644
index 00000000..cac90d8e
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ReaderIterator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.executor;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class ReaderIterator <Output> implements Iterator<Output> {
+
+    private Output nextObj = null;
+    private boolean eos = false;
+    private boolean fst = false;
+    private DataInputStream stream = null;
+
+    public ReaderIterator(DataInputStream stream) {
+
+        this.stream = stream;
+        this.eos = false;
+        this.nextObj = null;
+    }
+
+    private Output read() {
+
+        int END_OF_DATA_SECTION = -1;
+
+        try {
+            int length = this.stream.readInt();
+
+            if (length > 0) {
+                byte[] obj = new byte[length];
+                stream.readFully(obj);
+                String s = new String(obj, StandardCharsets.UTF_8);
+                Output it = (Output) s;
+                return it;
+            } else if (length == END_OF_DATA_SECTION) {
+                this.eos = true;
+                return null;
+            }
+        } catch (EOFException e){
+            this.eos = true;
+            return null;
+        } catch (IOException e) {
+            //e.printStackTrace();
+            throw new RuntimeException(e);
+        }
+        return null;
+    }
+
+    @Override
+    public boolean hasNext() {
+
+        if(!this.eos){
+            nextObj = read();
+         //   System.out.println(nextObj + " " + !this.eos);
+            /*To work with null values it is suppose to use -5
+            if(this.nextObj == null){
+                return false;
+            }*/
+
+            return !this.eos;
+        }
+
+        return false;
+    }
+
+    @Override
+    public Output next() {
+
+        if(!this.eos){
+            Output obj = nextObj;
+            nextObj = null;
+            return obj;
+        }
+
+        throw new NoSuchElementException();
+    }
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonCode.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonCode.java
new file mode 100644
index 00000000..55e6a346
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonCode.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.api.python.function;
+
+import java.io.Serializable;
+
+public class PythonCode implements Serializable {
+
+  private byte[] code;
+  public PythonCode(byte[] code){
+    this.code = code;
+  }
+
+  public byte[] toByteArray(){
+    return this.code;
+  }
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonFunctionWrapper.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonFunctionWrapper.java
new file mode 100644
index 00000000..d7300d26
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonFunctionWrapper.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.function;
+
+import org.apache.wayang.api.python.executor.PythonWorkerManager;
+import org.apache.wayang.core.function.FunctionDescriptor;
+
+public class PythonFunctionWrapper<Input, Output>  implements FunctionDescriptor.SerializableFunction<Iterable<Input>, Iterable<Output>> {
+
+  private PythonUDF<Input, Output> myUDF;
+  private PythonCode serializedUDF;
+
+  public PythonFunctionWrapper(PythonUDF<Input, Output> myUDF, PythonCode serializedUDF){
+    this.myUDF = myUDF;
+    this.serializedUDF = serializedUDF;
+  }
+
+  @Override
+  public Iterable<Output> apply(Iterable<Input> input) {
+
+    PythonWorkerManager<Input, Output> manager = new PythonWorkerManager<>(
+                                                                  this.myUDF,
+                                                                  this.serializedUDF,
+                                                                  input
+    );
+    Iterable<Output> output = manager.execute();
+    return output;
+  }
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUDF.java b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUDF.java
new file mode 100644
index 00000000..1f8dc5dc
--- /dev/null
+++ b/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/function/PythonUDF.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.python.function;
+
+import org.apache.wayang.core.function.FunctionDescriptor;
+
+public interface PythonUDF<Input, Output> extends FunctionDescriptor.SerializableFunction<Iterable<Input>, Iterable<Output>>{
+
+}
diff --git a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties
index ff3a99fe..5f9e8f4e 100644
--- a/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties
+++ b/wayang-api/wayang-api-python/src/main/resources/wayang-api-python-defaults.properties
@@ -15,4 +15,5 @@
 # limitations under the License.
 #
 
-# TODO: Add properties here
\ No newline at end of file
+wayang.api.python.worker = python/src/pywy/platforms/jvm/worker.py
+wayang.api.python.path = /opt/python3
diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java
new file mode 100644
index 00000000..c45cf2ea
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/decoder/WayangPlanBuilder.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.rest.server.spring.decoder;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.wayang.api.python.function.WrappedPythonFunction;
+import org.apache.wayang.basic.operators.MapPartitionsOperator;
+import org.apache.wayang.basic.operators.TextFileSink;
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.basic.operators.UnionAllOperator;
+import org.apache.wayang.commons.serializable.OperatorProto;
+import org.apache.wayang.commons.serializable.PlanProto;
+import org.apache.wayang.commons.serializable.WayangPlanProto;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.MapPartitionsDescriptor;
+import org.apache.wayang.core.plan.wayangplan.OperatorBase;
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.java.Java;
+import org.apache.wayang.spark.Spark;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.Base64;
+
+public class WayangPlanBuilder {
+
+    private WayangPlan wayangPlan;
+    private WayangContext wayangContext;
+
+    public WayangPlanBuilder(FileInputStream planFile){
+        try {
+
+            WayangPlanProto plan = WayangPlanProto.parseFrom(planFile);
+
+            this.wayangContext = buildContext(plan);
+            this.wayangPlan = buildPlan(plan);
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public WayangPlanBuilder(String writtenPlan){
+
+        System.out.println(writtenPlan);
+        byte[] message = Base64.getDecoder().decode(writtenPlan);
+        System.out.println(message);
+
+        try {
+            WayangPlanProto plan = WayangPlanProto.parseFrom(message);
+
+            this.wayangContext = buildContext(plan);
+            this.wayangPlan = buildPlan(plan);
+        } catch (InvalidProtocolBufferException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    private WayangContext buildContext(WayangPlanProto plan){
+
+        WayangContext ctx = new WayangContext();
+//        plan.getContext().getPlatformsList().forEach(platform -> {
+//            if (platform.getNumber() == 0)
+//                ctx.with(Java.basicPlugin());
+//            else if (platform.getNumber() == 1)
+//                ctx.with(Spark.basicPlugin());
+//        });
+        ctx.with(Spark.basicPlugin());
+
+        return ctx;
+    }
+
+    private WayangPlan buildPlan(WayangPlanProto plan){
+
+        System.out.println(plan);
+
+        PlanProto planProto = plan.getPlan();
+        LinkedList<OperatorProto> protoList = new LinkedList<>();
+        planProto.getSourcesList().forEach(protoList::addLast);
+
+        Map<String, OperatorBase> operators = new HashMap<>();
+        List<OperatorBase> sinks = new ArrayList<>();
+        while(! protoList.isEmpty()) {
+
+            OperatorProto proto = protoList.pollFirst();
+
+            /* Checking if protoOperator can be connected to the current WayangPlan*/
+            boolean processIt;
+            if(proto.getType().equals("source")) processIt = true;
+
+            else {
+                /* Checking if ALL predecessors were already processed */
+                processIt = true;
+                for(String predecessor : proto.getPredecessorsList()){
+                    if (!operators.containsKey(predecessor)) {
+                        processIt = false;
+                        break;
+                    }
+                }
+            }
+
+            /* Operators should not be processed twice*/
+            if(operators.containsKey(proto.getId())) processIt = false;
+
+            if(processIt) {
+
+                /* Create and store Wayang operator */
+                OperatorBase operator = createOperatorByType(proto);
+                operators.put(proto.getId(), operator);
+
+                /*TODO Connect with predecessors requires more details in connection slot*/
+                int order = 0;
+                for (String pre_id : proto.getPredecessorsList()) {
+
+                    OperatorBase predecessor = operators.get(pre_id);
+                    /* Only works without replicate topology */
+                    predecessor.connectTo(0, operator, order);
+                    order++;
+
+                    if(proto.getType().toLowerCase().contains("sink")){
+                        sinks.add(operator);
+                        //if(!sinks.contains(operator)) {
+                        //    sinks.add(operator);
+                        //}
+                    }
+                }
+
+                /*List of OperatorProto successors
+                 * They will be added to the protoList
+                 * nevertheless they must be processed only if the parents are in operators list */
+                List<OperatorProto> listSuccessors = planProto.getOperatorsList()
+                        .stream()
+                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+                        .collect(Collectors.toList());
+                for (OperatorProto successor : listSuccessors){
+                    if(!protoList.contains(successor)){
+                        protoList.addLast(successor);
+                    }
+                }
+
+                List<OperatorProto> sinkSuccessors = planProto.getSinksList()
+                        .stream()
+                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+                        .collect(Collectors.toList());
+                for (OperatorProto successor : sinkSuccessors){
+                    if(!protoList.contains(successor)){
+                        protoList.addLast(successor);
+                    }
+                }
+
+            } else {
+
+                /* In case we cannot process it yet, It must be added again at the end*/
+                protoList.addLast(proto);
+            }
+        }
+
+        WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
+        return wayangPlan;
+    }
+
+    public OperatorBase createOperatorByType(OperatorProto operator){
+
+        switch(operator.getType()){
+            case "TextFileSource":
+                try {
+                    String source_path = operator.getPath();
+                    URL url = new File(source_path).toURI().toURL();
+                    return new TextFileSource(url.toString());
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                }
+                break;
+            case "TextFileSink":
+                try {
+                    String sink_path = operator.getPath();
+                    URL url = new File(sink_path).toURI().toURL();
+                    return new TextFileSink<String>(
+                            url.toString(),
+                            String.class
+                    );
+
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                }
+                break;
+            case "MapPartitionOperator":
+                return new MapPartitionsOperator<>(
+                        new MapPartitionsDescriptor<String, String>(
+                                new WrappedPythonFunction<String, String>(
+                                        l -> l,
+                                        operator.getUdf()
+                                ),
+                                String.class,
+                                String.class
+                        )
+                );
+
+            case "union":
+                return new UnionAllOperator<String>(
+                        String.class
+                );
+
+        }
+
+        throw new WayangException("Operator Type not supported "+operator.getType());
+    }
+
+    public WayangContext getWayangContext() {
+        return wayangContext;
+    }
+
+    public WayangPlan getWayangPlan() {
+        return wayangPlan;
+    }
+}
diff --git a/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java
new file mode 100644
index 00000000..d87044fc
--- /dev/null
+++ b/wayang-api/wayang-api-rest/src/main/java/org/apache/wayang/api/rest/server/spring/general/WayangController.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api.rest.server.spring.general;
+
+import com.google.protobuf.ByteString;
+import org.apache.wayang.api.python.function.WrappedPythonFunction;
+import org.apache.wayang.api.rest.server.spring.decoder.WayangPlanBuilder;
+import org.apache.wayang.basic.operators.*;
+import org.apache.wayang.commons.serializable.OperatorProto;
+import org.apache.wayang.commons.serializable.PlanProto;
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.function.MapPartitionsDescriptor;
+import org.apache.wayang.core.plan.wayangplan.OperatorBase;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.wayang.core.plan.wayangplan.WayangPlan;
+import org.apache.wayang.java.Java;
+import org.apache.wayang.spark.Spark;
+
+import org.apache.wayang.commons.serializable.WayangPlanProto;
+import org.springframework.web.multipart.MultipartFile;
+
+
+@RestController
+public class WayangController {
+
+    @GetMapping("/plan/create/fromfile")
+    public String planFromFile(
+            //@RequestParam("file") MultipartFile file
+    ){
+
+        try {
+            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
+            WayangPlanBuilder wpb = new WayangPlanBuilder(inputStream);
+
+            /*TODO ADD id to executions*/
+            wpb.getWayangContext().execute(wpb.getWayangPlan());
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        return "Builder works";
+    }
+
+    @PostMapping("/plan/create")
+    public String planFromMessage(
+            @RequestParam("message") String message
+    ){
+
+        WayangPlanBuilder wpb = new WayangPlanBuilder(message);
+
+        /*TODO ADD id to executions*/
+        wpb.getWayangContext().execute(wpb.getWayangPlan());
+
+        return "";
+    }
+
+    @GetMapping("/")
+    public String all(){
+        System.out.println("detected!");
+
+        try {
+            FileInputStream inputStream = new FileInputStream(Paths.get(".").toRealPath() + "/protobuf/wayang_message");
+            WayangPlanProto plan = WayangPlanProto.parseFrom(inputStream);
+
+            WayangContext wc = buildContext(plan);
+            WayangPlan wp = buildPlan(plan);
+
+            System.out.println("Plan!");
+            System.out.println(wp.toString());
+
+            wc.execute(wp);
+            return("Works!");
+
+        } catch (FileNotFoundException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        return "Not working";
+    }
+
+    private WayangContext buildContext(WayangPlanProto plan){
+
+        WayangContext ctx = new WayangContext();
+//        plan.getContext().getPlatformsList().forEach(platform -> {
+//            if (platform.getNumber() == 0)
+//                ctx.with(Java.basicPlugin());
+//            else if (platform.getNumber() == 1)
+//                ctx.with(Spark.basicPlugin());
+//        });
+        ctx.with(Spark.basicPlugin());
+
+        return ctx;
+    }
+
+    private WayangPlan buildPlan(WayangPlanProto plan){
+
+        System.out.println(plan);
+
+        PlanProto planProto = plan.getPlan();
+        LinkedList<OperatorProto> protoList = new LinkedList<>();
+        planProto.getSourcesList().forEach(protoList::addLast);
+
+        Map<String, OperatorBase> operators = new HashMap<>();
+        List<OperatorBase> sinks = new ArrayList<>();
+        while(! protoList.isEmpty()) {
+
+            OperatorProto proto = protoList.pollFirst();
+
+            /* Checking if protoOperator can be connected to the current WayangPlan*/
+            boolean processIt;
+            if(proto.getType().equals("source")) processIt = true;
+
+            else {
+                /* Checking if ALL predecessors were already processed */
+                processIt = true;
+                for(String predecessor : proto.getPredecessorsList()){
+                    if (!operators.containsKey(predecessor)) {
+                        processIt = false;
+                        break;
+                    }
+                }
+            }
+
+            /* Operators should not be processed twice*/
+            if(operators.containsKey(proto.getId())) processIt = false;
+
+            if(processIt) {
+
+                /* Create and store Wayang operator */
+                OperatorBase operator = createOperatorByType(proto);
+                operators.put(proto.getId(), operator);
+
+                /*TODO Connect with predecessors requires more details in connection slot*/
+                int order = 0;
+                for (String pre_id : proto.getPredecessorsList()) {
+
+                    OperatorBase predecessor = operators.get(pre_id);
+                    /* Only works without replicate topology */
+                    predecessor.connectTo(0, operator, order);
+                    order++;
+
+                    if(proto.getType().equals("sink")){
+                        sinks.add(operator);
+                        //if(!sinks.contains(operator)) {
+                        //    sinks.add(operator);
+                        //}
+                    }
+                }
+
+                /*List of OperatorProto successors
+                 * They will be added to the protoList
+                 * nevertheless they must be processed only if the parents are in operators list */
+                List<OperatorProto> listSuccessors = planProto.getOperatorsList()
+                        .stream()
+                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+                        .collect(Collectors.toList());
+                for (OperatorProto successor : listSuccessors){
+                    if(!protoList.contains(successor)){
+                        protoList.addLast(successor);
+                    }
+                }
+
+                List<OperatorProto> sinkSuccessors = planProto.getSinksList()
+                        .stream()
+                        .filter(t -> proto.getSuccessorsList().contains(t.getId()))
+                        .collect(Collectors.toList());
+                for (OperatorProto successor : sinkSuccessors){
+                    if(!protoList.contains(successor)){
+                        protoList.addLast(successor);
+                    }
+                }
+
+            } else {
+
+                /* In case we cannot process it yet, It must be added again at the end*/
+                protoList.addLast(proto);
+            }
+        }
+
+        WayangPlan wayangPlan = new WayangPlan(sinks.get(0));
+        return wayangPlan;
+    }
+
+    public OperatorBase createOperatorByType(OperatorProto operator){
+
+        System.out.println("Typo: " + operator.getType());
+        switch(operator.getType()){
+            case "source":
+                try {
+                    String source_path = operator.getPath();
+                    URL url = new File(source_path).toURI().toURL();
+                    return new TextFileSource(url.toString());
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                }
+                break;
+            case "sink":
+                try {
+                    String sink_path = operator.getPath();
+                    URL url = new File(sink_path).toURI().toURL();
+                    return new TextFileSink<String>(
+                            url.toString(),
+                            String.class
+                    );
+
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                }
+                break;
+            case "reduce_by_key":
+                try {
+                    /* Function to be applied in Python workers */
+                    ByteString function = operator.getUdf();
+
+                    /* Has dimension or positions that compose GroupKey */
+                    Map<String, String> parameters = operator.getParametersMap();
+
+                    PyWayangReduceByOperator<String, String> op = new PyWayangReduceByOperator(
+                        operator.getParametersMap(),
+                        operator.getUdf() ,
+                        String.class,
+                        String.class,
+                            false
+                    );
+
+                    String sink_path = operator.getPath();
+                    URL url = new File(sink_path).toURI().toURL();
+                    return new TextFileSink<String>(
+                            url.toString(),
+                            String.class
+                    );
+
+                } catch (MalformedURLException e) {
+                    e.printStackTrace();
+                }
+                break;
+            case "map_partition":
+                return new MapPartitionsOperator<>(
+                    new MapPartitionsDescriptor<String, String>(
+                        new WrappedPythonFunction<String, String>(
+                            l -> l,
+                            operator.getUdf()
+                        ),
+                        String.class,
+                        String.class
+                    )
+                );
+
+            case "union":
+                return new UnionAllOperator<String>(
+                        String.class
+                );
+
+        }
+
+        throw new WayangException("Operator Type not supported");
+    }
+
+    public static URI createUri(String resourcePath) {
+        try {
+            return Thread.currentThread().getClass().getResource(resourcePath).toURI();
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Illegal URI.", e);
+        }
+
+    }
+
+}


[incubator-wayang] 02/03: [POM] update pom.xml

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch WAYANG-211
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 8813ede3319c013a4e570f7a488fd363e234f5f9
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Tue Jun 14 16:16:14 2022 +0200

    [POM] update pom.xml
    
    Signed-off-by: bertty <be...@apache.org>
---
 pom.xml | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index a4dedf81..36ab3ede 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1109,7 +1109,8 @@
                     <configuration>
                         <detectLinks>false</detectLinks>
                         <detectJavaApiLink>true</detectJavaApiLink>
-                        <additionalparam>-Xdoclint:none</additionalparam>
+                        <doclint>none</doclint>
+<!--                    <additionalparam>-Xdoclint:none</additionalparam>-->
                     </configuration>
                 </plugin>
 


[incubator-wayang] 01/03: [LICENSE] add check-license.sh for the project, and specially for python

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch WAYANG-211
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit d0d40e3e00040eb022dbfef7721354c53a136d30
Author: Bertty Contreras-Rojas <be...@databloom.ai>
AuthorDate: Mon Jun 13 18:17:58 2022 +0200

    [LICENSE] add check-license.sh for the project, and specially for python
    
    Signed-off-by: bertty <be...@apache.org>
---
 bin/check-license.sh        | 61 +++++++++++++++++++++++++++++++++++++++++++++
 python/.rat-excludes        |  1 +
 python/bin/check-license.sh | 28 ---------------------
 3 files changed, 62 insertions(+), 28 deletions(-)

diff --git a/bin/check-license.sh b/bin/check-license.sh
new file mode 100755
index 00000000..f38f89d5
--- /dev/null
+++ b/bin/check-license.sh
@@ -0,0 +1,61 @@
+#!/bin/bash
+
+################################################################################
+##
+##  Licensed to the Apache Software Foundation (ASF) under one or more
+##  contributor license agreements.  See the NOTICE file distributed with
+##  this work for additional information regarding copyright ownership.
+##  The ASF licenses this file to You under the Apache License, Version 2.0
+##  (the "License"); you may not use this file except in compliance with
+##  the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+##  Unless required by applicable law or agreed to in writing, software
+##  distributed under the License is distributed on an "AS IS" BASIS,
+##  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+##  See the License for the specific language governing permissions and
+##  limitations under the License.
+##
+################################################################################
+
+# set the variables
+BASE=$(cd "$(dirname "$0")/.." | pwd)
+VERSION_RAT="0.14"
+RAT_HOME=${BASE}/.rat/apache-rat-${VERSION_RAT}
+RAT_JAR=${RAT_HOME}/apache-rat-${VERSION_RAT}.jar
+RAT_EXCLUSION=$1
+
+# Validate if the folder is created
+if [[ ! -f "${BASE}/.rat" ]]; then
+  mkdir -p ${BASE}/.rat
+fi
+
+# download the elements required
+if [[ ! -f "$RAT_JAR" ]]; then
+  cd ${BASE}/.rat
+  wget https://dlcdn.apache.org/creadur/apache-rat-${VERSION_RAT}/apache-rat-${VERSION_RAT}-bin.tar.gz
+  tar -xvf apache-rat-${VERSION_RAT}-bin.tar.gz
+fi
+
+cd ${BASE}
+# Set the parameters for rat
+rat_opts=("-jar" "${RAT_JAR}" "-d" "${BASE}")
+
+#add the default exclusion for the project
+rat_opts+=("-e" "target/*")
+rat_opts+=("-e" "^.*.input")
+rat_opts+=("-e" "^.*.md")
+rat_opts+=("-e" "^.*.iml")
+rat_opts+=("-e" "^.*_pb2.py") # code generated by protocol buffer
+rat_opts+=("-e" "Gemfile.lock")
+rat_opts+=("-e" ".gitignore")
+rat_opts+=("-e" ".gitmodules")
+rat_opts+=("-e" ".rat-excludes")
+if [[ ! -z "$RAT_EXCLUSION" ]]; then
+   rat_opts+=("-E")
+   rat_opts+=($RAT_EXCLUSION)
+fi
+
+# execute the validation using rat
+java "${rat_opts[@]}"  2>&1 | grep "== File:"
\ No newline at end of file
diff --git a/python/.rat-excludes b/python/.rat-excludes
index 8d1082aa..51b18ba7 100644
--- a/python/.rat-excludes
+++ b/python/.rat-excludes
@@ -1,3 +1,4 @@
+.gitignore
 .rat-excludes
 venv/*
 10e*.input
diff --git a/python/bin/check-license.sh b/python/bin/check-license.sh
deleted file mode 100755
index 7af700f8..00000000
--- a/python/bin/check-license.sh
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/bin/bash
-
-################################################################################
-##
-##  Licensed to the Apache Software Foundation (ASF) under one or more
-##  contributor license agreements.  See the NOTICE file distributed with
-##  this work for additional information regarding copyright ownership.
-##  The ASF licenses this file to You under the Apache License, Version 2.0
-##  (the "License"); you may not use this file except in compliance with
-##  the License.  You may obtain a copy of the License at
-##
-##      http://www.apache.org/licenses/LICENSE-2.0
-##
-##  Unless required by applicable law or agreed to in writing, software
-##  distributed under the License is distributed on an "AS IS" BASIS,
-##  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-##  See the License for the specific language governing permissions and
-##  limitations under the License.
-##
-################################################################################
-
-# TODO download Apache rat if is needed
-
-BASE=$(cd "$(dirname "$0")/.." | pwd)
-cd ${BASE}
-RAT_HOME=/opt/apache-rat-0.13
-
-java -jar ${RAT_HOME}/apache-rat-0.13.jar -E .rat-excludes -d . | grep "== File:"
\ No newline at end of file