You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2022/08/25 18:56:18 UTC

[asterixdb] 03/03: [ASTERIXDB-3034][RT] Fenced UDFs

This is an automated email from the ASF dual-hosted git repository.

imaxon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 409e6a88b9e4083e0c30c3abb83d1cf3a66b3411
Author: Ian Maxon <ia...@maxons.email>
AuthorDate: Tue May 31 17:02:24 2022 -0700

    [ASTERIXDB-3034][RT] Fenced UDFs
    
    - user model changes: yes
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - Allow UDFs to be run via domain socket activated
    systemd services . This makes it so the UDF is run
    as a different user than the NC process itself
    
    Change-Id: Ibeb6228f2dc8edbf642e61cd5633c71913e18972
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16364
    Reviewed-by: Wail Alkowaileet <wa...@gmail.com>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
---
 asterixdb/asterix-app/pom.xml                      |   2 +-
 .../asterix-app/src/main/resources/entrypoint.py   |   1 +
 .../api/common/AsterixHyracksIntegrationUtil.java  |   7 +-
 .../asterix/test/runtime/LangExecutionUtil.java    |  15 +-
 .../resources/runtimets/testsuite_it_python.xml    |  14 +-
 asterixdb/asterix-docker/pom.xml                   |  68 -------
 .../external/api/IExternalLangIPCProto.java        | 105 +++++++++++
 .../asterix/external/api/ILibraryEvaluator.java    |  40 ++++
 ...onIPCProto.java => AbstractPythonIPCProto.java} | 157 +++-------------
 .../external/ipc/PythonDomainSocketProto.java      | 161 ++++++++++++++++
 .../asterix/external/ipc/PythonMessageBuilder.java |  10 +
 .../asterix/external/ipc/PythonTCPSocketProto.java |  85 +++++++++
 .../library/AbstractLibrarySocketEvaluator.java    | 100 ++++++++++
 .../ExternalScalarPythonFunctionEvaluator.java     |   5 +-
 .../PythonLibraryDomainSocketEvaluator.java        | 126 +++++++++++++
 .../external/library/PythonLibraryEvaluator.java   | 209 ---------------------
 .../library/PythonLibraryEvaluatorFactory.java     | 163 +++++++++-------
 .../library/PythonLibraryTCPSocketEvaluator.java   | 127 +++++++++++++
 .../ExternalAssignBatchRuntimeFactory.java         |  42 +++--
 .../asterix/external/util/ExternalDataUtils.java   |  74 ++++++--
 .../docker/.gitattributes                          |   0
 .../docker/Dockerfile                              |   0
 .../docker/asterix-configuration.xml               |   0
 .../docker/fbm.adm                                 |   0
 .../docker/fbu.adm                                 |   0
 .../docker/supervisord.conf                        |   0
 .../docker/twm.adm                                 |   0
 .../docker/twu.adm                                 |   0
 asterixdb/asterix-podman/pom.xml                   | 156 +++++++++++++++
 .../test/podman/PodmanPythonFunctionIT.java        | 103 ++++++++++
 .../asterix/test/podman/PodmanUDFLibrarian.java    |  85 +++++++++
 .../src/test/resources/cc.conf}                    |  36 ++--
 asterixdb/asterix-podman/src/test/resources/passwd |   1 +
 .../asterix-podman/src/test/resources/setup.sh     |   8 +
 .../src/test/resources/socktest/Containerfile      |  17 ++
 .../asterix-podman/src/test/resources/testenv.conf |   3 +
 asterixdb/asterix-server/pom.xml                   |  67 +++++--
 asterixdb/asterix-server/src/deb/control/control   |   5 +-
 asterixdb/asterix-server/src/deb/control/postinst  |   3 +-
 asterixdb/asterix-server/src/deb/control/preinst   |   4 +
 .../src/deb/systemd/asterix-cc.service             |   3 +-
 .../src/deb/systemd/asterix-nc.service             |   1 +
 .../src/deb/systemd/cc.conf}                       |  33 ++--
 .../systemd/{asterix-nc.service => pyudf.socket}   |  18 +-
 .../systemd/{asterix-cc.service => pyudf@.service} |  16 +-
 .../src/deb/udf_listener.py}                       |  49 ++---
 asterixdb/pom.xml                                  |   2 +-
 .../control/common/controllers/NCConfig.java       |   3 +
 48 files changed, 1506 insertions(+), 618 deletions(-)

diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 51ede69956..8262ce6e99 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -388,7 +388,7 @@
     <profile>
       <id>asterix-gerrit-asterix-app</id>
       <properties>
