You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2020/07/27 16:58:11 UTC

[asterixdb] branch master updated: [ASTERIXDB-2176][RT] Improved Python IPC

This is an automated email from the ASF dual-hosted git repository.

imaxon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b95996  [ASTERIXDB-2176][RT] Improved Python IPC
7b95996 is described below

commit 7b959961116754fb80b754e63dba5d20ef8797ae
Author: Ian Maxon <im...@apache.org>
AuthorDate: Thu Jul 23 22:15:40 2020 -0700

    [ASTERIXDB-2176][RT] Improved Python IPC
    
    - use msgpack serialization over tcp loopback
    - convert directly from ADM binary format to msg
    
    Change-Id: I5cbbc367944b489aee651ea050e74990dcf65521
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6883
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Dmitry Lychagin <dm...@couchbase.com>
---
 asterixdb/asterix-app/pom.xml                      |   6 +-
 .../apache/asterix/app/nc/NCAppRuntimeContext.java |   2 +-
 .../asterix-app/src/main/resources/entrypoint.py   | 229 +++++++++++--
 .../src/test/resources/TweetSent/roundtrip.py      |  24 ++
 .../py_function_error.0.ddl.sqlpp                  |  22 +-
 .../py_function_error.1.lib.sqlpp                  |  17 +-
 .../py_function_error.2.ddl.sqlpp                  |  17 +-
 .../py_function_error.3.query.sqlpp                |  23 +-
 .../py_nested_access/py_nested_access.0.ddl.sqlpp  |  88 +++++
 .../py_nested_access/py_nested_access.1.lib.sqlpp  |  17 +-
 .../py_nested_access.10.query.sqlpp                |  26 +-
 .../py_nested_access.11.query.sqlpp                |  26 +-
 .../py_nested_access.12.query.sqlpp                |  26 +-
 .../py_nested_access.13.query.sqlpp                |  26 +-
 .../py_nested_access/py_nested_access.2.ddl.sqlpp  |  17 +-
 .../py_nested_access.3.update.sqlpp                |  20 +-
 .../py_nested_access.4.query.sqlpp                 |  25 +-
 .../py_nested_access.5.query.sqlpp                 |  28 +-
 .../py_nested_access.6.query.sqlpp                 |  26 +-
 .../py_nested_access.7.query.sqlpp                 |  26 +-
 .../py_nested_access.8.query.sqlpp                 |  26 +-
 .../py_nested_access.9.query.sqlpp                 |  26 +-
 .../py_bigobj_roundtrip.1.regexjson                |   1 +
 .../py_function_error/py_function_error.1.json     |   1 +
 .../access-nested-fields.10.regexjson              |   4 +
 .../access-nested-fields.11.regexjson              |   4 +
 .../access-nested-fields.12.regexjson              |   4 +
 .../py_nested_access/access-nested-fields.3.adm    |   4 +
 .../access-nested-fields.4.regexjson               |   4 +
 .../access-nested-fields.5.regexjson               |   4 +
 .../access-nested-fields.6.regexjson               |   4 +
 .../access-nested-fields.7.regexjson               |   4 +
 .../access-nested-fields.8.regexjson               |   4 +
 .../access-nested-fields.9.regexjson               |   4 +
 .../resources/runtimets/testsuite_it_python.xml    |  14 +-
 .../asterix/common/exceptions/ErrorCode.java       |   1 +
 .../asterix/common/library/ILibraryManager.java    |   6 +
 .../external/ipc/ExternalFunctionResultRouter.java | 127 ++++++++
 .../src/main/resources/asx_errormsg/en.properties  |   1 +
 asterixdb/asterix-external-data/pom.xml            |  12 +-
 .../apache/asterix/external/ipc/MessageType.java   |  37 ++-
 .../asterix/external/ipc/PythonIPCProto.java       | 144 +++++++++
 .../asterix/external/ipc/PythonMessageBuilder.java | 132 ++++++++
 .../external/library/ExternalLibraryManager.java   |  23 +-
 .../library/ExternalScalarFunctionDescriptor.java  |   2 +-
 .../library/ExternalScalarFunctionEvaluator.java   |   8 +-
 .../ExternalScalarFunctionEvaluatorFactory.java    |   7 +-
 .../ExternalScalarPythonFunctionEvaluator.java     | 220 ++++++-------
 .../library/msgpack/MessagePackerFromADM.java      | 359 +++++++++++++++++++++
 .../library/msgpack/MessageUnpackerToADM.java      | 288 +++++++++++++++++
 .../LibraryDeployPrepareOperatorDescriptor.java    |  12 +-
 asterixdb/pom.xml                                  |  11 +-
 .../hyracks/control/common/ipc/CCNCFunctions.java  |  11 +-
 .../org/apache/hyracks/ipc/api/IIPCHandle.java     |   2 +
 .../ipc/api/IPayloadSerializerDeserializer.java    |   2 +-
 .../org/apache/hyracks/ipc/impl/IPCHandle.java     |   8 +
 ...lizationBasedPayloadSerializerDeserializer.java |   2 +-
 .../java/org/apache/hyracks/ipc/impl/Message.java  |  74 +++--
 .../hyracks/ipc/impl/ReconnectingIPCHandle.java    |   5 +
 .../apache/hyracks/util/string/UTF8StringUtil.java |   5 +
 60 files changed, 1824 insertions(+), 474 deletions(-)

diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index eecdc73..27c0215 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -208,7 +208,7 @@
             </configuration>
           </execution>
           <execution>
-            <id>shiv-pyro-shim</id>
+            <id>shiv-msgpack-shim</id>
             <phase>${pyro-shim.stage}</phase>
             <goals>
               <goal>exec</goal>
@@ -218,8 +218,8 @@
               <workingDirectory>${project.build.directory}</workingDirectory>
               <arguments>
                 <argument>-o </argument>
-                <argument>${project.build.directory}${file.separator}classes${file.separator}pyro4.pyz</argument>
-                <argument>pyro4</argument>
+                <argument>${project.build.directory}${file.separator}classes${file.separator}msgpack.pyz</argument>
+                <argument>msgpack</argument>
               </arguments>
               <environmentVariables>
                 <VIRTUALENV>${project.build.directory}</VIRTUALENV>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 5c15e68..e804d60 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -257,7 +257,7 @@ public class NCAppRuntimeContext implements INcApplicationContext {
         FileReference appDir =
                 ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath());
         libraryManager = new ExternalLibraryManager(ncs, persistedResourceRegistry, appDir);
-        libraryManager.initStorage(resetStorageData);
+        libraryManager.initialize(resetStorageData);
 
         /*
          * The order of registration is important. The buffer cache must registered before recovery and transaction
diff --git a/asterixdb/asterix-app/src/main/resources/entrypoint.py b/asterixdb/asterix-app/src/main/resources/entrypoint.py
old mode 100644
new mode 100755
index cd3298e..bdb68e2
--- a/asterixdb/asterix-app/src/main/resources/entrypoint.py
+++ b/asterixdb/asterix-app/src/main/resources/entrypoint.py
@@ -15,46 +15,233 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import math,sys
-sys.path.insert(0,'./site-packages/')
-import Pyro4
+import sys
+sys.path.insert(0, './site-packages/')
+sys.path.insert(len(sys.path)-1, './ipc/site-packages')
+from struct import *
+import signal
+import msgpack
+import socket
 from importlib import import_module
 from pathlib import Path
+from enum import IntEnum
+from io import BytesIO
+
+PROTO_VERSION = 1
+HEADER_SZ = 8+8+1
+REAL_HEADER_SZ = 4+8+8+1
+
+
+class MessageType(IntEnum):
+    HELO = 0
+    QUIT = 1
+    INIT = 2
+    INIT_RSP = 3
+    CALL = 4
+    CALL_RSP = 5
+    ERROR = 6
+
+
+class MessageFlags(IntEnum):
+    NORMAL = 0
+    INITIAL_REQ = 1
+    INITIAL_ACK = 2
+    ERROR = 3
+
 
-@Pyro4.expose
 class Wrapper(object):
     wrapped_module = None
     wrapped_class = None
     wrapped_fn = None
+    packer = msgpack.Packer(autoreset=False)
+    unpacker = msgpack.Unpacker()
+    response_buf = BytesIO()
+    stdin_buf = BytesIO()
+    wrapped_fns = {}
+    alive = True
 
-    def __init__(self, module_name, class_name, fn_name):
+    def init(self, module_name, class_name, fn_name):
         self.wrapped_module = import_module(module_name)
         # do not allow modules to be called that are not part of the uploaded module
+        wrapped_fn = None
         if not self.check_module_path(self.wrapped_module):
             wrapped_module = None
-            return None
+            raise ImportError("Module was not found in library")
         if class_name is not None:
-            self.wrapped_class = getattr(import_module(module_name),class_name)()
+            self.wrapped_class = getattr(
+                import_module(module_name), class_name)()
         if self.wrapped_class is not None:
-            self.wrapped_fn = getattr(self.wrapped_class,fn_name)
+            wrapped_fn = getattr(self.wrapped_class, fn_name)
         else:
-            self.wrapped_fn = locals()[fn_name]
-
-    def nextTuple(self, *args):
-        return self.wrapped_fn(args)
+            wrapped_fn = locals()[fn_name]
+        if wrapped_fn is None:
+            raise ImportError("Could not find class or function in specified module")
+        self.wrapped_fns[self.rmid] = wrapped_fn
 
-    def ping(self):
-        return "pong"
+    def nextTuple(self, *args, key=None):
+        return self.wrapped_fns[key](*args)
 
-    def check_module_path(self,module):
+    def check_module_path(self, module):
         cwd = Path('.').resolve()
         module_path = Path(module.__file__).resolve()
         return cwd in module_path.parents
 
+    def read_header(self, readbuf):
+        self.sz, self.mid, self.rmid, self.flag = unpack(
+            "!iqqb", readbuf[0:21])
+        return True
+
+    def write_header(self, response_buf, dlen):
+        total_len = dlen + HEADER_SZ
+        header = pack("!iqqb", total_len, int(-1), int(self.rmid), self.flag)
+        self.response_buf.write(header)
+        return total_len+4
+
+    def get_ver_hlen(self, hlen):
+        return hlen + (PROTO_VERSION << 4)
+
+    def get_hlen(self):
+        return self.ver_hlen - (PROTO_VERSION << 4)
+
+    def init_remote_ipc(self):
+        self.response_buf.seek(0)
+        self.flag = MessageFlags.INITIAL_REQ
+        dlen = len(self.unpacked_msg[1])
+        resp_len = self.write_header(self.response_buf, dlen)
+        self.response_buf.write(self.unpacked_msg[1])
+        self.resp = self.response_buf.getbuffer()[0:resp_len]
+        self.send_msg()
+        self.packer.reset()
+
+    def helo(self):
+        #need to ack the connection back before sending actual HELO
+        self.init_remote_ipc()
+
+        self.flag = MessageFlags.NORMAL
+        self.response_buf.seek(0)
+        self.packer.pack(int(MessageType.HELO))
+        self.packer.pack("HELO")
+        dlen = 5 #tag(1) + body(4)
+        resp_len = self.write_header(self.response_buf, dlen)
+        self.response_buf.write(self.packer.bytes())
+        self.resp = self.response_buf.getbuffer()[0:resp_len]
+        self.send_msg()
+        self.packer.reset()
+        return True
+
+    def handle_init(self):
+        self.flag = MessageFlags.NORMAL
+        self.response_buf.seek(0)
+        args = self.unpacked_msg[1]
+        module = args[0]
+        if len(args) == 3:
+            clazz = args[1]
+            fn = args[2]
+        else:
+            clazz = None
+            fn = args[1]
+        self.init(module, clazz, fn)
+        self.packer.pack(int(MessageType.INIT_RSP))
+        dlen = 1  # just the tag.
+        resp_len = self.write_header(self.response_buf, dlen)
+        self.response_buf.write(self.packer.bytes())
+        self.resp = self.response_buf.getbuffer()[0:resp_len]
+        self.send_msg()
+        self.packer.reset()
+        return True
+
+    def quit(self):
+        self.alive = False
+        return True
+
+    def handle_call(self):
+        self.flag = MessageFlags.NORMAL
+        args = self.unpacked_msg[1]
+        result = None
+        if args is None:
+            result = self.nextTuple(key=self.rmid)
+        else:
+            result = self.nextTuple(args, key=self.rmid)
+        self.packer.reset()
+        self.response_buf.seek(0)
+        body = msgpack.packb(result)
+        dlen = len(body)+1  # 1 for tag
+        resp_len = self.write_header(self.response_buf, dlen)
+        self.packer.pack(int(MessageType.CALL_RSP))
+        self.response_buf.write(self.packer.bytes())
+        self.response_buf.write(body)
+        self.resp = self.response_buf.getbuffer()[0:resp_len]
+        self.send_msg()
+        self.packer.reset()
+        return True
+
+    def handle_error(self,e):
+        self.flag = MessageFlags.NORMAL
+        result = type(e).__name__ + ": " + str(e)
+        self.packer.reset()
+        self.response_buf.seek(0)
+        body = msgpack.packb(result)
+        dlen = len(body)+1  # 1 for tag
+        resp_len = self.write_header(self.response_buf, dlen)
+        self.packer.pack(int(MessageType.ERROR))
+        self.response_buf.write(self.packer.bytes())
+        self.response_buf.write(body)
+        self.resp = self.response_buf.getbuffer()[0:resp_len]
+        self.send_msg()
+        self.packer.reset()
+        return True
+
+    type_handler = {
+        MessageType.HELO: helo,
+        MessageType.QUIT: quit,
+        MessageType.INIT: handle_init,
+        MessageType.CALL: handle_call
+    }
+
+    def connect_sock(self, addr, port):
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        try:
+            self.sock.connect((addr, int(port)))
+        except socket.error as msg:
+            print(sys.stderr, msg)
+
+    def disconnect_sock(self, *args):
+        self.sock.shutdown(socket.SHUT_RDWR)
+        self.sock.close()
+
+    def recv_msg(self):
+        completed = False
+        while not completed and self.alive:
+            readbuf = sys.stdin.buffer.read1(4096)
+            try:
+                if(len(readbuf) < REAL_HEADER_SZ):
+                    while(len(readbuf) < REAL_HEADER_SZ):
+                        readbuf += sys.stdin.buffer.read1(4096)
+                self.read_header(readbuf)
+                if(self.sz > len(readbuf)):
+                    while(len(readbuf) < self.sz):
+                        readbuf += sys.stdin.buffer.read1(4096)
+                self.unpacker.feed(readbuf[21:])
+                self.unpacked_msg = list(self.unpacker)
+                self.type = MessageType(self.unpacked_msg[0])
+                completed = self.type_handler[self.type](self)
+            except BaseException as e:
+                completed = self.handle_error(e)
+
+    def send_msg(self):
+        self.sock.sendall(self.resp)
+        self.resp = None
+        return
+
+    def recv_loop(self):
+        while self.alive:
+            self.recv_msg()
+        self.disconnect_sock()
+
 
-port = int(sys.argv[1])
-wrap = Wrapper(sys.argv[2],sys.argv[3],sys.argv[4])
-d = Pyro4.Daemon(host="127.0.0.1",port=port)
-d.register(wrap,"nextTuple")
-print(Pyro4.config.dump())
-d.requestLoop()
+addr = str(sys.argv[1])
+port = str(sys.argv[2])
+wrap = Wrapper()
+wrap.connect_sock(addr, port)
+signal.signal(signal.SIGTERM, wrap.disconnect_sock)
+wrap.recv_loop()
diff --git a/asterixdb/asterix-app/src/test/resources/TweetSent/roundtrip.py b/asterixdb/asterix-app/src/test/resources/TweetSent/roundtrip.py
new file mode 100644
index 0000000..37350be
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/TweetSent/roundtrip.py
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+class Tests(object):
+
+    def roundtrip(self, *args):
+        return args
+
+    def warning(self):
+        raise ArithmeticError("oof")
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.0.ddl.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.0.ddl.sqlpp
index d66f233..2076054 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.0.ddl.sqlpp
@@ -16,20 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
-
-    public void setAttachment(Object attachment);
+/*
+* Description  : Access a records nested records at each level.
+* Expected Res : Success
+* Date         : 04 Jun 2015
+*/
 
