You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2021/03/26 11:11:25 UTC

[asterixdb] 08/15: [ASTERIXDB-2838][RT][FUN] Batched PyUDF calls

This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit ac6543ff3fc252e14291810d8682660f2990fa18
Author: Ian Maxon <ia...@maxons.email>
AuthorDate: Fri Mar 19 20:17:55 2021 -0700

    [ASTERIXDB-2838][RT][FUN] Batched PyUDF calls
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - Implement the runtime for batched calls of Python
      UDFs
    - Fix and improve the stdin reading of the Python UDF
      wrapper
    - Check if the Python UDF process is still alive while
      waiting on results
    - Pull nullCall through function info to properly deal
      with it
    - Properly handle calls to multiple functions in one
      library.
    - Properly handle null/missing
    - Fix calling top level functions in Python
    - Fix recieve buffer growth
    - Fix resource leaks
    
    Change-Id: I5af4da999985afcc33cdfacea79576f1d6109173
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10187
    Reviewed-by: Ian Maxon <im...@uci.edu>
    Reviewed-by: Dmitry Lychagin <dm...@couchbase.com>
    Contrib: Ian Maxon <im...@uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
---
 asterixdb/asterix-algebra/pom.xml                  |   5 +
 .../rules/SetAsterixPhysicalOperatorsRule.java     |   2 +-
 asterixdb/asterix-app/pom.xml                      |   2 +-
 .../asterix-app/src/main/resources/entrypoint.py   | 111 +++++---
 .../TweetSent/{sentiment.py => crashy.py}          |  19 +-
 .../src/test/resources/TweetSent/roundtrip.py      |   4 +
 .../src/test/resources/TweetSent/sentiment.py      |   6 +-
 .../crash.0.ddl.sqlpp}                             |   3 +-
 .../crash.1.lib.sqlpp}                             |   2 +-
 .../crash.2.ddl.sqlpp}                             |   7 +-
 .../crash.3.query.sqlpp}                           |   7 +-
 .../mysentiment.6.query.sqlpp}                     |   6 +-
 ...ntiment.6.ddl.sqlpp => mysentiment.7.ddl.sqlpp} |   0
 .../mysentiment_twitter.0.ddl.sqlpp}               |  12 +-
 .../mysentiment_twitter.1.update.sqlpp}            |   5 +-
 .../mysentiment_twitter.10.query.sqlpp}            |   6 +-
 .../mysentiment_twitter.11.query.sqlpp}            |   8 +-
 .../mysentiment_twitter.12.query.sqlpp}            |   6 +-
 .../mysentiment_twitter.13.ddl.sqlpp}              |   6 +-
 .../mysentiment_twitter.14.query.sqlpp}            |   6 +-
 .../mysentiment_twitter.15.query.sqlpp}            |   8 +-
 .../mysentiment_twitter.16.ddl.sqlpp}              |   6 +-
 .../mysentiment_twitter.17.query.sqlpp}            |   6 +-
 .../mysentiment_twitter.18.query.sqlpp}            |   8 +-
 .../mysentiment_twitter.19.ddl.sqlpp}              |   6 +-
 .../mysentiment_twitter.2.lib.sqlpp}               |   2 +-
 .../mysentiment_twitter.20.query.sqlpp}            |   8 +-
 .../mysentiment_twitter.21.ddl.sqlpp}              |   0
 .../mysentiment_twitter.3.ddl.sqlpp}               |   7 +-
 .../mysentiment_twitter.4.query.sqlpp}             |   6 +-
 .../mysentiment_twitter.5.query.sqlpp}             |   8 +-
 .../mysentiment_twitter.6.query.sqlpp}             |   8 +-
 .../mysentiment_twitter.7.query.sqlpp}             |  10 +-
 .../mysentiment_twitter.8.update.sqlpp}            |  26 +-
 .../mysentiment_twitter.9.query.sqlpp}             |   6 +-
 .../py_function_error.2.ddl.sqlpp                  |   3 +
 .../py_function_error.4.query.sqlpp}               |  10 +-
 .../py_nested_access.10.query.sqlpp                |   2 +-
 .../py_nested_access.11.query.sqlpp                |   2 +-
 .../py_nested_access.12.query.sqlpp                |   2 +-
 .../py_nested_access.13.query.sqlpp                |   2 +-
 .../py_nested_access.4.query.sqlpp                 |   5 +-
 .../py_nested_access.5.query.sqlpp                 |   6 +-
 .../py_nested_access.6.query.sqlpp                 |   2 +-
 .../py_nested_access.7.query.sqlpp                 |   2 +-
 .../py_nested_access.8.query.sqlpp                 |   2 +-
 .../py_nested_access.9.query.sqlpp                 |   2 +-
 .../type_validation.0.ddl.sqlpp}                   |   3 +-
 .../type_validation.1.lib.sqlpp}                   |   3 +-
 .../type_validation.2.ddl.sqlpp}                   |   6 +-
 .../type_validation.3.query.sqlpp}                 |   7 +-
 .../type_validation.4.ddl.sqlpp}                   |   1 +
 .../mysentiment.0.ddl.sqlpp}                       |   3 +-
 .../mysentiment.1.lib.sqlpp}                       |   2 +-
 .../mysentiment.2.ddl.sqlpp}                       |   7 +-
 .../mysentiment.3.query.sqlpp}                     |   5 +-
 .../mysentiment.4.ddl.sqlpp}                       |   0
 .../results/external-library/crash/crash.1.adm     |   1 +
 ...p1.4.regexjson => library_list_api.1.regexjson} |   0
 ...p1.3.regexjson => library_list_api.2.regexjson} |   0
 ...p1.2.regexjson => library_list_api.3.regexjson} |   0
 ...p1.1.regexjson => library_list_api.4.regexjson} |   0
 ...p1.5.regexjson => library_list_api.5.regexjson} |   2 +-
 .../external-library/mysentiment/mysentiment.4.adm |   1 +
 .../mysentiment_twitter/mysentiment_twitter.1.adm  |   1 +
 .../mysentiment_twitter/mysentiment_twitter.10.adm |   1 +
 .../mysentiment_twitter/mysentiment_twitter.11.adm |   1 +
 .../mysentiment_twitter/mysentiment_twitter.12.adm |   1 +
 .../mysentiment_twitter/mysentiment_twitter.13.adm | 100 ++++++++
 .../mysentiment_twitter/mysentiment_twitter.2.adm  |   1 +
 .../mysentiment_twitter/mysentiment_twitter.3.adm  | 100 ++++++++
 .../mysentiment_twitter/mysentiment_twitter.4.adm  | 100 ++++++++
 .../mysentiment_twitter/mysentiment_twitter.5.adm  |   1 +
 .../mysentiment_twitter/mysentiment_twitter.6.adm  |   1 +
 .../mysentiment_twitter/mysentiment_twitter.7.adm  |   1 +
 .../mysentiment_twitter/mysentiment_twitter.8.adm  |   1 +
 .../mysentiment_twitter/mysentiment_twitter.9.adm  |   1 +
 .../py_function_error/py_function_error.2.adm      |   4 +
 .../type_validation.1.adm                          |   1 +
 .../external-library/toplevel_fn/toplevel_fn.1.adm |   1 +
 .../resources/runtimets/testsuite_it_python.xml    |  32 ++-
 .../external/ipc/ExternalFunctionResultRouter.java |  43 ++--
 .../asterix/external/ipc/PythonIPCProto.java       | 113 ++++++---
 .../asterix/external/ipc/PythonMessageBuilder.java |  76 ++++--
 .../ExternalScalarPythonFunctionEvaluator.java     | 281 ++++-----------------
 .../external/library/PythonLibraryEvaluator.java   | 216 ++++++++++++++++
 .../library/PythonLibraryEvaluatorFactory.java     |  96 +++++++
 .../external/library/PythonLibraryEvaluatorId.java |  63 +++++
 .../library/msgpack/MessagePackerFromADM.java      |  85 ++++---
 .../library/msgpack/MessageUnpackerToADM.java      |  59 ++---
 .../ExternalAssignBatchRuntimeFactory.java         | 273 +++++++++++++++++++-
 .../functions/ExternalFunctionCompilerUtil.java    |   4 +-
 .../functions/ExternalScalarFunctionInfo.java      |   5 +-
 .../asterix/om/functions/ExternalFunctionInfo.java |  11 +-
 .../om/functions/IExternalFunctionInfo.java        |   2 +
 95 files changed, 1522 insertions(+), 572 deletions(-)

diff --git a/asterixdb/asterix-algebra/pom.xml b/asterixdb/asterix-algebra/pom.xml
index cf5802f..acc70e2 100644
--- a/asterixdb/asterix-algebra/pom.xml
+++ b/asterixdb/asterix-algebra/pom.xml
@@ -127,6 +127,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-om</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
       <artifactId>asterix-external-data</artifactId>
       <version>${project.version}</version>
       <exclusions>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 076783f..d466446 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -75,7 +75,7 @@ public final class SetAsterixPhysicalOperatorsRule extends SetAlgebricksPhysical
 
     // Disable ASSIGN_BATCH physical operator if this option is set to 'false'
     public static final String REWRITE_ATTEMPT_BATCH_ASSIGN = "rewrite_attempt_batch_assign";