-        <test.excludes>**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,**/AqlExecutionTest.java,**/*Compression*Test.java,**/*Ssl*Test.java</test.excludes>
+        <test.excludes>**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,**/AqlExecutionTest.java,**/*Compression*Test.java,**/*Ssl*Test.java,**/Podman*.java</test.excludes>
         <itest.excludes>**/*.java</itest.excludes>
       </properties>
       <build>
diff --git a/asterixdb/asterix-app/src/main/resources/entrypoint.py b/asterixdb/asterix-app/src/main/resources/entrypoint.py
index 7bad7ef485..918596ca33 100755
--- a/asterixdb/asterix-app/src/main/resources/entrypoint.py
+++ b/asterixdb/asterix-app/src/main/resources/entrypoint.py
@@ -168,6 +168,7 @@ class Wrapper(object):
 
     def quit(self):
         self.alive = False
+        self.disconnect_sock()
         return True
 
     def handle_call(self):
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 86d8ed4ad1..e8c2c1d00f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -43,6 +43,7 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.hyracks.bootstrap.CCApplication;
 import org.apache.asterix.hyracks.bootstrap.NCApplication;
+import org.apache.asterix.lang.common.util.ExpressionUtils;
 import org.apache.asterix.test.dataflow.TestLsmIoOpCallbackFactory;
 import org.apache.asterix.test.dataflow.TestPrimaryIndexOperationTrackerFactory;
 import org.apache.commons.io.FileUtils;
@@ -132,13 +133,13 @@ public class AsterixHyracksIntegrationUtil {
         cc = new ClusterControllerService(ccConfig, ccApplication);
 
         nodeNames = ccConfig.getConfigManager().getNodeNames();
-        if (deleteOldInstanceData) {
+        if (deleteOldInstanceData && nodeNames != null) {
             deleteTransactionLogs();
             removeTestStorageFiles();
             deleteCCFiles();
         }
         final List<NodeControllerService> nodeControllers = new ArrayList<>();
-        for (String nodeId : nodeNames) {
+        for (String nodeId : ExpressionUtils.emptyIfNull(nodeNames)) {
             // mark this NC as virtual, so that the CC doesn't try to start via NCService...
             configManager.set(nodeId, NCConfig.Option.NCSERVICE_PORT, NCConfig.NCSERVICE_PORT_DISABLED);
             final INCApplication ncApplication = createNCApplication();
@@ -303,7 +304,7 @@ public class AsterixHyracksIntegrationUtil {
 
         stopCC(false);
 
-        if (deleteOldInstanceData) {
+        if (deleteOldInstanceData && nodeNames != null) {
             deleteTransactionLogs();
             removeTestStorageFiles();
             deleteCCFiles();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 408882d082..d704d8e456 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -33,6 +33,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.asterix.app.external.ExternalUDFLibrarian;
+import org.apache.asterix.app.external.IExternalUDFLibrarian;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
@@ -65,11 +66,17 @@ public class LangExecutionUtil {
     }
 
     public static void setUp(String configFile, TestExecutor executor, boolean startHdfs) throws Exception {
+        setUp(configFile, executor, startHdfs, false, new ExternalUDFLibrarian());
+    }
+
+    public static void setUp(String configFile, TestExecutor executor, boolean startHdfs, boolean disableLangExec,
+            IExternalUDFLibrarian librarian) throws Exception {
         testExecutor = executor;
         File outdir = new File(PATH_ACTUAL);
         outdir.mkdirs();
-        ExecutionTestUtil.setUp(cleanupOnStart, configFile, integrationUtil, startHdfs, null);
-        librarian = new ExternalUDFLibrarian();
+        if (!disableLangExec) {
+            ExecutionTestUtil.setUp(cleanupOnStart, configFile, integrationUtil, startHdfs, null);
+        }
         testExecutor.setLibrarian(librarian);
         if (repeat != 1) {
             System.out.println("FYI: each test will be run " + repeat + " times.");
@@ -151,7 +158,9 @@ public class LangExecutionUtil {
         NodeControllerService[] ncs = integrationUtil.ncs;
         // Checks that dataset files are uniformly distributed across each io device.
         for (NodeControllerService nc : ncs) {
-            checkNcStore(nc);
+            if (nc != null) {
+                checkNcStore(nc);
+            }
         }
     }
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
index 686ede22e9..284c2fd242 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
@@ -52,15 +52,7 @@
     <test-case FilePath="external-library" check-warnings="true">
       <compilation-unit name="py_function_error">
         <output-dir compare="Clean-JSON">py_function_error</output-dir>
-        <expected-warn>ASX0201: External UDF returned exception. Returned exception was: Traceback (most recent call last):
-  File "entrypoint.py", line 181, in handle_call
-    result[0].append(self.next_tuple(*arg, key=self.mid))
-  File "entrypoint.py", line 99, in next_tuple
-    return self.wrapped_fns[key](*args)
-  File "site-packages/roundtrip.py", line 32, in warning
-    raise ArithmeticError("oof")
-ArithmeticError: oof
- (in line 28, at column 1)</expected-warn>
+        <expected-warn>ArithmeticError: oof</expected-warn>
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-library">
@@ -76,8 +68,8 @@ ArithmeticError: oof
     <test-case FilePath="external-library" check-warnings="true">
       <compilation-unit name="crash">
         <output-dir compare="Text">crash</output-dir>
-        <expected-warn>ASX0201: External UDF returned exception. Returned exception was: Function externallibtest:crash#0 failed to execute (in line 23, at column 1)</expected-warn>
-        <expected-warn>ASX0201: External UDF returned exception. Returned exception was: java.io.IOException: Python process exited with code: 1 (in line 23, at column 1)</expected-warn>
+        <expected-warn>ASX0201: External UDF returned exception.</expected-warn>
+        <expected-warn>ASX0201: External UDF returned exception.</expected-warn>
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-library">
diff --git a/asterixdb/asterix-docker/pom.xml b/asterixdb/asterix-docker/pom.xml
deleted file mode 100644
index 6c54337dd5..0000000000
--- a/asterixdb/asterix-docker/pom.xml
+++ /dev/null
@@ -1,68 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements.  See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership.  The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License.  You may obtain a copy of the License at
- !
- !   http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied.  See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <artifactId>apache-asterixdb</artifactId>
-    <groupId>org.apache.asterix</groupId>
-    <version>0.9.8-SNAPSHOT</version>
-  </parent>
-  <artifactId>asterix-docker</artifactId>
-
-  <properties>
-    <root.dir>${basedir}/..</root.dir>
-  </properties>
-
-  <licenses>
-    <license>
-      <name>Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-      <distribution>repo</distribution>
-      <comments>A business-friendly OSS license</comments>
-    </license>
-  </licenses>
-
-  <profiles>
-    <profile>
-        <id>docker</id>
-        <build>
-          <plugins>
-            <plugin>
-              <groupId>com.spotify</groupId>
-              <artifactId>docker-maven-plugin</artifactId>
-              <version>0.2.11</version>
-              <configuration>
-                <imageName>asterixdb/demo</imageName>
-                <dockerDirectory>docker</dockerDirectory>
-                <resources>
-                  <resource>
-                    <targetPath>/</targetPath>
-                    <directory>../asterix-server/target/</directory>
-                    <include>asterix-server-${project.version}-binary-assembly.zip</include>
-                  </resource>
-                </resources>
-              </configuration>
-            </plugin>
-          </plugins>
-        </build>
-    </profile>
-  </profiles>
-
-</project>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalLangIPCProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalLangIPCProto.java
new file mode 100644
index 0000000000..35e59610f3
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalLangIPCProto.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.ipc.MessageType;
+import org.apache.asterix.external.library.msgpack.MsgPackPointableVisitor;
+import org.apache.asterix.om.pointables.AFlatValuePointable;
+import org.apache.asterix.om.pointables.AListVisitablePointable;
+import org.apache.asterix.om.pointables.ARecordVisitablePointable;
+import org.apache.asterix.om.pointables.PointableAllocator;
+import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public interface IExternalLangIPCProto {
+    static void visitValueRef(IAType type, DataOutput out, IValueReference valueReference,
+            PointableAllocator pointableAllocator, MsgPackPointableVisitor pointableVisitor, boolean visitNull)
+            throws IOException {
+        IVisitablePointable pointable;
+        switch (type.getTypeTag()) {
+            case OBJECT:
+                pointable = pointableAllocator.allocateRecordValue(type);
+                pointable.set(valueReference);
+                pointableVisitor.visit((ARecordVisitablePointable) pointable, pointableVisitor.getTypeInfo(type, out));
+                break;
+            case ARRAY:
+            case MULTISET:
+                pointable = pointableAllocator.allocateListValue(type);
+                pointable.set(valueReference);
+                pointableVisitor.visit((AListVisitablePointable) pointable, pointableVisitor.getTypeInfo(type, out));
+                break;
+            case ANY:
+                ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                        .deserialize(valueReference.getByteArray()[valueReference.getStartOffset()]);
+                IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
+                visitValueRef(rtType, out, valueReference, pointableAllocator, pointableVisitor, visitNull);
+                break;
+            case MISSING:
+            case NULL:
+                if (!visitNull) {
+                    return;
+                }
+            default:
+                pointable = pointableAllocator.allocateFieldValue(type);
+                pointable.set(valueReference);
+                pointableVisitor.visit((AFlatValuePointable) pointable, pointableVisitor.getTypeInfo(type, out));
+                break;
+        }
+    }
+
+    void start();
+
+    void helo() throws IOException, AsterixException;
+
+    long init(String module, String clazz, String fn) throws IOException, AsterixException;
+
+    ByteBuffer call(long functionId, IAType[] argTypes, IValueReference[] argValues, boolean nullCall)
+            throws IOException, AsterixException;
+
+    ByteBuffer callMulti(long key, ArrayBackedValueStorage args, int numTuples) throws IOException, AsterixException;
+
+    //For future use with interpreter reuse between jobs.
+    void quit() throws HyracksDataException;
+
+    void receiveMsg() throws IOException, AsterixException;
+
+    void sendHeader(long key, int msgLen) throws IOException;
+
+    void sendMsg(ArrayBackedValueStorage content) throws IOException;
+
+    void sendMsg() throws IOException;
+
+    MessageType getResponseType();
+
+    long getRouteId();
+
+    DataOutputStream getSockOut();
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ILibraryEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ILibraryEvaluator.java
new file mode 100644
index 0000000000..8c6538b181
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ILibraryEvaluator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.om.functions.IExternalFunctionInfo;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.resources.IDeallocatable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public interface ILibraryEvaluator extends IDeallocatable {
+
+    void start() throws IOException, AsterixException;
+
+    long initialize(IExternalFunctionInfo finfo) throws IOException, AsterixException;
+
+    ByteBuffer call(long id, IAType[] argTypes, IValueReference[] valueReferences, boolean nullCall) throws IOException;
+
+    ByteBuffer callMulti(long id, ArrayBackedValueStorage arguments, int numTuples) throws IOException;
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/AbstractPythonIPCProto.java
similarity index 51%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/AbstractPythonIPCProto.java
index c803517f45..00d1dccf77 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/AbstractPythonIPCProto.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,7 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,7 +20,6 @@ package org.apache.asterix.external.ipc;
 
 import static org.apache.hyracks.ipc.impl.Message.HEADER_SIZE;
 
-import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -26,16 +27,10 @@ import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.IExternalLangIPCProto;
 import org.apache.asterix.external.library.msgpack.MsgPackPointableVisitor;
-import org.apache.asterix.om.pointables.AFlatValuePointable;
-import org.apache.asterix.om.pointables.AListVisitablePointable;
-import org.apache.asterix.om.pointables.ARecordVisitablePointable;
 import org.apache.asterix.om.pointables.PointableAllocator;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.TypeTagUtil;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
@@ -45,28 +40,24 @@ import org.msgpack.core.MessagePack;
 import org.msgpack.core.MessageUnpacker;
 import org.msgpack.core.buffer.ArrayBufferInput;
 
-public class PythonIPCProto {
-
-    private final PythonMessageBuilder messageBuilder;
-    private final DataOutputStream sockOut;
-    private final ByteBuffer headerBuffer = ByteBuffer.allocate(21);
-    private ByteBuffer recvBuffer = ByteBuffer.allocate(32768);
-    private final ExternalFunctionResultRouter router;
-    private long routeId;
-    private Pair<ByteBuffer, Exception> bufferBox;
-    private final Process pythonProc;
-    private long maxFunctionId;
-    private final ArrayBufferInput unpackerInput;
-    private final MessageUnpacker unpacker;
-    private final ArrayBackedValueStorage argsStorage;
-    private final PointableAllocator pointableAllocator;
-    private final MsgPackPointableVisitor pointableVisitor;
-
-    public PythonIPCProto(OutputStream sockOut, ExternalFunctionResultRouter router, Process pythonProc) {
-        this.sockOut = new DataOutputStream(sockOut);
+public abstract class AbstractPythonIPCProto {
+    public static final int HEADER_SIZE_LEN_INCLUSIVE = 21;
+    protected final PythonMessageBuilder messageBuilder;
+    protected final DataOutputStream sockOut;
+    protected final ArrayBufferInput unpackerInput;
+    protected final MessageUnpacker unpacker;
+    protected final ArrayBackedValueStorage argsStorage;
+    protected final PointableAllocator pointableAllocator;
+    protected final MsgPackPointableVisitor pointableVisitor;
+    private final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE_LEN_INCLUSIVE);
+    protected ByteBuffer recvBuffer = ByteBuffer.allocate(32768);
+    protected long routeId;
+    protected Pair<ByteBuffer, Exception> bufferBox;
+    protected long maxFunctionId;
+
+    public AbstractPythonIPCProto(OutputStream sockOut) {
         messageBuilder = new PythonMessageBuilder();
-        this.router = router;
-        this.pythonProc = pythonProc;
+        this.sockOut = new DataOutputStream(sockOut);
         this.maxFunctionId = 0L;
         unpackerInput = new ArrayBufferInput(new byte[0]);
         unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
@@ -75,12 +66,6 @@ public class PythonIPCProto {
         this.pointableVisitor = new MsgPackPointableVisitor();
     }
 
-    public void start() {
-        Pair<Long, Pair<ByteBuffer, Exception>> keyAndBufferBox = router.insertRoute(recvBuffer);
-        this.routeId = keyAndBufferBox.getFirst();
-        this.bufferBox = keyAndBufferBox.getSecond();
-    }
-
     public void helo() throws IOException, AsterixException {
         recvBuffer.clear();
         recvBuffer.position(0);
@@ -121,8 +106,8 @@ public class PythonIPCProto {
         messageBuilder.reset();
         argsStorage.reset();
         for (int i = 0; i < argTypes.length; i++) {
-            visitValueRef(argTypes[i], argsStorage.getDataOutput(), argValues[i], pointableAllocator, pointableVisitor,
-                    nullCall);
+            IExternalLangIPCProto.visitValueRef(argTypes[i], argsStorage.getDataOutput(), argValues[i],
+                    pointableAllocator, pointableVisitor, nullCall);
         }
         int len = argsStorage.getLength() + 5;
         sendHeader(functionId, len);
@@ -154,42 +139,11 @@ public class PythonIPCProto {
         return recvBuffer;
     }
 
-    //For future use with interpreter reuse between jobs.
     public void quit() throws HyracksDataException {
         messageBuilder.quit();
-        router.removeRoute(routeId);
     }
 
-    public void receiveMsg() throws IOException, AsterixException {
-        Exception except;
-        try {
-            synchronized (bufferBox) {
-                while ((bufferBox.getFirst().limit() == 0 || bufferBox.getSecond() != null) && pythonProc.isAlive()) {
-                    bufferBox.wait(100);
-                }
-            }
-            except = router.getAndRemoveException(routeId);
-            if (!pythonProc.isAlive()) {
-                except = new IOException("Python process exited with code: " + pythonProc.exitValue());
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION, e);
-        }
-        if (except != null) {
-            throw new AsterixException(except);
-        }
-        if (bufferBox.getFirst() != recvBuffer) {
-            recvBuffer = bufferBox.getFirst();
-        }
-        messageBuilder.readHead(recvBuffer);
-        if (messageBuilder.type == MessageType.ERROR) {
-            unpackerInput.reset(recvBuffer.array(), recvBuffer.position() + recvBuffer.arrayOffset(),
-                    recvBuffer.remaining());
-            unpacker.reset(unpackerInput);
-            throw new AsterixException(unpacker.unpackString());
-        }
-    }
+    public abstract void receiveMsg() throws IOException, AsterixException;
 
     public void sendHeader(long key, int msgLen) throws IOException {
         headerBuffer.clear();
@@ -226,65 +180,4 @@ public class PythonIPCProto {
     public DataOutputStream getSockOut() {
         return sockOut;
     }
-
-    public static void visitValueRef(IAType type, DataOutput out, IValueReference valueReference,
-            PointableAllocator pointableAllocator, MsgPackPointableVisitor pointableVisitor, boolean visitNull)
-            throws IOException {
-        IVisitablePointable pointable;
-        switch (type.getTypeTag()) {
-            case OBJECT:
-                pointable = pointableAllocator.allocateRecordValue(type);
-                pointable.set(valueReference);
-                pointableVisitor.visit((ARecordVisitablePointable) pointable, pointableVisitor.getTypeInfo(type, out));
-                break;
-            case ARRAY:
-            case MULTISET:
-                pointable = pointableAllocator.allocateListValue(type);
-                pointable.set(valueReference);
-                pointableVisitor.visit((AListVisitablePointable) pointable, pointableVisitor.getTypeInfo(type, out));
-                break;
-            case ANY:
-                ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                        .deserialize(valueReference.getByteArray()[valueReference.getStartOffset()]);
-                IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
-                switch (rtTypeTag) {
-                    case OBJECT:
-                        pointable = pointableAllocator.allocateRecordValue(rtType);
-                        pointable.set(valueReference);
-                        pointableVisitor.visit((ARecordVisitablePointable) pointable,
-                                pointableVisitor.getTypeInfo(rtType, out));
-                        break;
-                    case ARRAY:
-                    case MULTISET:
-                        pointable = pointableAllocator.allocateListValue(rtType);
-                        pointable.set(valueReference);
-                        pointableVisitor.visit((AListVisitablePointable) pointable,
-                                pointableVisitor.getTypeInfo(rtType, out));
-                        break;
-                    case MISSING:
-                    case NULL:
-                        if (!visitNull) {
-                            return;
-                        }
-                    default:
-                        pointable = pointableAllocator.allocateFieldValue(rtType);
-                        pointable.set(valueReference);
-                        pointableVisitor.visit((AFlatValuePointable) pointable,
-                                pointableVisitor.getTypeInfo(rtType, out));
-                        break;
-                }
-                break;
-            case MISSING:
-            case NULL:
-                if (!visitNull) {
-                    return;
-                }
-            default:
-                pointable = pointableAllocator.allocateFieldValue(type);
-                pointable.set(valueReference);
-                pointableVisitor.visit((AFlatValuePointable) pointable, pointableVisitor.getTypeInfo(type, out));
-                break;
-        }
-    }
-
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonDomainSocketProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonDomainSocketProto.java
new file mode 100644
index 0000000000..89f240a9ee
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonDomainSocketProto.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.ipc;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.IExternalLangIPCProto;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.msgpack.core.MessagePack;
+
+public class PythonDomainSocketProto extends AbstractPythonIPCProto implements IExternalLangIPCProto {
+    private final String wd;
+    SocketChannel chan;
+    private ByteBuffer headerBuffer;
+    private ProcessHandle pid;
+    public static final int HYR_HEADER_SIZE = 21; // 4 (sz) + 8 (mid) + 8 (rmid) + 1 (flags)
+    public static final int HYR_HEADER_SIZE_NOSZ = 17; // 8 + 8 + 1
+
+    public PythonDomainSocketProto(OutputStream sockOut, SocketChannel chan, String wd) {
+        super(sockOut);
+        this.chan = chan;
+        this.wd = wd;
+        headerBuffer = ByteBuffer.allocate(HYR_HEADER_SIZE);
+    }
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void helo() throws IOException, AsterixException {
+        recvBuffer.clear();
+        recvBuffer.position(0);
+        recvBuffer.limit(0);
+        messageBuilder.reset();
+        messageBuilder.helloDS(wd);
+        sendHeader(routeId, messageBuilder.getLength());
+        sendMsg(true);
+        receiveMsg(true);
+        byte pidType = recvBuffer.get();
+        if (pidType != MessagePack.Code.UINT32 && pidType != MessagePack.Code.UINT16) {
+            throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
+                    "Returned pid type is incorrect: " + pidType);
+        }
+        switch (pidType) {
+            case MessagePack.Code.UINT32:
+                pid = ProcessHandle.of(recvBuffer.getInt()).get();
+                break;
+            case MessagePack.Code.UINT16:
+                pid = ProcessHandle.of(recvBuffer.getShort()).get();
+                break;
+            case MessagePack.Code.UINT8:
+                pid = ProcessHandle.of(recvBuffer.get()).get();
+                break;
+            default:
+                throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
+                        "Returned pid type is incorrect: " + pidType);
+        }
+        if (getResponseType() != MessageType.HELO) {
+            throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+                    "Expected HELO, recieved " + getResponseType().name());
+        }
+    }
+
+    @Override
+    public void sendMsg() throws IOException {
+        sendMsg(false);
+    }
+
+    @Override
+    public void sendMsg(ArrayBackedValueStorage args) throws IOException {
+        sendMsg(false, args);
+    }
+
+    public void sendMsg(boolean sendIfDead) throws IOException {
+        if (!sendIfDead && (pid == null || !pid.isAlive())) {
+            return;
+        }
+        super.sendMsg();
+    }
+
+    public void sendMsg(boolean sendIfDead, ArrayBackedValueStorage args) throws IOException {
+        if (!sendIfDead && (pid == null || !pid.isAlive())) {
+            return;
+        }
+        super.sendMsg(args);
+    }
+
+    @Override
+    public void receiveMsg() throws IOException, AsterixException {
+        receiveMsg(false);
+    }
+
+    public void receiveMsg(boolean sendIfDead) throws IOException, AsterixException {
+        if (!sendIfDead && (pid == null || !pid.isAlive())) {
+            throw new AsterixException("Python process exited unexpectedly");
+        }
+        readFully(headerBuffer.capacity(), headerBuffer);
+        if (headerBuffer.remaining() < Integer.BYTES) {
+            recvBuffer.limit(0);
+            throw new AsterixException("Python process exited unexpectedly");
+        }
+        int msgSz = headerBuffer.getInt() - HYR_HEADER_SIZE_NOSZ;
+        if (recvBuffer.capacity() < msgSz) {
+            recvBuffer = ByteBuffer.allocate(((msgSz / 32768) + 1) * 32768);
+        }
+        readFully(msgSz, recvBuffer);
+        messageBuilder.readHead(recvBuffer);
+        if (messageBuilder.type == MessageType.ERROR) {
+            unpackerInput.reset(recvBuffer.array(), recvBuffer.position() + recvBuffer.arrayOffset(),
+                    recvBuffer.remaining());
+            unpacker.reset(unpackerInput);
+            throw new AsterixException(unpacker.unpackString().replace('\0', ' '));
+        }
+    }
+
+    private void readFully(int msgSz, ByteBuffer buf) throws IOException, AsterixException {
+        buf.limit(msgSz);
+        buf.clear();
+        int read;
+        int size = msgSz;
+        while (size > 0) {
+            read = chan.read(buf);
+            if (read < 0) {
+                throw new AsterixException("Socket closed");
+            }
+            size -= read;
+        }
+        buf.flip();
+    }
+
+    @Override
+    public void quit() throws HyracksDataException {
+        messageBuilder.quit();
+    }
+
+    public ProcessHandle getPid() {
+        return pid;
+    }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
index 5429657fe7..20f8306274 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
@@ -82,6 +82,16 @@ public class PythonMessageBuilder {
         buf.put(serAddr);
     }
 
+    public void helloDS(String modulePath) throws IOException {
+        this.type = MessageType.HELO;
+        // sum(string lengths) + 2 from fix array tag and message type
+        dataLength = PythonMessageBuilder.getStringLength(modulePath) + 2;
+        packHeader();
+        MessagePackUtils.packFixArrayHeader(buf, (byte) 2);
+        MessagePackUtils.packStr(buf, "HELLO");
+        MessagePackUtils.packStr(buf, modulePath);
+    }
+
     public void quit() throws HyracksDataException {
         this.type = MessageType.QUIT;
         dataLength = getStringLength("QUIT");
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonTCPSocketProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonTCPSocketProto.java
new file mode 100644
index 0000000000..7fd3de49f9
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonTCPSocketProto.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.ipc;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.IExternalLangIPCProto;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class PythonTCPSocketProto extends AbstractPythonIPCProto
+        implements org.apache.asterix.external.api.IExternalLangIPCProto {
+
+    private final ExternalFunctionResultRouter router;
+    private final Process proc;
+
+    public PythonTCPSocketProto(OutputStream sockOut, ExternalFunctionResultRouter router, Process pythonProc) {
+        super(sockOut);
+        this.router = router;
+        this.proc = pythonProc;
+    }
+
+    @Override
+    public void start() {
+        Pair<Long, Pair<ByteBuffer, Exception>> keyAndBufferBox = router.insertRoute(recvBuffer);
+        this.routeId = keyAndBufferBox.getFirst();
+        this.bufferBox = keyAndBufferBox.getSecond();
+    }
+
+    @Override
+    public void quit() throws HyracksDataException {
+        messageBuilder.quit();
+        router.removeRoute(routeId);
+    }
+
+    @Override
+    public void receiveMsg() throws IOException, AsterixException {
+        Exception except;
+        try {
+            synchronized (bufferBox) {
+                while ((bufferBox.getFirst().limit() == 0 || bufferBox.getSecond() != null) && proc.isAlive()) {
+                    bufferBox.wait(100);
+                }
+            }
+            except = router.getAndRemoveException(routeId);
+            if (!proc.isAlive()) {
+                except = new IOException("Python process exited with code: " + proc.exitValue());
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION, e);
+        }
+        if (except != null) {
+            throw new AsterixException(except);
+        }
+        if (bufferBox.getFirst() != recvBuffer) {
+            recvBuffer = bufferBox.getFirst();
+        }
+        messageBuilder.readHead(recvBuffer);
+        if (messageBuilder.type == MessageType.ERROR) {
+            unpackerInput.reset(recvBuffer.array(), recvBuffer.position() + recvBuffer.arrayOffset(),
+                    recvBuffer.remaining());
+            unpacker.reset(unpackerInput);
+            throw new AsterixException(unpacker.unpackString());
+        }
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/AbstractLibrarySocketEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/AbstractLibrarySocketEvaluator.java
new file mode 100644
index 0000000000..6fcfdcff3c
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/AbstractLibrarySocketEvaluator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library;
+
+import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_UDF_EXCEPTION;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalLangIPCProto;
+import org.apache.asterix.external.api.ILibraryEvaluator;
+import org.apache.asterix.om.functions.IExternalFunctionInfo;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
+
+public abstract class AbstractLibrarySocketEvaluator extends AbstractStateObject implements ILibraryEvaluator {
+
+    protected IExternalLangIPCProto proto;
+    protected TaskAttemptId task;
+    protected IWarningCollector warningCollector;
+    protected SourceLocation sourceLoc;
+
+    public AbstractLibrarySocketEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, TaskAttemptId task,
+            IWarningCollector warningCollector, SourceLocation sourceLoc) {
+        super(jobId, evaluatorId);
+        this.task = task;
+        this.warningCollector = warningCollector;
+        this.sourceLoc = sourceLoc;
+    }
+
+    @Override
+    public long initialize(IExternalFunctionInfo finfo) throws IOException, AsterixException {
+        List<String> externalIdents = finfo.getExternalIdentifier();
+        String packageModule = externalIdents.get(0);
+        String clazz;
+        String fn;
+        String externalIdent1 = externalIdents.get(1);
+        int idx = externalIdent1.lastIndexOf('.');
+        if (idx >= 0) {
+            clazz = externalIdent1.substring(0, idx);
+            fn = externalIdent1.substring(idx + 1);
+        } else {
+            clazz = null;
+            fn = externalIdent1;
+        }
+        return proto.init(packageModule, clazz, fn);
+    }
+
+    @Override
+    public ByteBuffer call(long id, IAType[] argTypes, IValueReference[] valueReferences, boolean nullCall)
+            throws IOException {
+        ByteBuffer ret = null;
+        try {
+            ret = proto.call(id, argTypes, valueReferences, nullCall);
+        } catch (AsterixException e) {
+            if (warningCollector.shouldWarn()) {
+                warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage()));
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public ByteBuffer callMulti(long id, ArrayBackedValueStorage arguments, int numTuples) throws IOException {
+        ByteBuffer ret = null;
+        try {
+            ret = proto.callMulti(id, arguments, numTuples);
+        } catch (AsterixException e) {
+            if (warningCollector.shouldWarn()) {
+                warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage()));
+            }
+        }
+        return ret;
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
index 94a4dd2cf8..fb8d761520 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.ILibraryEvaluator;
 import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.functions.IExternalFunctionInfo;
@@ -49,7 +50,7 @@ import org.msgpack.core.buffer.ArrayBufferInput;
 
 class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvaluator {
 
-    private final PythonLibraryEvaluator libraryEvaluator;
+    private final ILibraryEvaluator libraryEvaluator;
 
     private final ArrayBackedValueStorage resultBuffer = new ArrayBackedValueStorage();
     private final ByteBuffer argHolder;
@@ -115,7 +116,7 @@ class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvalua
             return;
         }
         try {
-            ByteBuffer res = libraryEvaluator.callPython(fnId, argTypes, argValues, nullCall);
+            ByteBuffer res = libraryEvaluator.call(fnId, argTypes, argValues, nullCall);
             resultBuffer.reset();
             wrap(res, resultBuffer.getDataOutput());
         } catch (Exception e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryDomainSocketEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryDomainSocketEvaluator.java
new file mode 100644
index 0000000000..056aa9a014
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryDomainSocketEvaluator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.lang.invoke.VarHandle;
+import java.net.ProtocolFamily;
+import java.net.SocketAddress;
+import java.net.StandardProtocolFamily;
+import java.nio.channels.Channels;
+import java.nio.channels.SocketChannel;
+import java.nio.file.Path;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.external.ipc.PythonDomainSocketProto;
+import org.apache.asterix.om.functions.IExternalFunctionInfo;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class PythonLibraryDomainSocketEvaluator extends AbstractLibrarySocketEvaluator {
+
+    private final ILibraryManager libMgr;
+    private final Path sockPath;
+    SocketChannel chan;
+    ProcessHandle pid;
+    private static final Logger LOGGER = LogManager.getLogger(ExternalLibraryManager.class);
+
+    public PythonLibraryDomainSocketEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, ILibraryManager libMgr,
+            TaskAttemptId task, IWarningCollector warningCollector, SourceLocation sourceLoc, Path sockPath) {
+        super(jobId, evaluatorId, task, warningCollector, sourceLoc);
+        this.libMgr = libMgr;
+        this.sockPath = sockPath;
+    }
+
+    public void start() throws IOException, AsterixException {
+        PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id;
+        PythonLibrary library =
+                (PythonLibrary) libMgr.getLibrary(fnId.getLibraryDataverseName(), fnId.getLibraryName());
+        String wd = library.getFile().getAbsolutePath();
+        MethodHandles.Lookup lookup = MethodHandles.lookup();
+        SocketAddress sockAddr;
+        try {
+            VarHandle sockEnum = lookup.in(StandardProtocolFamily.class)
+                    .findStaticVarHandle(StandardProtocolFamily.class, "UNIX", StandardProtocolFamily.class);
+            Class domainSock = Class.forName("java.net.UnixDomainSocketAddress");
+            MethodType unixDomainSockAddrType = MethodType.methodType(domainSock, Path.class);
+            MethodHandle unixDomainSockAddr = lookup.findStatic(domainSock, "of", unixDomainSockAddrType);
+            MethodType sockChanMethodType = MethodType.methodType(SocketChannel.class, ProtocolFamily.class);
+            MethodHandle sockChanOpen = lookup.findStatic(SocketChannel.class, "open", sockChanMethodType);
+            sockAddr = ((SocketAddress) unixDomainSockAddr.invoke(sockPath));
+            chan = (SocketChannel) sockChanOpen.invoke(sockEnum.get());
+        } catch (Throwable e) {
+            throw HyracksDataException.create(ErrorCode.LOCAL_NETWORK_ERROR, e);
+        }
+        chan.connect(sockAddr);
+        proto = new PythonDomainSocketProto(Channels.newOutputStream(chan), chan, wd);
+        proto.start();
+        proto.helo();
+        this.pid = ((PythonDomainSocketProto) proto).getPid();
+    }
+
+    @Override
+    public void deallocate() {
+        try {
+            if (proto != null) {
+                proto.quit();
+            }
+            if (chan != null) {
+                chan.close();
+            }
+        } catch (IOException e) {
+            LOGGER.error("Caught exception exiting Python UDF:", e);
+        }
+        if (pid != null && pid.isAlive()) {
+            LOGGER.error("Python UDF " + pid.pid() + " did not exit as expected.");
+        }
+    }
+
+    static PythonLibraryDomainSocketEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr,
+            IHyracksTaskContext ctx, IWarningCollector warningCollector, SourceLocation sourceLoc)
+            throws IOException, AsterixException {
+        PythonLibraryEvaluatorId evaluatorId = new PythonLibraryEvaluatorId(finfo.getLibraryDataverseName(),
+                finfo.getLibraryName(), Thread.currentThread());
+        PythonLibraryDomainSocketEvaluator evaluator =
+                (PythonLibraryDomainSocketEvaluator) ctx.getStateObject(evaluatorId);
+        if (evaluator == null) {
+            Path sockPath = Path.of(ctx.getJobletContext().getServiceContext().getAppConfig()
+                    .getString(NCConfig.Option.PYTHON_DS_PATH));
+            evaluator = new PythonLibraryDomainSocketEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, libMgr,
+                    ctx.getTaskAttemptId(), warningCollector, sourceLoc, sockPath);
+            ctx.getJobletContext().registerDeallocatable(evaluator);
+            evaluator.start();
+            ctx.setStateObject(evaluator);
+        }
+        return evaluator;
+    }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java
deleted file mode 100644
index f82b30d91d..0000000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.library;
-
-import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_UDF_EXCEPTION;
-import static org.msgpack.core.MessagePack.Code.ARRAY16;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
-import org.apache.asterix.external.ipc.PythonIPCProto;
-import org.apache.asterix.external.library.msgpack.MessagePackUtils;
-import org.apache.asterix.om.functions.IExternalFunctionInfo;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.EnumDeserializer;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.TypeTagUtil;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.resources.IDeallocatable;
-import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
-import org.apache.hyracks.ipc.impl.IPCSystem;
-
-public class PythonLibraryEvaluator extends AbstractStateObject implements IDeallocatable {
-
-    public static final String ENTRYPOINT = "entrypoint.py";
-    public static final String SITE_PACKAGES = "site-packages";
-
-    private Process p;
-    private ILibraryManager libMgr;
-    private File pythonHome;
-    private PythonIPCProto proto;
-    private ExternalFunctionResultRouter router;
-    private IPCSystem ipcSys;
-    private String sitePkgs;
-    private List<String> pythonArgs;
-    private Map<String, String> pythonEnv;
-    private TaskAttemptId task;
-    private IWarningCollector warningCollector;
-    private SourceLocation sourceLoc;
-
-    public PythonLibraryEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, ILibraryManager libMgr,
-            File pythonHome, String sitePkgs, List<String> pythonArgs, Map<String, String> pythonEnv,
-            ExternalFunctionResultRouter router, IPCSystem ipcSys, TaskAttemptId task,
-            IWarningCollector warningCollector, SourceLocation sourceLoc) {
-        super(jobId, evaluatorId);
-        this.libMgr = libMgr;
-        this.pythonHome = pythonHome;
-        this.sitePkgs = sitePkgs;
-        this.pythonArgs = pythonArgs;
-        this.pythonEnv = pythonEnv;
-        this.router = router;
-        this.task = task;
-        this.ipcSys = ipcSys;
-        this.warningCollector = warningCollector;
-        this.sourceLoc = sourceLoc;
-    }
-
-    private void initialize() throws IOException, AsterixException {
-        PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id;
-        PythonLibrary library =
-                (PythonLibrary) libMgr.getLibrary(fnId.getLibraryDataverseName(), fnId.getLibraryName());
-        String wd = library.getFile().getAbsolutePath();
-        int port = ipcSys.getSocketAddress().getPort();
-        List<String> args = new ArrayList<>();
-        args.add(pythonHome.getAbsolutePath());
-        args.addAll(pythonArgs);
-        args.add(ENTRYPOINT);
-        args.add(InetAddress.getLoopbackAddress().getHostAddress());
-        args.add(Integer.toString(port));
-        args.add(sitePkgs);
-        ProcessBuilder pb = new ProcessBuilder(args.toArray(new String[0]));
-        pb.environment().putAll(pythonEnv);
-        pb.directory(new File(wd));
-        p = pb.start();
-        proto = new PythonIPCProto(p.getOutputStream(), router, p);
-        proto.start();
-        proto.helo();
-    }
-
-    public long initialize(IExternalFunctionInfo finfo) throws IOException, AsterixException {
-        List<String> externalIdents = finfo.getExternalIdentifier();
-        String packageModule = externalIdents.get(0);
-        String clazz;
-        String fn;
-        String externalIdent1 = externalIdents.get(1);
-        int idx = externalIdent1.lastIndexOf('.');
-        if (idx >= 0) {
-            clazz = externalIdent1.substring(0, idx);
-            fn = externalIdent1.substring(idx + 1);
-        } else {
-            clazz = null;
-            fn = externalIdent1;
-        }
-        return proto.init(packageModule, clazz, fn);
-    }
-
-    public ByteBuffer callPython(long id, IAType[] argTypes, IValueReference[] valueReferences, boolean nullCall)
-            throws IOException {
-        ByteBuffer ret = null;
-        try {
-            ret = proto.call(id, argTypes, valueReferences, nullCall);
-        } catch (AsterixException e) {
-            if (warningCollector.shouldWarn()) {
-                warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage()));
-            }
-        }
-        return ret;
-    }
-
-    public ByteBuffer callPythonMulti(long id, ArrayBackedValueStorage arguments, int numTuples) throws IOException {
-        ByteBuffer ret = null;
-        try {
-            ret = proto.callMulti(id, arguments, numTuples);
-        } catch (AsterixException e) {
-            if (warningCollector.shouldWarn()) {
-                warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage()));
-            }
-        }
-        return ret;
-    }
-
-    @Override
-    public void deallocate() {
-        if (p != null) {
-            boolean dead = false;
-            try {
-                p.destroy();
-                dead = p.waitFor(100, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                //gonna kill it anyway
-            }
-            if (!dead) {
-                p.destroyForcibly();
-            }
-        }
-        router.removeRoute(proto.getRouteId());
-    }
-
-    public static ATypeTag peekArgument(IAType type, IValueReference valueReference) throws HyracksDataException {
-        ATypeTag tag = type.getTypeTag();
-        if (tag == ATypeTag.ANY) {
-            TaggedValuePointable pointy = TaggedValuePointable.FACTORY.createPointable();
-            pointy.set(valueReference);
-            ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
-            IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
-            return MessagePackUtils.peekUnknown(rtType);
-        } else {
-            return MessagePackUtils.peekUnknown(type);
-        }
-    }
-
-    public static void setVoidArgument(ArrayBackedValueStorage argHolder) throws IOException {
-        argHolder.getDataOutput().writeByte(ARRAY16);
-        argHolder.getDataOutput().writeShort((short) 0);
-    }
-
-    public static PythonLibraryEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr,
-            ExternalFunctionResultRouter router, IPCSystem ipcSys, File pythonHome, IHyracksTaskContext ctx,
-            String sitePkgs, List<String> pythonArgs, Map<String, String> pythonEnv, IWarningCollector warningCollector,
-            SourceLocation sourceLoc) throws IOException, AsterixException {
-        PythonLibraryEvaluatorId evaluatorId = new PythonLibraryEvaluatorId(finfo.getLibraryDataverseName(),
-                finfo.getLibraryName(), Thread.currentThread());
-        PythonLibraryEvaluator evaluator = (PythonLibraryEvaluator) ctx.getStateObject(evaluatorId);
-        if (evaluator == null) {
-            evaluator = new PythonLibraryEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, libMgr, pythonHome,
-                    sitePkgs, pythonArgs, pythonEnv, router, ipcSys, ctx.getTaskAttemptId(), warningCollector,
-                    sourceLoc);
-            ctx.getJobletContext().registerDeallocatable(evaluator);
-            evaluator.initialize();
-            ctx.setStateObject(evaluator);
-        }
-        return evaluator;
-    }
-
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java
index 06c9bc99a6..63a6ec37c6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java
@@ -18,10 +18,12 @@
  */
 package org.apache.asterix.external.library;
 