-    public Object getAttachment();
+drop  dataverse test if exists;
+create  dataverse test;
 
-    public boolean isConnected();
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.1.lib.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.1.lib.sqlpp
index d66f233..0f0a05b 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.1.lib.sqlpp
@@ -16,20 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
 
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
-
-    public void setAttachment(Object attachment);
-
-    public Object getAttachment();
-
-    public boolean isConnected();
-}
+install test testlib admin admin target/TweetSent.pyz
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
index d66f233..5b8f8bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
@@ -16,20 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
 
-import java.net.InetSocketAddress;
+use test;
 
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
-
-    public void setAttachment(Object attachment);
-
-    public Object getAttachment();
-
-    public boolean isConnected();
-}
+create function warning() language python as "testlib","roundtrip:Tests";
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.3.query.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.3.query.sqlpp
index d66f233..1fe7c59 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.3.query.sqlpp
@@ -16,20 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
-
-    public void setAttachment(Object attachment);
+/*
+* Description  : Access a records nested records at each level.
+* Expected Res : Success
+* Date         : 04 Jun 2015
+*/
+// param max-warnings:json=1
 
-    public Object getAttachment();
+use test;
 
-    public boolean isConnected();
-}
+warning();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.0.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.0.ddl.sqlpp
new file mode 100644
index 0000000..c074534
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.0.ddl.sqlpp
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Access a records nested records at each level.
+* Expected Res : Success
+* Date         : 04 Jun 2015
+*/
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+
+create type test.S as
+{
+  id : bigint
+};
+
+create type test.GS as
+ closed {
+  id : bigint,
+  Genus : string,
+  lower : S
+};
+
+create type test.FGS as
+{
+  id : bigint,
+  Family : string
+};
+
+create type test.OFGS as
+ closed {
+  id : bigint,
+  `Order` : string,
+  lower : FGS
+};
+
+create type test.COFGS as
+ closed {
+  id : bigint,
+  Class : string,
+  lower : OFGS
+};
+
+create type test.PCOFGS as
+ closed {
+  id : bigint,
+  Phylum : string,
+  lower : COFGS
+};
+
+create type test.KPCOFGS as
+{
+  id : bigint,
+  Kingdom : string
+};
+
+create type test.Classification as
+ closed {
+  id : bigint,
+  fullClassification : KPCOFGS
+};
+
+create type test.Animal as
+{
+  id : bigint
+};
+
+create  dataset Animals(Animal) primary key id;
+
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.1.lib.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.1.lib.sqlpp
index d66f233..0f0a05b 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.1.lib.sqlpp
@@ -16,20 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
 
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
-
-    public void setAttachment(Object attachment);
-
-    public Object getAttachment();
-
-    public boolean isConnected();
-}
+install test testlib admin admin target/TweetSent.pyz
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp
index d66f233..c48dda5 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp
@@ -16,20 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
+/*
+* Description  : Access a records nested records at each level.
+* Expected Res : Success
+* Date         : 04 Jun 2015
+*/
 
-    public void setAttachment(Object attachment);
+use test;
 
-    public Object getAttachment();
 
-    public boolean isConnected();
-}
+select value [(
+select element result
+from  Animals as test
+with  result as roundtrip(test)[0][0].class.fullClassification.lower
+order by result.id)][0]
+;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.11.query.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.11.query.sqlpp
index d66f233..30ec1da 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.11.query.sqlpp
@@ -16,20 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
+/*
+* Description  : Access a records nested records at each level.
+* Expected Res : Success
+* Date         : 04 Jun 2015
+*/
 
-    public void setAttachment(Object attachment);
+use test;
 
-    public Object getAttachment();
 
-    public boolean isConnected();
-}
+select value [(
+select element result
+from  Animals as test
+with  result as roundtrip(test)[0][0].class.fullClassification
+order by result.id)][0]
+;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp
index d66f233..1f7925f 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp
@@ -16,20 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
+/*
+* Description  : Access a records nested records at each level.
+* Expected Res : Success
+* Date         : 04 Jun 2015
+*/
 
-    public void setAttachment(Object attachment);
+use test;
 
-    public Object getAttachment();
 
-    public boolean isConnected();
-}
+select value [(
+select element result
+from  Animals as test
+with  result as roundtrip(test)[0][0].class
+order by result.id)][0]
+;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.13.query.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.13.query.sqlpp
index d66f233..13b4c28 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.13.query.sqlpp
@@ -16,20 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
+/*
+* Description  : Access a records nested records at each level.
+* Expected Res : Success
+* Date         : 04 Jun 2015
+*/
 
-    public void setAttachment(Object attachment);
+use test;
 
-    public Object getAttachment();
 
-    public boolean isConnected();
-}
+select value [(
+select element result
+from  Animals as test
+with  result as roundtrip(test)[0][0]
+order by result.id)][0]
+;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.2.ddl.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.2.ddl.sqlpp
index d66f233..655036c 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.2.ddl.sqlpp
@@ -16,20 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
 
-import java.net.InetSocketAddress;
+use test;
 
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
-
-    public void setAttachment(Object attachment);
-
-    public Object getAttachment();
-
-    public boolean isConnected();
-}
+create function roundtrip(s) language python as "testlib","roundtrip:Tests";
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.3.update.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.3.update.sqlpp
index d66f233..3ca7f76 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.3.update.sqlpp
@@ -16,20 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
+/*
+* Description  : Access a records nested records at each level.
+* Expected Res : Success
+* Date         : 04 Jun 2015
+*/
 
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
+use test;
 
-    public void setAttachment(Object attachment);
 
-    public Object getAttachment();
+load  dataset Animals using localfs ((`path`=`asterix_nc1://data/classifications/animals.adm`),(`format`=`adm`));
 
-    public boolean isConnected();
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
index d66f233..1e9a088 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
@@ -16,20 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
+/*
+* Description  : Access a records nested records at each level.
+* Expected Res : Success
+* Date         : 04 Jun 2015
+*/
 
-    public void setAttachment(Object attachment);
+use test;
 
-    public Object getAttachment();
 
-    public boolean isConnected();
-}
+select element result
+from  Animals as test
+with  result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower.lower.lower.Species
+order by result
+;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.5.query.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.5.query.sqlpp
index d66f233..eedf56b 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.5.query.sqlpp
@@ -16,20 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
-
-    public void setAttachment(Object attachment);
-
-    public Object getAttachment();
+/*
+* Description  : Access a records nested records at each level.
+* Expected Res : Success
+* Date         : 04 Jun 2015
+*/
+use test;
 
-    public boolean isConnected();
-}
+select value [(
+select element result
+from  Animals as test
+with  result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower.lower.lower
+order by result.id )][0]
+;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.6.query.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.6.query.sqlpp
index d66f233..4912ad6 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.6.query.sqlpp
@@ -16,20 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
+/*
+* Description  : Access a records nested records at each level.
+* Expected Res : Success
+* Date         : 04 Jun 2015
+*/
 
-    public void setAttachment(Object attachment);
+use test;
 
-    public Object getAttachment();
 
-    public boolean isConnected();
-}
+select value [(
+select element result
+from  Animals as test
+with  result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower.lower
+order by result.id)][0]
+;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.7.query.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.7.query.sqlpp
index d66f233..546c174 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.7.query.sqlpp
@@ -16,20 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
+/*
+* Description  : Access a records nested records at each level.
+* Expected Res : Success
+* Date         : 04 Jun 2015
+*/
 
-    public void setAttachment(Object attachment);
+use test;
 
-    public Object getAttachment();
 
-    public boolean isConnected();
-}
+select value [(
+select element result
+from  Animals as test
+with  result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower
+order by result.id)][0]
+;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.8.query.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.8.query.sqlpp
index d66f233..62af850 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.8.query.sqlpp
@@ -16,20 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
+/*
+* Description  : Access a records nested records at each level.
+* Expected Res : Success
+* Date         : 04 Jun 2015
+*/
 
-    public void setAttachment(Object attachment);
+use test;
 
-    public Object getAttachment();
 
-    public boolean isConnected();
-}
+select value [(
+select element result
+from  Animals as test
+with  result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower
+order by result.id)][0]
+;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.9.query.sqlpp
similarity index 65%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.9.query.sqlpp
index d66f233..6c594f0 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.9.query.sqlpp
@@ -16,20 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
-
-import java.net.InetSocketAddress;
-
-import org.apache.hyracks.ipc.exceptions.IPCException;
-
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
-
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
+/*
+* Description  : Access a records nested records at each level.
+* Expected Res : Success
+* Date         : 04 Jun 2015
+*/
 
-    public void setAttachment(Object attachment);
+use test;
 
-    public Object getAttachment();
 
-    public boolean isConnected();
-}
+select value [(
+select element result
+from  Animals as test
+with  result as roundtrip(test)[0][0].class.fullClassification.lower.lower
+order by result.id)][0]
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_bigobj_roundtrip/py_bigobj_roundtrip.1.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_bigobj_roundtrip/py_bigobj_roundtrip.1.regexjson
new file mode 120000
index 0000000..7f8c8ee
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_bigobj_roundtrip/py_bigobj_roundtrip.1.regexjson
@@ -0,0 +1 @@
+../../big-object/big_object_insert/big_object_insert.1.adm
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_function_error/py_function_error.1.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_function_error/py_function_error.1.json
new file mode 100644
index 0000000..ec747fa
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_function_error/py_function_error.1.json
@@ -0,0 +1 @@
+null
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.10.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.10.regexjson
new file mode 100644
index 0000000..2cddb0f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.10.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "Kingdom": "Animalia", "lower": { "id": 1, "Phylum": "Chordata", "lower": { "id": 1, "Class": "Mammalia", "lower": { "id": 1, "Order": "Carnivora", "lower": { "id": 1, "Family": "Mustelinae", "lower": { "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } } } } } } },
+{ "id": 2, "Kingdom": "Animalia", "lower": { "id": 2, "Phylum": "Chordata", "lower": { "id": 2, "Class": "Mammalia", "lower": { "id": 2, "Order": "Artiodactyla", "lower": { "id": 2, "Family": "Giraffidae", "lower": { "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } } } } } } },
+{ "id": 3, "Kingdom": "Animalia", "lower": { "id": 3, "Phylum": "Chordata", "lower": { "id": 3, "Class": "Mammalia", "lower": { "id": 3, "Order": "Atlantogenata", "lower": { "id": 3, "Family": "Afrotheria", "lower": { "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } } } } } } },
+{ "id": 4, "Kingdom": "Animalia", "lower": { "id": 4, "Phylum": "Chordata", "lower": { "id": 4, "Class": "Aves", "lower": { "id": 4, "Order": "Accipitriformes", "lower": { "id": 4, "Family": "Accipitridae", "lower": { "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } } } } } } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.11.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.11.regexjson
new file mode 100644
index 0000000..a7ce00a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.11.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "fullClassification": { "id": 1, "Kingdom": "Animalia", "lower": { "id": 1, "Phylum": "Chordata", "lower": { "id": 1, "Class": "Mammalia", "lower": { "id": 1, "Order": "Carnivora", "lower": { "id": 1, "Family": "Mustelinae", "lower": { "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } } } } } } } },
+{ "id": 2, "fullClassification": { "id": 2, "Kingdom": "Animalia", "lower": { "id": 2, "Phylum": "Chordata", "lower": { "id": 2, "Class": "Mammalia", "lower": { "id": 2, "Order": "Artiodactyla", "lower": { "id": 2, "Family": "Giraffidae", "lower": { "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } } } } } } } },
+{ "id": 3, "fullClassification": { "id": 3, "Kingdom": "Animalia", "lower": { "id": 3, "Phylum": "Chordata", "lower": { "id": 3, "Class": "Mammalia", "lower": { "id": 3, "Order": "Atlantogenata", "lower": { "id": 3, "Family": "Afrotheria", "lower": { "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } } } } } } } },
+{ "id": 4, "fullClassification": { "id": 4, "Kingdom": "Animalia", "lower": { "id": 4, "Phylum": "Chordata", "lower": { "id": 4, "Class": "Aves", "lower": { "id": 4, "Order": "Accipitriformes", "lower": { "id": 4, "Family": "Accipitridae", "lower": { "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } } } } } } } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.12.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.12.regexjson
new file mode 100644
index 0000000..d09c537
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.12.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "class": { "id": 1, "fullClassification": { "id": 1, "Kingdom": "Animalia", "lower": { "id": 1, "Phylum": "Chordata", "lower": { "id": 1, "Class": "Mammalia", "lower": { "id": 1, "Order": "Carnivora", "lower": { "id": 1, "Family": "Mustelinae", "lower": { "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } } } } } } } } },
+{ "id": 2, "class": { "id": 2, "fullClassification": { "id": 2, "Kingdom": "Animalia", "lower": { "id": 2, "Phylum": "Chordata", "lower": { "id": 2, "Class": "Mammalia", "lower": { "id": 2, "Order": "Artiodactyla", "lower": { "id": 2, "Family": "Giraffidae", "lower": { "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } } } } } } } } },
+{ "id": 3, "class": { "id": 3, "fullClassification": { "id": 3, "Kingdom": "Animalia", "lower": { "id": 3, "Phylum": "Chordata", "lower": { "id": 3, "Class": "Mammalia", "lower": { "id": 3, "Order": "Atlantogenata", "lower": { "id": 3, "Family": "Afrotheria", "lower": { "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } } } } } } } } },
+{ "id": 4, "class": { "id": 4, "fullClassification": { "id": 4, "Kingdom": "Animalia", "lower": { "id": 4, "Phylum": "Chordata", "lower": { "id": 4, "Class": "Aves", "lower": { "id": 4, "Order": "Accipitriformes", "lower": { "id": 4, "Family": "Accipitridae", "lower": { "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } } } } } } } } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.3.adm
new file mode 100644
index 0000000..e34004a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.3.adm
@@ -0,0 +1,4 @@
+"Gulo"
+"Hyracoidea"
+"Jamaicensis"
+"Johnstoni"
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.4.regexjson
new file mode 100644
index 0000000..5d8ddf4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.4.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "Species": "Gulo" },
+{ "id": 2, "Species": "Johnstoni" },
+{ "id": 3, "Species": "Hyracoidea" },
+{ "id": 4, "Species": "Jamaicensis" }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.5.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.5.regexjson
new file mode 100644
index 0000000..f10de1f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.5.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } },
+{ "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } },
+{ "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } },
+{ "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.6.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.6.regexjson
new file mode 100644
index 0000000..4e6d8ea
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.6.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "Family": "Mustelinae", "lower": { "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } } },
+{ "id": 2, "Family": "Giraffidae", "lower": { "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } } },
+{ "id": 3, "Family": "Afrotheria", "lower": { "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } } },
+{ "id": 4, "Family": "Accipitridae", "lower": { "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.7.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.7.regexjson
new file mode 100644
index 0000000..60d21a9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.7.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "Order": "Carnivora", "lower": { "id": 1, "Family": "Mustelinae", "lower": { "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } } } },
+{ "id": 2, "Order": "Artiodactyla", "lower": { "id": 2, "Family": "Giraffidae", "lower": { "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } } } },
+{ "id": 3, "Order": "Atlantogenata", "lower": { "id": 3, "Family": "Afrotheria", "lower": { "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } } } },
+{ "id": 4, "Order": "Accipitriformes", "lower": { "id": 4, "Family": "Accipitridae", "lower": { "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } } } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.8.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.8.regexjson
new file mode 100644
index 0000000..183e94c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.8.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "Class": "Mammalia", "lower": { "id": 1, "Order": "Carnivora", "lower": { "id": 1, "Family": "Mustelinae", "lower": { "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } } } } },
+{ "id": 2, "Class": "Mammalia", "lower": { "id": 2, "Order": "Artiodactyla", "lower": { "id": 2, "Family": "Giraffidae", "lower": { "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } } } } },
+{ "id": 3, "Class": "Mammalia", "lower": { "id": 3, "Order": "Atlantogenata", "lower": { "id": 3, "Family": "Afrotheria", "lower": { "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } } } } },
+{ "id": 4, "Class": "Aves", "lower": { "id": 4, "Order": "Accipitriformes", "lower": { "id": 4, "Family": "Accipitridae", "lower": { "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } } } } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.9.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.9.regexjson
new file mode 100644
index 0000000..347b5b6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.9.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "Phylum": "Chordata", "lower": { "id": 1, "Class": "Mammalia", "lower": { "id": 1, "Order": "Carnivora", "lower": { "id": 1, "Family": "Mustelinae", "lower": { "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } } } } } },
+{ "id": 2, "Phylum": "Chordata", "lower": { "id": 2, "Class": "Mammalia", "lower": { "id": 2, "Order": "Artiodactyla", "lower": { "id": 2, "Family": "Giraffidae", "lower": { "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } } } } } },
+{ "id": 3, "Phylum": "Chordata", "lower": { "id": 3, "Class": "Mammalia", "lower": { "id": 3, "Order": "Atlantogenata", "lower": { "id": 3, "Family": "Afrotheria", "lower": { "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } } } } } },
+{ "id": 4, "Phylum": "Chordata", "lower": { "id": 4, "Class": "Aves", "lower": { "id": 4, "Order": "Accipitriformes", "lower": { "id": 4, "Family": "Accipitridae", "lower": { "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } } } } } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
index 6d03162..e4669da 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
@@ -22,7 +22,6 @@
              ResultOffsetPath="results"
              QueryOffsetPath="queries_sqlpp"
              QueryFileExtension=".sqlpp">
-
   <test-group name="external-library-python">
     <test-case FilePath="external-library">
       <compilation-unit name="mysentiment">
@@ -32,7 +31,18 @@
     <test-case FilePath="external-library">
       <compilation-unit name="python-fn-escape">
         <output-dir compare="Text">python-fn-escape</output-dir>
-        <expected-error>'NoneType' object is not callable</expected-error>
+        <expected-error>ImportError: Module was not found in library</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-library">
+      <compilation-unit name="py_nested_access">
+        <output-dir compare="Clean-JSON">py_nested_access</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-library" check-warnings="true">
+      <compilation-unit name="py_function_error">
+        <output-dir compare="Clean-JSON">py_function_error</output-dir>
+        <expected-warn>ASX0201: External UDF returned exception. Returned exception was: ArithmeticError: oof</expected-warn>
       </compilation-unit>
     </test-case>
   </test-group>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 73dfbd4..87a11cc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -88,6 +88,7 @@ public class ErrorCode {
     public static final int UNSUPPORTED_JRE = 100;
 
     public static final int EXTERNAL_UDF_RESULT_TYPE_ERROR = 200;
+    public static final int EXTERNAL_UDF_EXCEPTION = 201;
 
     // Compilation errors
     public static final int PARSE_ERROR = 1001;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
index 83f1d71..047018b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
@@ -20,8 +20,10 @@
 package org.apache.asterix.common.library;
 
 import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.ipc.impl.IPCSystem;
 
 public interface ILibraryManager {
 
@@ -36,4 +38,8 @@ public interface ILibraryManager {
     void dropLibraryPath(FileReference fileRef) throws HyracksDataException;
 
     byte[] serializeLibraryDescriptor(LibraryDescriptor libraryDescriptor) throws HyracksDataException;
+
+    ExternalFunctionResultRouter getRouter();
+
+    IPCSystem getIPCI();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/external/ipc/ExternalFunctionResultRouter.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/external/ipc/ExternalFunctionResultRouter.java
new file mode 100644
index 0000000..8d56eb7
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/external/ipc/ExternalFunctionResultRouter.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.ipc;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.IIPCI;
+import org.apache.hyracks.ipc.api.IPayloadSerializerDeserializer;
+import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.apache.hyracks.ipc.impl.Message;
+
+public class ExternalFunctionResultRouter implements IIPCI {
+
+    AtomicLong maxId = new AtomicLong(0);
+    ConcurrentHashMap<Long, MutableObject<ByteBuffer>> activeClients = new ConcurrentHashMap<>();
+    ConcurrentHashMap<Long, Exception> exceptionInbox = new ConcurrentHashMap<>();
+    private static int MAX_BUF_SIZE = 32 * 1024 * 1024; //32MB
+
+    @Override
+    public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload) {
+        int rewind = handle.getAttachmentLen();
+        ByteBuffer buf = (ByteBuffer) payload;
+        int end = buf.position();
+        buf.position(end - rewind);
+        ByteBuffer copyTo = activeClients.get(rmid).getValue();
+        if (copyTo.capacity() < handle.getAttachmentLen()) {
+            int nextSize = closestPow2(handle.getAttachmentLen());
+            if (nextSize > MAX_BUF_SIZE) {
+                onError(handle, mid, rmid, HyracksException.create(ErrorCode.RECORD_IS_TOO_LARGE));
+                return;
+            }
+            copyTo = ByteBuffer.allocate(nextSize);
+            activeClients.get(rmid).setValue(copyTo);
+        }
+        copyTo.position(0);
+        System.arraycopy(buf.array(), buf.position() + buf.arrayOffset(), copyTo.array(), copyTo.arrayOffset(),
+                handle.getAttachmentLen());
+        synchronized (copyTo) {
+            copyTo.limit(handle.getAttachmentLen() + 1);
+            copyTo.notify();
+        }
+        buf.position(end);
+    }
+
+    @Override
+    public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) {
+        exceptionInbox.put(rmid, exception);
+        ByteBuffer route = activeClients.get(rmid).getValue();
+        synchronized (route) {
+            route.notify();
+        }
+    }
+
+    public Long insertRoute(ByteBuffer buf) {
+        Long id = maxId.incrementAndGet();
+        activeClients.put(id, new MutableObject<>(buf));
+        return id;
+    }
+
+    public Exception getException(Long id) {
+        return exceptionInbox.remove(id);
+    }
+
+    public boolean hasException(long id) {
+        return exceptionInbox.get(id) == null;
+    }
+
+    public void removeRoute(Long id) {
+        activeClients.remove(id);
+        exceptionInbox.remove(id);
+    }
+
+    public static int closestPow2(int n) {
+        return (int) Math.pow(2, Math.ceil(Math.log(n) / Math.log(2)));
+    }
+
+    public static class NoOpNoSerJustDe implements IPayloadSerializerDeserializer {
+
+        private static byte[] noop = new byte[] { (byte) 0 };
+
+        @Override
+        public Object deserializeObject(ByteBuffer buffer, int length, byte flag) throws Exception {
+            if (flag == Message.INITIAL_REQ) {
+                return new JavaSerializationBasedPayloadSerializerDeserializer().deserializeObject(buffer, length,
+                        flag);
+            }
+            return buffer;
+        }
+
+        @Override
+        public Exception deserializeException(ByteBuffer buffer, int length) throws Exception {
+            return null;
+        }
+
+        @Override
+        public byte[] serializeObject(Object object) throws Exception {
+            return noop;
+        }
+
+        @Override
+        public byte[] serializeException(Exception object) throws Exception {
+            return noop;
+        }
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index a7d49d9..438d719 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -90,6 +90,7 @@
 100 = Unsupported JRE: %1$s
 
 200 = External UDF cannot produce expected result. Please check the UDF configuration
+201 = External UDF returned exception. Returned exception was: %1$s
 
 # Compile-time check errors
 1001 = Syntax error: %1$s
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 7760101..3847576 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -445,14 +445,6 @@
       <artifactId>guava</artifactId>
     </dependency>
     <dependency>
-        <groupId>net.razorvine</groupId>
-        <artifactId>pyrolite</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>net.razorvine</groupId>
-      <artifactId>serpent</artifactId>
-    </dependency>
-    <dependency>
       <groupId>software.amazon.awssdk</groupId>
       <artifactId>http-client-spi</artifactId>
     </dependency>
@@ -472,5 +464,9 @@
       <groupId>software.amazon.awssdk</groupId>
       <artifactId>auth</artifactId>
     </dependency>
+    <dependency>
+        <groupId>org.msgpack</groupId>
+        <artifactId>msgpack-core</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/MessageType.java
similarity index 61%
copy from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
copy to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/MessageType.java
index d66f233..506d9d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/MessageType.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,7 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * http://www.apache.org/licenses/LICENSE-2.0
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,20 +14,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.api;
 
-import java.net.InetSocketAddress;
+package org.apache.asterix.external.ipc;
 
-import org.apache.hyracks.ipc.exceptions.IPCException;
+public enum MessageType {
+    HELO,
+    QUIT,
+    INIT,
+    INIT_RSP,
+    CALL,
+    CALL_RSP,
+    ERROR;
 
-public interface IIPCHandle {
-    public InetSocketAddress getRemoteAddress();
+    static MessageType[] messageTypes;
+    static {
+        messageTypes = values();
+    }
 
-    public long send(long requestId, Object payload, Exception exception) throws IPCException;
-
-    public void setAttachment(Object attachment);
-
-    public Object getAttachment();
-
-    public boolean isConnected();
+    public static MessageType fromByte(byte b) {
+        if (b > messageTypes.length - 1) {
+            return null;
+        }
+        return messageTypes[b];
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
new file mode 100644
index 0000000..feb52cf
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.ipc;
+
+import static org.apache.hyracks.ipc.impl.Message.HEADER_SIZE;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+import org.apache.hyracks.ipc.impl.Message;
+import org.msgpack.core.MessagePack;
+
+public class PythonIPCProto {
+
+    public PythonMessageBuilder messageBuilder;
+    OutputStream sockOut;
+    ByteBuffer headerBuffer = ByteBuffer.allocate(21);
+    ByteBuffer recvBuffer = ByteBuffer.allocate(4096);
+    ExternalFunctionResultRouter router;
+    IPCSystem ipcSys;
+    Message outMsg;
+    Long key;
+
+    public PythonIPCProto(OutputStream sockOut, ExternalFunctionResultRouter router, IPCSystem ipcSys)
+            throws IOException {
+        this.sockOut = sockOut;
+        messageBuilder = new PythonMessageBuilder();
+        this.router = router;
+        this.ipcSys = ipcSys;
+        this.outMsg = new Message(null);
+    }
+
+    public void start() {
+        this.key = router.insertRoute(recvBuffer);
+    }
+
+    public void helo() throws IOException, AsterixException {
+        recvBuffer.clear();
+        recvBuffer.position(0);
+        recvBuffer.limit(0);
+        messageBuilder.buf.clear();
+        messageBuilder.buf.position(0);
+        messageBuilder.hello();
+        sendMsg();
+        receiveMsg();
+        if (getResponseType() != MessageType.HELO) {
+            throw new IllegalStateException("Illegal reply received, expected HELO");
+        }
+    }
+
+    public void init(String module, String clazz, String fn) throws IOException, AsterixException {
+        recvBuffer.clear();
+        recvBuffer.position(0);
+        recvBuffer.limit(0);
+        messageBuilder.buf.clear();
+        messageBuilder.buf.position(0);
+        messageBuilder.init(module, clazz, fn);
+        sendMsg();
+        receiveMsg();
+        if (getResponseType() != MessageType.INIT_RSP) {
+            throw new IllegalStateException("Illegal reply received, expected INIT_RSP");
+        }
+    }
+
+    public ByteBuffer call(ByteBuffer args, int numArgs) throws Exception {
+        recvBuffer.clear();
+        recvBuffer.position(0);
+        recvBuffer.limit(0);
+        messageBuilder.buf.clear();
+        messageBuilder.buf.position(0);
+        messageBuilder.call(args.array(), args.position(), numArgs);
+        sendMsg();
+        receiveMsg();
+        if (getResponseType() != MessageType.CALL_RSP) {
+            throw new IllegalStateException("Illegal reply received, expected CALL_RSP, recvd: " + getResponseType());
+        }
+        return recvBuffer;
+    }
+
+    public void quit() throws IOException {
+        messageBuilder.quit();
+        router.removeRoute(key);
+    }
+
+    public void receiveMsg() throws IOException, AsterixException {
+        Exception except = null;
+        try {
+            synchronized (recvBuffer) {
+                while (recvBuffer.limit() == 0) {
+                    recvBuffer.wait(100);
+                }
+            }
+            if (router.hasException(key)) {
+                except = router.getException(key);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION, e);
+        }
+        if (except != null) {
+            throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION, except);
+        }
+        messageBuilder.readHead(recvBuffer);
+        if (messageBuilder.type == MessageType.ERROR) {
+            throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION,
+                    MessagePack.newDefaultUnpacker(recvBuffer).unpackString());
+        }
+    }
+
+    public void sendMsg() throws IOException {
+        headerBuffer.clear();
+        headerBuffer.position(0);
+        headerBuffer.putInt(HEADER_SIZE + messageBuilder.buf.position());
+        headerBuffer.putLong(-1);
+        headerBuffer.putLong(key);
+        headerBuffer.put(Message.NORMAL);
+        sockOut.write(headerBuffer.array(), 0, HEADER_SIZE + Integer.BYTES);
+        sockOut.write(messageBuilder.buf.array(), 0, messageBuilder.buf.position());
+        sockOut.flush();
+    }
+
+    public MessageType getResponseType() {
+        return messageBuilder.type;
+    }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
new file mode 100644
index 0000000..506e80d
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.ipc;
+
+import static org.apache.hyracks.api.util.JavaSerializationUtils.getSerializationProvider;
+import static org.msgpack.core.MessagePack.Code.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.asterix.external.library.msgpack.MessagePackerFromADM;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class PythonMessageBuilder {
+    private static final int MAX_BUF_SIZE = 21 * 1024 * 1024; //21MB.
+    private static final Logger LOGGER = LogManager.getLogger();
+    MessageType type;
+    long dataLength;
+    ByteBuffer buf;
+    String[] initAry = new String[3];
+
+    public PythonMessageBuilder() {
+        this.type = null;
+        dataLength = -1;
+        this.buf = ByteBuffer.allocate(4096);
+    }
+
+    public void setType(MessageType type) {
+        this.type = type;
+    }
+
+    public void packHeader() {
+        MessagePackerFromADM.packFixPos(buf, (byte) type.ordinal());
+    }
+
+    //TODO: this is wrong for any multibyte chars
+    private int getStringLength(String s) {
+        return s.length();
+    }
+
+    public void readHead(ByteBuffer buf) {
+        byte typ = buf.get();
+        type = MessageType.fromByte(typ);
+    }
+
+    public void hello() throws IOException {
+        this.type = MessageType.HELO;
+        byte[] serAddr = serialize(new InetSocketAddress(InetAddress.getLoopbackAddress(), 1));
+        dataLength = serAddr.length + 5;
+        packHeader();
+        //TODO:make this cleaner
+        buf.put(BIN32);
+        buf.putInt(serAddr.length);
+        buf.put(serAddr);
+    }
+
+    public void quit() {
+        this.type = MessageType.QUIT;
+        dataLength = getStringLength("QUIT");
+        packHeader();
+        MessagePackerFromADM.packFixStr(buf, "QUIT");
+    }
+
+    public void init(String module, String clazz, String fn) {
+        this.type = MessageType.INIT;
+        initAry[0] = module;
+        initAry[1] = clazz;
+        initAry[2] = fn;
+        dataLength = Arrays.stream(initAry).mapToInt(s -> getStringLength(s)).sum() + 2;
+        packHeader();
+        MessagePackerFromADM.packFixArrayHeader(buf, (byte) initAry.length);
+        for (String s : initAry) {
+            MessagePackerFromADM.packStr(buf, s);
+        }
+    }
+
+    public void call(byte[] args, int lim, int numArgs) {
+        if (args.length > buf.capacity()) {
+            int growTo = ExternalFunctionResultRouter.closestPow2(args.length);
+            if (growTo > MAX_BUF_SIZE) {
+                //TODO: something more graceful
+                throw new IllegalArgumentException("Reached maximum buffer size");
+            }
+            buf = ByteBuffer.allocate(growTo);
+        }
+        buf.clear();
+        buf.position(0);
+        this.type = MessageType.CALL;
+        dataLength = 5 + 1 + lim;
+        packHeader();
+        //TODO: make this switch between fixarray/array16/array32
+        if (numArgs == 0) {
+            buf.put(NIL);
+        } else {
+            buf.put(ARRAY32);
+            buf.putInt(numArgs);
+            buf.put(args, 0, lim);
+        }
+    }
+
+    //this is used to send a serialized java inetaddress to the entrypoint so it can send it back
+    //to the IPC subsystem, which needs it. don't use this for anything else.
+    private byte[] serialize(Object object) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (ObjectOutputStream oos = getSerializationProvider().newObjectOutputStream(baos)) {
+            oos.writeObject(object);
+            oos.flush();
+            baos.close();
+        }
+        return baos.toByteArray();
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
index b0a4dfd..d7a446b 100755
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
@@ -23,6 +23,8 @@ import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTR
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
@@ -34,6 +36,7 @@ import org.apache.asterix.common.library.ILibrary;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.library.LibraryDescriptor;
 import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -43,6 +46,8 @@ import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
 import org.apache.hyracks.util.file.FileUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -81,6 +86,8 @@ public final class ExternalLibraryManager implements ILibraryManager, ILifeCycle
     private final FileReference trashDir;
     private final Path trashDirPath;
     private final Map<Pair<DataverseName, String>, ILibrary> libraries = new HashMap<>();
+    private IPCSystem pythonIPC;
+    private final ExternalFunctionResultRouter router;
 
     public ExternalLibraryManager(NodeControllerService ncs, IPersistedResourceRegistry reg, FileReference appDir) {
         this.ncs = ncs;
@@ -91,10 +98,14 @@ public final class ExternalLibraryManager implements ILibraryManager, ILifeCycle
         trashDir = baseDir.getChild(TRASH_DIR_NAME);
         trashDirPath = trashDir.getFile().toPath().normalize();
         objectMapper = createObjectMapper();
+        router = new ExternalFunctionResultRouter();
     }
 
-    public void initStorage(boolean resetStorageData) throws HyracksDataException {
+    public void initialize(boolean resetStorageData) throws HyracksDataException {
         try {
+            pythonIPC = new IPCSystem(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0),
+                    PlainSocketChannelFactory.INSTANCE, router, new ExternalFunctionResultRouter.NoOpNoSerJustDe());
+            pythonIPC.start();
             Path baseDirPath = baseDir.getFile().toPath();
             if (Files.isDirectory(baseDirPath)) {
                 if (resetStorageData) {
@@ -291,6 +302,16 @@ public final class ExternalLibraryManager implements ILibraryManager, ILifeCycle
         return om;
     }
 
+    @Override
+    public ExternalFunctionResultRouter getRouter() {
+        return router;
+    }
+
+    @Override
+    public IPCSystem getIPCI() {
+        return pythonIPC;
+    }
+
     private static final class DeleteDirectoryWork extends AbstractWork {
 
         private final Path path;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java
index c033cea..61ab3ea 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java
@@ -47,7 +47,7 @@ public class ExternalScalarFunctionDescriptor extends AbstractScalarFunctionDyna
 
     @Override
     public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
-        return new ExternalScalarFunctionEvaluatorFactory(finfo, args, argTypes);
+        return new ExternalScalarFunctionEvaluatorFactory(finfo, args, argTypes, sourceLoc);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java
index 3ff706d..5ae1995 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java
@@ -21,12 +21,14 @@ package org.apache.asterix.external.library;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
 import org.apache.asterix.om.functions.IExternalFunctionInfo;
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.ipc.impl.IPCSystem;
 
 public abstract class ExternalScalarFunctionEvaluator implements IScalarEvaluator {
 
@@ -34,6 +36,8 @@ public abstract class ExternalScalarFunctionEvaluator implements IScalarEvaluato
     protected final IScalarEvaluator[] argEvals;
     protected final IAType[] argTypes;
     protected final ILibraryManager libraryManager;
+    protected final ExternalFunctionResultRouter router;
+    protected final IPCSystem ipcSys;
 
     public ExternalScalarFunctionEvaluator(IExternalFunctionInfo finfo, IScalarEvaluatorFactory[] args,
             IAType[] argTypes, IEvaluatorContext context) throws HyracksDataException {
@@ -45,5 +49,7 @@ public abstract class ExternalScalarFunctionEvaluator implements IScalarEvaluato
         }
         libraryManager =
                 ((INcApplicationContext) context.getServiceContext().getApplicationContext()).getLibraryManager();
+        router = libraryManager.getRouter();
+        ipcSys = libraryManager.getIPCI();
     }
-}
\ No newline at end of file
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluatorFactory.java
index ec757a1..de75f7a 100755
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluatorFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluatorFactory.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 
 public class ExternalScalarFunctionEvaluatorFactory implements IScalarEvaluatorFactory {
 
@@ -32,12 +33,14 @@ public class ExternalScalarFunctionEvaluatorFactory implements IScalarEvaluatorF
     private final IExternalFunctionInfo finfo;
     private final IScalarEvaluatorFactory[] args;
     private final IAType[] argTypes;
+    private final SourceLocation sourceLoc;
 
     public ExternalScalarFunctionEvaluatorFactory(IExternalFunctionInfo finfo, IScalarEvaluatorFactory[] args,
-            IAType[] argTypes) {
+            IAType[] argTypes, SourceLocation sourceLoc) {
         this.finfo = finfo;
         this.args = args;
         this.argTypes = argTypes;
+        this.sourceLoc = sourceLoc;
     }
 
     @Override
@@ -46,7 +49,7 @@ public class ExternalScalarFunctionEvaluatorFactory implements IScalarEvaluatorF
             case JAVA:
                 return new ExternalScalarJavaFunctionEvaluator(finfo, args, argTypes, ctx);
             case PYTHON:
-                return new ExternalScalarPythonFunctionEvaluator(finfo, args, argTypes, ctx);
+                return new ExternalScalarPythonFunctionEvaluator(finfo, args, argTypes, ctx, sourceLoc);
             default:
                 throw new HyracksDataException(ErrorCode.ASTERIX, ErrorCode.LIBRARY_EXTERNAL_FUNCTION_UNSUPPORTED_KIND,
                         finfo.getLanguage());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
index e49c97e..31f96cf 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
@@ -22,36 +22,34 @@ package org.apache.asterix.external.library;
 import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
-import java.net.ConnectException;
-import java.net.ServerSocket;
-import java.util.HashMap;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.WarningUtil;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.metadata.DataverseName;
-import org.apache.asterix.external.api.IJObject;
-import org.apache.asterix.external.library.java.JObjectPointableVisitor;
-import org.apache.asterix.external.library.java.base.JComplexObject;
-import org.apache.asterix.external.library.java.base.JObject;
+import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
+import org.apache.asterix.external.ipc.PythonIPCProto;
+import org.apache.asterix.external.library.msgpack.MessagePackerFromADM;
+import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
 import org.apache.asterix.om.functions.IExternalFunctionInfo;
-import org.apache.asterix.om.pointables.AFlatValuePointable;
-import org.apache.asterix.om.pointables.AListVisitablePointable;
-import org.apache.asterix.om.pointables.ARecordVisitablePointable;
-import org.apache.asterix.om.pointables.PointableAllocator;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeTagUtil;
-import org.apache.asterix.om.util.container.IObjectPool;
-import org.apache.asterix.om.util.container.ListObjectPool;
 import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.resources.IDeallocatable;
 import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -62,49 +60,45 @@ import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
-
-import net.razorvine.pyro.PyroProxy;
+import org.apache.hyracks.ipc.impl.IPCSystem;
 
 class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvaluator {
 
     private final PythonLibraryEvaluator libraryEvaluator;
 
     private final ArrayBackedValueStorage resultBuffer = new ArrayBackedValueStorage();
-    private final PointableAllocator pointableAllocator;
-    private final JObjectPointableVisitor pointableVisitor;
-    private final Object[] argHolder;
-    private final IObjectPool<IJObject, IAType> reflectingPool = new ListObjectPool<>(JTypeObjectFactory.INSTANCE);
-    private final Map<IAType, TypeInfo> infoPool = new HashMap<>();
+    private final ByteBuffer argHolder;
+    private final ByteBuffer outputWrapper;
+    private final IEvaluatorContext evaluatorContext;
     private static final String ENTRYPOINT = "entrypoint.py";
-    private static final String PY_NO_SITE_PKGS_OPT = "-S";
-    private static final String PY_NO_USER_PKGS_OPT = "-s";
 
     private final IPointable[] argValues;
 
     ExternalScalarPythonFunctionEvaluator(IExternalFunctionInfo finfo, IScalarEvaluatorFactory[] args,
-            IAType[] argTypes, IEvaluatorContext ctx) throws HyracksDataException {
+            IAType[] argTypes, IEvaluatorContext ctx, SourceLocation sourceLoc) throws HyracksDataException {
         super(finfo, args, argTypes, ctx);
 
         File pythonPath = new File(ctx.getServiceContext().getAppConfig().getString(NCConfig.Option.PYTHON_HOME));
-        this.pointableAllocator = new PointableAllocator();
-        this.pointableVisitor = new JObjectPointableVisitor();
-
         DataverseName dataverseName = FunctionSignature.getDataverseName(finfo.getFunctionIdentifier());
         try {
-            libraryEvaluator = PythonLibraryEvaluator.getInstance(dataverseName, finfo, libraryManager, pythonPath,
-                    ctx.getTaskContext());
-        } catch (IOException | InterruptedException e) {
+            libraryEvaluator = PythonLibraryEvaluator.getInstance(dataverseName, finfo, libraryManager, router, ipcSys,
+                    pythonPath, ctx.getTaskContext(), ctx.getWarningCollector(), sourceLoc);
+        } catch (IOException | AsterixException e) {
             throw new HyracksDataException("Failed to initialize Python", e);
         }
         argValues = new IPointable[args.length];
         for (int i = 0; i < argValues.length; i++) {
             argValues[i] = VoidPointable.FACTORY.createPointable();
         }
-        this.argHolder = new Object[args.length];
+        //TODO: these should be dynamic
+        this.argHolder = ByteBuffer.wrap(new byte[Short.MAX_VALUE * 2]);
+        this.outputWrapper = ByteBuffer.wrap(new byte[Short.MAX_VALUE * 2]);
+        this.evaluatorContext = ctx;
     }
 
     @Override
     public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
+        argHolder.clear();
         for (int i = 0, ln = argEvals.length; i < ln; i++) {
             argEvals[i].evaluate(tuple, argValues[i]);
             try {
@@ -114,10 +108,10 @@ class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvalua
             }
         }
         try {
-            Object res = libraryEvaluator.callPython(argHolder);
+            ByteBuffer res = libraryEvaluator.callPython(argHolder, argTypes.length);
             resultBuffer.reset();
             wrap(res, resultBuffer.getDataOutput());
-        } catch (IOException e) {
+        } catch (Exception e) {
             throw new HyracksDataException("Error evaluating Python UDF", e);
         }
         result.set(resultBuffer.getByteArray(), resultBuffer.getStartOffset(), resultBuffer.getLength());
@@ -125,26 +119,39 @@ class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvalua
 
     private static class PythonLibraryEvaluator extends AbstractStateObject implements IDeallocatable {
         Process p;
-        PyroProxy remoteObj;
         IExternalFunctionInfo finfo;
         ILibraryManager libMgr;
         File pythonHome;
+        PythonIPCProto proto;
+        ExternalFunctionResultRouter router;
+        IPCSystem ipcSys;
+        String module;
+        String clazz;
+        String fn;
+        TaskAttemptId task;
+        IWarningCollector warningCollector;
+        SourceLocation sourceLoc;
 
         private PythonLibraryEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, IExternalFunctionInfo finfo,
-                ILibraryManager libMgr, File pythonHome) {
+                ILibraryManager libMgr, File pythonHome, ExternalFunctionResultRouter router, IPCSystem ipcSys,
+                TaskAttemptId task, IWarningCollector warningCollector, SourceLocation sourceLoc) {
             super(jobId, evaluatorId);
             this.finfo = finfo;
             this.libMgr = libMgr;
             this.pythonHome = pythonHome;
+            this.router = router;
+            this.task = task;
+            this.ipcSys = ipcSys;
+            this.warningCollector = warningCollector;
+            this.sourceLoc = sourceLoc;
 
         }
 
-        public void initialize() throws IOException, InterruptedException {
+        public void initialize() throws IOException, AsterixException {
             PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id;
             List<String> externalIdents = finfo.getExternalIdentifier();
             PythonLibrary library = (PythonLibrary) libMgr.getLibrary(fnId.dataverseName, fnId.libraryName);
             String wd = library.getFile().getAbsolutePath();
-            int port = getFreeHighPort();
             String packageModule = externalIdents.get(0);
             String clazz = "None";
             String fn;
@@ -154,59 +161,60 @@ class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvalua
             } else {
                 fn = externalIdents.get(1);
             }
-            ProcessBuilder pb = new ProcessBuilder(pythonHome.getAbsolutePath(), PY_NO_SITE_PKGS_OPT,
-                    PY_NO_USER_PKGS_OPT, ENTRYPOINT, Integer.toString(port), packageModule, clazz, fn);
+            this.fn = fn;
+            this.clazz = clazz;
+            this.module = packageModule;
+            int port = ipcSys.getSocketAddress().getPort();
+            ProcessBuilder pb = new ProcessBuilder(pythonHome.getAbsolutePath(), ENTRYPOINT,
+                    InetAddress.getLoopbackAddress().getHostAddress(), Integer.toString(port));
             pb.directory(new File(wd));
-            pb.environment().clear();
-            pb.inheritIO();
             p = pb.start();
-            remoteObj = new PyroProxy("127.0.0.1", port, "nextTuple");
-            waitForPython();
+            proto = new PythonIPCProto(p.getOutputStream(), router, ipcSys);
+            proto.start();
+            proto.helo();
+            proto.init(packageModule, clazz, fn);
         }
 
-        Object callPython(Object[] arguments) throws IOException {
-            return remoteObj.call("nextTuple", arguments);
+        ByteBuffer callPython(ByteBuffer arguments, int numArgs) throws Exception {
+            ByteBuffer ret = null;
+            try {
+                ret = proto.call(arguments, numArgs);
+            } catch (AsterixException e) {
+                warningCollector
+                        .warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.EXTERNAL_UDF_EXCEPTION, e.getMessage()));
+            }
+            return ret;
         }
 
         @Override
         public void deallocate() {
-            p.destroyForcibly();
+            boolean dead = false;
+            try {
+                p.destroy();
+                dead = p.waitFor(100, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                //gonna kill it anyway
+            }
+            if (!dead) {
+                p.destroyForcibly();
+            }
         }
 
         private static PythonLibraryEvaluator getInstance(DataverseName dataverseName, IExternalFunctionInfo finfo,
-                ILibraryManager libMgr, File pythonHome, IHyracksTaskContext ctx)
-                throws IOException, InterruptedException {
+                ILibraryManager libMgr, ExternalFunctionResultRouter router, IPCSystem ipcSys, File pythonHome,
+                IHyracksTaskContext ctx, IWarningCollector warningCollector, SourceLocation sourceLoc)
+                throws IOException, AsterixException {
             PythonLibraryEvaluatorId evaluatorId = new PythonLibraryEvaluatorId(dataverseName, finfo.getLibrary());
             PythonLibraryEvaluator evaluator = (PythonLibraryEvaluator) ctx.getStateObject(evaluatorId);
             if (evaluator == null) {
                 evaluator = new PythonLibraryEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, finfo, libMgr,
-                        pythonHome);
-                evaluator.initialize();
+                        pythonHome, router, ipcSys, ctx.getTaskAttemptId(), warningCollector, sourceLoc);
                 ctx.registerDeallocatable(evaluator);
+                evaluator.initialize();
                 ctx.setStateObject(evaluator);
             }
             return evaluator;
         }
-
-        private int getFreeHighPort() throws IOException {
-            int port;
-            try (ServerSocket socket = new ServerSocket(0)) {
-                socket.setReuseAddress(true);
-                port = socket.getLocalPort();
-            }
-            return port;
-        }
-
-        private void waitForPython() throws IOException, InterruptedException {
-            for (int i = 0; i < 100; i++) {
-                try {
-                    remoteObj.call("ping");
-                    break;
-                } catch (ConnectException e) {
-                    Thread.sleep(100);
-                }
-            }
-        }
     }
 
     private static final class PythonLibraryEvaluatorId {
@@ -237,76 +245,32 @@ class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvalua
     }
 
     private void setArgument(int index, IValueReference valueReference) throws IOException {
-        IVisitablePointable pointable;
-        IJObject jobj;
         IAType type = argTypes[index];
-        TypeInfo info;
-        switch (type.getTypeTag()) {
-            case OBJECT:
-                pointable = pointableAllocator.allocateRecordValue(type);
-                pointable.set(valueReference);
-                info = getTypeInfo(type);
-                jobj = pointableVisitor.visit((ARecordVisitablePointable) pointable, info);
-                break;
-            case ARRAY:
-            case MULTISET:
-                pointable = pointableAllocator.allocateListValue(type);
-                pointable.set(valueReference);
-                info = getTypeInfo(type);
-                jobj = pointableVisitor.visit((AListVisitablePointable) pointable, info);
-                break;
+        ATypeTag tag = type.getTypeTag();
+        switch (tag) {
             case ANY:
                 TaggedValuePointable pointy = TaggedValuePointable.FACTORY.createPointable();
                 pointy.set(valueReference);
                 ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
                 IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
-                info = getTypeInfo(rtType);
-                switch (rtTypeTag) {
-                    case OBJECT:
-                        pointable = pointableAllocator.allocateRecordValue(rtType);
-                        pointable.set(valueReference);
-                        jobj = pointableVisitor.visit((ARecordVisitablePointable) pointable, info);
-                        break;
-                    case ARRAY:
-                    case MULTISET:
-                        pointable = pointableAllocator.allocateListValue(rtType);
-                        pointable.set(valueReference);
-                        jobj = pointableVisitor.visit((AListVisitablePointable) pointable, info);
-                        break;
-                    default:
-                        pointable = pointableAllocator.allocateFieldValue(rtType);
-                        pointable.set(valueReference);
-                        jobj = pointableVisitor.visit((AFlatValuePointable) pointable, info);
-                        break;
-                }
+                MessagePackerFromADM.pack(valueReference, rtType, argHolder);
                 break;
             default:
-                pointable = pointableAllocator.allocateFieldValue(type);
-                pointable.set(valueReference);
-                info = getTypeInfo(type);
-                jobj = pointableVisitor.visit((AFlatValuePointable) pointable, info);
+                MessagePackerFromADM.pack(valueReference, type, argHolder);
                 break;
         }
-        argHolder[index] = jobj.getValueGeneric();
     }
 
-    private TypeInfo getTypeInfo(IAType type) {
-        TypeInfo typeInfo = infoPool.get(type);
-        if (typeInfo == null) {
-            typeInfo = new TypeInfo(reflectingPool, type, type.getTypeTag());
-            infoPool.put(type, typeInfo);
+    private void wrap(ByteBuffer resultWrapper, DataOutput out) throws HyracksDataException {
+        //TODO: output wrapper needs to grow with result wrapper
+        outputWrapper.clear();
+        outputWrapper.position(0);
+        MessageUnpackerToADM.unpack(resultWrapper, outputWrapper, true);
+        try {
+            out.write(outputWrapper.array(), 0, outputWrapper.position() + outputWrapper.arrayOffset());
+        } catch (IOException e) {
+            throw new HyracksDataException(e.getMessage());
         }
-        return typeInfo;
-    }
 
-    private void wrap(Object o, DataOutput out) throws HyracksDataException {
-        Class concrete = o.getClass();
-        IAType asxConv = JObject.convertType(concrete);
-        IJObject res = reflectingPool.allocate(asxConv);
-        if (res instanceof JComplexObject) {
-            ((JComplexObject) res).setPool(reflectingPool);
-        }
-        res.setValueGeneric(o);
-        res.serialize(out, true);
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java
new file mode 100644
index 0000000..383b2f1
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library.msgpack;
+
+import static org.msgpack.core.MessagePack.Code.ARRAY32;
+import static org.msgpack.core.MessagePack.Code.FALSE;
+import static org.msgpack.core.MessagePack.Code.FIXARRAY_PREFIX;
+import static org.msgpack.core.MessagePack.Code.FIXSTR_PREFIX;
+import static org.msgpack.core.MessagePack.Code.FLOAT32;
+import static org.msgpack.core.MessagePack.Code.FLOAT64;
+import static org.msgpack.core.MessagePack.Code.INT16;
+import static org.msgpack.core.MessagePack.Code.INT32;
+import static org.msgpack.core.MessagePack.Code.INT64;
+import static org.msgpack.core.MessagePack.Code.INT8;
+import static org.msgpack.core.MessagePack.Code.MAP32;
+import static org.msgpack.core.MessagePack.Code.STR32;
+import static org.msgpack.core.MessagePack.Code.TRUE;
+import static org.msgpack.core.MessagePack.Code.UINT16;
+import static org.msgpack.core.MessagePack.Code.UINT32;
+import static org.msgpack.core.MessagePack.Code.UINT64;
+import static org.msgpack.core.MessagePack.Code.UINT8;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.BooleanPointable;
+import org.apache.hyracks.data.std.primitive.BytePointable;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.data.std.primitive.FloatPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.ShortPointable;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+
+public class MessagePackerFromADM {
+
+    private static final int TYPE_TAG_SIZE = 1;
+    private static final int TYPE_SIZE = 1;
+    private static final int LENGTH_SIZE = 4;
+    private static final int ITEM_COUNT_SIZE = 4;
+    private static final int ITEM_OFFSET_SIZE = 4;
+
+    public static void pack(IValueReference ptr, IAType type, ByteBuffer out) throws HyracksDataException {
+        pack(ptr.getByteArray(), ptr.getStartOffset(), type, true, out);
+    }
+
+    public static void pack(byte[] ptr, int offs, IAType type, boolean tagged, ByteBuffer out)
+            throws HyracksDataException {
+        int relOffs = tagged ? offs + 1 : offs;
+        ATypeTag tag = type.getTypeTag();
+        switch (tag) {
+            case STRING:
+                packStr(ptr, relOffs, out);
+                break;
+            case BOOLEAN:
+                if (BooleanPointable.getBoolean(ptr, relOffs)) {
+                    out.put(TRUE);
+                } else {
+                    out.put(FALSE);
+                }
+                break;
+            case TINYINT:
+                packByte(out, BytePointable.getByte(ptr, relOffs));
+                break;
+            case SMALLINT:
+                packShort(out, ShortPointable.getShort(ptr, relOffs));
+                break;
+            case INTEGER:
+                packInt(out, IntegerPointable.getInteger(ptr, relOffs));
+                break;
+            case BIGINT:
+                packLong(out, LongPointable.getLong(ptr, relOffs));
+                break;
+            case FLOAT:
+                packFloat(out, FloatPointable.getFloat(ptr, relOffs));
+                break;
+            case DOUBLE:
+                packDouble(out, DoublePointable.getDouble(ptr, relOffs));
+                break;
+            case ARRAY:
+            case MULTISET:
+                packArray(ptr, offs, type, out);
+                break;
+            case OBJECT:
+                packObject(ptr, offs, type, out);
+                break;
+            default:
+                throw new IllegalArgumentException("NYI");
+        }
+    }
+
+    public static byte minPackPosLong(ByteBuffer out, long in) {
+        if (in < 127) {
+            packFixPos(out, (byte) in);
+            return 1;
+        } else if (in < Byte.MAX_VALUE) {
+            out.put(UINT8);
+            out.put((byte) in);
+            return 2;
+        } else if (in < Short.MAX_VALUE) {
+            out.put(UINT16);
+            out.putShort((short) in);
+            return 3;
+        } else if (in < Integer.MAX_VALUE) {
+            out.put(UINT32);
+            out.putInt((int) in);
+            return 5;
+        } else {
+            out.put(UINT64);
+            out.putLong(in);
+            return 9;
+        }
+    }
+
+    public static void packByte(ByteBuffer out, byte in) {
+        out.put(INT8);
+        out.put(in);
+    }
+
+    public static void packShort(ByteBuffer out, short in) {
+        out.put(INT16);
+        out.putShort(in);
+    }
+
+    public static void packInt(ByteBuffer out, int in) {
+        out.put(INT32);
+        out.putInt(in);
+
+    }
+
+    public static void packLong(ByteBuffer out, long in) {
+        out.put(INT64);
+        out.putLong(in);
+    }
+
+    public static void packFloat(ByteBuffer out, float in) {
+        out.put(FLOAT32);
+        out.putFloat(in);
+    }
+
+    public static void packDouble(ByteBuffer out, double in) {
+        out.put(FLOAT64);
+        out.putDouble(in);
+    }
+
+    public static void packFixPos(ByteBuffer out, byte in) {
+        byte mask = (byte) (1 << 7);
+        if ((in & mask) != 0) {
+            throw new IllegalArgumentException("fixint7 must be positive");
+        }
+        out.put(in);
+    }
+
+    public static void packFixStr(ByteBuffer buf, String in) {
+        byte[] strBytes = in.getBytes(Charset.forName("UTF-8"));
+        if (strBytes.length > 31) {
+            throw new IllegalArgumentException("fixstr cannot be longer than 31");
+        }
+        buf.put((byte) (FIXSTR_PREFIX + strBytes.length));
+        buf.put(strBytes);
+    }
+
+    public static void packStr(ByteBuffer out, String in) {
+        out.put(STR32);
+        byte[] strBytes = in.getBytes(Charset.forName("UTF-8"));
+        out.putInt(strBytes.length);
+        out.put(strBytes);
+    }
+
+    private static void packStr(byte[] in, int offs, ByteBuffer out) {
+        out.put(STR32);
+        //TODO: tagged/untagged. closed support is borked so always tagged rn
+        String str = UTF8StringUtil.toString(in, offs);
+        byte[] strBytes = str.getBytes(Charset.forName("UTF-8"));
+        out.putInt(strBytes.length);
+        out.put(strBytes);
+    }
+
+    public static void packStr(String str, ByteBuffer out) {
+        out.put(STR32);
+        byte[] strBytes = str.getBytes(Charset.forName("UTF-8"));
+        out.putInt(strBytes.length);
+        out.put(strBytes);
+    }
+
+    private static void packArray(byte[] in, int offs, IAType type, ByteBuffer out) throws HyracksDataException {
+        //TODO: - could optimize to pack fixarray/array16 for small arrays
+        //      - this code is basically a static version of AListPointable, could be deduped
+        AbstractCollectionType collType = (AbstractCollectionType) type;
+        out.put(ARRAY32);
+        int lenOffs = offs + TYPE_TAG_SIZE + TYPE_SIZE;
+        int itemCtOffs = LENGTH_SIZE + lenOffs;
+        int itemCt = IntegerPointable.getInteger(in, itemCtOffs);
+        boolean fixType = NonTaggedFormatUtil.isFixedSizedCollection(type);
+        out.putInt(itemCt);
+        for (int i = 0; i < itemCt; i++) {
+            if (fixType) {
+                int itemOffs = itemCtOffs + ITEM_COUNT_SIZE + (i
+                        * NonTaggedFormatUtil.getFieldValueLength(in, 0, collType.getItemType().getTypeTag(), false));
+                pack(in, itemOffs, collType.getItemType(), false, out);
+            } else {
+                int itemOffs =
+                        offs + IntegerPointable.getInteger(in, itemCtOffs + ITEM_COUNT_SIZE + (i * ITEM_OFFSET_SIZE));
+                ATypeTag tag = ATypeTag.VALUE_TYPE_MAPPING[BytePointable.getByte(in, itemOffs)];
+                pack(in, itemOffs, TypeTagUtil.getBuiltinTypeByTag(tag), true, out);
+            }
+        }
+    }
+
+    private static void packObject(byte[] in, int offs, IAType type, ByteBuffer out) throws HyracksDataException {
+        ARecordType recType = (ARecordType) type;
+        out.put(MAP32);
+        int fieldCt = recType.getFieldNames().length + RecordUtils.getOpenFieldCount(in, offs, recType);
+        out.putInt(fieldCt);
+        for (int i = 0; i < recType.getFieldNames().length; i++) {
+            String field = recType.getFieldNames()[i];
+            IAType fieldType = RecordUtils.getClosedFieldType(recType, i);
+            packStr(field, out);
+            pack(in, RecordUtils.getClosedFieldOffset(in, offs, recType, i), fieldType, false, out);
+        }
+        if (RecordUtils.isExpanded(in, offs, recType)) {
+            for (int i = 0; i < RecordUtils.getOpenFieldCount(in, offs, recType); i++) {
+                packStr(in, RecordUtils.getOpenFieldNameOffset(in, offs, recType, i), out);
+                ATypeTag tag = ATypeTag.VALUE_TYPE_MAPPING[RecordUtils.getOpenFieldTag(in, offs, recType, i)];
+                pack(in, RecordUtils.getOpenFieldValueOffset(in, offs, recType, i),
+                        TypeTagUtil.getBuiltinTypeByTag(tag), true, out);
+            }
+        }
+
+    }
+
+    public static void packFixArrayHeader(ByteBuffer buf, byte numObj) {
+        buf.put((byte) (FIXARRAY_PREFIX + (0x0F & numObj)));
+    }
+
+    private static class RecordUtils {
+
+        static final int TAG_SIZE = 1;
+        static final int RECORD_LENGTH_SIZE = 4;
+        static final int EXPANDED_SIZE = 1;
+        static final int OPEN_OFFSET_SIZE = 4;
+        static final int CLOSED_COUNT_SIZE = 4;
+        static final int FIELD_OFFSET_SIZE = 4;
+        static final int OPEN_COUNT_SIZE = 4;
+        private static final int OPEN_FIELD_HASH_SIZE = 4;
+        private static final int OPEN_FIELD_OFFSET_SIZE = 4;
+        private static final int OPEN_FIELD_HEADER = OPEN_FIELD_HASH_SIZE + OPEN_FIELD_OFFSET_SIZE;
+
+        private static boolean isOpen(ARecordType recordType) {
+            return recordType == null || recordType.isOpen();
+        }
+
+        public static int getLength(byte[] bytes, int start) {
+            return IntegerPointable.getInteger(bytes, start + TAG_SIZE);
+        }
+
+        public static boolean isExpanded(byte[] bytes, int start, ARecordType recordType) {
+            return isOpen(recordType) && BooleanPointable.getBoolean(bytes, start + TAG_SIZE + RECORD_LENGTH_SIZE);
+        }
+
+        public static int getOpenPartOffset(int start, ARecordType recordType) {
+            return start + TAG_SIZE + RECORD_LENGTH_SIZE + (isOpen(recordType) ? EXPANDED_SIZE : 0);
+        }
+
+        public static int getNullBitmapOffset(byte[] bytes, int start, ARecordType recordType) {
+            return getOpenPartOffset(start, recordType) + (isExpanded(bytes, start, recordType) ? OPEN_OFFSET_SIZE : 0)
+                    + CLOSED_COUNT_SIZE;
+        }
+
+        public static int getNullBitmapSize(ARecordType recordType) {
+            return RecordUtil.computeNullBitmapSize(recordType);
+        }
+
+        public static final IAType getClosedFieldType(ARecordType recordType, int fieldId) {
+            IAType aType = recordType.getFieldTypes()[fieldId];
+            if (NonTaggedFormatUtil.isOptional(aType)) {
+                // optional field: add the embedded non-null type tag
+                aType = ((AUnionType) aType).getActualType();
+            }
+            return aType;
+        }
+
+        public static final int getClosedFieldOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+            int offset = getNullBitmapOffset(bytes, start, recordType) + getNullBitmapSize(recordType)
+                    + fieldId * FIELD_OFFSET_SIZE;
+            return start + IntegerPointable.getInteger(bytes, offset);
+        }
+
+        public static final int getOpenFieldCount(byte[] bytes, int start, ARecordType recordType) {
+            return isExpanded(bytes, start, recordType)
+                    ? IntegerPointable.getInteger(bytes, getOpenFieldCountOffset(bytes, start, recordType)) : 0;
+        }
+
+        public static int getOpenFieldCountSize(byte[] bytes, int start, ARecordType recordType) {
+            return isExpanded(bytes, start, recordType) ? OPEN_COUNT_SIZE : 0;
+        }
+
+        public static int getOpenFieldCountOffset(byte[] bytes, int start, ARecordType recordType) {
+            return start + IntegerPointable.getInteger(bytes, getOpenPartOffset(start, recordType));
+        }
+
+        public static final int getOpenFieldValueOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+            return getOpenFieldNameOffset(bytes, start, recordType, fieldId)
+                    + getOpenFieldNameSize(bytes, start, recordType, fieldId);
+        }
+
+        public static int getOpenFieldNameSize(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+            int utfleng = UTF8StringUtil.getUTFLength(bytes, getOpenFieldNameOffset(bytes, start, recordType, fieldId));
+            return utfleng + UTF8StringUtil.getNumBytesToStoreLength(utfleng);
+        }
+
+        public static int getOpenFieldNameOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+            return getOpenFieldOffset(bytes, start, recordType, fieldId);
+        }
+
+        public static final byte getOpenFieldTag(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+            return bytes[getOpenFieldValueOffset(bytes, start, recordType, fieldId)];
+        }
+
+        public static int getOpenFieldHashOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+            return getOpenFieldCountOffset(bytes, start, recordType) + getOpenFieldCountSize(bytes, start, recordType)
+                    + fieldId * OPEN_FIELD_HEADER;
+        }
+
+        public static int getOpenFieldOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+            return start
+                    + IntegerPointable.getInteger(bytes, getOpenFieldOffsetOffset(bytes, start, recordType, fieldId));
+        }
+
+        public static int getOpenFieldOffsetOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+            return getOpenFieldHashOffset(bytes, start, recordType, fieldId) + OPEN_FIELD_HASH_SIZE;
+        }
+    }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
new file mode 100644
index 0000000..fedd1f6
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library.msgpack;
+
+import static org.msgpack.core.MessagePack.Code.*;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+
+public class MessageUnpackerToADM {
+
+    public static void unpack(ByteBuffer in, ByteBuffer out, boolean tagged) {
+        byte tag = NIL;
+        if (in != null) {
+            tag = in.get();
+        }
+        if (isFixStr(tag)) {
+            unpackStr(in, out, (tag ^ FIXSTR_PREFIX), tagged);
+        } else if (isFixInt(tag)) {
+            if (tagged) {
+                out.put(ATypeTag.SERIALIZED_INT8_TYPE_TAG);
+            }
+            if (isPosFixInt(tag)) {
+                out.put(tag);
+            } else if (isNegFixInt(tag)) {
+                out.put(tag);
+            }
+        } else if (isFixedArray(tag)) {
+            unpackArray(in, out, (tag ^ FIXARRAY_PREFIX));
+        } else if (isFixedMap(tag)) {
+            unpackMap(in, out, (tag ^ FIXMAP_PREFIX));
+        } else {
+            switch (tag) {
+                case TRUE:
+                    out.put(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+                    out.put((byte) 1);
+                    break;
+                case FALSE:
+                    out.put(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+                    out.put((byte) 0);
+                    break;
+                case NIL:
+                    out.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+                    break;
+                case UINT8:
+                    unpackUByte(in, out, tagged);
+                    break;
+                case UINT16:
+                    unpackUShort(in, out, tagged);
+                    break;
+                case UINT32:
+                    unpackUInt(in, out, tagged);
+                    break;
+                case INT8:
+                    unpackByte(in, out, tagged);
+                    break;
+                case INT16:
+                    unpackShort(in, out, tagged);
+                    break;
+                case INT32:
+                    unpackInt(in, out, tagged);
+                    break;
+                case INT64:
+                    unpackLong(in, out, tagged);
+                    break;
+                case FLOAT32:
+                    unpackFloat(in, out, tagged);
+                    break;
+                case FLOAT64:
+                    unpackDouble(in, out, tagged);
+                    break;
+                case STR8:
+                    unpackStr(in, out, Byte.toUnsignedInt(in.get()), tagged);
+                    break;
+                case STR16:
+                    unpackStr(in, out, Short.toUnsignedInt(in.getShort()), tagged);
+                    break;
+                case STR32:
+                    unpackStr(in, out, Integer.toUnsignedLong(in.getInt()), tagged);
+                    break;
+                case ARRAY16:
+                    unpackArray(in, out, Short.toUnsignedInt(in.getShort()));
+                    break;
+                case ARRAY32:
+                    unpackArray(in, out, Integer.toUnsignedLong(in.getInt()));
+                    break;
+                case MAP16:
+                    unpackMap(in, out, Short.toUnsignedInt(in.getShort()));
+                    break;
+                case MAP32:
+                    unpackMap(in, out, (int) Integer.toUnsignedLong(in.getInt()));
+                    break;
+
+                default:
+                    throw new IllegalArgumentException("NYI");
+            }
+        }
+    }
+
+    public static long unpackNextInt(ByteBuffer in) {
+        byte tag = in.get();
+        if (isFixInt(tag)) {
+            if (isPosFixInt(tag)) {
+                return tag;
+            } else if (isNegFixInt(tag)) {
+                return (tag ^ NEGFIXINT_PREFIX);
+            }
+        } else {
+            switch (tag) {
+                case INT8:
+                    return in.get();
+                case UINT8:
+                    return Byte.toUnsignedInt(in.get());
+                case INT16:
+                    return in.getShort();
+                case UINT16:
+                    return Short.toUnsignedInt(in.getShort());
+                case INT32:
+                    return in.getInt();
+                case UINT32:
+                    return Integer.toUnsignedLong(in.getInt());
+                case INT64:
+                    return in.getLong();
+                default:
+                    throw new IllegalArgumentException("NYI");
+            }
+        }
+        return -1;
+    }
+
+    public static void unpackByte(ByteBuffer in, ByteBuffer out, boolean tagged) {
+        if (tagged) {
+            out.put(ATypeTag.SERIALIZED_INT8_TYPE_TAG);
+        }
+        out.put(in.get());
+    }
+
+    public static void unpackShort(ByteBuffer in, ByteBuffer out, boolean tagged) {
+        if (tagged) {
+            out.put(ATypeTag.SERIALIZED_INT16_TYPE_TAG);
+        }
+        out.putShort(in.getShort());
+    }
+
+    public static void unpackInt(ByteBuffer in, ByteBuffer out, boolean tagged) {
+        if (tagged) {
+            out.put(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+        }
+        out.putInt(in.getInt());
+    }
+
+    public static void unpackLong(ByteBuffer in, ByteBuffer out, boolean tagged) {
+        if (tagged) {
+            out.put(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+        }
+        out.putLong(in.getLong());
+    }
+
+    public static void unpackUByte(ByteBuffer in, ByteBuffer out, boolean tagged) {
+        if (tagged) {
+            out.put(ATypeTag.SERIALIZED_INT16_TYPE_TAG);
+        }
+        out.putShort((short) (in.get() & ((short) 0x00FF)));
+    }
+
+    public static void unpackUShort(ByteBuffer in, ByteBuffer out, boolean tagged) {
+        if (tagged) {
+            out.put(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+        }
+        out.putInt(in.getShort() & 0x0000FFFF);
+    }
+
+    public static void unpackUInt(ByteBuffer in, ByteBuffer out, boolean tagged) {
+        if (tagged) {
+            out.put(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+        }
+        out.putLong(in.getInt() & 0x00000000FFFFFFFFl);
+    }
+
+    public static void unpackFloat(ByteBuffer in, ByteBuffer out, boolean tagged) {
+        if (tagged) {
+            out.put(ATypeTag.SERIALIZED_FLOAT_TYPE_TAG);
+        }
+        out.putFloat(in.getFloat());
+
+    }
+
+    public static void unpackDouble(ByteBuffer in, ByteBuffer out, boolean tagged) {
+        if (tagged) {
+            out.put(ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG);
+        }
+        out.putDouble(in.getDouble());
+    }
+
+    public static void unpackArray(ByteBuffer in, ByteBuffer out, long uLen) {
+        if (uLen > Integer.MAX_VALUE) {
+            throw new UnsupportedOperationException("String is too long");
+        }
+        int count = (int) uLen;
+        int offs = out.position();
+        out.put(ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG);
+        out.put(ATypeTag.ANY.serialize());
+        int asxLenPos = out.position();
+        //reserve space
+        out.putInt(-1);
+        out.putInt(count);
+        int slotStartOffs = out.position() + out.arrayOffset();
+        for (int i = 0; i < count; i++) {
+            out.putInt(0xFFFF);
+        }
+        for (int i = 0; i < count; i++) {
+            out.putInt(slotStartOffs + (i * 4), (out.position() - offs));
+            unpack(in, out, true);
+        }
+        int totalLen = out.position() - offs;
+        out.putInt(asxLenPos, totalLen);
+    }
+
+    public static void unpackMap(ByteBuffer in, ByteBuffer out, int count) {
+        //TODO: need to handle typed records. this only produces a completely open record.
+        //hdr size = 6?
+        int startOffs = out.position();
+        out.put(ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
+        int totalSizeOffs = out.position();
+        out.putInt(-1);
+        //isExpanded
+        out.put((byte) 1);
+        int openPartOffs = out.position();
+        out.putInt(-1);
+        //isExpanded, so num of open fields
+        out.putInt(openPartOffs, out.position() - startOffs);
+        out.putInt(count);
+        int offsetAryPos = out.position();
+        int offsetArySz = count * 2;
+        //allocate space for open field offsets
+        for (int i = 0; i < offsetArySz; i++) {
+            out.putInt(0xDEADBEEF);
+        }
+        for (int i = 0; i < count; i++) {
+            int offs = out.position() + out.arrayOffset();
+            int relOffs = offs - startOffs;
+            unpack(in, out, false);
+            int hash = UTF8StringUtil.hash(out.array(), offs);
+            out.putInt(offsetAryPos, hash);
+            offsetAryPos += 4;
+            out.putInt(offsetAryPos, relOffs);
+            offsetAryPos += 4;
+            unpack(in, out, true);
+        }
+        out.putInt(totalSizeOffs, out.position() - startOffs);
+    }
+
+    public static void unpackStr(ByteBuffer in, ByteBuffer out, long uLen, boolean tag) {
+        //TODO: this probably breaks for 3 and 4 byte UTF-8
+        if (tag) {
+            out.put(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
+        }
+        if (Long.compareUnsigned(uLen, Integer.MAX_VALUE) > 0) {
+            throw new UnsupportedOperationException("String is too long");
+        }
+        int len = (int) uLen;
+        int strLen = UTF8StringUtil.getStringLength(in.array(), in.position() + in.arrayOffset(), len);
+        int adv = VarLenIntEncoderDecoder.encode(strLen, out.array(), out.position() + out.arrayOffset());
+        out.position(out.position() + adv);
+        System.arraycopy(in.array(), in.arrayOffset() + in.position(), out.array(), out.arrayOffset() + out.position(),
+                len);
+        out.position(out.position() + len);
+        in.position(in.position() + len);
+    }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
index a576c34..8929a60 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
@@ -22,6 +22,7 @@ package org.apache.asterix.external.operators;
 import static org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME;
 
 import java.io.ByteArrayInputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -286,12 +287,15 @@ public class LibraryDeployPrepareOperatorDescriptor extends AbstractLibraryOpera
 
             private void shiv(FileReference sourceFile, FileReference stageDir, FileReference contentsDir)
                     throws IOException {
-                FileReference pyro4 = stageDir.getChild("pyro4.pyz");
-                writeShim(pyro4);
+                FileReference msgpack = stageDir.getChild("msgpack.pyz");
+                writeShim(msgpack);
                 unzip(sourceFile, contentsDir);
-                unzip(pyro4, contentsDir);
+                File msgPackFolder = new File(contentsDir.getRelativePath(), "ipc");
+                FileReference msgPackFolderRef =
+                        new FileReference(contentsDir.getDeviceHandle(), msgPackFolder.getPath());
+                unzip(msgpack, msgPackFolderRef);
                 writeShim(contentsDir.getChild("entrypoint.py"));
-                Files.delete(pyro4.getFile().toPath());
+                Files.delete(msgpack.getFile().toPath());
             }
 
             private void writeShim(FileReference outputFile) throws IOException {
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 70e23c1..28f1139 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -1488,14 +1488,9 @@
         <version>4.5.11</version>
       </dependency>
       <dependency>
-        <groupId>net.razorvine</groupId>
-        <artifactId>pyrolite</artifactId>
-        <version>4.30</version>
-      </dependency>
-      <dependency>
-        <groupId>net.razorvine</groupId>
-        <artifactId>serpent</artifactId>
-        <version>1.23</version>
+        <groupId>org.msgpack</groupId>
+        <artifactId>msgpack-core</artifactId>
+        <version>0.8.20</version>
       </dependency>
     </dependencies>
   </dependencyManagement>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 2ba4768..6b5b5db 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -70,6 +70,7 @@ import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
 import org.apache.hyracks.ipc.api.IPayloadSerializerDeserializer;
 import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.apache.hyracks.ipc.impl.Message;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -1431,12 +1432,12 @@ public class CCNCFunctions {
         }
 
         @Override
-        public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
+        public Object deserializeObject(ByteBuffer buffer, int length, byte flag) throws Exception {
             if (length < FID_CODE_SIZE) {
                 throw new IllegalStateException("Message size too small: " + length);
             }
             byte fid = buffer.get();
-            return deserialize(fid, buffer, length - FID_CODE_SIZE);
+            return deserialize(fid, buffer, length - FID_CODE_SIZE, flag);
         }
 
         @Override
@@ -1448,7 +1449,7 @@ public class CCNCFunctions {
             if (fid != FunctionId.OTHER.ordinal()) {
                 throw new IllegalStateException("Expected FID for OTHER, found: " + fid);
             }
-            return (Exception) deserialize(fid, buffer, length - FID_CODE_SIZE);
+            return (Exception) deserialize(fid, buffer, length - FID_CODE_SIZE, Message.ERROR);
         }
 
         @Override
@@ -1515,7 +1516,7 @@ public class CCNCFunctions {
             JavaSerializationBasedPayloadSerializerDeserializer.serialize(out, object);
         }
 
-        private Object deserialize(byte fid, ByteBuffer buffer, int length) throws Exception {
+        private Object deserialize(byte fid, ByteBuffer buffer, int length, byte flag) throws Exception {
             switch (FunctionId.values()[fid]) {
                 case REGISTER_PARTITION_PROVIDER:
                     return RegisterPartitionProviderFunction.deserialize(buffer, length);
@@ -1542,7 +1543,7 @@ public class CCNCFunctions {
                     return CleanupJobletFunction.deserialize(buffer, length);
             }
 
-            return javaSerde.deserializeObject(buffer, length);
+            return javaSerde.deserializeObject(buffer, length, flag);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
index d66f233..6920cfb 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
@@ -31,5 +31,7 @@ public interface IIPCHandle {
 
     public Object getAttachment();
 
+    public int getAttachmentLen();
+
     public boolean isConnected();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IPayloadSerializerDeserializer.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IPayloadSerializerDeserializer.java
index 1d2c754..2b69513 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IPayloadSerializerDeserializer.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IPayloadSerializerDeserializer.java
@@ -21,7 +21,7 @@ package org.apache.hyracks.ipc.api;
 import java.nio.ByteBuffer;
 
 public interface IPayloadSerializerDeserializer {
-    public Object deserializeObject(ByteBuffer buffer, int length) throws Exception;
+    public Object deserializeObject(ByteBuffer buffer, int length, byte flag) throws Exception;
 
     public Exception deserializeException(ByteBuffer buffer, int length) throws Exception;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
index ddcc677..cc0a852 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
@@ -39,6 +39,8 @@ final class IPCHandle implements IIPCHandle {
 
     private Object attachment;
 
+    private int attachmentLen;
+
     private ByteBuffer inBuffer;
 
     private ByteBuffer outBuffer;
@@ -95,6 +97,11 @@ final class IPCHandle implements IIPCHandle {
         return attachment;
     }
 
+    @Override
+    public int getAttachmentLen() {
+        return attachmentLen;
+    }
+
     SelectionKey getKey() {
         return key;
     }
@@ -178,6 +185,7 @@ final class IPCHandle implements IIPCHandle {
                     throw new IllegalStateException();
                 }
             } else {
+                attachmentLen = message.getPayloadLen();
                 system.deliverIncomingMessage(message);
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
index 439f230..64befde 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
@@ -33,7 +33,7 @@ import org.apache.hyracks.ipc.api.IPayloadSerializerDeserializer;
 public class JavaSerializationBasedPayloadSerializerDeserializer implements IPayloadSerializerDeserializer {
 
     @Override
-    public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
+    public Object deserializeObject(ByteBuffer buffer, int length, byte flag) throws Exception {
         return deserialize(buffer, length);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/Message.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/Message.java
index 550ce45..5f73890 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/Message.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/Message.java
@@ -22,18 +22,18 @@ import java.nio.ByteBuffer;
 
 import org.apache.hyracks.ipc.api.IPayloadSerializerDeserializer;
 
-class Message {
+public class Message {
     private static final int MSG_SIZE_SIZE = 4;
 
-    private static final int HEADER_SIZE = 17;
+    public static final int HEADER_SIZE = 17;
 
-    static final byte INITIAL_REQ = 1;
+    public static final byte INITIAL_REQ = 1;
 
-    static final byte INITIAL_ACK = 2;
+    public static final byte INITIAL_ACK = 2;
 
-    static final byte ERROR = 3;
+    public static final byte ERROR = 3;
 
-    static final byte NORMAL = 0;
+    public static final byte NORMAL = 0;
 
     private IPCHandle ipcHandle;
 
@@ -45,7 +45,9 @@ class Message {
 
     private Object payload;
 
-    Message(IPCHandle ipcHandle) {
+    private int payloadLen;
+
+    public Message(IPCHandle ipcHandle) {
         this.ipcHandle = ipcHandle;
     }
 
@@ -53,31 +55,31 @@ class Message {
         return ipcHandle;
     }
 
-    void setMessageId(long messageId) {
+    public void setMessageId(long messageId) {
         this.messageId = messageId;
     }
 
-    long getMessageId() {
+    public long getMessageId() {
         return messageId;
     }
 
-    void setRequestMessageId(long requestMessageId) {
+    public void setRequestMessageId(long requestMessageId) {
         this.requestMessageId = requestMessageId;
     }
 
-    long getRequestMessageId() {
+    public long getRequestMessageId() {
         return requestMessageId;
     }
 
-    void setFlag(byte flag) {
+    public void setFlag(byte flag) {
         this.flag = flag;
     }
 
-    byte getFlag() {
+    public byte getFlag() {
         return flag;
     }
 
-    void setPayload(Object payload) {
+    public void setPayload(Object payload) {
         this.payload = payload;
     }
 
@@ -85,15 +87,19 @@ class Message {
         return payload;
     }
 
-    static boolean hasMessage(ByteBuffer buffer) {
+    int getPayloadLen() {
+        return payloadLen;
+    }
+
+    public static boolean hasMessage(ByteBuffer buffer) {
         if (buffer.remaining() < MSG_SIZE_SIZE) {
             return false;
         }
         int msgSize = buffer.getInt(buffer.position());
-        return buffer.remaining() >= msgSize + MSG_SIZE_SIZE;
+        return msgSize > 0 && buffer.remaining() >= msgSize + MSG_SIZE_SIZE;
     }
 
-    void read(ByteBuffer buffer) throws Exception {
+    public void read(ByteBuffer buffer) throws Exception {
         assert hasMessage(buffer);
         int msgSize = buffer.getInt();
         messageId = buffer.getLong();
@@ -101,29 +107,49 @@ class Message {
         flag = buffer.get();
         int finalPosition = buffer.position() + msgSize - HEADER_SIZE;
         int length = msgSize - HEADER_SIZE;
+        payloadLen = length;
         try {
             IPayloadSerializerDeserializer serde = ipcHandle.getIPCSystem().getSerializerDeserializer();
-            payload = flag == ERROR ? serde.deserializeException(buffer, length)
-                    : serde.deserializeObject(buffer, length);
+            switch (flag) {
+                case NORMAL:
+                case INITIAL_ACK:
+                case INITIAL_REQ:
+                    payload = serde.deserializeObject(buffer, length, flag);
+                    break;
+                case ERROR:
+                    payload = serde.deserializeException(buffer, length);
+                    break;
+                default:
+                    throw new UnsupportedOperationException("Unknown message flag");
+            }
+
         } finally {
             buffer.position(finalPosition);
         }
     }
 
-    boolean write(ByteBuffer buffer) throws Exception {
+    public boolean write(ByteBuffer buffer) throws Exception {
         IPayloadSerializerDeserializer serde = ipcHandle.getIPCSystem().getSerializerDeserializer();
+        return write(buffer, serde);
+    }
+
+    public boolean write(ByteBuffer buffer, IPayloadSerializerDeserializer serde) throws Exception {
         byte[] bytes = flag == ERROR ? serde.serializeException((Exception) payload) : serde.serializeObject(payload);
         if (buffer.remaining() >= MSG_SIZE_SIZE + HEADER_SIZE + bytes.length) {
-            buffer.putInt(HEADER_SIZE + bytes.length);
-            buffer.putLong(messageId);
-            buffer.putLong(requestMessageId);
-            buffer.put(flag);
+            writeHeader(buffer, bytes.length, messageId, requestMessageId, flag);
             buffer.put(bytes);
             return true;
         }
         return false;
     }
 
+    public static void writeHeader(ByteBuffer buffer, int dlen, long messageId, long requestMessageId, byte flag) {
+        buffer.putInt(HEADER_SIZE + dlen);
+        buffer.putLong(messageId);
+        buffer.putLong(requestMessageId);
+        buffer.put(flag);
+    }
+
     @Override
     public String toString() {
         return "MSG[" + messageId + ":" + requestMessageId + ":" + flag + ":" + payload + "]";
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
index a3578ad..ff048a4 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
@@ -65,6 +65,11 @@ class ReconnectingIPCHandle implements IIPCHandle {
     }
 
     @Override
+    public int getAttachmentLen() {
+        return delegate.getAttachmentLen();
+    }
+
+    @Override
     public boolean isConnected() {
         return delegate.isConnected();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
index ebc1301..e1a7cac 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
@@ -185,6 +185,11 @@ public class UTF8StringUtil {
     public static int getStringLength(byte[] b, int s) {
         int len = getUTFLength(b, s);
         int pos = s + getNumBytesToStoreLength(len);
+        return getStringLength(b, pos, len);
+    }
+
+    public static int getStringLength(byte[] b, int offs, int len) {
+        int pos = offs;
         int end = pos + len;
         int charCount = 0;
         while (pos < end) {