-    static final boolean REWRITE_ATTEMPT_BATCH_ASSIGN_DEFAULT = false;
+    static final boolean REWRITE_ATTEMPT_BATCH_ASSIGN_DEFAULT = true;
 
     @Override
     protected ILogicalOperatorVisitor<IPhysicalOperator, Boolean> createPhysicalOperatorFactoryVisitor(
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 1f22924..9131202 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -423,7 +423,7 @@
       <id>asterix-gerrit-asterix-app-sql-execution</id>
       <properties>
         <test.excludes>**/*.java</test.excludes>
-        <itest.includes>**/SqlppExecution*IT.java,**/ExternalPythonFunction*IT.java</itest.includes>
+        <itest.includes>**/SqlppExecution*IT.java,**/ExternalPythonFunctionIT.java</itest.includes>
         <failIfNoTests>false</failIfNoTests>
       </properties>
     </profile>
diff --git a/asterixdb/asterix-app/src/main/resources/entrypoint.py b/asterixdb/asterix-app/src/main/resources/entrypoint.py
index 0917f49..aba4f29 100755
--- a/asterixdb/asterix-app/src/main/resources/entrypoint.py
+++ b/asterixdb/asterix-app/src/main/resources/entrypoint.py
@@ -26,14 +26,16 @@ from struct import *
 import signal
 import msgpack
 import socket
+import traceback
 from importlib import import_module
 from pathlib import Path
 from enum import IntEnum
 from io import BytesIO
 
 PROTO_VERSION = 1
-HEADER_SZ = 8+8+1
-REAL_HEADER_SZ = 4+8+8+1
+HEADER_SZ = 8 + 8 + 1
+REAL_HEADER_SZ = 4 + 8 + 8 + 1
+FRAMESZ = 32768
 
 
 class MessageType(IntEnum):
@@ -57,19 +59,29 @@ class Wrapper(object):
     wrapped_module = None
     wrapped_class = None
     wrapped_fn = None
+    sz = None
+    mid = None
+    rmid = None
+    flag = None
+    resp = None
+    unpacked_msg = None
+    msg_type = None
     packer = msgpack.Packer(autoreset=False)
     unpacker = msgpack.Unpacker()
     response_buf = BytesIO()
     stdin_buf = BytesIO()
     wrapped_fns = {}
     alive = True
+    readbuf = bytearray(FRAMESZ)
+    readview = memoryview(readbuf)
+
 
     def init(self, module_name, class_name, fn_name):
         self.wrapped_module = import_module(module_name)
         # do not allow modules to be called that are not part of the uploaded module
         wrapped_fn = None
         if not self.check_module_path(self.wrapped_module):
-            wrapped_module = None
+            self.wrapped_module = None
             raise ImportError("Module was not found in library")
         if class_name is not None:
             self.wrapped_class = getattr(
@@ -77,12 +89,13 @@ class Wrapper(object):
         if self.wrapped_class is not None:
             wrapped_fn = getattr(self.wrapped_class, fn_name)
         else:
-            wrapped_fn = locals()[fn_name]
+            wrapped_fn = getattr(import_module(module_name), fn_name)
         if wrapped_fn is None:
-            raise ImportError("Could not find class or function in specified module")
-        self.wrapped_fns[self.rmid] = wrapped_fn
+            raise ImportError(
+                "Could not find class or function in specified module")
+        self.wrapped_fns[self.mid] = wrapped_fn
 
-    def nextTuple(self, *args, key=None):
+    def next_tuple(self, *args, key=None):
         return self.wrapped_fns[key](*args)
 
     def check_module_path(self, module):
@@ -92,14 +105,14 @@ class Wrapper(object):
 
     def read_header(self, readbuf):
         self.sz, self.mid, self.rmid, self.flag = unpack(
-            "!iqqb", readbuf[0:21])
+            "!iqqb", readbuf[0:REAL_HEADER_SZ])
         return True
 
     def write_header(self, response_buf, dlen):
         total_len = dlen + HEADER_SZ
         header = pack("!iqqb", total_len, int(-1), int(self.rmid), self.flag)
         self.response_buf.write(header)
-        return total_len+4
+        return total_len + 4
 
     def get_ver_hlen(self, hlen):
         return hlen + (PROTO_VERSION << 4)
@@ -118,14 +131,13 @@ class Wrapper(object):
         self.packer.reset()
 
     def helo(self):
-        #need to ack the connection back before sending actual HELO
+        # need to ack the connection back before sending actual HELO
         self.init_remote_ipc()
-
         self.flag = MessageFlags.NORMAL
         self.response_buf.seek(0)
         self.packer.pack(int(MessageType.HELO))
         self.packer.pack("HELO")
-        dlen = 5 #tag(1) + body(4)
+        dlen = 5  # tag(1) + body(4)
         resp_len = self.write_header(self.response_buf, dlen)
         self.response_buf.write(self.packer.bytes())
         self.resp = self.response_buf.getbuffer()[0:resp_len]
@@ -160,16 +172,19 @@ class Wrapper(object):
 
     def handle_call(self):
         self.flag = MessageFlags.NORMAL
-        args = self.unpacked_msg[1]
-        result = None
-        if args is None:
-            result = self.nextTuple(key=self.rmid)
-        else:
-            result = self.nextTuple(args, key=self.rmid)
+        result = ([], [])
+        if len(self.unpacked_msg) > 1:
+            args = self.unpacked_msg[1]
+            if args is not None:
+                for arg in args:
+                    try:
+                        result[0].append(self.next_tuple(*arg, key=self.mid))
+                    except BaseException as e:
+                        result[1].append(traceback.format_exc())
         self.packer.reset()
         self.response_buf.seek(0)
         body = msgpack.packb(result)
-        dlen = len(body)+1  # 1 for tag
+        dlen = len(body) + 1  # 1 for tag
         resp_len = self.write_header(self.response_buf, dlen)
         self.packer.pack(int(MessageType.CALL_RSP))
         self.response_buf.write(self.packer.bytes())
@@ -179,13 +194,12 @@ class Wrapper(object):
         self.packer.reset()
         return True
 
-    def handle_error(self,e):
+    def handle_error(self, e):
         self.flag = MessageFlags.NORMAL
-        result = type(e).__name__ + ": " + str(e)
         self.packer.reset()
         self.response_buf.seek(0)
-        body = msgpack.packb(result)
-        dlen = len(body)+1  # 1 for tag
+        body = msgpack.packb(e)
+        dlen = len(body) + 1  # 1 for tag
         resp_len = self.write_header(self.response_buf, dlen)
         self.packer.pack(int(MessageType.ERROR))
         self.response_buf.write(self.packer.bytes())
@@ -193,6 +207,7 @@ class Wrapper(object):
         self.resp = self.response_buf.getbuffer()[0:resp_len]
         self.send_msg()
         self.packer.reset()
+        self.alive = False
         return True
 
     type_handler = {
@@ -204,33 +219,47 @@ class Wrapper(object):
 
     def connect_sock(self, addr, port):
         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        try:
-            self.sock.connect((addr, int(port)))
-        except socket.error as msg:
-            print(sys.stderr, msg)
+        self.sock.connect((addr, int(port)))
 
     def disconnect_sock(self, *args):
         self.sock.shutdown(socket.SHUT_RDWR)
         self.sock.close()
 
     def recv_msg(self):
-        completed = False
-        while not completed and self.alive:
-            readbuf = sys.stdin.buffer.read1(4096)
+        while self.alive:
+            pos = sys.stdin.buffer.readinto1(self.readbuf)
+            if pos <= 0:
+                self.alive = False
+                return
             try:
-                if(len(readbuf) < REAL_HEADER_SZ):
-                    while(len(readbuf) < REAL_HEADER_SZ):
-                        readbuf += sys.stdin.buffer.read1(4096)
-                self.read_header(readbuf)
-                if(self.sz > len(readbuf)):
-                    while(len(readbuf) < self.sz):
-                        readbuf += sys.stdin.buffer.read1(4096)
-                self.unpacker.feed(readbuf[21:])
+                while pos < REAL_HEADER_SZ:
+                    read = sys.stdin.buffer.readinto1(self.readview[pos:])
+                    if read <= 0:
+                        self.alive = False
+                        return
+                    pos += read
+                self.read_header(self.readview)
+                while pos < self.sz and len(self.readbuf) - pos > 0:
+                    read = sys.stdin.buffer.readinto1(self.readview[pos:])
+                    if read <= 0:
+                        self.alive = False
+                        return
+                    pos += read
+                while pos < self.sz:
+                    vszchunk = sys.stdin.buffer.read1()
+                    if len(vszchunk) == 0:
+                        self.alive = False
+                        return
+                    self.readview = None
+                    self.readbuf.extend(vszchunk)
+                    self.readview = memoryview(self.readbuf)
+                    pos += len(vszchunk)
+                self.unpacker.feed(self.readview[REAL_HEADER_SZ:self.sz])
                 self.unpacked_msg = list(self.unpacker)
-                self.type = MessageType(self.unpacked_msg[0])
-                completed = self.type_handler[self.type](self)
+                self.msg_type = MessageType(self.unpacked_msg[0])
+                self.type_handler[self.msg_type](self)
             except BaseException as e:
-                completed = self.handle_error(e)
+                self.handle_error(traceback.format_exc())
 
     def send_msg(self):
         self.sock.sendall(self.resp)
diff --git a/asterixdb/asterix-app/src/test/resources/TweetSent/sentiment.py b/asterixdb/asterix-app/src/test/resources/TweetSent/crashy.py
similarity index 81%
copy from asterixdb/asterix-app/src/test/resources/TweetSent/sentiment.py
copy to asterixdb/asterix-app/src/test/resources/TweetSent/crashy.py
index 29d371f..c3d9a74 100644
--- a/asterixdb/asterix-app/src/test/resources/TweetSent/sentiment.py
+++ b/asterixdb/asterix-app/src/test/resources/TweetSent/crashy.py
@@ -15,18 +15,23 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import math,sys
-import pickle;
-import sklearn;
-import os;
+import pickle
+import sklearn
+import sys
+import os
+import ctypes
 class TweetSent(object):
 
     def __init__(self):
-
         pickle_path = os.path.join(os.path.dirname(__file__), 'sentiment_pipeline3')
         f = open(pickle_path,'rb')
         self.pipeline = pickle.load(f)
         f.close()
 
-    def sentiment(self, *args):
-        return self.pipeline.predict(args[0])[0].item()
+    def sentiment(self, args):
+        if args is None:
+            return 2
+        return self.pipeline.predict([args])[0].item()
+
+    def crash(self):
+        os._exit(1)
diff --git a/asterixdb/asterix-app/src/test/resources/TweetSent/roundtrip.py b/asterixdb/asterix-app/src/test/resources/TweetSent/roundtrip.py
index 37350be..8b8fced 100644
--- a/asterixdb/asterix-app/src/test/resources/TweetSent/roundtrip.py
+++ b/asterixdb/asterix-app/src/test/resources/TweetSent/roundtrip.py
@@ -14,6 +14,10 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import math
+
+def sqrt(num):
+    return math.sqrt(num)
 
 class Tests(object):
 
diff --git a/asterixdb/asterix-app/src/test/resources/TweetSent/sentiment.py b/asterixdb/asterix-app/src/test/resources/TweetSent/sentiment.py
index 29d371f..66545ae 100644
--- a/asterixdb/asterix-app/src/test/resources/TweetSent/sentiment.py
+++ b/asterixdb/asterix-app/src/test/resources/TweetSent/sentiment.py
@@ -28,5 +28,7 @@ class TweetSent(object):
         self.pipeline = pickle.load(f)
         f.close()
 
-    def sentiment(self, *args):
-        return self.pipeline.predict(args[0])[0].item()
+    def sentiment(self, args):
+        if args is None:
+            return 2
+        return self.pipeline.predict([args])[0].item()
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.0.ddl.sqlpp
similarity index 91%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.0.ddl.sqlpp
index 65733e4..76cc70d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.0.ddl.sqlpp
@@ -16,4 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-DROP DATAVERSE externallibtest;
+DROP DATAVERSE externallibtest if exists;
+CREATE DATAVERSE  externallibtest;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.1.lib.sqlpp
similarity index 91%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.1.lib.sqlpp
index 65733e4..699e565 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.1.lib.sqlpp
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-DROP DATAVERSE externallibtest;
+install externallibtest testlib python admin admin target/TweetSent.pyz
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.2.ddl.sqlpp
similarity index 89%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.2.ddl.sqlpp
index 6bbeaa1..8a2746c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.2.ddl.sqlpp
@@ -16,8 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+ USE externallibtest;
 
-use test;
-
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
+create function crash()
+  as "crashy", "TweetSent.crash" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.3.query.sqlpp
similarity index 92%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.3.query.sqlpp
index 65733e4..f1858f3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.3.query.sqlpp
@@ -16,4 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-DROP DATAVERSE externallibtest;
+// param max-warnings:json=2
+
+use externallibtest;
+
+crash();
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.query.sqlpp
similarity index 83%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.query.sqlpp
index 6bbeaa1..1ec6766 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.query.sqlpp
@@ -16,8 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use externallibtest;
 
-use test;
+select sentiment("great") as peachy, sentiment("okay") as phlegmatic,
+       sentiment("meh") as indifferent, sentiment("ugh") as choleric;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.7.ddl.sqlpp
similarity index 100%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.7.ddl.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.0.ddl.sqlpp
similarity index 77%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.0.ddl.sqlpp
index 6bbeaa1..18ad48b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.0.ddl.sqlpp
@@ -16,8 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+DROP DATAVERSE externallibtest if exists;
+CREATE DATAVERSE  externallibtest;
+USE  externallibtest;
+create type typeTweet if not exists as open{
+    create_at : datetime,
+    id: bigint
+};
+
+create dataset Tweet(typeTweet) primary key id;
 
-use test;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.1.update.sqlpp
similarity index 87%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.1.update.sqlpp
index 6bbeaa1..5a8dae2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.1.update.sqlpp
@@ -16,8 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+USE  externallibtest;
 
-use test;
+load dataset Tweet using localfs(("path"="asterix_nc1://data/twitter/real.adm"),("format"="adm"));
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.10.query.sqlpp
similarity index 91%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.10.query.sqlpp
index 65733e4..f7a29fd 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.10.query.sqlpp
@@ -16,4 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-DROP DATAVERSE externallibtest;
+use externallibtest;
+
+select value count(sentiment(t.text))
+from Tweet t;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.11.query.sqlpp
similarity index 87%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.11.query.sqlpp
index 6bbeaa1..f1ef8e9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.11.query.sqlpp
@@ -16,8 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use externallibtest;
 
-use test;
+set `rewrite_attempt_batch_assign` "false";
+
+select value count(sentiment(t.text))
+from Tweet t;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.12.query.sqlpp
similarity index 90%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.12.query.sqlpp
index 6bbeaa1..4d4d179 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.12.query.sqlpp
@@ -16,8 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use externallibtest;
 
-use test;
+select count(sentiment(t.text)), count(t.text)
+from Tweet t;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.13.ddl.sqlpp
similarity index 85%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.13.ddl.sqlpp
index 6bbeaa1..3438521 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.13.ddl.sqlpp
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-use test;
+ USE externallibtest;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
+create function sentiment_nullcall(s)
+  as "sentiment", "TweetSent.sentiment" at testlib with {"null-call": "true"};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.14.query.sqlpp
similarity index 90%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.14.query.sqlpp
index 6bbeaa1..0a219ca 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.14.query.sqlpp
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-use test;
+use externallibtest;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
+select value count(sentiment_nullcall(t.text))
+from Tweet t;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.15.query.sqlpp
similarity index 86%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.15.query.sqlpp
index 6bbeaa1..a259660 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.15.query.sqlpp
@@ -17,7 +17,9 @@
  * under the License.
  */
 
-use test;
+use externallibtest;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
+set `rewrite_attempt_batch_assign` "false";
+
+select value count(sentiment_nullcall(t.text))
+from Tweet t;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.16.ddl.sqlpp
similarity index 85%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.16.ddl.sqlpp
index 6bbeaa1..b3adc32 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.16.ddl.sqlpp
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-use test;
+ USE externallibtest;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
+create function sentiment_nullcall_bool(s)
+  as "sentiment", "TweetSent.sentiment" at testlib with {"null-call": true};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.17.query.sqlpp
similarity index 90%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.17.query.sqlpp
index 6bbeaa1..a336979 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.17.query.sqlpp
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-use test;
+use externallibtest;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
+select value count(sentiment_nullcall_bool(t.text))
+from Tweet t;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.18.query.sqlpp
similarity index 86%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.18.query.sqlpp
index 6bbeaa1..14650a3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.18.query.sqlpp
@@ -17,7 +17,9 @@
  * under the License.
  */
 
-use test;
+use externallibtest;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
+set `rewrite_attempt_batch_assign` "false";
+
+select value count(sentiment_nullcall_bool(t.text))
+from Tweet t;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.19.ddl.sqlpp
similarity index 89%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.19.ddl.sqlpp
index 6bbeaa1..d05af3a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.19.ddl.sqlpp
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-use test;
+use externallibtest;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
+create function roundtrip(s)
+  as "roundtrip", "Tests.roundtrip" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.2.lib.sqlpp
similarity index 91%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.2.lib.sqlpp
index 65733e4..699e565 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.2.lib.sqlpp
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-DROP DATAVERSE externallibtest;
+install externallibtest testlib python admin admin target/TweetSent.pyz
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.20.query.sqlpp
similarity index 84%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.20.query.sqlpp
index 6bbeaa1..7d6550c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.20.query.sqlpp
@@ -16,8 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+USE externallibtest;
 
-use test;
+select t.id as id, length(roundtrip(t.text)[0]) as len, sentiment(t.text) as sent
+from Tweet t
+order by id DESC
+limit 100;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.21.ddl.sqlpp
similarity index 100%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.21.ddl.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.3.ddl.sqlpp
similarity index 88%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.3.ddl.sqlpp
index 6bbeaa1..00838de 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.3.ddl.sqlpp
@@ -16,8 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+ USE externallibtest;
 
-use test;
-
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
+create function sentiment(s)
+  as "sentiment", "TweetSent.sentiment" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.4.query.sqlpp
similarity index 91%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.4.query.sqlpp
index 65733e4..f7a29fd 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.4.query.sqlpp
@@ -16,4 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-DROP DATAVERSE externallibtest;
+use externallibtest;
+
+select value count(sentiment(t.text))
+from Tweet t;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.5.query.sqlpp
similarity index 87%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.5.query.sqlpp
index 6bbeaa1..f1ef8e9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.5.query.sqlpp
@@ -16,8 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use externallibtest;
 
-use test;
+set `rewrite_attempt_batch_assign` "false";
+
+select value count(sentiment(t.text))
+from Tweet t;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.6.query.sqlpp
similarity index 87%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.6.query.sqlpp
index 6bbeaa1..4a85ab7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.6.query.sqlpp
@@ -16,8 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use externallibtest;
 
-use test;
+select sentiment(t.text) as sent, length(t.text) as text
+from Tweet t
+order by t.id
+limit 100;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.7.query.sqlpp
similarity index 83%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.7.query.sqlpp
index 6bbeaa1..69174bf 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.7.query.sqlpp
@@ -16,8 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use externallibtest;
 
-use test;
+set `rewrite_attempt_batch_assign` "false";
+
+select sentiment(t.text) as sent, length(t.text) as text
+from Tweet t
+order by t.id
+limit 100;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.8.update.sqlpp
similarity index 61%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.8.update.sqlpp
index c48dda5..891efc0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.8.update.sqlpp
@@ -16,18 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
-* Description  : Access a records nested records at each level.
-* Expected Res : Success
-* Date         : 04 Jun 2015
-*/
-
-use test;
+use externallibtest;
 
+insert into Tweet (
+ select t.create_at, t.id+1000000 as id, t.in_reply_to_status, t.in_reply_to_user, t.favorite_count, t.coordinate, t.retweet_count, t.lang,
+        t.is_retweet, t.user, t.place
+        from Tweet t
+        limit 50
+);
 
-select value [(
-select element result
-from  Animals as test
-with  result as roundtrip(test)[0][0].class.fullClassification.lower
-order by result.id)][0]
-;
+insert into Tweet (
+ select t.create_at, t.id+2000000 as id, t.in_reply_to_status, t.in_reply_to_user, t.favorite_count, t.coordinate, t.retweet_count, t.lang,
+        t.is_retweet, t.user, t.place, null as text
+        from Tweet t
+        limit 50
+);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.9.query.sqlpp
similarity index 92%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.9.query.sqlpp
index 65733e4..e77b11a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.9.query.sqlpp
@@ -16,4 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-DROP DATAVERSE externallibtest;
+use externallibtest;
+
+select value count(t.text)
+from Tweet t;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
index 6bbeaa1..0ad9fb3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
@@ -21,3 +21,6 @@ use test;
 
 create function warning()
   as "roundtrip", "Tests.warning" at testlib;
+
+create function roundtrip(s)
+  as "roundtrip", "Tests.roundtrip" at testlib;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.4.query.sqlpp
similarity index 87%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.4.query.sqlpp
index 1f7925f..1ef35a7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.4.query.sqlpp
@@ -21,13 +21,11 @@
 * Expected Res : Success
 * Date         : 04 Jun 2015
 */
+// param max-warnings:json=0
 
 use test;
 
+set `rewrite_attempt_batch_assign` "false";
 
-select value [(
-select element result
-from  Animals as test
-with  result as roundtrip(test)[0][0].class
-order by result.id)][0]
-;
+select warning(), roundtrip(d)
+from [ "a", "b" , "c", "d" ] d;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp
index c48dda5..30d9426 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp
@@ -28,6 +28,6 @@ use test;
 select value [(
 select element result
 from  Animals as test
-with  result as roundtrip(test)[0][0].class.fullClassification.lower
+with  result as roundtrip(test)[0].class.fullClassification.lower
 order by result.id)][0]
 ;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.11.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.11.query.sqlpp
index 30ec1da..9c3e0cb 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.11.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.11.query.sqlpp
@@ -28,6 +28,6 @@ use test;
 select value [(
 select element result
 from  Animals as test
-with  result as roundtrip(test)[0][0].class.fullClassification
+with  result as roundtrip(test)[0].class.fullClassification
 order by result.id)][0]
 ;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp
index 1f7925f..2aed607 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp
@@ -28,6 +28,6 @@ use test;
 select value [(
 select element result
 from  Animals as test
-with  result as roundtrip(test)[0][0].class
+with  result as roundtrip(test)[0].class
 order by result.id)][0]
 ;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.13.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.13.query.sqlpp
index 13b4c28..52705db 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.13.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.13.query.sqlpp
@@ -28,6 +28,6 @@ use test;
 select value [(
 select element result
 from  Animals as test
-with  result as roundtrip(test)[0][0]
+with  result as roundtrip(test)[0]
 order by result.id)][0]
 ;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
index 1e9a088..e17f03f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
@@ -27,6 +27,5 @@ use test;
 
 select element result
 from  Animals as test
-with  result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower.lower.lower.Species
-order by result
-;
+with  result as roundtrip(test)[0].class.fullClassification.lower.lower.lower.lower.lower.lower.Species
+order by result;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.5.query.sqlpp
index eedf56b..621b4d6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.5.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.5.query.sqlpp
@@ -23,9 +23,9 @@
 */
 use test;
 
-select value [(
+select value (
 select element result
 from  Animals as test
-with  result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower.lower.lower
-order by result.id )][0]
+with  result as roundtrip(test)[0].class.fullClassification.lower.lower.lower.lower.lower.lower
+order by result.id )
 ;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.6.query.sqlpp
index 4912ad6..8e2ec95 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.6.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.6.query.sqlpp
@@ -28,6 +28,6 @@ use test;
 select value [(
 select element result
 from  Animals as test
-with  result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower.lower
+with  result as roundtrip(test)[0].class.fullClassification.lower.lower.lower.lower.lower
 order by result.id)][0]
 ;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.7.query.sqlpp
index 546c174..c39d933 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.7.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.7.query.sqlpp
@@ -28,6 +28,6 @@ use test;
 select value [(
 select element result
 from  Animals as test
-with  result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower
+with  result as roundtrip(test)[0].class.fullClassification.lower.lower.lower.lower
 order by result.id)][0]
 ;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.8.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.8.query.sqlpp
index 62af850..9a20dc2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.8.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.8.query.sqlpp
@@ -28,6 +28,6 @@ use test;
 select value [(
 select element result
 from  Animals as test
-with  result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower
+with  result as roundtrip(test)[0].class.fullClassification.lower.lower.lower
 order by result.id)][0]
 ;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.9.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.9.query.sqlpp
index 6c594f0..0314f22 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.9.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.9.query.sqlpp
@@ -28,6 +28,6 @@ use test;
 select value [(
 select element result
 from  Animals as test
-with  result as roundtrip(test)[0][0].class.fullClassification.lower.lower
+with  result as roundtrip(test)[0].class.fullClassification.lower.lower
 order by result.id)][0]
 ;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.0.ddl.sqlpp
similarity index 91%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.0.ddl.sqlpp
index 65733e4..76cc70d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.0.ddl.sqlpp
@@ -16,4 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-DROP DATAVERSE externallibtest;
+DROP DATAVERSE externallibtest if exists;
+CREATE DATAVERSE  externallibtest;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.1.lib.sqlpp
similarity index 91%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.1.lib.sqlpp
index 65733e4..3250a90 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.1.lib.sqlpp
@@ -16,4 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-DROP DATAVERSE externallibtest;
+
+install externallibtest testlib python admin admin target/TweetSent.pyz
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
similarity index 87%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
index 6bbeaa1..a8ba8a1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
@@ -16,8 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use externallibtest;
 
-use test;
+create function typeValidation(a, b, c, d, e, f, g)
+  as "roundtrip", "Tests.roundtrip" at testlib;
 
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.3.query.sqlpp
similarity index 80%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.3.query.sqlpp
index 6bbeaa1..0ae7d0c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.3.query.sqlpp
@@ -16,8 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use externallibtest;
+typeValidation(907, 9.07, "907", 9.07, true, unix_time_from_date_in_days(date("2013-01-01")),
+               unix_time_from_datetime_in_secs(datetime("1989-09-07T12:13:14.039Z")));
 
-use test;
-
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.4.ddl.sqlpp
similarity index 99%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.4.ddl.sqlpp
index 65733e4..2b27030 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.4.ddl.sqlpp
@@ -16,4 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 DROP DATAVERSE externallibtest;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.0.ddl.sqlpp
similarity index 91%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.0.ddl.sqlpp
index 65733e4..76cc70d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.0.ddl.sqlpp
@@ -16,4 +16,5 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-DROP DATAVERSE externallibtest;
+DROP DATAVERSE externallibtest if exists;
+CREATE DATAVERSE  externallibtest;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.1.lib.sqlpp
similarity index 91%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.1.lib.sqlpp
index 65733e4..699e565 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.1.lib.sqlpp
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-DROP DATAVERSE externallibtest;
+install externallibtest testlib python admin admin target/TweetSent.pyz
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.2.ddl.sqlpp
similarity index 90%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.2.ddl.sqlpp
index 6bbeaa1..d6a4ea7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.2.ddl.sqlpp
@@ -16,8 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+ USE externallibtest;
 
-use test;
-
-create function warning()
-  as "roundtrip", "Tests.warning" at testlib;
+create function sqrt(s)
+  as "roundtrip", "sqrt" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.3.query.sqlpp
similarity index 96%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.3.query.sqlpp
index 65733e4..755d980 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.3.query.sqlpp
@@ -16,4 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-DROP DATAVERSE externallibtest;
+use externallibtest;
+
+sqrt(4);
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.4.ddl.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.4.ddl.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/crash/crash.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/crash/crash.1.adm
new file mode 100644
index 0000000..ec747fa
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/crash/crash.1.adm
@@ -0,0 +1 @@
+null
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.1.regexjson
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.4.regexjson
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.1.regexjson
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.2.regexjson
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.3.regexjson
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.2.regexjson
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.2.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.3.regexjson
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.2.regexjson
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.3.regexjson
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.1.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.4.regexjson
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.1.regexjson
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.4.regexjson
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.5.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.5.regexjson
similarity index 99%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.5.regexjson
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.5.regexjson
index e5d039f..c896e0d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.5.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.5.regexjson
@@ -17,4 +17,4 @@
 	"dataverse": ["externallibtest", "foo", "bar"],
 	"hash_md5": "R{[a-zA-Z0-9-]+}",
 	"name": "testlib"
-}]
+}]
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment/mysentiment.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment/mysentiment.4.adm
new file mode 100644
index 0000000..d7f41ee
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment/mysentiment.4.adm
@@ -0,0 +1 @@
+{ "peachy": 1, "phlegmatic": 0, "indifferent": 0, "choleric": 0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.1.adm
new file mode 100644
index 0000000..e9c02da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.1.adm
@@ -0,0 +1 @@
+5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.10.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.10.adm
new file mode 100644
index 0000000..878726a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.10.adm
@@ -0,0 +1 @@
+5100
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.11.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.11.adm
new file mode 100644
index 0000000..0ead4c6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.11.adm
@@ -0,0 +1 @@
+5100
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.12.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.12.adm
new file mode 100644
index 0000000..878726a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.12.adm
@@ -0,0 +1 @@
+5100
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.13.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.13.adm
new file mode 100644
index 0000000..65a7c81
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.13.adm
@@ -0,0 +1,100 @@
+{ "id": 670301227662491648, "len": 20, "sent": 1 }
+{ "id": 670301227553566720, "len": 139, "sent": 0 }
+{ "id": 670301227041857536, "len": 112, "sent": 0 }
+{ "id": 670301227037519876, "len": 33, "sent": 0 }
+{ "id": 670301226987159552, "len": 57, "sent": 0 }
+{ "id": 670301226513391616, "len": 28, "sent": 1 }
+{ "id": 670301226337202180, "len": 77, "sent": 1 }
+{ "id": 670301226190278656, "len": 25, "sent": 0 }
+{ "id": 670301225959579648, "len": 112, "sent": 1 }
+{ "id": 670301225838125056, "len": 107, "sent": 0 }
+{ "id": 670301225598906369, "len": 64, "sent": 0 }
+{ "id": 670301225489817600, "len": 49, "sent": 0 }
+{ "id": 670301225456308224, "len": 103, "sent": 0 }
+{ "id": 670301225326391296, "len": 66, "sent": 1 }
+{ "id": 670301225162661889, "len": 28, "sent": 1 }
+{ "id": 670301224885837824, "len": 63, "sent": 0 }
+{ "id": 670301224814698496, "len": 59, "sent": 0 }
+{ "id": 670301224709849090, "len": 33, "sent": 1 }
+{ "id": 670301224684556288, "len": 21, "sent": 0 }
+{ "id": 670301224680480768, "len": 39, "sent": 0 }
+{ "id": 670301224348946433, "len": 64, "sent": 1 }
+{ "id": 670301224261058560, "len": 61, "sent": 1 }
+{ "id": 670301224231690240, "len": 33, "sent": 0 }
+{ "id": 670301224214794240, "len": 33, "sent": 0 }
+{ "id": 670301223753351168, "len": 105, "sent": 1 }
+{ "id": 670301223426367488, "len": 23, "sent": 0 }
+{ "id": 670301223216545792, "len": 31, "sent": 0 }
+{ "id": 670301223182974976, "len": 34, "sent": 1 }
+{ "id": 670301223128535041, "len": 21, "sent": 0 }
+{ "id": 670301222759301121, "len": 132, "sent": 0 }
+{ "id": 670301222734307329, "len": 110, "sent": 1 }
+{ "id": 670301222717419520, "len": 81, "sent": 0 }
+{ "id": 670301222318936064, "len": 110, "sent": 1 }
+{ "id": 670301222302150657, "len": 131, "sent": 0 }
+{ "id": 670301222222602240, "len": 43, "sent": 1 }
+{ "id": 670301222113517568, "len": 27, "sent": 0 }
+{ "id": 670301221836615680, "len": 44, "sent": 1 }
+{ "id": 670301221719310336, "len": 28, "sent": 0 }
+{ "id": 670301221442486272, "len": 34, "sent": 0 }
+{ "id": 670301221266153472, "len": 86, "sent": 0 }
+{ "id": 670301220960096256, "len": 102, "sent": 0 }
+{ "id": 670301220855136256, "len": 129, "sent": 1 }
+{ "id": 670301220637044736, "len": 43, "sent": 0 }
+{ "id": 670301220305821696, "len": 140, "sent": 0 }
+{ "id": 670301220247072770, "len": 83, "sent": 1 }
+{ "id": 670301220196626432, "len": 36, "sent": 0 }
+{ "id": 670301220079312901, "len": 31, "sent": 1 }
+{ "id": 670301219949305857, "len": 70, "sent": 1 }
+{ "id": 670301219739574273, "len": 131, "sent": 1 }
+{ "id": 670301219206877184, "len": 27, "sent": 0 }
+{ "id": 670301219139620864, "len": 124, "sent": 0 }
+{ "id": 670301218737123328, "len": 124, "sent": 0 }
+{ "id": 670301218640531458, "len": 31, "sent": 1 }
+{ "id": 670301218598756352, "len": 47, "sent": 0 }
+{ "id": 670301218565156865, "len": 44, "sent": 0 }
+{ "id": 670301218414206976, "len": 71, "sent": 1 }
+{ "id": 670301218376413185, "len": 14, "sent": 0 }
+{ "id": 670301218078629888, "len": 9, "sent": 0 }
+{ "id": 670301217851990017, "len": 111, "sent": 0 }
+{ "id": 670301217793269760, "len": 113, "sent": 0 }
+{ "id": 670301217508036608, "len": 47, "sent": 0 }
+{ "id": 670301217369657344, "len": 137, "sent": 0 }
+{ "id": 670301217311088641, "len": 28, "sent": 0 }
+{ "id": 670301217231347712, "len": 123, "sent": 0 }
+{ "id": 670301216891473920, "len": 44, "sent": 0 }
+{ "id": 670301216874721280, "len": 68, "sent": 0 }
+{ "id": 670301216799232000, "len": 50, "sent": 1 }
+{ "id": 670301216669171713, "len": 54, "sent": 0 }
+{ "id": 670301216493060097, "len": 113, "sent": 1 }
+{ "id": 670301216400924676, "len": 35, "sent": 1 }
+{ "id": 670301216371552258, "len": 58, "sent": 0 }
+{ "id": 670301216367185920, "len": 48, "sent": 0 }
+{ "id": 670301216228831232, "len": 130, "sent": 1 }
+{ "id": 670301215901802496, "len": 71, "sent": 1 }
+{ "id": 670301215725649921, "len": 20, "sent": 0 }
+{ "id": 670301215306199040, "len": 35, "sent": 0 }
+{ "id": 670301215138250754, "len": 48, "sent": 0 }
+{ "id": 670301214958055424, "len": 58, "sent": 1 }
+{ "id": 670301214605733888, "len": 139, "sent": 1 }
+{ "id": 670301214509129728, "len": 114, "sent": 1 }
+{ "id": 670301214442041344, "len": 18, "sent": 1 }
+{ "id": 670301214295392256, "len": 47, "sent": 0 }
+{ "id": 670301213737529344, "len": 9, "sent": 0 }
+{ "id": 670301213544595457, "len": 63, "sent": 1 }
+{ "id": 670301213515235333, "len": 107, "sent": 0 }
+{ "id": 670301213464899584, "len": 105, "sent": 1 }
+{ "id": 670301213120942080, "len": 39, "sent": 0 }
+{ "id": 670301212961603585, "len": 63, "sent": 0 }
+{ "id": 670301212961603584, "len": 20, "sent": 0 }
+{ "id": 670301212856737792, "len": 51, "sent": 0 }
+{ "id": 670301212760117248, "len": 133, "sent": 1 }
+{ "id": 670301211808010240, "len": 103, "sent": 0 }
+{ "id": 670301211774468096, "len": 40, "sent": 0 }
+{ "id": 670301211703144450, "len": 138, "sent": 1 }
+{ "id": 670301211581685761, "len": 25, "sent": 1 }
+{ "id": 670301211560685568, "len": 12, "sent": 1 }
+{ "id": 670301211090751490, "len": 140, "sent": 0 }
+{ "id": 670301210654699520, "len": 13, "sent": 0 }
+{ "id": 670301210486919168, "len": 38, "sent": 0 }
+{ "id": 670301210470195200, "len": 67, "sent": 1 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.2.adm
new file mode 100644
index 0000000..0b3e0a6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.2.adm
@@ -0,0 +1 @@
+5000
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.3.adm
new file mode 100644
index 0000000..1995b70
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.3.adm
@@ -0,0 +1,100 @@
+{ "text": 65, "sent": 0 }
+{ "text": 47, "sent": 0 }
+{ "text": 68, "sent": 0 }
+{ "text": 60, "sent": 0 }
+{ "text": 26, "sent": 1 }
+{ "text": 90, "sent": 1 }
+{ "text": 89, "sent": 0 }
+{ "text": 36, "sent": 0 }
+{ "text": 16, "sent": 0 }
+{ "text": 67, "sent": 0 }
+{ "text": 26, "sent": 0 }
+{ "text": 103, "sent": 1 }
+{ "text": 38, "sent": 1 }
+{ "text": 23, "sent": 1 }
+{ "text": 134, "sent": 1 }
+{ "text": 18, "sent": 0 }
+{ "text": 13, "sent": 1 }
+{ "text": 140, "sent": 0 }
+{ "text": 70, "sent": 1 }
+{ "text": 122, "sent": 0 }
+{ "text": 64, "sent": 0 }
+{ "text": 135, "sent": 0 }
+{ "text": 42, "sent": 1 }
+{ "text": 59, "sent": 0 }
+{ "text": 23, "sent": 1 }
+{ "text": 15, "sent": 1 }
+{ "text": 10, "sent": 0 }
+{ "text": 39, "sent": 1 }
+{ "text": 56, "sent": 0 }
+{ "text": 35, "sent": 0 }
+{ "text": 98, "sent": 1 }
+{ "text": 9, "sent": 0 }
+{ "text": 21, "sent": 0 }
+{ "text": 52, "sent": 0 }
+{ "text": 44, "sent": 0 }
+{ "text": 135, "sent": 0 }
+{ "text": 50, "sent": 0 }
+{ "text": 32, "sent": 0 }
+{ "text": 45, "sent": 0 }
+{ "text": 47, "sent": 0 }
+{ "text": 105, "sent": 0 }
+{ "text": 77, "sent": 0 }
+{ "text": 33, "sent": 0 }
+{ "text": 64, "sent": 0 }
+{ "text": 12, "sent": 0 }
+{ "text": 27, "sent": 1 }
+{ "text": 30, "sent": 0 }
+{ "text": 140, "sent": 1 }
+{ "text": 107, "sent": 1 }
+{ "text": 47, "sent": 0 }
+{ "text": 31, "sent": 0 }
+{ "text": 32, "sent": 1 }
+{ "text": 24, "sent": 0 }
+{ "text": 132, "sent": 0 }
+{ "text": 88, "sent": 0 }
+{ "text": 16, "sent": 0 }
+{ "text": 69, "sent": 0 }
+{ "text": 80, "sent": 0 }
+{ "text": 28, "sent": 1 }
+{ "text": 23, "sent": 0 }
+{ "text": 42, "sent": 0 }
+{ "text": 101, "sent": 1 }
+{ "text": 30, "sent": 0 }
+{ "text": 138, "sent": 0 }
+{ "text": 66, "sent": 0 }
+{ "text": 61, "sent": 0 }
+{ "text": 51, "sent": 1 }
+{ "text": 107, "sent": 0 }
+{ "text": 136, "sent": 0 }
+{ "text": 17, "sent": 0 }
+{ "text": 36, "sent": 1 }
+{ "text": 23, "sent": 0 }
+{ "text": 20, "sent": 1 }
+{ "text": 103, "sent": 0 }
+{ "text": 8, "sent": 0 }
+{ "text": 139, "sent": 0 }
+{ "text": 114, "sent": 0 }
+{ "text": 57, "sent": 1 }
+{ "text": 30, "sent": 0 }
+{ "text": 72, "sent": 0 }
+{ "text": 32, "sent": 0 }
+{ "text": 140, "sent": 0 }
+{ "text": 90, "sent": 1 }
+{ "text": 25, "sent": 0 }
+{ "text": 56, "sent": 0 }
+{ "text": 43, "sent": 0 }
+{ "text": 58, "sent": 0 }
+{ "text": 23, "sent": 0 }
+{ "text": 15, "sent": 1 }
+{ "text": 53, "sent": 1 }
+{ "text": 58, "sent": 1 }
+{ "text": 14, "sent": 0 }
+{ "text": 21, "sent": 1 }
+{ "text": 37, "sent": 0 }
+{ "text": 118, "sent": 0 }
+{ "text": 59, "sent": 0 }
+{ "text": 43, "sent": 0 }
+{ "text": 55, "sent": 0 }
+{ "text": 35, "sent": 1 }
+{ "text": 127, "sent": 0 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.4.adm
new file mode 100644
index 0000000..1995b70
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.4.adm
@@ -0,0 +1,100 @@
+{ "text": 65, "sent": 0 }
+{ "text": 47, "sent": 0 }
+{ "text": 68, "sent": 0 }
+{ "text": 60, "sent": 0 }
+{ "text": 26, "sent": 1 }
+{ "text": 90, "sent": 1 }
+{ "text": 89, "sent": 0 }
+{ "text": 36, "sent": 0 }
+{ "text": 16, "sent": 0 }
+{ "text": 67, "sent": 0 }
+{ "text": 26, "sent": 0 }
+{ "text": 103, "sent": 1 }
+{ "text": 38, "sent": 1 }
+{ "text": 23, "sent": 1 }
+{ "text": 134, "sent": 1 }
+{ "text": 18, "sent": 0 }
+{ "text": 13, "sent": 1 }
+{ "text": 140, "sent": 0 }
+{ "text": 70, "sent": 1 }
+{ "text": 122, "sent": 0 }
+{ "text": 64, "sent": 0 }
+{ "text": 135, "sent": 0 }
+{ "text": 42, "sent": 1 }
+{ "text": 59, "sent": 0 }
+{ "text": 23, "sent": 1 }
+{ "text": 15, "sent": 1 }
+{ "text": 10, "sent": 0 }
+{ "text": 39, "sent": 1 }
+{ "text": 56, "sent": 0 }
+{ "text": 35, "sent": 0 }
+{ "text": 98, "sent": 1 }
+{ "text": 9, "sent": 0 }
+{ "text": 21, "sent": 0 }
+{ "text": 52, "sent": 0 }
+{ "text": 44, "sent": 0 }
+{ "text": 135, "sent": 0 }
+{ "text": 50, "sent": 0 }
+{ "text": 32, "sent": 0 }
+{ "text": 45, "sent": 0 }
+{ "text": 47, "sent": 0 }
+{ "text": 105, "sent": 0 }
+{ "text": 77, "sent": 0 }
+{ "text": 33, "sent": 0 }
+{ "text": 64, "sent": 0 }
+{ "text": 12, "sent": 0 }
+{ "text": 27, "sent": 1 }
+{ "text": 30, "sent": 0 }
+{ "text": 140, "sent": 1 }
+{ "text": 107, "sent": 1 }
+{ "text": 47, "sent": 0 }
+{ "text": 31, "sent": 0 }
+{ "text": 32, "sent": 1 }
+{ "text": 24, "sent": 0 }
+{ "text": 132, "sent": 0 }
+{ "text": 88, "sent": 0 }
+{ "text": 16, "sent": 0 }
+{ "text": 69, "sent": 0 }
+{ "text": 80, "sent": 0 }
+{ "text": 28, "sent": 1 }
+{ "text": 23, "sent": 0 }
+{ "text": 42, "sent": 0 }
+{ "text": 101, "sent": 1 }
+{ "text": 30, "sent": 0 }
+{ "text": 138, "sent": 0 }
+{ "text": 66, "sent": 0 }
+{ "text": 61, "sent": 0 }
+{ "text": 51, "sent": 1 }
+{ "text": 107, "sent": 0 }
+{ "text": 136, "sent": 0 }
+{ "text": 17, "sent": 0 }
+{ "text": 36, "sent": 1 }
+{ "text": 23, "sent": 0 }
+{ "text": 20, "sent": 1 }
+{ "text": 103, "sent": 0 }
+{ "text": 8, "sent": 0 }
+{ "text": 139, "sent": 0 }
+{ "text": 114, "sent": 0 }
+{ "text": 57, "sent": 1 }
+{ "text": 30, "sent": 0 }
+{ "text": 72, "sent": 0 }
+{ "text": 32, "sent": 0 }
+{ "text": 140, "sent": 0 }
+{ "text": 90, "sent": 1 }
+{ "text": 25, "sent": 0 }
+{ "text": 56, "sent": 0 }
+{ "text": 43, "sent": 0 }
+{ "text": 58, "sent": 0 }
+{ "text": 23, "sent": 0 }
+{ "text": 15, "sent": 1 }
+{ "text": 53, "sent": 1 }
+{ "text": 58, "sent": 1 }
+{ "text": 14, "sent": 0 }
+{ "text": 21, "sent": 1 }
+{ "text": 37, "sent": 0 }
+{ "text": 118, "sent": 0 }
+{ "text": 59, "sent": 0 }
+{ "text": 43, "sent": 0 }
+{ "text": 55, "sent": 0 }
+{ "text": 35, "sent": 1 }
+{ "text": 127, "sent": 0 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.5.adm
new file mode 100644
index 0000000..e9c02da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.5.adm
@@ -0,0 +1 @@
+5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.6.adm
new file mode 100644
index 0000000..e9c02da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.6.adm
@@ -0,0 +1 @@
+5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.7.adm
new file mode 100644
index 0000000..e9c02da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.7.adm
@@ -0,0 +1 @@
+5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.8.adm
new file mode 100644
index 0000000..9fada79
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.8.adm
@@ -0,0 +1 @@
+{ "$1": 5000, "$2": 5000 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.9.adm
new file mode 100644
index 0000000..0ead4c6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.9.adm
@@ -0,0 +1 @@
+5100
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_function_error/py_function_error.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_function_error/py_function_error.2.adm
new file mode 100644
index 0000000..f0405ed
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_function_error/py_function_error.2.adm
@@ -0,0 +1,4 @@
+{ "$1": null, "$2": [ "a" ] }
+{ "$1": null, "$2": [ "b" ] }
+{ "$1": null, "$2": [ "c" ] }
+{ "$1": null, "$2": [ "d" ] }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/python_open_type_validation/type_validation.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/python_open_type_validation/type_validation.1.adm
new file mode 100644
index 0000000..93f8aec
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/python_open_type_validation/type_validation.1.adm
@@ -0,0 +1 @@
+[ 907, 9.07, "907", 9.07, true, 15706, 621173594 ]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/toplevel_fn/toplevel_fn.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/toplevel_fn/toplevel_fn.1.adm
new file mode 100644
index 0000000..cd5ac03
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/toplevel_fn/toplevel_fn.1.adm
@@ -0,0 +1 @@
+2.0
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
index 59dec11..4e1d5b2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
@@ -44,10 +44,40 @@
         <output-dir compare="Clean-JSON">py_nested_access</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="external-library">
+      <compilation-unit name="python_open_type_validation">
+        <output-dir compare="Clean-JSON">python_open_type_validation</output-dir>
+      </compilation-unit>
+    </test-case>
     <test-case FilePath="external-library" check-warnings="true">
       <compilation-unit name="py_function_error">
         <output-dir compare="Clean-JSON">py_function_error</output-dir>
-        <expected-warn>ASX0201: External UDF returned exception. Returned exception was: ArithmeticError: oof</expected-warn>
+        <expected-warn>ASX0201: External UDF returned exception. Returned exception was: Traceback (most recent call last):
+  File "entrypoint.py", line 181, in handle_call
+    result[0].append(self.next_tuple(*arg, key=self.mid))
+  File "entrypoint.py", line 99, in next_tuple
+    return self.wrapped_fns[key](*args)
+  File "site-packages/roundtrip.py", line 28, in warning
+    raise ArithmeticError("oof")
+ArithmeticError: oof
+ (in line 28, at column 1)</expected-warn>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-library">
+      <compilation-unit name="mysentiment_twitter">
+        <output-dir compare="Text">mysentiment_twitter</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-library">
+      <compilation-unit name="toplevel_fn">
+        <output-dir compare="Text">toplevel_fn</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-library" check-warnings="true">
+      <compilation-unit name="crash">
+        <output-dir compare="Text">crash</output-dir>
+        <expected-warn>ASX0201: External UDF returned exception. Returned exception was: Function externallibtest:crash#0 failed to execute (in line 23, at column 1)</expected-warn>
+        <expected-warn>ASX0201: External UDF returned exception. Returned exception was: java.io.IOException: Python process exited with code: 1 (in line 23, at column 1)</expected-warn>
       </compilation-unit>
     </test-case>
   </test-group>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/external/ipc/ExternalFunctionResultRouter.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/external/ipc/ExternalFunctionResultRouter.java
index 8d56eb7..94baa64 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/external/ipc/ExternalFunctionResultRouter.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/external/ipc/ExternalFunctionResultRouter.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.ipc.api.IIPCHandle;
@@ -33,9 +33,8 @@ import org.apache.hyracks.ipc.impl.Message;
 
 public class ExternalFunctionResultRouter implements IIPCI {
 
-    AtomicLong maxId = new AtomicLong(0);
-    ConcurrentHashMap<Long, MutableObject<ByteBuffer>> activeClients = new ConcurrentHashMap<>();
-    ConcurrentHashMap<Long, Exception> exceptionInbox = new ConcurrentHashMap<>();
+    private final AtomicLong maxId = new AtomicLong(0);
+    private final ConcurrentHashMap<Long, Pair<ByteBuffer, Exception>> activeClients = new ConcurrentHashMap<>();
     private static int MAX_BUF_SIZE = 32 * 1024 * 1024; //32MB
 
     @Override
@@ -44,7 +43,8 @@ public class ExternalFunctionResultRouter implements IIPCI {
         ByteBuffer buf = (ByteBuffer) payload;
         int end = buf.position();
         buf.position(end - rewind);
-        ByteBuffer copyTo = activeClients.get(rmid).getValue();
+        Pair<ByteBuffer, Exception> route = activeClients.get(rmid);
+        ByteBuffer copyTo = route.getFirst();
         if (copyTo.capacity() < handle.getAttachmentLen()) {
             int nextSize = closestPow2(handle.getAttachmentLen());
             if (nextSize > MAX_BUF_SIZE) {
@@ -52,44 +52,43 @@ public class ExternalFunctionResultRouter implements IIPCI {
                 return;
             }
             copyTo = ByteBuffer.allocate(nextSize);
-            activeClients.get(rmid).setValue(copyTo);
+            route.setFirst(copyTo);
         }
         copyTo.position(0);
         System.arraycopy(buf.array(), buf.position() + buf.arrayOffset(), copyTo.array(), copyTo.arrayOffset(),
                 handle.getAttachmentLen());
-        synchronized (copyTo) {
+        synchronized (route) {
             copyTo.limit(handle.getAttachmentLen() + 1);
-            copyTo.notify();
+            route.notifyAll();
         }
         buf.position(end);
     }
 
     @Override
     public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) {
-        exceptionInbox.put(rmid, exception);
-        ByteBuffer route = activeClients.get(rmid).getValue();
+        Pair<ByteBuffer, Exception> route = activeClients.get(rmid);
         synchronized (route) {
-            route.notify();
+            route.setSecond(exception);
+            route.notifyAll();
         }
     }
 
-    public Long insertRoute(ByteBuffer buf) {
-        Long id = maxId.incrementAndGet();
-        activeClients.put(id, new MutableObject<>(buf));
-        return id;
-    }
-
-    public Exception getException(Long id) {
-        return exceptionInbox.remove(id);
+    public Pair<Long, Pair<ByteBuffer, Exception>> insertRoute(ByteBuffer buf) {
+        Long id = maxId.getAndIncrement();
+        Pair<ByteBuffer, Exception> bufferHolder = new Pair<>(buf, null);
+        activeClients.put(id, bufferHolder);
+        return new Pair<>(id, bufferHolder);
     }
 
-    public boolean hasException(long id) {
-        return exceptionInbox.get(id) == null;
+    public Exception getAndRemoveException(Long id) {
+        Pair<ByteBuffer, Exception> route = activeClients.get(id);
+        Exception e = route.getSecond();
+        route.setSecond(null);
+        return e;
     }
 
     public void removeRoute(Long id) {
         activeClients.remove(id);
-        exceptionInbox.remove(id);
     }
 
     public static int closestPow2(int n) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
index feb52cf..cd7ec18 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
@@ -24,32 +24,41 @@ import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.hyracks.ipc.impl.IPCSystem;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.ipc.impl.Message;
 import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessageUnpacker;
+import org.msgpack.core.buffer.ArrayBufferInput;
 
 public class PythonIPCProto {
 
-    public PythonMessageBuilder messageBuilder;
-    OutputStream sockOut;
-    ByteBuffer headerBuffer = ByteBuffer.allocate(21);
-    ByteBuffer recvBuffer = ByteBuffer.allocate(4096);
-    ExternalFunctionResultRouter router;
-    IPCSystem ipcSys;
-    Message outMsg;
-    Long key;
-
-    public PythonIPCProto(OutputStream sockOut, ExternalFunctionResultRouter router, IPCSystem ipcSys)
-            throws IOException {
+    private PythonMessageBuilder messageBuilder;
+    private OutputStream sockOut;
+    private ByteBuffer headerBuffer = ByteBuffer.allocate(21);
+    private ByteBuffer recvBuffer = ByteBuffer.allocate(32768);
+    private ExternalFunctionResultRouter router;
+    private long routeId;
+    private Pair<ByteBuffer, Exception> bufferBox;
+    private Process pythonProc;
+    private long maxFunctionId;
+    private ArrayBufferInput unpackerInput;
+    private MessageUnpacker unpacker;
+
+    public PythonIPCProto(OutputStream sockOut, ExternalFunctionResultRouter router, Process pythonProc) {
         this.sockOut = sockOut;
         messageBuilder = new PythonMessageBuilder();
         this.router = router;
-        this.ipcSys = ipcSys;
-        this.outMsg = new Message(null);
+        this.pythonProc = pythonProc;
+        this.maxFunctionId = 0l;
+        unpackerInput = new ArrayBufferInput(new byte[0]);
+        unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
     }
 
     public void start() {
-        this.key = router.insertRoute(recvBuffer);
+        Pair<Long, Pair<ByteBuffer, Exception>> keyAndBufferBox = router.insertRoute(recvBuffer);
+        this.routeId = keyAndBufferBox.getFirst();
+        this.bufferBox = keyAndBufferBox.getSecond();
     }
 
     public void helo() throws IOException, AsterixException {
@@ -59,78 +68,106 @@ public class PythonIPCProto {
         messageBuilder.buf.clear();
         messageBuilder.buf.position(0);
         messageBuilder.hello();
-        sendMsg();
+        sendMsg(routeId);
         receiveMsg();
         if (getResponseType() != MessageType.HELO) {
-            throw new IllegalStateException("Illegal reply received, expected HELO");
+            throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+                    "Expected HELO, recieved " + getResponseType().name());
         }
     }
 
-    public void init(String module, String clazz, String fn) throws IOException, AsterixException {
+    public long init(String module, String clazz, String fn) throws IOException, AsterixException {
+        long functionId = maxFunctionId++;
         recvBuffer.clear();
         recvBuffer.position(0);
         recvBuffer.limit(0);
         messageBuilder.buf.clear();
         messageBuilder.buf.position(0);
         messageBuilder.init(module, clazz, fn);
-        sendMsg();
+        sendMsg(functionId);
         receiveMsg();
         if (getResponseType() != MessageType.INIT_RSP) {
-            throw new IllegalStateException("Illegal reply received, expected INIT_RSP");
+            throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+                    "Expected INIT_RSP, recieved " + getResponseType().name());
         }
+        return functionId;
     }
 
-    public ByteBuffer call(ByteBuffer args, int numArgs) throws Exception {
+    public ByteBuffer call(long functionId, ByteBuffer args, int numArgs) throws IOException, AsterixException {
         recvBuffer.clear();
         recvBuffer.position(0);
         recvBuffer.limit(0);
         messageBuilder.buf.clear();
         messageBuilder.buf.position(0);
         messageBuilder.call(args.array(), args.position(), numArgs);
-        sendMsg();
+        sendMsg(functionId);
         receiveMsg();
         if (getResponseType() != MessageType.CALL_RSP) {
-            throw new IllegalStateException("Illegal reply received, expected CALL_RSP, recvd: " + getResponseType());
+            throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+                    "Expected CALL_RSP, recieved " + getResponseType().name());
         }
         return recvBuffer;
     }
 
-    public void quit() throws IOException {
+    public ByteBuffer callMulti(long key, ByteBuffer args, int numTuples) throws IOException, AsterixException {
+        recvBuffer.clear();
+        recvBuffer.position(0);
+        recvBuffer.limit(0);
+        messageBuilder.buf.clear();
+        messageBuilder.buf.position(0);
+        messageBuilder.callMulti(args.array(), args.position(), numTuples);
+        sendMsg(key);
+        receiveMsg();
+        if (getResponseType() != MessageType.CALL_RSP) {
+            throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+                    "Expected CALL_RSP, recieved " + getResponseType().name());
+        }
+        return recvBuffer;
+    }
+
+    //For future use with interpreter reuse between jobs.
+    public void quit() throws HyracksDataException {
         messageBuilder.quit();
-        router.removeRoute(key);
+        router.removeRoute(routeId);
     }
 
     public void receiveMsg() throws IOException, AsterixException {
         Exception except = null;
         try {
-            synchronized (recvBuffer) {
-                while (recvBuffer.limit() == 0) {
-                    recvBuffer.wait(100);
+            synchronized (bufferBox) {
+                while ((bufferBox.getFirst().limit() == 0 || bufferBox.getSecond() != null) && pythonProc.isAlive()) {
+                    bufferBox.wait(100);
                 }
             }
-            if (router.hasException(key)) {
-                except = router.getException(key);
+            except = router.getAndRemoveException(routeId);
+            if (!pythonProc.isAlive()) {
+                except = new IOException("Python process exited with code: " + pythonProc.exitValue());
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION, e);
         }
         if (except != null) {
-            throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION, except);
+            throw new AsterixException(except);
+        }
+        if (bufferBox.getFirst() != recvBuffer) {
+            recvBuffer = bufferBox.getFirst();
         }
         messageBuilder.readHead(recvBuffer);
         if (messageBuilder.type == MessageType.ERROR) {
-            throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION,
-                    MessagePack.newDefaultUnpacker(recvBuffer).unpackString());
+            unpackerInput.reset(recvBuffer.array(), recvBuffer.position() + recvBuffer.arrayOffset(),
+                    recvBuffer.remaining());
+            unpacker.reset(unpackerInput);
+            throw new AsterixException(unpacker.unpackString());
         }
     }
 
-    public void sendMsg() throws IOException {
+    public void sendMsg(long key) throws IOException {
         headerBuffer.clear();
         headerBuffer.position(0);
-        headerBuffer.putInt(HEADER_SIZE + messageBuilder.buf.position());
-        headerBuffer.putLong(-1);
+        headerBuffer.putInt(HEADER_SIZE + Integer.BYTES + messageBuilder.buf.position());
         headerBuffer.putLong(key);
+        headerBuffer.putLong(routeId);
         headerBuffer.put(Message.NORMAL);
         sockOut.write(headerBuffer.array(), 0, HEADER_SIZE + Integer.BYTES);
         sockOut.write(messageBuilder.buf.array(), 0, messageBuilder.buf.position());
@@ -141,4 +178,8 @@ public class PythonIPCProto {
         return messageBuilder.type;
     }
 
+    public long getRouteId() {
+        return routeId;
+    }
+
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
index 506e80d..5052eb4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
@@ -25,19 +25,16 @@ import java.io.ObjectOutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 
 import org.apache.asterix.external.library.msgpack.MessagePackerFromADM;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class PythonMessageBuilder {
-    private static final int MAX_BUF_SIZE = 21 * 1024 * 1024; //21MB.
-    private static final Logger LOGGER = LogManager.getLogger();
+    private static final int MAX_BUF_SIZE = 64 * 1024 * 1024; //64MB.
     MessageType type;
     long dataLength;
     ByteBuffer buf;
-    String[] initAry = new String[3];
 
     public PythonMessageBuilder() {
         this.type = null;
@@ -49,12 +46,12 @@ public class PythonMessageBuilder {
         this.type = type;
     }
 
-    public void packHeader() {
+    public void packHeader() throws HyracksDataException {
         MessagePackerFromADM.packFixPos(buf, (byte) type.ordinal());
     }
 
     //TODO: this is wrong for any multibyte chars
-    private int getStringLength(String s) {
+    private static int getStringLength(String s) {
         return s.length();
     }
 
@@ -66,7 +63,7 @@ public class PythonMessageBuilder {
     public void hello() throws IOException {
         this.type = MessageType.HELO;
         byte[] serAddr = serialize(new InetSocketAddress(InetAddress.getLoopbackAddress(), 1));
-        dataLength = serAddr.length + 5;
+        dataLength = serAddr.length + 1;
         packHeader();
         //TODO:make this cleaner
         buf.put(BIN32);
@@ -74,32 +71,38 @@ public class PythonMessageBuilder {
         buf.put(serAddr);
     }
 
-    public void quit() {
+    public void quit() throws HyracksDataException {
         this.type = MessageType.QUIT;
         dataLength = getStringLength("QUIT");
         packHeader();
         MessagePackerFromADM.packFixStr(buf, "QUIT");
     }
 
-    public void init(String module, String clazz, String fn) {
+    public void init(final String module, final String clazz, final String fn) throws HyracksDataException {
         this.type = MessageType.INIT;
-        initAry[0] = module;
-        initAry[1] = clazz;
-        initAry[2] = fn;
-        dataLength = Arrays.stream(initAry).mapToInt(s -> getStringLength(s)).sum() + 2;
+        // sum(string lengths) + 2 from fix array tag and message type
+        if (clazz != null) {
+            dataLength =
+                    PythonMessageBuilder.getStringLength(module) + getStringLength(clazz) + getStringLength(fn) + 2;
+        } else {
+            dataLength = PythonMessageBuilder.getStringLength(module) + getStringLength(fn) + 2;
+        }
         packHeader();
-        MessagePackerFromADM.packFixArrayHeader(buf, (byte) initAry.length);
-        for (String s : initAry) {
-            MessagePackerFromADM.packStr(buf, s);
+        int numArgs = clazz == null ? 2 : 3;
+        MessagePackerFromADM.packFixArrayHeader(buf, (byte) numArgs);
+        MessagePackerFromADM.packStr(buf, module);
+        if (clazz != null) {
+            MessagePackerFromADM.packStr(buf, clazz);
         }
+        MessagePackerFromADM.packStr(buf, fn);
     }
 
-    public void call(byte[] args, int lim, int numArgs) {
+    public void call(byte[] args, int lim, int numArgs) throws HyracksDataException {
         if (args.length > buf.capacity()) {
             int growTo = ExternalFunctionResultRouter.closestPow2(args.length);
             if (growTo > MAX_BUF_SIZE) {
-                //TODO: something more graceful
-                throw new IllegalArgumentException("Reached maximum buffer size");
+                throw HyracksDataException.create(ErrorCode.ILLEGAL_STATE,
+                        "Unable to allocate message buffer larger than:" + MAX_BUF_SIZE + " bytes");
             }
             buf = ByteBuffer.allocate(growTo);
         }
@@ -109,11 +112,32 @@ public class PythonMessageBuilder {
         dataLength = 5 + 1 + lim;
         packHeader();
         //TODO: make this switch between fixarray/array16/array32
-        if (numArgs == 0) {
-            buf.put(NIL);
-        } else {
-            buf.put(ARRAY32);
-            buf.putInt(numArgs);
+        buf.put((byte) (FIXARRAY_PREFIX + 1));
+        buf.put(ARRAY32);
+        buf.putInt(numArgs);
+        if (numArgs > 0) {
+            buf.put(args, 0, lim);
+        }
+    }
+
+    public void callMulti(byte[] args, int lim, int numArgs) throws HyracksDataException {
+        if (args.length > buf.capacity()) {
+            int growTo = ExternalFunctionResultRouter.closestPow2(args.length);
+            if (growTo > MAX_BUF_SIZE) {
+                throw HyracksDataException.create(ErrorCode.ILLEGAL_STATE,
+                        "Unable to allocate message buffer larger than:" + MAX_BUF_SIZE + " bytes");
+            }
+            buf = ByteBuffer.allocate(growTo);
+        }
+        buf.clear();
+        buf.position(0);
+        this.type = MessageType.CALL;
+        dataLength = 5 + 1 + lim;
+        packHeader();
+        //TODO: make this switch between fixarray/array16/array32
+        buf.put(ARRAY16);
+        buf.putShort((short) numArgs);
+        if (numArgs > 0) {
             buf.put(args, 0, lim);
         }
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
index 1fa53ea..e664f47 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
@@ -19,50 +19,31 @@
 
 package org.apache.asterix.external.library;
 
+import static org.msgpack.core.MessagePack.Code.FIXARRAY_PREFIX;
+
 import java.io.DataOutput;
-import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.metadata.DataverseName;
-import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
-import org.apache.asterix.external.ipc.PythonIPCProto;
-import org.apache.asterix.external.library.msgpack.MessagePackerFromADM;
 import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
 import org.apache.asterix.om.functions.IExternalFunctionInfo;
 import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
 import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.config.IApplicationConfig;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.resources.IDeallocatable;
-import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
-import org.apache.hyracks.ipc.impl.IPCSystem;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessageUnpacker;
+import org.msgpack.core.buffer.ArrayBufferInput;
 
 class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvaluator {
 
@@ -72,53 +53,22 @@ class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvalua
     private final ByteBuffer argHolder;
     private final ByteBuffer outputWrapper;
     private final IEvaluatorContext evaluatorContext;
-    private static final String ENTRYPOINT = "entrypoint.py";
-    private static final String SITE_PACKAGES = "site-packages";
 
     private final IPointable[] argValues;
+    private final SourceLocation sourceLocation;
+
+    private MessageUnpacker unpacker;
+    private ArrayBufferInput unpackerInput;
+
+    private long fnId;
 
     ExternalScalarPythonFunctionEvaluator(IExternalFunctionInfo finfo, IScalarEvaluatorFactory[] args,
             IAType[] argTypes, IEvaluatorContext ctx, SourceLocation sourceLoc) throws HyracksDataException {
         super(finfo, args, argTypes, ctx);
-        IApplicationConfig cfg = ctx.getServiceContext().getAppConfig();
-        String pythonPathCmd = cfg.getString(NCConfig.Option.PYTHON_CMD);
-        boolean findPython = cfg.getBoolean(NCConfig.Option.PYTHON_CMD_AUTOLOCATE);
-        List<String> pythonArgs = new ArrayList<>();
-        if (pythonPathCmd == null) {
-            //if absolute path to interpreter is not specified, try to use environmental python
-            if (findPython) {
-                pythonPathCmd = "/usr/bin/env";
-                pythonArgs.add("python3");
-            } else {
-                throw HyracksDataException.create(AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
-                        "Python interpreter not specified, and " + NCConfig.Option.PYTHON_CMD_AUTOLOCATE.ini()
-                                + " is false"));
-            }
-        }
-        File pythonPath = new File(pythonPathCmd);
-        List<String> sitePkgs = new ArrayList<>();
-        sitePkgs.add(SITE_PACKAGES);
-        String[] addlSitePackages =
-                ctx.getServiceContext().getAppConfig().getStringArray((NCConfig.Option.PYTHON_ADDITIONAL_PACKAGES));
-        sitePkgs.addAll(Arrays.asList(addlSitePackages));
-        if (cfg.getBoolean(NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK)) {
-            sitePkgs.add("ipc" + File.separator + SITE_PACKAGES + File.separator);
-        }
-        String[] pythonArgsRaw = ctx.getServiceContext().getAppConfig().getStringArray(NCConfig.Option.PYTHON_ARGS);
-        if (pythonArgsRaw != null) {
-            pythonArgs.addAll(Arrays.asList(pythonArgsRaw));
-        }
-        StringBuilder sitePackagesPathBuilder = new StringBuilder();
-        for (int i = 0; i < sitePkgs.size() - 1; i++) {
-            sitePackagesPathBuilder.append(sitePkgs.get(i));
-            sitePackagesPathBuilder.append(File.pathSeparator);
-        }
-        sitePackagesPathBuilder.append(sitePkgs.get(sitePkgs.size() - 1));
-
         try {
-            libraryEvaluator = PythonLibraryEvaluator.getInstance(finfo, libraryManager, router, ipcSys, pythonPath,
-                    ctx.getTaskContext(), sitePackagesPathBuilder.toString(), pythonArgs, ctx.getWarningCollector(),
-                    sourceLoc);
+            PythonLibraryEvaluatorFactory evaluatorFactory = new PythonLibraryEvaluatorFactory(ctx.getTaskContext());
+            this.libraryEvaluator = evaluatorFactory.getEvaluator(finfo, sourceLoc);
+            this.fnId = libraryEvaluator.initialize(finfo);
         } catch (IOException | AsterixException e) {
             throw new HyracksDataException("Failed to initialize Python", e);
         }
@@ -130,6 +80,9 @@ class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvalua
         this.argHolder = ByteBuffer.wrap(new byte[Short.MAX_VALUE * 2]);
         this.outputWrapper = ByteBuffer.wrap(new byte[Short.MAX_VALUE * 2]);
         this.evaluatorContext = ctx;
+        this.sourceLocation = sourceLoc;
+        this.unpackerInput = new ArrayBufferInput(new byte[0]);
+        this.unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
     }
 
     @Override
@@ -137,193 +90,57 @@ class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvalua
         argHolder.clear();
         for (int i = 0, ln = argEvals.length; i < ln; i++) {
             argEvals[i].evaluate(tuple, argValues[i]);
+            if (!finfo.getNullCall() && PointableHelper.checkAndSetMissingOrNull(result, argValues[i])) {
+                return;
+            }
             try {
-                setArgument(i, argValues[i]);
+                PythonLibraryEvaluator.setArgument(argTypes[i], argValues[i], argHolder, finfo.getNullCall());
             } catch (IOException e) {
                 throw new HyracksDataException("Error evaluating Python UDF", e);
             }
         }
         try {
-            ByteBuffer res = libraryEvaluator.callPython(argHolder, argTypes.length);
+            ByteBuffer res = libraryEvaluator.callPython(fnId, argHolder, argTypes.length);
             resultBuffer.reset();
             wrap(res, resultBuffer.getDataOutput());
         } catch (Exception e) {
             throw new HyracksDataException("Error evaluating Python UDF", e);
         }
-        result.set(resultBuffer.getByteArray(), resultBuffer.getStartOffset(), resultBuffer.getLength());
-    }
-
-    private static class PythonLibraryEvaluator extends AbstractStateObject implements IDeallocatable {
-        Process p;
-        IExternalFunctionInfo finfo;
-        ILibraryManager libMgr;
-        File pythonHome;
-        PythonIPCProto proto;
-        ExternalFunctionResultRouter router;
-        IPCSystem ipcSys;
-        String module;
-        String clazz;
-        String fn;
-        String sitePkgs;
-        List<String> pythonArgs;
-        TaskAttemptId task;
-        IWarningCollector warningCollector;
-        SourceLocation sourceLoc;
-
-        private PythonLibraryEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, IExternalFunctionInfo finfo,
-                ILibraryManager libMgr, File pythonHome, String sitePkgs, List<String> pythonArgs,
-                ExternalFunctionResultRouter router, IPCSystem ipcSys, TaskAttemptId task,
-                IWarningCollector warningCollector, SourceLocation sourceLoc) {
-            super(jobId, evaluatorId);
-            this.finfo = finfo;
-            this.libMgr = libMgr;
-            this.pythonHome = pythonHome;
-            this.sitePkgs = sitePkgs;
-            this.pythonArgs = pythonArgs;
-            this.router = router;
-            this.task = task;
-            this.ipcSys = ipcSys;
-            this.warningCollector = warningCollector;
-            this.sourceLoc = sourceLoc;
-
-        }
-
-        public void initialize() throws IOException, AsterixException {
-            PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id;
-            List<String> externalIdents = finfo.getExternalIdentifier();
-            PythonLibrary library = (PythonLibrary) libMgr.getLibrary(fnId.libraryDataverseName, fnId.libraryName);
-            String wd = library.getFile().getAbsolutePath();
-            String packageModule = externalIdents.get(0);
-            String clazz;
-            String fn;
-            String externalIdent1 = externalIdents.get(1);
-            int idx = externalIdent1.lastIndexOf('.');
-            if (idx >= 0) {
-                clazz = externalIdent1.substring(0, idx);
-                fn = externalIdent1.substring(idx + 1);
-            } else {
-                clazz = "None";
-                fn = externalIdent1;
-            }
-            this.fn = fn;
-            this.clazz = clazz;
-            this.module = packageModule;
-            int port = ipcSys.getSocketAddress().getPort();
-            List<String> args = new ArrayList<>();
-            args.add(pythonHome.getAbsolutePath());
-            args.addAll(pythonArgs);
-            args.add(ENTRYPOINT);
-            args.add(InetAddress.getLoopbackAddress().getHostAddress());
-            args.add(Integer.toString(port));
-            args.add(sitePkgs);
-            ProcessBuilder pb = new ProcessBuilder(args.toArray(new String[0]));
-            pb.directory(new File(wd));
-            p = pb.start();
-            proto = new PythonIPCProto(p.getOutputStream(), router, ipcSys);
-            proto.start();
-            proto.helo();
-            proto.init(packageModule, clazz, fn);
-        }
-
-        ByteBuffer callPython(ByteBuffer arguments, int numArgs) throws Exception {
-            ByteBuffer ret = null;
-            try {
-                ret = proto.call(arguments, numArgs);
-            } catch (AsterixException e) {
-                warningCollector.warn(Warning.of(sourceLoc, ErrorCode.EXTERNAL_UDF_EXCEPTION, e.getMessage()));
-            }
-            return ret;
-        }
-
-        @Override
-        public void deallocate() {
-            if (p != null) {
-                boolean dead = false;
-                try {
-                    p.destroy();
-                    dead = p.waitFor(100, TimeUnit.MILLISECONDS);
-                } catch (InterruptedException e) {
-                    //gonna kill it anyway
-                }
-                if (!dead) {
-                    p.destroyForcibly();
-                }
-            }
-        }
-
-        private static PythonLibraryEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr,
-                ExternalFunctionResultRouter router, IPCSystem ipcSys, File pythonHome, IHyracksTaskContext ctx,
-                String sitePkgs, List<String> pythonArgs, IWarningCollector warningCollector, SourceLocation sourceLoc)
-                throws IOException, AsterixException {
-            PythonLibraryEvaluatorId evaluatorId =
-                    new PythonLibraryEvaluatorId(finfo.getLibraryDataverseName(), finfo.getLibraryName());
-            PythonLibraryEvaluator evaluator = (PythonLibraryEvaluator) ctx.getStateObject(evaluatorId);
-            if (evaluator == null) {
-                evaluator = new PythonLibraryEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, finfo, libMgr,
-                        pythonHome, sitePkgs, pythonArgs, router, ipcSys, ctx.getTaskAttemptId(), warningCollector,
-                        sourceLoc);
-                ctx.registerDeallocatable(evaluator);
-                evaluator.initialize();
-                ctx.setStateObject(evaluator);
-            }
-            return evaluator;
-        }
-    }
-
-    private static final class PythonLibraryEvaluatorId {
-
-        private final DataverseName libraryDataverseName;
-
-        private final String libraryName;
-
-        private PythonLibraryEvaluatorId(DataverseName libraryDataverseName, String libraryName) {
-            this.libraryDataverseName = Objects.requireNonNull(libraryDataverseName);
-            this.libraryName = Objects.requireNonNull(libraryName);
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-            PythonLibraryEvaluatorId that = (PythonLibraryEvaluatorId) o;
-            return libraryDataverseName.equals(that.libraryDataverseName) && libraryName.equals(that.libraryName);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(libraryDataverseName, libraryName);
-        }
-    }
-
-    private void setArgument(int index, IValueReference valueReference) throws IOException {
-        IAType type = argTypes[index];
-        ATypeTag tag = type.getTypeTag();
-        switch (tag) {
-            case ANY:
-                TaggedValuePointable pointy = TaggedValuePointable.FACTORY.createPointable();
-                pointy.set(valueReference);
-                ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
-                IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
-                MessagePackerFromADM.pack(valueReference, rtType, argHolder);
-                break;
-            default:
-                MessagePackerFromADM.pack(valueReference, type, argHolder);
-                break;
-        }
+        result.set(resultBuffer);
     }
 
     private void wrap(ByteBuffer resultWrapper, DataOutput out) throws HyracksDataException {
         //TODO: output wrapper needs to grow with result wrapper
         outputWrapper.clear();
         outputWrapper.position(0);
-        MessageUnpackerToADM.unpack(resultWrapper, outputWrapper, true);
         try {
+            if (resultWrapper == null) {
+                outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+                out.write(outputWrapper.array(), 0, outputWrapper.position() + outputWrapper.arrayOffset());
+                return;
+            }
+            if ((resultWrapper.get() ^ FIXARRAY_PREFIX) != (byte) 2) {
+                throw HyracksDataException.create(AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
+                        "Returned result missing outer wrapper"));
+            }
+            int numresults = resultWrapper.get() ^ FIXARRAY_PREFIX;
+            if (numresults > 0) {
+                MessageUnpackerToADM.unpack(resultWrapper, outputWrapper, true);
+            }
+            unpackerInput.reset(resultWrapper.array(), resultWrapper.position() + resultWrapper.arrayOffset(),
+                    resultWrapper.remaining());
+            unpacker.reset(unpackerInput);
+            int numEntries = unpacker.unpackArrayHeader();
+            for (int j = 0; j < numEntries; j++) {
+                outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+                if (evaluatorContext.getWarningCollector().shouldWarn()) {
+                    evaluatorContext.getWarningCollector().warn(
+                            Warning.of(sourceLocation, ErrorCode.EXTERNAL_UDF_EXCEPTION, unpacker.unpackString()));
+                }
+            }
             out.write(outputWrapper.array(), 0, outputWrapper.position() + outputWrapper.arrayOffset());
         } catch (IOException e) {
-            throw new HyracksDataException(e.getMessage());
+            throw HyracksDataException.create(e);
         }
-
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java
new file mode 100644
index 0000000..e2229ee
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library;
+
+import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_UDF_EXCEPTION;
+import static org.msgpack.core.MessagePack.Code.ARRAY16;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
+import org.apache.asterix.external.ipc.PythonIPCProto;
+import org.apache.asterix.external.library.msgpack.MessagePackerFromADM;
+import org.apache.asterix.om.functions.IExternalFunctionInfo;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.resources.IDeallocatable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
+import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+
+public class PythonLibraryEvaluator extends AbstractStateObject implements IDeallocatable {
+
+    public static final String ENTRYPOINT = "entrypoint.py";
+    public static final String SITE_PACKAGES = "site-packages";
+
+    private Process p;
+    private ILibraryManager libMgr;
+    private File pythonHome;
+    private PythonIPCProto proto;
+    private ExternalFunctionResultRouter router;
+    private IPCSystem ipcSys;
+    private String sitePkgs;
+    private List<String> pythonArgs;
+    private TaskAttemptId task;
+    private IWarningCollector warningCollector;
+    private SourceLocation sourceLoc;
+
+    public PythonLibraryEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, ILibraryManager libMgr,
+            File pythonHome, String sitePkgs, List<String> pythonArgs, ExternalFunctionResultRouter router,
+            IPCSystem ipcSys, TaskAttemptId task, IWarningCollector warningCollector, SourceLocation sourceLoc) {
+        super(jobId, evaluatorId);
+        this.libMgr = libMgr;
+        this.pythonHome = pythonHome;
+        this.sitePkgs = sitePkgs;
+        this.pythonArgs = pythonArgs;
+        this.router = router;
+        this.task = task;
+        this.ipcSys = ipcSys;
+        this.warningCollector = warningCollector;
+        this.sourceLoc = sourceLoc;
+
+    }
+
+    private void initialize() throws IOException, AsterixException {
+        PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id;
+        PythonLibrary library =
+                (PythonLibrary) libMgr.getLibrary(fnId.getLibraryDataverseName(), fnId.getLibraryName());
+        String wd = library.getFile().getAbsolutePath();
+        int port = ipcSys.getSocketAddress().getPort();
+        List<String> args = new ArrayList<>();
+        args.add(pythonHome.getAbsolutePath());
+        args.addAll(pythonArgs);
+        args.add(ENTRYPOINT);
+        args.add(InetAddress.getLoopbackAddress().getHostAddress());
+        args.add(Integer.toString(port));
+        args.add(sitePkgs);
+
+        ProcessBuilder pb = new ProcessBuilder(args.toArray(new String[0]));
+        pb.directory(new File(wd));
+        p = pb.start();
+        proto = new PythonIPCProto(p.getOutputStream(), router, p);
+        proto.start();
+        proto.helo();
+    }
+
+    public long initialize(IExternalFunctionInfo finfo) throws IOException, AsterixException {
+        List<String> externalIdents = finfo.getExternalIdentifier();
+        String packageModule = externalIdents.get(0);
+        String clazz;
+        String fn;
+        String externalIdent1 = externalIdents.get(1);
+        int idx = externalIdent1.lastIndexOf('.');
+        if (idx >= 0) {
+            clazz = externalIdent1.substring(0, idx);
+            fn = externalIdent1.substring(idx + 1);
+        } else {
+            clazz = null;
+            fn = externalIdent1;
+        }
+        return proto.init(packageModule, clazz, fn);
+    }
+
+    public ByteBuffer callPython(long id, ByteBuffer arguments, int numArgs) throws IOException {
+        ByteBuffer ret = null;
+        try {
+            ret = proto.call(id, arguments, numArgs);
+        } catch (AsterixException e) {
+            if (warningCollector.shouldWarn()) {
+                warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage()));
+            }
+        }
+        return ret;
+    }
+
+    public ByteBuffer callPythonMulti(long id, ByteBuffer arguments, int numTuples) throws IOException {
+        ByteBuffer ret = null;
+        try {
+            ret = proto.callMulti(id, arguments, numTuples);
+        } catch (AsterixException e) {
+            if (warningCollector.shouldWarn()) {
+                warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage()));
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public void deallocate() {
+        if (p != null) {
+            boolean dead = false;
+            try {
+                p.destroy();
+                dead = p.waitFor(100, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                //gonna kill it anyway
+            }
+            if (!dead) {
+                p.destroyForcibly();
+            }
+        }
+        router.removeRoute(proto.getRouteId());
+    }
+
+    public static ATypeTag setArgument(IAType type, IValueReference valueReference, ByteBuffer argHolder,
+            boolean nullCall) throws IOException {
+        ATypeTag tag = type.getTypeTag();
+        if (tag == ATypeTag.ANY) {
+            TaggedValuePointable pointy = TaggedValuePointable.FACTORY.createPointable();
+            pointy.set(valueReference);
+            ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
+            IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
+            return MessagePackerFromADM.pack(valueReference, rtType, argHolder, nullCall);
+        } else {
+            return MessagePackerFromADM.pack(valueReference, type, argHolder, nullCall);
+        }
+    }
+
+    public static ATypeTag peekArgument(IAType type, IValueReference valueReference) throws HyracksDataException {
+        ATypeTag tag = type.getTypeTag();
+        if (tag == ATypeTag.ANY) {
+            TaggedValuePointable pointy = TaggedValuePointable.FACTORY.createPointable();
+            pointy.set(valueReference);
+            ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
+            IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
+            return MessagePackerFromADM.peekUnknown(rtType);
+        } else {
+            return MessagePackerFromADM.peekUnknown(type);
+        }
+    }
+
+    public static void setVoidArgument(ByteBuffer argHolder) {
+        argHolder.put(ARRAY16);
+        argHolder.putShort((short) 0);
+    }
+
+    public static PythonLibraryEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr,
+            ExternalFunctionResultRouter router, IPCSystem ipcSys, File pythonHome, IHyracksTaskContext ctx,
+            String sitePkgs, List<String> pythonArgs, IWarningCollector warningCollector, SourceLocation sourceLoc)
+            throws IOException, AsterixException {
+        PythonLibraryEvaluatorId evaluatorId = new PythonLibraryEvaluatorId(finfo.getLibraryDataverseName(),
+                finfo.getLibraryName(), Thread.currentThread());
+        PythonLibraryEvaluator evaluator = (PythonLibraryEvaluator) ctx.getStateObject(evaluatorId);
+        if (evaluator == null) {
+            evaluator = new PythonLibraryEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, libMgr, pythonHome,
+                    sitePkgs, pythonArgs, router, ipcSys, ctx.getTaskAttemptId(), warningCollector, sourceLoc);
+            ctx.getJobletContext().registerDeallocatable(evaluator);
+            evaluator.initialize();
+            ctx.setStateObject(evaluator);
+        }
+        return evaluator;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java
new file mode 100644
index 0000000..86d51de
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library;
+
+import static org.apache.asterix.external.library.PythonLibraryEvaluator.SITE_PACKAGES;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
+import org.apache.asterix.om.functions.IExternalFunctionInfo;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+
+public class PythonLibraryEvaluatorFactory {
+    private final ILibraryManager libraryManager;
+    private final IPCSystem ipcSys;
+    private final File pythonPath;
+    private final IHyracksTaskContext ctx;
+    private final ExternalFunctionResultRouter router;
+    private final String sitePackagesPath;
+    private final List<String> pythonArgs;
+
+    public PythonLibraryEvaluatorFactory(IHyracksTaskContext ctx) throws AsterixException {
+        this.ctx = ctx;
+        libraryManager = ((INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext())
+                .getLibraryManager();
+        router = libraryManager.getRouter();
+        ipcSys = libraryManager.getIPCI();
+        IApplicationConfig appCfg = ctx.getJobletContext().getServiceContext().getAppConfig();
+        String pythonPathCmd = appCfg.getString(NCConfig.Option.PYTHON_CMD);
+        boolean findPython = appCfg.getBoolean(NCConfig.Option.PYTHON_CMD_AUTOLOCATE);
+        pythonArgs = new ArrayList<>();
+        if (pythonPathCmd == null) {
+            if (findPython) {
+                //if absolute path to interpreter is not specified, try to use environmental python
+                pythonPathCmd = "/usr/bin/env";
+                pythonArgs.add("python3");
+            } else {
+                throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION, "Python interpreter not specified, and "
+                        + NCConfig.Option.PYTHON_CMD_AUTOLOCATE.ini() + " is false");
+            }
+        }
+        pythonPath = new File(pythonPathCmd);
+        List<String> sitePkgs = new ArrayList<>();
+        sitePkgs.add(SITE_PACKAGES);
+        String[] addlSitePackages = appCfg.getStringArray((NCConfig.Option.PYTHON_ADDITIONAL_PACKAGES));
+        sitePkgs.addAll(Arrays.asList(addlSitePackages));
+        if (appCfg.getBoolean(NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK)) {
+            sitePkgs.add("ipc" + File.separator + SITE_PACKAGES + File.separator);
+        }
+        String[] pythonArgsRaw = appCfg.getStringArray(NCConfig.Option.PYTHON_ARGS);
+        if (pythonArgsRaw != null) {
+            pythonArgs.addAll(Arrays.asList(pythonArgsRaw));
+        }
+        StringBuilder sitePackagesPathBuilder = new StringBuilder();
+        for (int i = 0; i < sitePkgs.size() - 1; i++) {
+            sitePackagesPathBuilder.append(sitePkgs.get(i));
+            sitePackagesPathBuilder.append(File.pathSeparator);
+        }
+        sitePackagesPathBuilder.append(sitePkgs.get(sitePkgs.size() - 1));
+        sitePackagesPath = sitePackagesPathBuilder.toString();
+    }
+
+    public PythonLibraryEvaluator getEvaluator(IExternalFunctionInfo fnInfo, SourceLocation sourceLoc)
+            throws IOException, AsterixException {
+        return PythonLibraryEvaluator.getInstance(fnInfo, libraryManager, router, ipcSys, pythonPath, ctx,
+                sitePackagesPath, pythonArgs, ctx.getWarningCollector(), sourceLoc);
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorId.java
new file mode 100644
index 0000000..c2f6f00
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorId.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library;
+
+import java.util.Objects;
+
+import org.apache.asterix.common.metadata.DataverseName;
+
+final class PythonLibraryEvaluatorId {
+
+    private final DataverseName libraryDataverseName;
+
+    private final String libraryName;
+
+    private final Thread thread;
+
+    PythonLibraryEvaluatorId(DataverseName libraryDataverseName, String libraryName, Thread thread) {
+        this.libraryDataverseName = Objects.requireNonNull(libraryDataverseName);
+        this.libraryName = Objects.requireNonNull(libraryName);
+        this.thread = Objects.requireNonNull(thread);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        PythonLibraryEvaluatorId that = (PythonLibraryEvaluatorId) o;
+        return libraryDataverseName.equals(that.libraryDataverseName) && libraryName.equals(that.libraryName)
+                && thread.equals(that.thread);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(libraryDataverseName, libraryName);
+    }
+
+    public DataverseName getLibraryDataverseName() {
+        return libraryDataverseName;
+    }
+
+    public String getLibraryName() {
+        return libraryName;
+    }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java
index 383b2f1..f0ac56e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java
@@ -27,16 +27,15 @@ import static org.msgpack.core.MessagePack.Code.INT32;
 import static org.msgpack.core.MessagePack.Code.INT64;
 import static org.msgpack.core.MessagePack.Code.INT8;
 import static org.msgpack.core.MessagePack.Code.MAP32;
+import static org.msgpack.core.MessagePack.Code.NIL;
 import static org.msgpack.core.MessagePack.Code.STR32;
 import static org.msgpack.core.MessagePack.Code.TRUE;
-import static org.msgpack.core.MessagePack.Code.UINT16;
-import static org.msgpack.core.MessagePack.Code.UINT32;
-import static org.msgpack.core.MessagePack.Code.UINT64;
-import static org.msgpack.core.MessagePack.Code.UINT8;
 
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.AUnionType;
@@ -64,11 +63,12 @@ public class MessagePackerFromADM {
     private static final int ITEM_COUNT_SIZE = 4;
     private static final int ITEM_OFFSET_SIZE = 4;
 
-    public static void pack(IValueReference ptr, IAType type, ByteBuffer out) throws HyracksDataException {
-        pack(ptr.getByteArray(), ptr.getStartOffset(), type, true, out);
+    public static ATypeTag pack(IValueReference ptr, IAType type, ByteBuffer out, boolean packUnknown)
+            throws HyracksDataException {
+        return pack(ptr.getByteArray(), ptr.getStartOffset(), type, true, packUnknown, out);
     }
 
-    public static void pack(byte[] ptr, int offs, IAType type, boolean tagged, ByteBuffer out)
+    public static ATypeTag pack(byte[] ptr, int offs, IAType type, boolean tagged, boolean packUnknown, ByteBuffer out)
             throws HyracksDataException {
         int relOffs = tagged ? offs + 1 : offs;
         ATypeTag tag = type.getTypeTag();
@@ -108,34 +108,35 @@ public class MessagePackerFromADM {
             case OBJECT:
                 packObject(ptr, offs, type, out);
                 break;
+            case MISSING:
+            case NULL:
+                if (packUnknown) {
+                    packNull(out);
+                    break;
+                } else {
+                    return tag;
+                }
             default:
-                throw new IllegalArgumentException("NYI");
+                throw HyracksDataException.create(AsterixException.create(ErrorCode.PARSER_ADM_DATA_PARSER_CAST_ERROR,
+                        tag.name(), "to a msgpack"));
         }
+        return ATypeTag.TYPE;
     }
 
-    public static byte minPackPosLong(ByteBuffer out, long in) {
-        if (in < 127) {
-            packFixPos(out, (byte) in);
-            return 1;
-        } else if (in < Byte.MAX_VALUE) {
-            out.put(UINT8);
-            out.put((byte) in);
-            return 2;
-        } else if (in < Short.MAX_VALUE) {
-            out.put(UINT16);
-            out.putShort((short) in);
-            return 3;
-        } else if (in < Integer.MAX_VALUE) {
-            out.put(UINT32);
-            out.putInt((int) in);
-            return 5;
-        } else {
-            out.put(UINT64);
-            out.putLong(in);
-            return 9;
+    public static ATypeTag peekUnknown(IAType type) {
+        switch (type.getTypeTag()) {
+            case MISSING:
+            case NULL:
+                return type.getTypeTag();
+            default:
+                return ATypeTag.TYPE;
         }
     }
 
+    public static void packNull(ByteBuffer out) {
+        out.put(NIL);
+    }
+
     public static void packByte(ByteBuffer out, byte in) {
         out.put(INT8);
         out.put(in);
@@ -167,18 +168,20 @@ public class MessagePackerFromADM {
         out.putDouble(in);
     }
 
-    public static void packFixPos(ByteBuffer out, byte in) {
+    public static void packFixPos(ByteBuffer out, byte in) throws HyracksDataException {
         byte mask = (byte) (1 << 7);
         if ((in & mask) != 0) {
-            throw new IllegalArgumentException("fixint7 must be positive");
+            throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+                    "fixint7 must be positive");
         }
         out.put(in);
     }
 
-    public static void packFixStr(ByteBuffer buf, String in) {
-        byte[] strBytes = in.getBytes(Charset.forName("UTF-8"));
+    public static void packFixStr(ByteBuffer buf, String in) throws HyracksDataException {
+        byte[] strBytes = in.getBytes(StandardCharsets.UTF_8);
         if (strBytes.length > 31) {
-            throw new IllegalArgumentException("fixstr cannot be longer than 31");
+            throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+                    "fixint7 must be positive");
         }
         buf.put((byte) (FIXSTR_PREFIX + strBytes.length));
         buf.put(strBytes);
@@ -186,7 +189,7 @@ public class MessagePackerFromADM {
 
     public static void packStr(ByteBuffer out, String in) {
         out.put(STR32);
-        byte[] strBytes = in.getBytes(Charset.forName("UTF-8"));
+        byte[] strBytes = in.getBytes(StandardCharsets.UTF_8);
         out.putInt(strBytes.length);
         out.put(strBytes);
     }
@@ -195,14 +198,14 @@ public class MessagePackerFromADM {
         out.put(STR32);
         //TODO: tagged/untagged. closed support is borked so always tagged rn
         String str = UTF8StringUtil.toString(in, offs);
-        byte[] strBytes = str.getBytes(Charset.forName("UTF-8"));
+        byte[] strBytes = str.getBytes(StandardCharsets.UTF_8);
         out.putInt(strBytes.length);
         out.put(strBytes);
     }
 
     public static void packStr(String str, ByteBuffer out) {
         out.put(STR32);
-        byte[] strBytes = str.getBytes(Charset.forName("UTF-8"));
+        byte[] strBytes = str.getBytes(StandardCharsets.UTF_8);
         out.putInt(strBytes.length);
         out.put(strBytes);
     }
@@ -221,12 +224,12 @@ public class MessagePackerFromADM {
             if (fixType) {
                 int itemOffs = itemCtOffs + ITEM_COUNT_SIZE + (i
                         * NonTaggedFormatUtil.getFieldValueLength(in, 0, collType.getItemType().getTypeTag(), false));
-                pack(in, itemOffs, collType.getItemType(), false, out);
+                pack(in, itemOffs, collType.getItemType(), false, true, out);
             } else {
                 int itemOffs =
                         offs + IntegerPointable.getInteger(in, itemCtOffs + ITEM_COUNT_SIZE + (i * ITEM_OFFSET_SIZE));
                 ATypeTag tag = ATypeTag.VALUE_TYPE_MAPPING[BytePointable.getByte(in, itemOffs)];
-                pack(in, itemOffs, TypeTagUtil.getBuiltinTypeByTag(tag), true, out);
+                pack(in, itemOffs, TypeTagUtil.getBuiltinTypeByTag(tag), true, true, out);
             }
         }
     }
@@ -240,14 +243,14 @@ public class MessagePackerFromADM {
             String field = recType.getFieldNames()[i];
             IAType fieldType = RecordUtils.getClosedFieldType(recType, i);
             packStr(field, out);
-            pack(in, RecordUtils.getClosedFieldOffset(in, offs, recType, i), fieldType, false, out);
+            pack(in, RecordUtils.getClosedFieldOffset(in, offs, recType, i), fieldType, false, true, out);
         }
         if (RecordUtils.isExpanded(in, offs, recType)) {
             for (int i = 0; i < RecordUtils.getOpenFieldCount(in, offs, recType); i++) {
                 packStr(in, RecordUtils.getOpenFieldNameOffset(in, offs, recType, i), out);
                 ATypeTag tag = ATypeTag.VALUE_TYPE_MAPPING[RecordUtils.getOpenFieldTag(in, offs, recType, i)];
                 pack(in, RecordUtils.getOpenFieldValueOffset(in, offs, recType, i),
-                        TypeTagUtil.getBuiltinTypeByTag(tag), true, out);
+                        TypeTagUtil.getBuiltinTypeByTag(tag), true, true, out);
             }
         }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
index fedd1f6..4af1121 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
@@ -20,13 +20,16 @@ import static org.msgpack.core.MessagePack.Code.*;
 
 import java.nio.ByteBuffer;
 
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
 import org.apache.hyracks.util.string.UTF8StringUtil;
 
 public class MessageUnpackerToADM {
 
-    public static void unpack(ByteBuffer in, ByteBuffer out, boolean tagged) {
+    public static void unpack(ByteBuffer in, ByteBuffer out, boolean tagged) throws HyracksDataException {
         byte tag = NIL;
         if (in != null) {
             tag = in.get();
@@ -68,6 +71,9 @@ public class MessageUnpackerToADM {
                 case UINT32:
                     unpackUInt(in, out, tagged);
                     break;
+                case UINT64:
+                    unpackULong(in, out, tagged);
+                    break;
                 case INT8:
                     unpackByte(in, out, tagged);
                     break;
@@ -109,42 +115,12 @@ public class MessageUnpackerToADM {
                     break;
 
                 default:
-                    throw new IllegalArgumentException("NYI");
+                    throw HyracksDataException.create(AsterixException.create(
+                            ErrorCode.PARSER_ADM_DATA_PARSER_CAST_ERROR, "msgpack tag " + tag + " ", "to an ADM type"));
             }
         }
     }
 
-    public static long unpackNextInt(ByteBuffer in) {
-        byte tag = in.get();
-        if (isFixInt(tag)) {
-            if (isPosFixInt(tag)) {
-                return tag;
-            } else if (isNegFixInt(tag)) {
-                return (tag ^ NEGFIXINT_PREFIX);
-            }
-        } else {
-            switch (tag) {
-                case INT8:
-                    return in.get();
-                case UINT8:
-                    return Byte.toUnsignedInt(in.get());
-                case INT16:
-                    return in.getShort();
-                case UINT16:
-                    return Short.toUnsignedInt(in.getShort());
-                case INT32:
-                    return in.getInt();
-                case UINT32:
-                    return Integer.toUnsignedLong(in.getInt());
-                case INT64:
-                    return in.getLong();
-                default:
-                    throw new IllegalArgumentException("NYI");
-            }
-        }
-        return -1;
-    }
-
     public static void unpackByte(ByteBuffer in, ByteBuffer out, boolean tagged) {
         if (tagged) {
             out.put(ATypeTag.SERIALIZED_INT8_TYPE_TAG);
@@ -194,6 +170,17 @@ public class MessageUnpackerToADM {
         out.putLong(in.getInt() & 0x00000000FFFFFFFFl);
     }
 
+    public static void unpackULong(ByteBuffer in, ByteBuffer out, boolean tagged) {
+        if (tagged) {
+            out.put(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+        }
+        long val = in.getLong();
+        if (val < 0) {
+            throw new IllegalArgumentException("Integer overflow");
+        }
+        out.putLong(val);
+    }
+
     public static void unpackFloat(ByteBuffer in, ByteBuffer out, boolean tagged) {
         if (tagged) {
             out.put(ATypeTag.SERIALIZED_FLOAT_TYPE_TAG);
@@ -209,9 +196,9 @@ public class MessageUnpackerToADM {
         out.putDouble(in.getDouble());
     }
 
-    public static void unpackArray(ByteBuffer in, ByteBuffer out, long uLen) {
+    public static void unpackArray(ByteBuffer in, ByteBuffer out, long uLen) throws HyracksDataException {
         if (uLen > Integer.MAX_VALUE) {
-            throw new UnsupportedOperationException("String is too long");
+            throw new UnsupportedOperationException("Array is too long");
         }
         int count = (int) uLen;
         int offs = out.position();
@@ -233,7 +220,7 @@ public class MessageUnpackerToADM {
         out.putInt(asxLenPos, totalLen);
     }
 
-    public static void unpackMap(ByteBuffer in, ByteBuffer out, int count) {
+    public static void unpackMap(ByteBuffer in, ByteBuffer out, int count) throws HyracksDataException {
         //TODO: need to handle typed records. this only produces a completely open record.
         //hdr size = 6?
         int startOffs = out.position();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
index 44a17a9..39e480a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
@@ -19,18 +19,44 @@
 
 package org.apache.asterix.external.operators;
 
+import static org.msgpack.core.MessagePack.Code.ARRAY16;
+import static org.msgpack.core.MessagePack.Code.ARRAY32;
+import static org.msgpack.core.MessagePack.Code.FIXARRAY_PREFIX;
+import static org.msgpack.core.MessagePack.Code.isFixedArray;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.library.PythonLibraryEvaluator;
+import org.apache.asterix.external.library.PythonLibraryEvaluatorFactory;
+import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
 import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.Counter;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessagePackException;
+import org.msgpack.core.MessageUnpacker;
+import org.msgpack.core.buffer.ArrayBufferInput;
 
 public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
-
     private int[] outColumns;
     private final IExternalFunctionDescriptor[] fnDescs;
     private final int[][] fnArgColumns;
@@ -44,9 +70,242 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne
     }
 
     @Override
-    public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx)
-            throws HyracksDataException {
-        throw new HyracksDataException(ErrorCode.OPERATOR_NOT_IMPLEMENTED, sourceLoc,
-                PhysicalOperatorTag.ASSIGN_BATCH.toString());
+    public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
+
+        final int[] projectionToOutColumns = new int[projectionList.length];
+        for (int j = 0; j < projectionList.length; j++) {
+            projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
+        }
+
+        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+            private ByteBuffer outputWrapper;
+            private List<ByteBuffer> argHolders;
+            ArrayTupleBuilder tupleBuilder;
+            private List<Pair<Long, PythonLibraryEvaluator>> libraryEvaluators;
+            private ATypeTag[][] nullCalls;
+            private int[] numCalls;
+            private VoidPointable ref;
+            private MessageUnpacker unpacker;
+            private ArrayBufferInput unpackerInput;
+            private List<Pair<ByteBuffer, Counter>> batchResults;
+
+            @Override
+            public void open() throws HyracksDataException {
+                super.open();
+                initAccessAppend(ctx);
+                tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+                tRef = new FrameTupleReference();
+                ref = VoidPointable.FACTORY.createPointable();
+                libraryEvaluators = new ArrayList<>();
+                try {
+                    PythonLibraryEvaluatorFactory evalFactory = new PythonLibraryEvaluatorFactory(ctx);
+                    for (IExternalFunctionDescriptor fnDesc : fnDescs) {
+                        PythonLibraryEvaluator eval = evalFactory.getEvaluator(fnDesc.getFunctionInfo(), sourceLoc);
+                        long id = eval.initialize(fnDesc.getFunctionInfo());
+                        libraryEvaluators.add(new Pair<>(id, eval));
+                    }
+                } catch (IOException | AsterixException e) {
+                    throw RuntimeDataException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION, e, sourceLoc, e.getMessage());
+                }
+                argHolders = new ArrayList<>(fnArgColumns.length);
+                for (int i = 0; i < fnArgColumns.length; i++) {
+                    argHolders.add(ctx.allocateFrame());
+                }
+                outputWrapper = ctx.allocateFrame();
+                nullCalls = new ATypeTag[argHolders.size()][0];
+                numCalls = new int[fnArgColumns.length];
+                batchResults = new ArrayList<>(argHolders.size());
+                for (int i = 0; i < argHolders.size(); i++) {
+                    batchResults.add(new Pair<>(ctx.allocateFrame(), new Counter(-1)));
+                }
+                unpackerInput = new ArrayBufferInput(new byte[0]);
+                unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
+            }
+
+            private void resetBuffers(int numTuples, int[] numCalls) {
+                for (int func = 0; func < fnArgColumns.length; func++) {
+                    argHolders.get(func).clear();
+                    argHolders.get(func).position(0);
+                    if (nullCalls[func].length < numTuples) {
+                        nullCalls[func] = new ATypeTag[numTuples];
+                    }
+                    numCalls[func] = numTuples;
+                    Arrays.fill(nullCalls[func], ATypeTag.TYPE);
+                    for (Pair<ByteBuffer, Counter> batch : batchResults) {
+                        batch.getFirst().clear();
+                        batch.getFirst().position(0);
+                        batch.getSecond().set(-1);
+                    }
+                }
+            }
+
+            private ATypeTag handleNullMatrix(int func, int t, ATypeTag argumentPresence, ATypeTag argumentStatus) {
+                //If any argument is unknown, skip call. If any argument is null, return null, first.
+                //However, if any argument is missing, return missing instead.
+                if (nullCalls[func][t] == ATypeTag.TYPE && argumentPresence != ATypeTag.TYPE) {
+                    if (argumentPresence == ATypeTag.NULL && argumentStatus != ATypeTag.MISSING) {
+                        nullCalls[func][t] = argumentPresence;
+                        return ATypeTag.NULL;
+                    } else {
+                        nullCalls[func][t] = argumentPresence;
+                        return ATypeTag.MISSING;
+                    }
+                }
+                return argumentPresence;
+            }
+
+            private void collectFunctionWarnings(List<Pair<ByteBuffer, Counter>> batchResults) throws IOException {
+                for (Pair<ByteBuffer, Counter> result : batchResults) {
+                    if (result.getSecond().get() > -1) {
+                        ByteBuffer resBuf = result.getFirst();
+                        unpackerInput.reset(resBuf.array(), resBuf.position() + resBuf.arrayOffset(),
+                                resBuf.remaining());
+                        unpacker.reset(unpackerInput);
+                        try {
+                            int numEntries = unpacker.unpackArrayHeader();
+                            for (int j = 0; j < numEntries; j++) {
+                                if (ctx.getWarningCollector().shouldWarn()) {
+                                    ctx.getWarningCollector().warn(Warning.of(sourceLoc,
+                                            ErrorCode.EXTERNAL_UDF_EXCEPTION, unpacker.unpackString()));
+                                }
+                            }
+                        } catch (MessagePackException e) {
+                            if (ctx.getWarningCollector().shouldWarn()) {
+                                ctx.getWarningCollector().warn(Warning.of(sourceLoc, ErrorCode.EXTERNAL_UDF_EXCEPTION,
+                                        "Error retrieving returned warnings from Python UDF"));
+                            }
+                        }
+                    }
+                }
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                tAccess.reset(buffer);
+                tupleBuilder.reset();
+                try {
+                    int numTuples = tAccess.getTupleCount();
+                    resetBuffers(numTuples, numCalls);
+                    //build columns of arguments for each function
+                    for (int t = 0; t < numTuples; t++) {
+                        for (int func = 0; func < fnArgColumns.length; func++) {
+                            tRef.reset(tAccess, t);
+                            int[] cols = fnArgColumns[func];
+                            //TODO: switch between fixarray/array16/array32 where appropriate
+                            ATypeTag argumentStatus = ATypeTag.TYPE;
+                            if (!fnDescs[func].getFunctionInfo().getNullCall()) {
+                                for (int colIdx = 0; colIdx < cols.length; colIdx++) {
+                                    ref.set(buffer.array(), tRef.getFieldStart(cols[colIdx]),
+                                            tRef.getFieldLength(cols[colIdx]));
+                                    ATypeTag argumentPresence = PythonLibraryEvaluator
+                                            .peekArgument(fnDescs[func].getArgumentTypes()[colIdx], ref);
+                                    argumentStatus = handleNullMatrix(func, t, argumentPresence, argumentStatus);
+                                }
+                            }
+                            if (argumentStatus == ATypeTag.TYPE) {
+                                if (cols.length > 0) {
+                                    argHolders.get(func).put(ARRAY16);
+                                    argHolders.get(func).putShort((short) cols.length);
+                                }
+                                for (int colIdx = 0; colIdx < cols.length; colIdx++) {
+                                    ref.set(buffer.array(), tRef.getFieldStart(cols[colIdx]),
+                                            tRef.getFieldLength(cols[colIdx]));
+                                    PythonLibraryEvaluator.setArgument(fnDescs[func].getArgumentTypes()[colIdx], ref,
+                                            argHolders.get(func), fnDescs[func].getFunctionInfo().getNullCall());
+                                }
+                            } else {
+                                numCalls[func]--;
+                            }
+                            if (cols.length == 0) {
+                                PythonLibraryEvaluator.setVoidArgument(argHolders.get(func));
+                            }
+                        }
+                    }
+                    //TODO: maybe this could be done in parallel for each unique library evaluator?
+                    for (int argHolderIdx = 0; argHolderIdx < argHolders.size(); argHolderIdx++) {
+                        Pair<Long, PythonLibraryEvaluator> fnEval = libraryEvaluators.get(argHolderIdx);
+                        ByteBuffer columnResult = fnEval.getSecond().callPythonMulti(fnEval.getFirst(),
+                                argHolders.get(argHolderIdx), numCalls[argHolderIdx]);
+                        if (columnResult != null) {
+                            Pair<ByteBuffer, Counter> resultholder = batchResults.get(argHolderIdx);
+                            if (resultholder.getFirst().capacity() < columnResult.capacity()) {
+                                resultholder.setFirst(ctx.allocateFrame(columnResult.capacity()));
+                            }
+                            ByteBuffer resultBuf = resultholder.getFirst();
+                            resultBuf.clear();
+                            resultBuf.position(0);
+                            //offset 1 to skip message type
+                            System.arraycopy(columnResult.array(), columnResult.arrayOffset() + 1, resultBuf.array(),
+                                    resultBuf.arrayOffset(), columnResult.capacity() - 1);
+                            //wrapper for results and warnings arrays. always length 2
+                            consumeAndGetBatchLength(resultBuf);
+                            int numResults = (int) consumeAndGetBatchLength(resultBuf);
+                            resultholder.getSecond().set(numResults);
+                        } else {
+                            if (ctx.getWarningCollector().shouldWarn()) {
+                                ctx.getWarningCollector()
+                                        .warn(Warning.of(sourceLoc, ErrorCode.EXTERNAL_UDF_EXCEPTION,
+                                                "Function "
+                                                        + fnDescs[argHolderIdx].getFunctionInfo()
+                                                                .getFunctionIdentifier().toString()
+                                                        + " failed to execute"));
+                            }
+                        }
+                    }
+                    //decompose returned function columns into frame tuple format
+                    for (int i = 0; i < numTuples; i++) {
+                        tupleBuilder.reset();
+                        for (int f = 0; f < projectionList.length; f++) {
+                            int k = projectionToOutColumns[f];
+                            if (k >= 0) {
+                                outputWrapper.clear();
+                                outputWrapper.position(0);
+                                Pair<ByteBuffer, Counter> result = batchResults.get(k);
+                                int start = outputWrapper.arrayOffset();
+                                ATypeTag functionCalled = nullCalls[k][i];
+                                if (functionCalled == ATypeTag.TYPE) {
+                                    if (result.getSecond().get() > 0) {
+                                        MessageUnpackerToADM.unpack(result.getFirst(), outputWrapper, true);
+                                        result.getSecond().set(result.getSecond().get() - 1);
+                                    } else {
+                                        //emit NULL for functions which failed with a warning
+                                        outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+                                    }
+                                } else if (functionCalled == ATypeTag.NULL) {
+                                    outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+                                } else {
+                                    outputWrapper.put(ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
+                                }
+                                tupleBuilder.addField(outputWrapper.array(), start, start + outputWrapper.position());
+                            } else {
+                                tupleBuilder.addField(tAccess, i, projectionList[f]);
+                            }
+                        }
+                        appendToFrameFromTupleBuilder(tupleBuilder);
+                    }
+                    collectFunctionWarnings(batchResults);
+                } catch (IOException e) {
+                    throw HyracksDataException.create(e);
+                }
+            }
+
+            private long consumeAndGetBatchLength(ByteBuffer buf) {
+                byte tag = buf.get();
+                if (isFixedArray(tag)) {
+                    return tag ^ FIXARRAY_PREFIX;
+                } else if (tag == ARRAY16) {
+                    return Short.toUnsignedInt(buf.getShort());
+                } else if (tag == ARRAY32) {
+                    return Integer.toUnsignedLong(buf.getInt());
+                }
+                return -1L;
+            }
+
+            @Override
+            public void flush() throws HyracksDataException {
+                appender.flush(writer);
+            }
+        };
     }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java
index f900c92..6751171 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java
@@ -81,7 +81,7 @@ public class ExternalFunctionCompilerUtil {
 
         return new ExternalScalarFunctionInfo(function.getSignature().createFunctionIdentifier(), paramTypes,
                 returnType, typeComputer, lang, function.getLibraryDataverseName(), function.getLibraryName(),
-                function.getExternalIdentifier(), function.getResources(), deterministic);
+                function.getExternalIdentifier(), function.getResources(), deterministic, function.getNullCall());
     }
 
     private static IFunctionInfo getUnnestFunctionInfo(MetadataProvider metadataProvider, Function function) {
@@ -182,7 +182,7 @@ public class ExternalFunctionCompilerUtil {
             case JAVA:
                 return false;
             case PYTHON:
-                return false;
+                return true;
             default:
                 throw new CompilationException(ErrorCode.METADATA_ERROR, language.name());
         }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalScalarFunctionInfo.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalScalarFunctionInfo.java
index 854320a..82f74d9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalScalarFunctionInfo.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalScalarFunctionInfo.java
@@ -35,8 +35,9 @@ public class ExternalScalarFunctionInfo extends ExternalFunctionInfo {
 
     public ExternalScalarFunctionInfo(FunctionIdentifier fid, List<IAType> parameterTypes, IAType returnType,
             IResultTypeComputer rtc, ExternalFunctionLanguage language, DataverseName libraryDataverseName,
-            String libraryName, List<String> externalIdentifier, Map<String, String> resources, boolean deterministic) {
+            String libraryName, List<String> externalIdentifier, Map<String, String> resources, boolean deterministic,
+            boolean nullCall) {
         super(fid, FunctionKind.SCALAR, parameterTypes, returnType, rtc, language, libraryDataverseName, libraryName,
-                externalIdentifier, resources, deterministic);
+                externalIdentifier, resources, deterministic, nullCall);
     }
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/ExternalFunctionInfo.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/ExternalFunctionInfo.java
index 7477330..a3ec9c6 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/ExternalFunctionInfo.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/ExternalFunctionInfo.java
@@ -30,7 +30,7 @@ import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 
 public class ExternalFunctionInfo extends FunctionInfo implements IExternalFunctionInfo {
 
-    private static final long serialVersionUID = 4L;
+    private static final long serialVersionUID = 5L;
 
     private final FunctionKind kind;
     private final List<IAType> parameterTypes;
@@ -40,11 +40,12 @@ public class ExternalFunctionInfo extends FunctionInfo implements IExternalFunct
     private final String libraryName;
     private final List<String> externalIdentifier;
     private final Map<String, String> resources;
+    private final boolean nullCall;
 
     public ExternalFunctionInfo(FunctionIdentifier fid, FunctionKind kind, List<IAType> parameterTypes,
             IAType returnType, IResultTypeComputer rtc, ExternalFunctionLanguage language,
             DataverseName libraryDataverseName, String libraryName, List<String> externalIdentifier,
-            Map<String, String> resources, boolean deterministic) {
+            Map<String, String> resources, boolean deterministic, boolean nullCall) {
         super(fid, rtc, deterministic);
         this.kind = kind;
         this.parameterTypes = parameterTypes;
@@ -54,6 +55,7 @@ public class ExternalFunctionInfo extends FunctionInfo implements IExternalFunct
         this.libraryName = libraryName;
         this.externalIdentifier = externalIdentifier;
         this.resources = resources;
+        this.nullCall = nullCall;
     }
 
     @Override
@@ -94,4 +96,9 @@ public class ExternalFunctionInfo extends FunctionInfo implements IExternalFunct
     public Map<String, String> getResources() {
         return resources;
     }
+
+    @Override
+    public boolean getNullCall() {
+        return nullCall;
+    }
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionInfo.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionInfo.java
index d87d6df..9eb9875 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionInfo.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionInfo.java
@@ -47,4 +47,6 @@ public interface IExternalFunctionInfo extends IFunctionInfo {
     List<String> getExternalIdentifier();
 
     Map<String, String> getResources();
+
+    boolean getNullCall();
 }