-import static org.apache.asterix.external.library.PythonLibraryEvaluator.SITE_PACKAGES;
+import static org.apache.asterix.external.library.PythonLibraryTCPSocketEvaluator.SITE_PACKAGES;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -31,8 +33,10 @@ import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.external.api.ILibraryEvaluator;
 import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
 import org.apache.asterix.om.functions.IExternalFunctionInfo;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -40,83 +44,116 @@ import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 
 public class PythonLibraryEvaluatorFactory {
-    private final ILibraryManager libraryManager;
-    private final IPCSystem ipcSys;
-    private final File pythonPath;
-    private final IHyracksTaskContext ctx;
-    private final ExternalFunctionResultRouter router;
-    private final String sitePackagesPath;
-    private final List<String> pythonArgs;
-    private final Map<String, String> pythonEnv;
+
+    private ILibraryManager libraryManager;
+    private IPCSystem ipcSys;
+    private File pythonPath;
+    private IHyracksTaskContext ctx;
+    private ExternalFunctionResultRouter router;
+    private String sitePackagesPath;
+    private List<String> pythonArgs;
+    private Map<String, String> pythonEnv;
+
+    private boolean domainSockEnable;
 
     public PythonLibraryEvaluatorFactory(IHyracksTaskContext ctx) throws AsterixException {
         this.ctx = ctx;
+        String dsPath =
+                ctx.getJobletContext().getServiceContext().getAppConfig().getString(NCConfig.Option.PYTHON_DS_PATH);
+        config(dsPath == null ? null : Path.of(dsPath));
         libraryManager = ((INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext())
                 .getLibraryManager();
-        router = libraryManager.getRouter();
-        ipcSys = libraryManager.getIPCI();
-        IApplicationConfig appCfg = ctx.getJobletContext().getServiceContext().getAppConfig();
-        String pythonPathCmd = appCfg.getString(NCConfig.Option.PYTHON_CMD);
-        boolean findPython = appCfg.getBoolean(NCConfig.Option.PYTHON_CMD_AUTOLOCATE);
-        pythonArgs = new ArrayList<>();
-        if (pythonPathCmd == null) {
-            if (findPython) {
-                //if absolute path to interpreter is not specified, try to use environmental python
-                pythonPathCmd = "/usr/bin/env";
-                pythonArgs.add("python3");
-            } else {
-                throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION, "Python interpreter not specified, and "
-                        + NCConfig.Option.PYTHON_CMD_AUTOLOCATE.ini() + " is false");
+        if (!domainSockEnable) {
+            router = libraryManager.getRouter();
+            ipcSys = libraryManager.getIPCI();
+            IApplicationConfig appCfg = ctx.getJobletContext().getServiceContext().getAppConfig();
+            String pythonPathCmd = appCfg.getString(NCConfig.Option.PYTHON_CMD);
+            boolean findPython = appCfg.getBoolean(NCConfig.Option.PYTHON_CMD_AUTOLOCATE);
+            pythonArgs = new ArrayList<>();
+            if (pythonPathCmd == null) {
+                if (findPython) {
+                    //if absolute path to interpreter is not specified, try to use environmental python
+                    pythonPathCmd = "/usr/bin/env";
+                    pythonArgs.add("python3");
+                } else {
+                    throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
+                            "Python interpreter not specified or domain socket not found, and "
+                                    + NCConfig.Option.PYTHON_CMD_AUTOLOCATE.ini() + " is false");
+                }
             }
-        }
-        pythonEnv = new HashMap<>();
-        String[] envRaw = appCfg.getStringArray((NCConfig.Option.PYTHON_ENV));
-        if (envRaw != null) {
-            for (String rawEnvArg : envRaw) {
-                //TODO: i think equals is shared among all unixes and windows. but it needs verification
-                if (rawEnvArg.length() < 1) {
-                    continue;
+            pythonEnv = new HashMap<>();
+            String[] envRaw = appCfg.getStringArray((NCConfig.Option.PYTHON_ENV));
+            if (envRaw != null) {
+                for (String rawEnvArg : envRaw) {
+                    //TODO: i think equals is shared among all unixes and windows. but it needs verification
+                    if (rawEnvArg.length() < 1) {
+                        continue;
+                    }
+                    String[] rawArgSplit = rawEnvArg.split("(?<!\\\\)=", 2);
+                    if (rawArgSplit.length < 2) {
+                        throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
+                                "Invalid environment variable format detected.");
+                    }
+                    pythonEnv.put(rawArgSplit[0], rawArgSplit[1]);
                 }
-                String[] rawArgSplit = rawEnvArg.split("(?<!\\\\)=", 2);
-                if (rawArgSplit.length < 2) {
-                    throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
-                            "Invalid environment variable format detected.");
+            }
+            pythonPath = new File(pythonPathCmd);
+            List<String> sitePkgs = new ArrayList<>();
+            sitePkgs.add(SITE_PACKAGES);
+            String[] addlSitePackages = appCfg.getStringArray((NCConfig.Option.PYTHON_ADDITIONAL_PACKAGES));
+            for (String sitePkg : addlSitePackages) {
+                if (sitePkg.length() > 0) {
+                    sitePkgs.add(sitePkg);
                 }
-                pythonEnv.put(rawArgSplit[0], rawArgSplit[1]);
             }
-        }
-        pythonPath = new File(pythonPathCmd);
-        List<String> sitePkgs = new ArrayList<>();
-        sitePkgs.add(SITE_PACKAGES);
-        String[] addlSitePackages = appCfg.getStringArray((NCConfig.Option.PYTHON_ADDITIONAL_PACKAGES));
-        for (String sitePkg : addlSitePackages) {
-            if (sitePkg.length() > 0) {
-                sitePkgs.add(sitePkg);
+            if (appCfg.getBoolean(NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK)) {
+                sitePkgs.add("ipc" + File.separator + SITE_PACKAGES + File.separator);
             }
-        }
-        if (appCfg.getBoolean(NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK)) {
-            sitePkgs.add("ipc" + File.separator + SITE_PACKAGES + File.separator);
-        }
-        String[] pythonArgsRaw = appCfg.getStringArray(NCConfig.Option.PYTHON_ARGS);
-        if (pythonArgsRaw != null) {
-            for (String arg : pythonArgsRaw) {
-                if (arg.length() > 0) {
-                    pythonArgs.add(arg);
+            String[] pythonArgsRaw = appCfg.getStringArray(NCConfig.Option.PYTHON_ARGS);
+            if (pythonArgsRaw != null) {
+                for (String arg : pythonArgsRaw) {
+                    if (arg.length() > 0) {
+                        pythonArgs.add(arg);
+                    }
                 }
             }
+            StringBuilder sitePackagesPathBuilder = new StringBuilder();
+            for (int i = 0; i < sitePkgs.size() - 1; i++) {
+                sitePackagesPathBuilder.append(sitePkgs.get(i));
+                sitePackagesPathBuilder.append(File.pathSeparator);
+            }
+            sitePackagesPathBuilder.append(sitePkgs.get(sitePkgs.size() - 1));
+            sitePackagesPath = sitePackagesPathBuilder.toString();
         }
-        StringBuilder sitePackagesPathBuilder = new StringBuilder();
-        for (int i = 0; i < sitePkgs.size() - 1; i++) {
-            sitePackagesPathBuilder.append(sitePkgs.get(i));
-            sitePackagesPathBuilder.append(File.pathSeparator);
-        }
-        sitePackagesPathBuilder.append(sitePkgs.get(sitePkgs.size() - 1));
-        sitePackagesPath = sitePackagesPathBuilder.toString();
     }
 
-    public PythonLibraryEvaluator getEvaluator(IExternalFunctionInfo fnInfo, SourceLocation sourceLoc)
+    public ILibraryEvaluator getEvaluator(IExternalFunctionInfo fnInfo, SourceLocation sourceLoc)
             throws IOException, AsterixException {
-        return PythonLibraryEvaluator.getInstance(fnInfo, libraryManager, router, ipcSys, pythonPath, ctx,
-                sitePackagesPath, pythonArgs, pythonEnv, ctx.getWarningCollector(), sourceLoc);
+        if (domainSockEnable) {
+            return PythonLibraryDomainSocketEvaluator.getInstance(fnInfo, libraryManager, ctx,
+                    ctx.getWarningCollector(), sourceLoc);
+        } else {
+            return PythonLibraryTCPSocketEvaluator.getInstance(fnInfo, libraryManager, router, ipcSys, pythonPath, ctx,
+                    sitePackagesPath, pythonArgs, pythonEnv, ctx.getWarningCollector(), sourceLoc);
+        }
+    }
+
+    private void config(Path sockPath) throws AsterixException {
+        if (sockPath == null) {
+            domainSockEnable = false;
+            return;
+        }
+        Runtime rt = Runtime.getRuntime();
+        if (rt.version().feature() >= 17 && SystemUtils.IS_OS_LINUX) {
+            if (Files.exists(sockPath)) {
+                domainSockEnable = true;
+            } else {
+                throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
+                        "Domain socket was not found at specified path");
+            }
+        } else {
+            throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
+                    "Domain socket path specified, but Java version is below 17 or OS is not Linux");
+        }
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryTCPSocketEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryTCPSocketEvaluator.java
new file mode 100644
index 0000000000..385d738f83
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryTCPSocketEvaluator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
+import org.apache.asterix.external.ipc.PythonTCPSocketProto;
+import org.apache.asterix.om.functions.IExternalFunctionInfo;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+
+public class PythonLibraryTCPSocketEvaluator extends AbstractLibrarySocketEvaluator {
+
+    public static final String ENTRYPOINT = "entrypoint.py";
+    public static final String SITE_PACKAGES = "site-packages";
+
+    private Process p;
+    private ILibraryManager libMgr;
+    private File pythonHome;
+    private ExternalFunctionResultRouter router;
+    private IPCSystem ipcSys;
+    private String sitePkgs;
+    private List<String> pythonArgs;
+    private Map<String, String> pythonEnv;
+
+    public PythonLibraryTCPSocketEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, ILibraryManager libMgr,
+            File pythonHome, String sitePkgs, List<String> pythonArgs, Map<String, String> pythonEnv,
+            ExternalFunctionResultRouter router, IPCSystem ipcSys, TaskAttemptId task,
+            IWarningCollector warningCollector, SourceLocation sourceLoc) {
+        super(jobId, evaluatorId, task, warningCollector, sourceLoc);
+        this.libMgr = libMgr;
+        this.pythonHome = pythonHome;
+        this.sitePkgs = sitePkgs;
+        this.pythonArgs = pythonArgs;
+        this.pythonEnv = pythonEnv;
+        this.router = router;
+        this.ipcSys = ipcSys;
+    }
+
+    @Override
+    public void start() throws IOException, AsterixException {
+        PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id;
+        PythonLibrary library =
+                (PythonLibrary) libMgr.getLibrary(fnId.getLibraryDataverseName(), fnId.getLibraryName());
+        String wd = library.getFile().getAbsolutePath();
+        int port = ipcSys.getSocketAddress().getPort();
+        List<String> args = new ArrayList<>();
+        args.add(pythonHome.getAbsolutePath());
+        args.addAll(pythonArgs);
+        args.add(ENTRYPOINT);
+        args.add(InetAddress.getLoopbackAddress().getHostAddress());
+        args.add(Integer.toString(port));
+        args.add(sitePkgs);
+        ProcessBuilder pb = new ProcessBuilder(args.toArray(new String[0]));
+        pb.environment().putAll(pythonEnv);
+        pb.directory(new File(wd));
+        p = pb.start();
+        proto = new PythonTCPSocketProto(p.getOutputStream(), router, p);
+        proto.start();
+        proto.helo();
+    }
+
+    @Override
+    public void deallocate() {
+        if (p != null) {
+            boolean dead = false;
+            try {
+                p.destroy();
+                dead = p.waitFor(100, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                //gonna kill it anyway
+            }
+            if (!dead) {
+                p.destroyForcibly();
+            }
+        }
+        router.removeRoute(proto.getRouteId());
+    }
+
+    static PythonLibraryTCPSocketEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr,
+            ExternalFunctionResultRouter router, IPCSystem ipcSys, File pythonHome, IHyracksTaskContext ctx,
+            String sitePkgs, List<String> pythonArgs, Map<String, String> pythonEnv, IWarningCollector warningCollector,
+            SourceLocation sourceLoc) throws IOException, AsterixException {
+        PythonLibraryEvaluatorId evaluatorId = new PythonLibraryEvaluatorId(finfo.getLibraryDataverseName(),
+                finfo.getLibraryName(), Thread.currentThread());
+        PythonLibraryTCPSocketEvaluator evaluator = (PythonLibraryTCPSocketEvaluator) ctx.getStateObject(evaluatorId);
+        if (evaluator == null) {
+            evaluator = new PythonLibraryTCPSocketEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, libMgr,
+                    pythonHome, sitePkgs, pythonArgs, pythonEnv, router, ipcSys, ctx.getTaskAttemptId(),
+                    warningCollector, sourceLoc);
+            ctx.getJobletContext().registerDeallocatable(evaluator);
+            evaluator.start();
+            ctx.setStateObject(evaluator);
+        }
+        return evaluator;
+    }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
index 741dad2f85..5f8a3f01b5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
@@ -33,12 +33,13 @@ import java.util.List;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.ipc.PythonIPCProto;
-import org.apache.asterix.external.library.PythonLibraryEvaluator;
+import org.apache.asterix.external.api.IExternalLangIPCProto;
+import org.apache.asterix.external.api.ILibraryEvaluator;
 import org.apache.asterix.external.library.PythonLibraryEvaluatorFactory;
 import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
 import org.apache.asterix.external.library.msgpack.MsgPackPointableVisitor;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
 import org.apache.asterix.om.pointables.PointableAllocator;
 import org.apache.asterix.om.types.ATypeTag;
@@ -50,6 +51,7 @@ import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneO
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -87,7 +89,7 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne
             private ArrayBackedValueStorage outputWrapper;
             private List<ArrayBackedValueStorage> argHolders;
             ArrayTupleBuilder tupleBuilder;
-            private List<Pair<Long, PythonLibraryEvaluator>> libraryEvaluators;
+            private List<Pair<Long, ILibraryEvaluator>> libraryEvaluators;
             private ATypeTag[][] nullCalls;
             private int[] numCalls;
             private VoidPointable ref;
@@ -97,6 +99,7 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne
             private MessageUnpackerToADM unpackerToADM;
             private PointableAllocator pointableAllocator;
             private MsgPackPointableVisitor pointableVisitor;
+            private TaggedValuePointable anyPointer;
 
             @Override
             public void open() throws HyracksDataException {
@@ -109,7 +112,7 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne
                 try {
                     PythonLibraryEvaluatorFactory evalFactory = new PythonLibraryEvaluatorFactory(ctx);
                     for (IExternalFunctionDescriptor fnDesc : fnDescs) {
-                        PythonLibraryEvaluator eval = evalFactory.getEvaluator(fnDesc.getFunctionInfo(), sourceLoc);
+                        ILibraryEvaluator eval = evalFactory.getEvaluator(fnDesc.getFunctionInfo(), sourceLoc);
                         long id = eval.initialize(fnDesc.getFunctionInfo());
                         libraryEvaluators.add(new Pair<>(id, eval));
                     }
@@ -133,6 +136,7 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne
                 unpackerToADM = new MessageUnpackerToADM();
                 pointableAllocator = new PointableAllocator();
                 pointableVisitor = new MsgPackPointableVisitor();
+                anyPointer = TaggedValuePointable.FACTORY.createPointable();
             }
 
             private void resetBuffers(int numTuples, int[] numCalls) {
@@ -177,8 +181,12 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne
                             int numEntries = unpacker.unpackArrayHeader();
                             for (int j = 0; j < numEntries; j++) {
                                 if (ctx.getWarningCollector().shouldWarn()) {
-                                    ctx.getWarningCollector().warn(Warning.of(sourceLoc,
-                                            ErrorCode.EXTERNAL_UDF_EXCEPTION, unpacker.unpackString()));
+                                    //TODO: in domain socket mode, a NUL can appear at the end of the stacktrace strings.
+                                    //      this should probably not happen but warnings with control characters should
+                                    //      also be properly escaped
+                                    ctx.getWarningCollector()
+                                            .warn(Warning.of(sourceLoc, ErrorCode.EXTERNAL_UDF_EXCEPTION,
+                                                    unpacker.unpackString().replace('\0', ' ')));
                                 }
                             }
                         } catch (MessagePackException e) {
@@ -211,8 +219,8 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne
                                 for (int colIdx = 0; colIdx < cols.length; colIdx++) {
                                     ref.set(buffer.array(), tRef.getFieldStart(cols[colIdx]),
                                             tRef.getFieldLength(cols[colIdx]));
-                                    ATypeTag argumentPresence = PythonLibraryEvaluator
-                                            .peekArgument(fnDescs[func].getArgumentTypes()[colIdx], ref);
+                                    ATypeTag argumentPresence = ExternalDataUtils
+                                            .peekArgument(fnDescs[func].getArgumentTypes()[colIdx], ref, anyPointer);
                                     argumentStatus = handleNullMatrix(func, t, argumentPresence, argumentStatus);
                                 }
                             }
@@ -224,7 +232,7 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne
                                 for (int colIdx = 0; colIdx < cols.length; colIdx++) {
                                     ref.set(buffer.array(), tRef.getFieldStart(cols[colIdx]),
                                             tRef.getFieldLength(cols[colIdx]));
-                                    PythonIPCProto.visitValueRef(fnDescs[func].getArgumentTypes()[colIdx],
+                                    IExternalLangIPCProto.visitValueRef(fnDescs[func].getArgumentTypes()[colIdx],
                                             argHolders.get(func).getDataOutput(), ref, pointableAllocator,
                                             pointableVisitor, fnDescs[func].getFunctionInfo().getNullCall());
                                 }
@@ -232,21 +240,25 @@ public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOne
                                 numCalls[func]--;
                             }
                             if (cols.length == 0) {
-                                PythonLibraryEvaluator.setVoidArgument(argHolders.get(func));
+                                ExternalDataUtils.setVoidArgument(argHolders.get(func));
                             }
                         }
                     }
 
                     //TODO: maybe this could be done in parallel for each unique library evaluator?
                     for (int argHolderIdx = 0; argHolderIdx < argHolders.size(); argHolderIdx++) {
-                        Pair<Long, PythonLibraryEvaluator> fnEval = libraryEvaluators.get(argHolderIdx);
-                        ByteBuffer columnResult = fnEval.getSecond().callPythonMulti(fnEval.getFirst(),
+                        Pair<Long, ILibraryEvaluator> fnEval = libraryEvaluators.get(argHolderIdx);
+                        ByteBuffer columnResult = fnEval.getSecond().callMulti(fnEval.getFirst(),
                                 argHolders.get(argHolderIdx), numCalls[argHolderIdx]);
                         if (columnResult != null) {
                             Pair<ByteBuffer, Counter> resultholder = batchResults.get(argHolderIdx);
-                            if (resultholder.getFirst().capacity() < columnResult.capacity()) {
-                                ByteBuffer realloc = ctx.reallocateFrame(resultholder.getFirst(),
-                                        columnResult.capacity() * 2, false);
+                            if (resultholder.getFirst().capacity() < columnResult.remaining()) {
+                                ByteBuffer realloc =
+                                        ctx.reallocateFrame(resultholder.getFirst(),
+                                                ctx.getInitialFrameSize()
+                                                        * ((columnResult.remaining() / ctx.getInitialFrameSize()) + 1),
+                                                false);
+                                realloc.limit(columnResult.limit());
                                 resultholder.setFirst(realloc);
                             }
                             ByteBuffer resultBuf = resultholder.getFirst();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 62dc07425f..5bf584487e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -35,6 +35,7 @@ import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.val
 import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
 import static org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties;
 import static org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS;
+import static org.msgpack.core.MessagePack.Code.ARRAY16;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
@@ -66,11 +67,15 @@ import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
 import org.apache.asterix.external.library.JavaLibrary;
+import org.apache.asterix.external.library.msgpack.MessagePackUtils;
 import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
 import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
 import org.apache.asterix.runtime.evaluators.common.NumberUtils;
 import org.apache.asterix.runtime.projection.DataProjectionInfo;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
@@ -78,6 +83,9 @@ import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.parsers.BooleanParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
@@ -386,7 +394,8 @@ public class ExternalDataUtils {
     /**
      * Fills the configuration of the external dataset and its adapter with default values if not provided by user.
      *
-     * @param configuration external data configuration
+     * @param configuration
+     *            external data configuration
      */
     public static void defaultConfiguration(Map<String, String> configuration) {
         String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
@@ -408,8 +417,10 @@ public class ExternalDataUtils {
      * Prepares the configuration of the external data and its adapter by filling the information required by
      * adapters and parsers.
      *
-     * @param adapterName   adapter name
-     * @param configuration external data configuration
+     * @param adapterName
+     *            adapter name
+     * @param configuration
+     *            external data configuration
      */
     public static void prepare(String adapterName, Map<String, String> configuration) {
         if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) {
@@ -431,7 +442,8 @@ public class ExternalDataUtils {
      * Normalizes the values of certain parameters of the adapter configuration. This should happen before persisting
      * the metadata (e.g. when creating external datasets or feeds) and when creating an adapter factory.
      *
-     * @param configuration external data configuration
+     * @param configuration
+     *            external data configuration
      */
     public static void normalize(Map<String, String> configuration) {
         // normalize the "format" parameter
@@ -451,8 +463,10 @@ public class ExternalDataUtils {
     /**
      * Validates the parameter values of the adapter configuration. This should happen after normalizing the values.
      *
-     * @param configuration external data configuration
-     * @throws HyracksDataException HyracksDataException
+     * @param configuration
+     *            external data configuration
+     * @throws HyracksDataException
+     *             HyracksDataException
      */
     public static void validate(Map<String, String> configuration) throws HyracksDataException {
         String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
@@ -514,7 +528,8 @@ public class ExternalDataUtils {
      * Validates adapter specific external dataset properties. Specific properties for different adapters should be
      * validated here
      *
-     * @param configuration properties
+     * @param configuration
+     *            properties
      */
     public static void validateAdapterSpecificProperties(Map<String, String> configuration, SourceLocation srcLoc,
             IWarningCollector collector, IApplicationContext appCtx) throws CompilationException {
@@ -542,7 +557,8 @@ public class ExternalDataUtils {
     /**
      * Regex matches all the provided patterns against the provided path
      *
-     * @param path path to check against
+     * @param path
+     *            path to check against
      * @return {@code true} if all patterns match, {@code false} otherwise
      */
     public static boolean matchPatterns(List<Matcher> matchers, String path) {
@@ -557,7 +573,8 @@ public class ExternalDataUtils {
     /**
      * Converts the wildcard to proper regex
      *
-     * @param pattern wildcard pattern to convert
+     * @param pattern
+     *            wildcard pattern to convert
      * @return regex expression
      */
     public static String patternToRegex(String pattern) {
@@ -646,7 +663,8 @@ public class ExternalDataUtils {
     /**
      * Adjusts the prefix (if needed) and returns it
      *
-     * @param configuration configuration
+     * @param configuration
+     *            configuration
      */
     public static String getPrefix(Map<String, String> configuration) {
         return getPrefix(configuration, true);
@@ -661,8 +679,10 @@ public class ExternalDataUtils {
     }
 
     /**
-     * @param configuration configuration map
-     * @throws CompilationException Compilation exception
+     * @param configuration
+     *            configuration map
+     * @throws CompilationException
+     *             Compilation exception
      */
     public static void validateIncludeExclude(Map<String, String> configuration) throws CompilationException {
         // Ensure that include and exclude are not provided at the same time + ensure valid format or property
@@ -746,8 +766,10 @@ public class ExternalDataUtils {
     /**
      * Validate Parquet dataset's declared type and configuration
      *
-     * @param properties        external dataset configuration
-     * @param datasetRecordType dataset declared type
+     * @param properties
+     *            external dataset configuration
+     * @param datasetRecordType
+     *            dataset declared type
      */
     public static void validateParquetTypeAndConfiguration(Map<String, String> properties,
             ARecordType datasetRecordType) throws CompilationException {
@@ -780,7 +802,8 @@ public class ExternalDataUtils {
     /**
      * Serialize {@link ARecordType} as Base64 string to pass it to {@link org.apache.hadoop.conf.Configuration}
      *
-     * @param expectedType expected type
+     * @param expectedType
+     *            expected type
      * @return the expected type as Base64 string
      */
     private static String serializeExpectedTypeToString(ARecordType expectedType) throws IOException {
@@ -799,7 +822,8 @@ public class ExternalDataUtils {
      * Serialize {@link FunctionCallInformation} map as Base64 string to pass it to
      * {@link org.apache.hadoop.conf.Configuration}
      *
-     * @param functionCallInfoMap function information map
+     * @param functionCallInfoMap
+     *            function information map
      * @return function information map as Base64 string
      */
     static String serializeFunctionCallInfoToString(Map<String, FunctionCallInformation> functionCallInfoMap)
@@ -830,4 +854,22 @@ public class ExternalDataUtils {
     public static Optional<String> getFirstNotNull(Map<String, String> configuration, String... parameters) {
         return Arrays.stream(parameters).filter(field -> configuration.get(field) != null).findFirst();
     }
+
+    public static ATypeTag peekArgument(IAType type, IValueReference valueReference, TaggedValuePointable pointy)
+            throws HyracksDataException {
+        ATypeTag tag = type.getTypeTag();
+        if (tag == ATypeTag.ANY) {
+            pointy.set(valueReference);
+            ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
+            IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
+            return MessagePackUtils.peekUnknown(rtType);
+        } else {
+            return MessagePackUtils.peekUnknown(type);
+        }
+    }
+
+    public static void setVoidArgument(ArrayBackedValueStorage argHolder) throws IOException {
+        argHolder.getDataOutput().writeByte(ARRAY16);
+        argHolder.getDataOutput().writeShort((short) 0);
+    }
 }
diff --git a/asterixdb/asterix-docker/docker/.gitattributes b/asterixdb/asterix-podman/docker/.gitattributes
similarity index 100%
rename from asterixdb/asterix-docker/docker/.gitattributes
rename to asterixdb/asterix-podman/docker/.gitattributes
diff --git a/asterixdb/asterix-docker/docker/Dockerfile b/asterixdb/asterix-podman/docker/Dockerfile
similarity index 100%
rename from asterixdb/asterix-docker/docker/Dockerfile
rename to asterixdb/asterix-podman/docker/Dockerfile
diff --git a/asterixdb/asterix-docker/docker/asterix-configuration.xml b/asterixdb/asterix-podman/docker/asterix-configuration.xml
similarity index 100%
rename from asterixdb/asterix-docker/docker/asterix-configuration.xml
rename to asterixdb/asterix-podman/docker/asterix-configuration.xml
diff --git a/asterixdb/asterix-docker/docker/fbm.adm b/asterixdb/asterix-podman/docker/fbm.adm
similarity index 100%
rename from asterixdb/asterix-docker/docker/fbm.adm
rename to asterixdb/asterix-podman/docker/fbm.adm
diff --git a/asterixdb/asterix-docker/docker/fbu.adm b/asterixdb/asterix-podman/docker/fbu.adm
similarity index 100%
rename from asterixdb/asterix-docker/docker/fbu.adm
rename to asterixdb/asterix-podman/docker/fbu.adm
diff --git a/asterixdb/asterix-docker/docker/supervisord.conf b/asterixdb/asterix-podman/docker/supervisord.conf
similarity index 100%
copy from asterixdb/asterix-docker/docker/supervisord.conf
copy to asterixdb/asterix-podman/docker/supervisord.conf
diff --git a/asterixdb/asterix-docker/docker/twm.adm b/asterixdb/asterix-podman/docker/twm.adm
similarity index 100%
rename from asterixdb/asterix-docker/docker/twm.adm
rename to asterixdb/asterix-podman/docker/twm.adm
diff --git a/asterixdb/asterix-docker/docker/twu.adm b/asterixdb/asterix-podman/docker/twu.adm
similarity index 100%
rename from asterixdb/asterix-docker/docker/twu.adm
rename to asterixdb/asterix-podman/docker/twu.adm
diff --git a/asterixdb/asterix-podman/pom.xml b/asterixdb/asterix-podman/pom.xml
new file mode 100644
index 0000000000..3d32518587
--- /dev/null
+++ b/asterixdb/asterix-podman/pom.xml
@@ -0,0 +1,156 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>apache-asterixdb</artifactId>
+    <groupId>org.apache.asterix</groupId>
+    <version>0.9.8-SNAPSHOT</version>
+  </parent>
+  <artifactId>asterix-podman</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>asterix-server</artifactId>
+            <version>${project.version}</version>
+            <type>deb</type>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.asterix</groupId>
+            <artifactId>asterix-app</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.asterix</groupId>
+            <artifactId>asterix-test-framework</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.17.1</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+  <properties>
+    <root.dir>${basedir}/..</root.dir>
+  </properties>
+
+  <licenses>
+    <license>
+      <name>Apache License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+      <comments>A business-friendly OSS license</comments>
+    </license>
+  </licenses>
+   <build>
+       <plugins>
+           <plugin>
+               <groupId>org.apache.rat</groupId>
+               <artifactId>apache-rat-plugin</artifactId>
+               <configuration>
+                   <excludes combine.children="append">
+                       <exclude>src/test/resources/setup.sh</exclude>
+                       <exclude>src/test/resources/passwd</exclude>
+                       <exclude>src/test/resources/socktest/Containerfile</exclude>
+                       <exclude>src/test/resources/testenv.conf</exclude>
+                   </excludes>
+               </configuration>
+           </plugin>
+       </plugins>
+   </build>
+    <profiles>
+        <profile>
+            <id>podman.tests</id>
+            <properties>
+                <test.excludes>**/*.java</test.excludes>
+                <itest.includes>**/PodmanPythonFunctionIT.java</itest.includes>
+                <failIfNoTests>false</failIfNoTests>
+            </properties>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>nl.lexemmens</groupId>
+                        <artifactId>podman-maven-plugin</artifactId>
+                        <version>1.8.0</version>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>build</goal>
+                                </goals>
+                                <phase>generate-test-resources</phase>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <skipAuth>true</skipAuth>
+                            <images>
+                                <image>
+                                    <name>asterixdb/socktest</name>
+                                    <build>
+                                        <pull>false</pull>
+                                        <createLatestTag>true</createLatestTag>
+                                        <containerFileDir>src/test/resources/socktest</containerFileDir>
+                                    </build>
+                                </image>
+                            </images>
+                        </configuration>
+                    </plugin>
+                    <plugin>
+                        <artifactId>maven-resources-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>copy-external-data-resources</id>
+                                <phase>generate-resources</phase>
+                                <goals>
+                                    <goal>copy-resources</goal>
+                                </goals>
+                                <configuration>
+                                    <outputDirectory>target/</outputDirectory>
+                                    <overwrite>true</overwrite>
+                                    <resources>
+                                        <resource>
+                                            <directory>../asterix-server/target</directory>
+                                            <includes>
+                                                <include>asterix-server*.deb</include>
+                                            </includes>
+                                        </resource>
+                                    </resources>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
+</project>
diff --git a/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanPythonFunctionIT.java b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanPythonFunctionIT.java
new file mode 100644
index 0000000000..f0f89cd67f
--- /dev/null
+++ b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanPythonFunctionIT.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.podman;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import com.github.dockerjava.api.DockerClient;
+
+/**
+ * Runs the Python UDF tests within a container using domain sockets.
+ */
+@RunWith(Parameterized.class)
+public class PodmanPythonFunctionIT {
+    public static final DockerImageName ASTERIX_IMAGE = DockerImageName.parse("asterixdb/socktest");
+    @ClassRule
+    public static GenericContainer<?> asterix = new GenericContainer(ASTERIX_IMAGE).withExposedPorts(19004, 5006, 19002)
+            .withFileSystemBind("../asterix-app/", "/var/tmp/asterix-app/", BindMode.READ_WRITE);
+    protected static final String TEST_CONFIG_FILE_NAME = "../asterix-app/src/test/resources/cc.conf";
+    private static final boolean cleanupOnStop = true;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        final TestExecutor testExecutor = new TestExecutor(
+                List.of(InetSocketAddress.createUnresolved(asterix.getHost(), asterix.getMappedPort(19002))));
+        asterix.execInContainer("/opt/setup.sh");
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor, false, true, new PodmanUDFLibrarian(asterix));
+        setEndpoints(testExecutor);
+        testExecutor.waitForClusterActive(60, TimeUnit.SECONDS);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        try {
+        } finally {
+            ExecutionTestUtil.tearDown(cleanupOnStop);
+            DockerClient dc = DockerClientFactory.instance().client();
+            dc.removeImageCmd(ASTERIX_IMAGE.asCanonicalNameString()).withForce(true).exec();
+        }
+    }
+
+    @Parameters(name = "PodmanPythonFunctionIT {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests("only_sqlpp.xml", "testsuite_it_python.xml",
+                "../asterix-app/src/test/resources/runtimets");
+    }
+
+    protected TestCaseContext tcCtx;
+
+    public PodmanPythonFunctionIT(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @Test
+    public void test() throws Exception {
+        LangExecutionUtil.test(tcCtx);
+    }
+
+    private static void setEndpoints(TestExecutor testExecutor) {
+        final Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+        final String ip = asterix.getHost();
+        final String nodeId = "asterix_nc";
+        int apiPort = asterix.getMappedPort(19004);
+        ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
+        testExecutor.setNcEndPoints(ncEndPoints);
+    }
+}
diff --git a/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
new file mode 100644
index 0000000000..025f607a30
--- /dev/null
+++ b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.podman;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.asterix.app.external.IExternalUDFLibrarian;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.json.JsonReadFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class PodmanUDFLibrarian implements IExternalUDFLibrarian {
+    final GenericContainer<?> asterix;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public PodmanUDFLibrarian(GenericContainer asterix) {
+        OBJECT_MAPPER.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
+        this.asterix = asterix;
+    }
+
+    @Override
+    public void install(URI path, String type, String libPath, Pair<String, String> credentials) throws Exception {
+        Container.ExecResult curlResult = null;
+        int retryCt = 0;
+        while (retryCt < 10) {
+            try {
+                curlResult = asterix.execInContainer("curl", "--no-progress-meter", "-X", "POST", "-u",
+                        credentials.first + ":" + credentials.second, "-F",
+                        "data=@" + "/var/tmp/asterix-app/" + libPath, "-F", "type=" + type,
+                        "http://localhost:19004" + path.getRawPath());
+                handleResponse(curlResult);
+                return;
+            } catch (RuntimeException e) {
+                retryCt++;
+                if (retryCt > 9)
+                    throw e;
+            }
+        }
+    }
+
+    @Override
+    public void uninstall(URI path, Pair<String, String> credentials) throws IOException, AsterixException {
+        try {
+            Container.ExecResult curlResult = asterix.execInContainer("curl", "-X", "DELETE", "-u",
+                    credentials.first + ":" + credentials.second, "http://localhost:19004" + path.getPath());
+            handleResponse(curlResult);
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private void handleResponse(Container.ExecResult result) throws AsterixException, JsonProcessingException {
+        if (result.getExitCode() != 0) {
+            throw new AsterixException(result.getStderr());
+        }
+        JsonNode resp = OBJECT_MAPPER.readTree(result.getStdout().replace('\0', ' '));
+        if (resp.has("error")) {
+            throw new AsterixException(resp.get("error").toString());
+        }
+        return;
+    }
+}
diff --git a/asterixdb/asterix-docker/docker/supervisord.conf b/asterixdb/asterix-podman/src/test/resources/cc.conf
similarity index 55%
copy from asterixdb/asterix-docker/docker/supervisord.conf
copy to asterixdb/asterix-podman/src/test/resources/cc.conf
index 20f1797ed8..e4cbd73e48 100644
--- a/asterixdb/asterix-docker/docker/supervisord.conf
+++ b/asterixdb/asterix-podman/src/test/resources/cc.conf
@@ -15,26 +15,22 @@
 ; specific language governing permissions and limitations
 ; under the License.
 
-[supervisord]
-nodaemon=true
+[nc/asterix_nc1]
+txn.log.dir=/opt/apache-asterixdb/data/txnlog
+core.dump.dir=/opt/apache-asterixdb/logs/coredump
+iodevices=/opt/apache-asterixdb/data/
+nc.api.port=19004
 
-[program:asterixnc1]
-command=/asterixdb/bin/asterixncservice -logdir - -config-file "/asterixdb/opt/local/conf/blue.conf"
-stdout_logfile=/dev/stdout
-stdout_logfile_maxbytes=0
-stderr_logfile=/dev/stderr
-stderr_logfile_maxbytes=0
+[nc]
+address=127.0.0.1
+command=asterixnc
+credential.file=/opt/apache-asterixdb/etc/passwd
+jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5006
+python.ds.path = /tmp/pyudf.socket
 
-[program:asterixnc2]
-command=/asterixdb/bin/asterixncservice -logdir -
-stdout_logfile=/dev/stdout
-stdout_logfile_maxbytes=0
-stderr_logfile=/dev/stderr
-stderr_logfile_maxbytes=0
+[cc]
+address = 127.0.0.1
 
-[program:asterixcc]
-command=/asterixdb/bin/asterixcc -config-file "/asterixdb/opt/local/conf/cc.conf"
-stdout_logfile=/dev/stdout
-stdout_logfile_maxbytes=0
-stderr_logfile=/dev/stderr
-stderr_logfile_maxbytes=0
+[common]
+log.level = INFO
+log.dir = /opt/apache-asterixdb/logs/
diff --git a/asterixdb/asterix-podman/src/test/resources/passwd b/asterixdb/asterix-podman/src/test/resources/passwd
new file mode 100644
index 0000000000..a1ea5b03a2
--- /dev/null
+++ b/asterixdb/asterix-podman/src/test/resources/passwd
@@ -0,0 +1 @@
+admin:$2a$12$JxgDzf/uOn1NS2Y3exhrDOf7JY/eUHQH7HeH90s5Ye2gALoO0FsQy
diff --git a/asterixdb/asterix-podman/src/test/resources/setup.sh b/asterixdb/asterix-podman/src/test/resources/setup.sh
new file mode 100644
index 0000000000..e3523aaa5e
--- /dev/null
+++ b/asterixdb/asterix-podman/src/test/resources/setup.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+cd /var/tmp/asterix-app/
+shiv -o target/TweetSent.pyz --site-packages src/test/resources/TweetSent scikit-learn
+cp -a /var/tmp/asterix-app/data/classifications /opt/apache-asterixdb/data/
+cp -a /var/tmp/asterix-app/data/twitter /opt/apache-asterixdb/data/
+cp -a /var/tmp/asterix-app/data/big-object /opt/apache-asterixdb/data/
+mkdir -p /opt/apache-asterixdb/target/data/
+cp -a /var/tmp/asterix-app/target/data/big-object /opt/apache-asterixdb/target/data/
\ No newline at end of file
diff --git a/asterixdb/asterix-podman/src/test/resources/socktest/Containerfile b/asterixdb/asterix-podman/src/test/resources/socktest/Containerfile
new file mode 100644
index 0000000000..a7546d5a7d
--- /dev/null
+++ b/asterixdb/asterix-podman/src/test/resources/socktest/Containerfile
@@ -0,0 +1,17 @@
+FROM ubuntu:22.04
+RUN apt -y update
+RUN DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC apt -y install systemd openjdk-17-jre-headless unzip wget curl python3-pip python3-venv python3-systemd
+RUN pip3 install shiv msgpack
+COPY target/asterix-server_*all.deb .
+RUN dpkg -i asterix-server*.deb
+COPY src/test/resources/cc.conf /opt/apache-asterixdb/cc.conf
+COPY src/test/resources/passwd /opt/apache-asterixdb/etc/passwd
+RUN mkdir -p /etc/systemd/system/pyudf@.service.d/
+COPY src/test/resources/testenv.conf /etc/systemd/system/pyudf@.service.d/
+COPY src/test/resources/setup.sh /opt
+RUN chmod +x /opt/setup.sh
+RUN systemctl enable asterix-nc asterix-cc pyudf.socket
+
+EXPOSE 19001 19002 19004
+
+CMD [ "/lib/systemd/systemd" ]
diff --git a/asterixdb/asterix-podman/src/test/resources/testenv.conf b/asterixdb/asterix-podman/src/test/resources/testenv.conf
new file mode 100644
index 0000000000..0c2f182c35
--- /dev/null
+++ b/asterixdb/asterix-podman/src/test/resources/testenv.conf
@@ -0,0 +1,3 @@
+[Service]
+Environment="FOO=BAR=BAZ"
+Environment="BAR=BAZ"
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index 6c2a05adc3..e1d09640ba 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -978,7 +978,7 @@
           <plugin>
             <artifactId>jdeb</artifactId>
             <groupId>org.vafer</groupId>
-            <version>1.5</version>
+            <version>1.8</version>
             <executions>
               <execution>
                 <phase>package</phase>
@@ -988,26 +988,36 @@
                 <configuration>
                   <dataSet>
                     <data>
-                      <src>${project.build.directory}/${project.build.finalName}-binary-assembly/apache-asterixdb-${project.version}/</src>
-                      <excludes>bin/**</excludes>
+                      <src>${project.build.directory}/${project.build.finalName}-binary-assembly/apache-asterixdb-${project.version}</src>
                       <type>directory</type>
                       <mapper>
                         <type>perm</type>
-                        <prefix>/opt/apache-asterixdb-${project.version}/</prefix>
-                        <user>asterixdb</user>
-                        <group>asterixdb</group>
+                        <prefix>/opt/apache-asterixdb/</prefix>
+                        <user>root</user>
+                        <group>root</group>
+                        <filemode>755</filemode>
+                      </mapper>
+                    </data>
+                    <data>
+                      <type>file</type>
+                      <src>src/deb/systemd/cc.conf</src>
+                      <mapper>
+                        <prefix>/opt/apache-asterixdb/</prefix>
+                        <type>perm</type>
+                        <user>root</user>
+                        <group>root</group>
                         <filemode>644</filemode>
                       </mapper>
                     </data>
                     <data>
-                      <src>${project.build.directory}/${project.build.finalName}-binary-assembly/apache-asterixdb-${project.version}/bin</src>
-                      <type>directory</type>
+                      <type>file</type>
+                      <src>src/deb/udf_listener.py</src>
                       <mapper>
+                        <prefix>/opt/apache-asterixdb/bin</prefix>
                         <type>perm</type>
-                        <prefix>/opt/apache-asterixdb-${project.version}/bin</prefix>
-                        <user>asterixdb</user>
-                        <group>asterixdb</group>
-                        <filemode>754</filemode>
+                        <user>root</user>
+                        <group>root</group>
+                        <filemode>555</filemode>
                       </mapper>
                     </data>
                     <data>
@@ -1030,6 +1040,39 @@
                         <group>root</group>
                       </mapper>
                     </data>
+                    <data>
+                      <type>file</type>
+                      <src>src/deb/systemd/pyudf.socket</src>
+                      <mapper>
+                        <prefix>/lib/systemd/system</prefix>
+                        <type>perm</type>
+                        <user>root</user>
+                        <group>root</group>
+                      </mapper>
+                    </data>
+                    <data>
+                      <type>file</type>
+                      <src>src/deb/systemd/pyudf@.service</src>
+                      <mapper>
+                        <prefix>/lib/systemd/system</prefix>
+                        <type>perm</type>
+                        <user>root</user>
+                        <group>root</group>
+                      </mapper>
+                    </data>
+                    <data>
+                      <type>template</type>
+                      <paths>
+                          <path>/opt/apache-asterixdb/logs</path>
+                          <path>/opt/apache-asterixdb/data</path>
+                      </paths>
+                      <mapper>
+                        <type>perm</type>
+                        <user>asterixdb</user>
+                        <group>asterixdb</group>
+                        <filemode>750</filemode>
+                      </mapper>
+                    </data>
                   </dataSet>
                 </configuration>
               </execution>
diff --git a/asterixdb/asterix-server/src/deb/control/control b/asterixdb/asterix-server/src/deb/control/control
index 1f6c213e95..77bbd1df78 100644
--- a/asterixdb/asterix-server/src/deb/control/control
+++ b/asterixdb/asterix-server/src/deb/control/control
@@ -17,8 +17,7 @@ Version: [[version]]
 Section: databases
 Priority: extra
 Architecture: all
-Depends: jdk (>= 1.8)
+Depends: java17-runtime-headless
 Maintainer: Ian Maxon <ia...@maxons.email>
 Description: Apache AsterixDB - a scalable, open source Big Data Management System (BDMS)
-Distribution: development
-Depends: default-jre | java8-runtime
+Distribution: development
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/deb/control/postinst b/asterixdb/asterix-server/src/deb/control/postinst
index 896ca28b4e..fe5c912155 100644
--- a/asterixdb/asterix-server/src/deb/control/postinst
+++ b/asterixdb/asterix-server/src/deb/control/postinst
@@ -13,5 +13,4 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-adduser --system --group --quiet --home /opt/apache-asterixdb/ \
---no-create-home --disabled-login --force-badname asterixdb
+chmod -R 755 /opt/apache-asterixdb/
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/deb/control/preinst b/asterixdb/asterix-server/src/deb/control/preinst
index 4509c90586..8d14847d47 100644
--- a/asterixdb/asterix-server/src/deb/control/preinst
+++ b/asterixdb/asterix-server/src/deb/control/preinst
@@ -13,3 +13,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+adduser --system --group --quiet --home /opt/apache-asterixdb/ \
+--no-create-home --disabled-login --force-badname asterixdb
+adduser --system --group --quiet --home /opt/apache-asterixdb/ \
+--no-create-home --disabled-login --force-badname asterixdb-udf
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service b/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service
index 9711fba6c1..2a52e2def9 100644
--- a/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service
+++ b/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service
@@ -19,8 +19,9 @@ After=network.target
 [Service]
 Type=simple
 User=asterixdb
-ExecStart=/opt/apache-asterixdb/bin/asterixcc --config-file /opt/apache-asterixdb/cc.conf
+ExecStart=/opt/apache-asterixdb/bin/asterixcc -config-file "/opt/apache-asterixdb/cc.conf"
 Restart=on-abort
+WorkingDirectory=/opt/apache-asterixdb
 
 [Install]
 WantedBy=multi-user.target
diff --git a/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service b/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service
index bfe62966cd..e09d8e8202 100644
--- a/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service
+++ b/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service
@@ -21,6 +21,7 @@ Type=simple
 User=asterixdb
 ExecStart=/opt/apache-asterixdb/bin/asterixncservice
 Restart=on-abort
+WorkingDirectory=/opt/apache-asterixdb
 
 [Install]
 WantedBy=multi-user.target
diff --git a/asterixdb/asterix-docker/docker/supervisord.conf b/asterixdb/asterix-server/src/deb/systemd/cc.conf
similarity index 55%
rename from asterixdb/asterix-docker/docker/supervisord.conf
rename to asterixdb/asterix-server/src/deb/systemd/cc.conf
index 20f1797ed8..0af967a395 100644
--- a/asterixdb/asterix-docker/docker/supervisord.conf
+++ b/asterixdb/asterix-server/src/deb/systemd/cc.conf
@@ -15,26 +15,19 @@
 ; specific language governing permissions and limitations
 ; under the License.
 
-[supervisord]
-nodaemon=true
+[nc/asterix_nc1]
+txn.log.dir=/opt/apache-asterixdb/data/txnlog
+core.dump.dir=/opt/apache-asterixdb/logs/coredump
+iodevices=/opt/apache-asterixdb/data/
+nc.api.port=19004
 
-[program:asterixnc1]
-command=/asterixdb/bin/asterixncservice -logdir - -config-file "/asterixdb/opt/local/conf/blue.conf"
-stdout_logfile=/dev/stdout
-stdout_logfile_maxbytes=0
-stderr_logfile=/dev/stderr
-stderr_logfile_maxbytes=0
+[nc]
+address=127.0.0.1
+command=asterixnc
 
-[program:asterixnc2]
-command=/asterixdb/bin/asterixncservice -logdir -
-stdout_logfile=/dev/stdout
-stdout_logfile_maxbytes=0
-stderr_logfile=/dev/stderr
-stderr_logfile_maxbytes=0
+[cc]
+address = 127.0.0.1
 
-[program:asterixcc]
-command=/asterixdb/bin/asterixcc -config-file "/asterixdb/opt/local/conf/cc.conf"
-stdout_logfile=/dev/stdout
-stdout_logfile_maxbytes=0
-stderr_logfile=/dev/stderr
-stderr_logfile_maxbytes=0
+[common]
+log.level = INFO
+log.dir = /opt/apache-asterixdb/logs/
diff --git a/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service b/asterixdb/asterix-server/src/deb/systemd/pyudf.socket
similarity index 78%
copy from asterixdb/asterix-server/src/deb/systemd/asterix-nc.service
copy to asterixdb/asterix-server/src/deb/systemd/pyudf.socket
index bfe62966cd..4e731db8b4 100644
--- a/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service
+++ b/asterixdb/asterix-server/src/deb/systemd/pyudf.socket
@@ -13,14 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 [Unit]
-Description=Apache AsterixDB Node Controller Daemon
-After=network.target
+Description=AsterixDB UDF Domain Socket
+PartOf=asterixdb_udf.service
 
-[Service]
-Type=simple
-User=asterixdb
-ExecStart=/opt/apache-asterixdb/bin/asterixncservice
-Restart=on-abort
+[Socket]
+ListenStream=/tmp/pyudf.socket
+SocketMode=0660
+SocketUser=asterixdb-udf
+SocketGroup=asterixdb
+Accept=true
+DeferAcceptSec=1
 
 [Install]
-WantedBy=multi-user.target
+WantedBy=sockets.target
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service b/asterixdb/asterix-server/src/deb/systemd/pyudf@.service
similarity index 75%
copy from asterixdb/asterix-server/src/deb/systemd/asterix-cc.service
copy to asterixdb/asterix-server/src/deb/systemd/pyudf@.service
index 9711fba6c1..9856142e97 100644
--- a/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service
+++ b/asterixdb/asterix-server/src/deb/systemd/pyudf@.service
@@ -1,3 +1,4 @@
+
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,14 +14,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 [Unit]
-Description=Apache AsterixDB Cluster Controller
-After=network.target
+Description=AsterixDB UDF Executor Service
+After=network.target pyudf.socket
+Requires=pyudf.socket
 
 [Service]
+User=asterixdb-udf
 Type=simple
-User=asterixdb
-ExecStart=/opt/apache-asterixdb/bin/asterixcc --config-file /opt/apache-asterixdb/cc.conf
-Restart=on-abort
+ExecStart=/usr/bin/python3 /opt/apache-asterixdb/bin/udf_listener.py
+TimeoutStopSec=5
+StandardError=journal
+StandardError=journal
 
 [Install]
-WantedBy=multi-user.target
+WantedBy=default.target
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/resources/entrypoint.py b/asterixdb/asterix-server/src/deb/udf_listener.py
old mode 100755
new mode 100644
similarity index 88%
copy from asterixdb/asterix-app/src/main/resources/entrypoint.py
copy to asterixdb/asterix-server/src/deb/udf_listener.py
index 7bad7ef485..03874b2136
--- a/asterixdb/asterix-app/src/main/resources/entrypoint.py
+++ b/asterixdb/asterix-server/src/deb/udf_listener.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -16,12 +17,10 @@
 # under the License.
 
 import sys
-from os import pathsep
-addr = str(sys.argv[1])
-port = str(sys.argv[2])
-paths = sys.argv[3]
-for p in paths.split(pathsep):
-    sys.path.append(p)
+from systemd.daemon import listen_fds
+from os import chdir
+from os import getcwd
+from os import getpid
 from struct import *
 import signal
 import msgpack
@@ -32,6 +31,7 @@ from pathlib import Path
 from enum import IntEnum
 from io import BytesIO
 
+
 PROTO_VERSION = 1
 HEADER_SZ = 8 + 8 + 1
 REAL_HEADER_SZ = 4 + 8 + 8 + 1
@@ -66,8 +66,8 @@ class Wrapper(object):
     resp = None
     unpacked_msg = None
     msg_type = None
-    packer = msgpack.Packer(autoreset=False)
-    unpacker = msgpack.Unpacker()
+    packer = msgpack.Packer(autoreset=False, use_bin_type=False)
+    unpacker = msgpack.Unpacker(raw=False)
     response_buf = BytesIO()
     stdin_buf = BytesIO()
     wrapped_fns = {}
@@ -102,6 +102,7 @@ class Wrapper(object):
         cwd = Path('.').resolve()
         module_path = Path(module.__file__).resolve()
         return cwd in module_path.parents
+        return True
 
     def read_header(self, readbuf):
         self.sz, self.mid, self.rmid, self.flag = unpack(
@@ -130,14 +131,19 @@ class Wrapper(object):
         self.send_msg()
         self.packer.reset()
 
+    def cd(self, basedir):
+        chdir(basedir + "/site-packages")
+        sys.path.insert(0,getcwd())
+
     def helo(self):
         # need to ack the connection back before sending actual HELO
-        self.init_remote_ipc()
+        #   self.init_remote_ipc()
+        self.cd(self.unpacked_msg[1][1])
         self.flag = MessageFlags.NORMAL
         self.response_buf.seek(0)
         self.packer.pack(int(MessageType.HELO))
-        self.packer.pack("HELO")
-        dlen = 5  # tag(1) + body(4)
+        self.packer.pack(int(getpid()))
+        dlen = len(self.packer.bytes())  # tag(1) + body(4)
         resp_len = self.write_header(self.response_buf, dlen)
         self.response_buf.write(self.packer.bytes())
         self.resp = self.response_buf.getbuffer()[0:resp_len]
@@ -198,7 +204,7 @@ class Wrapper(object):
         self.flag = MessageFlags.NORMAL
         self.packer.reset()
         self.response_buf.seek(0)
-        body = msgpack.packb(e)
+        body = msgpack.packb(str(e))
         dlen = len(body) + 1  # 1 for tag
         resp_len = self.write_header(self.response_buf, dlen)
         self.packer.pack(int(MessageType.ERROR))
@@ -217,9 +223,8 @@ class Wrapper(object):
         MessageType.CALL: handle_call
     }
 
-    def connect_sock(self, addr, port):
-        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.sock.connect((addr, int(port)))
+    def connect_sock(self):
+        self.sock = socket.fromfd(listen_fds()[0], socket.AF_UNIX, socket.SOCK_STREAM)
 
     def disconnect_sock(self, *args):
         self.sock.shutdown(socket.SHUT_RDWR)
@@ -227,26 +232,26 @@ class Wrapper(object):
 
     def recv_msg(self):
         while self.alive:
-            pos = sys.stdin.buffer.readinto1(self.readbuf)
+            pos = self.sock.recv_into(self.readbuf)
             if pos <= 0:
                 self.alive = False
                 return
             try:
                 while pos < REAL_HEADER_SZ:
-                    read = sys.stdin.buffer.readinto1(self.readview[pos:])
+                    read = self.sock.recv_into(self.readview[pos:])
                     if read <= 0:
                         self.alive = False
                         return
                     pos += read
                 self.read_header(self.readview)
                 while pos < self.sz and len(self.readbuf) - pos > 0:
-                    read = sys.stdin.buffer.readinto1(self.readview[pos:])
+                    read = self.sock.recv_into(self.readview[pos:])
                     if read <= 0:
                         self.alive = False
                         return
                     pos += read
                 while pos < self.sz:
-                    vszchunk = sys.stdin.buffer.read1(FRAMESZ)
+                    vszchunk = self.sock.recv(4096)
                     if len(vszchunk) == 0:
                         self.alive = False
                         return
@@ -258,8 +263,8 @@ class Wrapper(object):
                 self.unpacked_msg = list(self.unpacker)
                 self.msg_type = MessageType(self.unpacked_msg[0])
                 self.type_handler[self.msg_type](self)
-            except BaseException:
-                self.handle_error(traceback.format_exc())
+            except BaseException as e:
+                self.handle_error(''.join(traceback.format_exc()))
 
     def send_msg(self):
         self.sock.sendall(self.resp)
@@ -273,6 +278,6 @@ class Wrapper(object):
 
 
 wrap = Wrapper()
-wrap.connect_sock(addr, port)
+wrap.connect_sock()
 signal.signal(signal.SIGTERM, wrap.disconnect_sock)
 wrap.recv_loop()
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 9fa2fc1d0e..dcd0978c38 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -931,7 +931,7 @@
     <module>asterix-test-framework</module>
     <module>asterix-maven-plugins</module>
     <module>asterix-server</module>
-    <module>asterix-docker</module>
+    <module>asterix-podman</module>
     <module>asterix-doc</module>
     <module>asterix-fuzzyjoin</module>
     <module>asterix-replication</module>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 01cb9bfc9d..bb40e2b728 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -99,6 +99,7 @@ public class NCConfig extends ControllerConfig {
         PYTHON_USE_BUNDLED_MSGPACK(BOOLEAN, true),
         PYTHON_ARGS(STRING_ARRAY, (String[]) null),
         PYTHON_ENV(STRING_ARRAY, (String[]) null),
+        PYTHON_DS_PATH(STRING, (String) null),
         CREDENTIAL_FILE(
                 OptionTypes.STRING,
                 (Function<IApplicationConfig, String>) appConfig -> FileUtil
@@ -248,6 +249,8 @@ public class NCConfig extends ControllerConfig {
                     return "Whether or not to attempt to automatically set PYTHON_CMD to a usable interpreter";
                 case PYTHON_ENV:
                     return "List of environment variables to set when invoking the Python interpreter for Python UDFs. E.g. FOO=1";
+                case PYTHON_DS_PATH:
+                    return "Path to systemd socket for fenced Python UDFs. Requires JDK17+, *nix operating system, and ";
                 case CREDENTIAL_FILE:
                     return "Path to HTTP basic credentials";
                 default: