You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2020/08/21 20:27:34 UTC

[asterixdb] branch master updated: [ASTERIXDB-2176][RT] Flexible python pathing

This is an automated email from the ASF dual-hosted git repository.

imaxon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ae092dd  [ASTERIXDB-2176][RT] Flexible python pathing
ae092dd is described below

commit ae092dd7cb3eec160950703edbf2967ba2b31e34
Author: Ian Maxon <ia...@couchbase.com>
AuthorDate: Thu Aug 20 13:51:02 2020 -0700

    [ASTERIXDB-2176][RT] Flexible python pathing
    
    - Allow python path and args to be specified in config
    
    Change-Id: I6f75b29be59090250b2ff9992279187babd97fff
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7423
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Michael Blow <mb...@apache.org>
---
 asterixdb/asterix-app/pom.xml                      | 14 ++---
 .../asterix/api/http/server/BasicAuthServlet.java  | 51 +++++++++++++---
 .../asterix/api/http/server/ServletConstants.java  |  1 +
 .../asterix/api/http/server/UdfApiServlet.java     | 70 +++++++++-------------
 .../asterix/hyracks/bootstrap/CCApplication.java   |  8 ++-
 .../asterix-app/src/main/resources/entrypoint.py   | 10 ++--
 .../ExternalScalarPythonFunctionEvaluator.java     | 64 ++++++++++++++++----
 .../LibraryDeployPrepareOperatorDescriptor.java    | 30 ++++++----
 .../control/common/controllers/NCConfig.java       | 16 ++++-
 9 files changed, 175 insertions(+), 89 deletions(-)

diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 27c0215..ef443ea 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -37,8 +37,8 @@
     <root.dir>${basedir}/..</root.dir>
     <appendedResourcesDirectory>${basedir}/src/main/appended-resources</appendedResourcesDirectory>
     <sonar.sources>pom.xml,src/main/java,src/main/resources</sonar.sources>
-    <pip.path>${project.build.directory}${file.separator}bin${file.separator}pip3</pip.path>
-    <shiv.path>${project.build.directory}${file.separator}bin${file.separator}shiv</shiv.path>
+    <pip.path>${project.build.directory}/bin/pip3</pip.path>
+    <shiv.path>${project.build.directory}/bin/shiv</shiv.path>
   </properties>
   <build>
     <plugins>
@@ -382,8 +382,8 @@
         </os>
       </activation>
       <properties>
-        <pip.path>${project.build.directory}${file.separator}Scripts${file.separator}pip3.exe</pip.path>
-        <shiv.path>${project.build.directory}${file.separator}Scripts${file.separator}shiv.exe</shiv.path>
+        <pip.path>${project.build.directory}\Scripts\pip3.exe</pip.path>
+        <shiv.path>${project.build.directory}\Scripts\shiv.exe</shiv.path>
       </properties>
     </profile>
     <profile>
@@ -827,9 +827,9 @@
       <artifactId>commons-csv</artifactId>
     </dependency>
     <dependency>
-        <groupId>org.apache.httpcomponents</groupId>
-        <artifactId>httpmime</artifactId>
-        <scope>test</scope>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpmime</artifactId>
+      <scope>test</scope>
     </dependency>
     <!-- AWS -->
     <dependency>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java
index d8e167f..f25d223 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java
@@ -18,35 +18,68 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static org.apache.asterix.api.http.server.ServletConstants.CREDENTIAL_MAP;
+import static org.apache.asterix.api.http.server.ServletConstants.*;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hyracks.http.api.IServlet;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
-import org.apache.hyracks.http.server.AbstractServlet;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.mindrot.jbcrypt.BCrypt;
 
 import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpResponseStatus;
 
-public abstract class BasicAuthServlet extends AbstractServlet {
+public class BasicAuthServlet implements IServlet {
 
     private static final Logger LOGGER = LogManager.getLogger();
     public static String BASIC_AUTH_METHOD_NAME = "Basic";
     private Base64.Decoder b64Decoder;
     Map<String, String> storedCredentials;
+    Map<String, String> ephemeralCredentials;
+    private String sysAuthHeader;
+    private final IServlet delegate;
+    private ConcurrentMap<String, Object> ctx;
 
-    protected BasicAuthServlet(ConcurrentMap<String, Object> ctx, String... paths) {
-        super(ctx, paths);
+    public BasicAuthServlet(ConcurrentMap<String, Object> ctx, IServlet delegate) {
+        this.delegate = delegate;
         b64Decoder = Base64.getDecoder();
         storedCredentials = (Map<String, String>) ctx.get(CREDENTIAL_MAP);
+        this.ctx = ctx;
+        // generate internal user
+        String sysUser;
+        do {
+            sysUser = generateRandomString(32);
+        } while (storedCredentials.containsKey(sysUser));
+        String sysPassword = generateRandomString(128);
+        ephemeralCredentials = Collections.singletonMap(sysUser, hashPassword(sysPassword));
+        sysAuthHeader = createAuthHeader(sysUser, sysPassword);
+        ctx.put(SYS_AUTH_HEADER, sysAuthHeader);
+    }
+
+    @Override
+    public String[] getPaths() {
+        return delegate.getPaths();
+    }
+
+    @Override
+    public void init() throws IOException {
+        delegate.init();
+    }
+
+    @Override
+    public ConcurrentMap<String, Object> ctx() {
+        return ctx;
     }
 
     @Override
@@ -56,7 +89,7 @@ public abstract class BasicAuthServlet extends AbstractServlet {
             if (!authorized) {
                 response.setStatus(HttpResponseStatus.UNAUTHORIZED);
             } else {
-                super.handle(request, response);
+                delegate.handle(request, response);
             }
         } catch (Exception e) {
             LOGGER.log(Level.WARN, "Unhandled exception", e);
@@ -104,7 +137,7 @@ public abstract class BasicAuthServlet extends AbstractServlet {
     }
 
     protected Map<String, String> getStoredCredentials(IServletRequest request) {
-        return storedCredentials;
+        return request.getHttpRequest().method().equals(HttpMethod.GET) ? ephemeralCredentials : storedCredentials;
     }
 
     public static String hashPassword(String password) {
@@ -116,4 +149,8 @@ public abstract class BasicAuthServlet extends AbstractServlet {
         byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes(StandardCharsets.ISO_8859_1));
         return "Basic " + new String(encodedAuth);
     }
+
+    private static String generateRandomString(int size) {
+        return RandomStringUtils.randomAlphanumeric(size);
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
index 857faf0..0eac474 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
@@ -26,6 +26,7 @@ public class ServletConstants {
     public static final String RUNNING_QUERIES_ATTR = "org.apache.asterix.RUNINNG_QUERIES";
     public static final String SERVICE_CONTEXT_ATTR = "org.apache.asterix.SERVICE_CONTEXT";
     public static final String CREDENTIAL_MAP = "org.apache.asterix.CREDENTIAL_MAP";
+    public static final String SYS_AUTH_HEADER = "org.apache.asterix.SYS_AUTH_HEADER";
 
     private ServletConstants() {
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
index d5bec4f..18139f6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.api.http.server;
 
 import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER;
 import static org.apache.asterix.common.functions.ExternalFunctionLanguage.JAVA;
 import static org.apache.asterix.common.functions.ExternalFunctionLanguage.PYTHON;
 
@@ -63,7 +64,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.output.NullWriter;
-import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.application.ICCServiceContext;
@@ -74,12 +74,12 @@ import org.apache.hyracks.control.common.context.ServerContext;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
 import org.apache.hyracks.http.server.utils.HttpUtil;
 import org.apache.hyracks.util.file.FileUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpScheme;
@@ -87,22 +87,21 @@ import io.netty.handler.codec.http.multipart.FileUpload;
 import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
 import io.netty.handler.codec.http.multipart.InterfaceHttpData;
 
-public class UdfApiServlet extends BasicAuthServlet {
+public class UdfApiServlet extends AbstractServlet {
 
     private static final Logger LOGGER = LogManager.getLogger();
 
-    private final ICcApplicationContext appCtx;
+    protected final ICcApplicationContext appCtx;
     private final ClusterControllerService ccs;
     private final HttpScheme httpServerProtocol;
     private final int httpServerPort;
 
-    private final ILangCompilationProvider compilationProvider;
-    private final IStatementExecutorFactory statementExecutorFactory;
-    private final IStorageComponentProvider componentProvider;
-    private final IReceptionist receptionist;
-    private final Path workingDir;
-    private Map<String, String> sysCredentials;
-    private String sysAuthHeader;
+    protected final ILangCompilationProvider compilationProvider;
+    protected final IStatementExecutorFactory statementExecutorFactory;
+    protected final IStorageComponentProvider componentProvider;
+    protected final IReceptionist receptionist;
+    protected final Path workingDir;
+    protected String sysAuthHeader;
 
     public UdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx,
             ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory,
@@ -124,23 +123,15 @@ public class UdfApiServlet extends BasicAuthServlet {
 
     @Override
     public void init() throws IOException {
-        super.init();
         initAuth();
         initStorage();
     }
 
-    private void initAuth() {
-        // generate internal user
-        String sysUser;
-        do {
-            sysUser = generateRandomString(32);
-        } while (storedCredentials.containsKey(sysUser));
-        String sysPassword = generateRandomString(128);
-        this.sysCredentials = Collections.singletonMap(sysUser, hashPassword(sysPassword));
-        this.sysAuthHeader = createAuthHeader(sysUser, sysPassword);
+    protected void initAuth() {
+        sysAuthHeader = (String) ctx.get(SYS_AUTH_HEADER);
     }
 
-    private void initStorage() throws IOException {
+    protected void initStorage() throws IOException {
         // prepare working directory
         if (Files.isDirectory(workingDir)) {
             try {
@@ -154,6 +145,10 @@ public class UdfApiServlet extends BasicAuthServlet {
         }
     }
 
+    protected Map<String, String> additionalHttpHeadersFromRequest(IServletRequest request) {
+        return Collections.emptyMap();
+    }
+
     @Override
     protected void post(IServletRequest request, IServletResponse response) {
         IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState();
@@ -197,7 +192,7 @@ public class UdfApiServlet extends BasicAuthServlet {
                 URI downloadURI = createDownloadURI(libraryTempFile);
                 CreateLibraryStatement stmt = new CreateLibraryStatement(libraryName.first, libraryName.second,
                         language, downloadURI, true, sysAuthHeader);
-                executeStatement(stmt, requestReference);
+                executeStatement(stmt, requestReference, request);
                 response.setStatus(HttpResponseStatus.OK);
             } catch (Exception e) {
                 response.setStatus(toHttpErrorStatus(e));
@@ -218,13 +213,12 @@ public class UdfApiServlet extends BasicAuthServlet {
         }
     }
 
-    private URI createDownloadURI(Path file) throws Exception {
+    protected URI createDownloadURI(Path file) throws Exception {
         String path = paths[0].substring(0, trims[0]) + '/' + file.getFileName();
         String host = getHyracksClientConnection().getHost();
         return new URI(httpServerProtocol.toString(), null, host, httpServerPort, path, null, null);
     }
 
-    @Override
     protected void delete(IServletRequest request, IServletResponse response) {
         IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState();
         if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
@@ -239,7 +233,7 @@ public class UdfApiServlet extends BasicAuthServlet {
         try {
             IRequestReference requestReference = receptionist.welcome(request);
             LibraryDropStatement stmt = new LibraryDropStatement(libraryName.first, libraryName.second, false);
-            executeStatement(stmt, requestReference);
+            executeStatement(stmt, requestReference, request);
             response.setStatus(HttpResponseStatus.OK);
         } catch (Exception e) {
             response.setStatus(toHttpErrorStatus(e));
@@ -250,21 +244,21 @@ public class UdfApiServlet extends BasicAuthServlet {
         }
     }
 
-    private void executeStatement(Statement statement, IRequestReference requestReference) throws Exception {
+    protected void executeStatement(Statement statement, IRequestReference requestReference, IServletRequest request)
+            throws Exception {
         SessionOutput sessionOutput = new SessionOutput(new SessionConfig(SessionConfig.OutputFormat.ADM),
                 new PrintWriter(NullWriter.NULL_WRITER));
         ResponsePrinter printer = new ResponsePrinter(sessionOutput);
         ResultProperties resultProperties = new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE, 1);
         IRequestParameters requestParams = new RequestParameters(requestReference, "", null, resultProperties,
                 new IStatementExecutor.Stats(), new IStatementExecutor.StatementProperties(), null, null,
-                Collections.emptyMap(), Collections.emptyMap(), false);
+                additionalHttpHeadersFromRequest(request), Collections.emptyMap(), false);
         MetadataManager.INSTANCE.init();
         IStatementExecutor translator = statementExecutorFactory.create(appCtx, Collections.singletonList(statement),
                 sessionOutput, compilationProvider, componentProvider, printer);
         translator.compileAndExecute(getHyracksClientConnection(), requestParams);
     }
 
-    @Override
     protected void get(IServletRequest request, IServletResponse response) throws Exception {
         IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState();
         if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
@@ -287,7 +281,7 @@ public class UdfApiServlet extends BasicAuthServlet {
         readFromFile(filePath, response);
     }
 
-    private IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR
+    protected IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR
         IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
         if (hcc == null) {
             throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR);
@@ -295,13 +289,7 @@ public class UdfApiServlet extends BasicAuthServlet {
         return hcc;
     }
 
-    @Override
-    protected Map<String, String> getStoredCredentials(IServletRequest request) {
-        return request.getHttpRequest().method().equals(HttpMethod.GET) ? sysCredentials
-                : super.getStoredCredentials(request);
-    }
-
-    private Pair<DataverseName, String> parseLibraryName(IServletRequest request) throws IllegalArgumentException {
+    protected Pair<DataverseName, String> parseLibraryName(IServletRequest request) throws IllegalArgumentException {
         String[] path = StringUtils.split(localPath(request), '/');
         int ln = path.length;
         if (ln < 2) {
@@ -312,7 +300,7 @@ public class UdfApiServlet extends BasicAuthServlet {
         return new Pair<>(dataverseName, libraryName);
     }
 
-    private static ExternalFunctionLanguage getLanguageByFileExtension(String fileExtension) {
+    protected static ExternalFunctionLanguage getLanguageByFileExtension(String fileExtension) {
         switch (fileExtension) {
             case LibraryDescriptor.FILE_EXT_ZIP:
                 return JAVA;
@@ -323,7 +311,7 @@ public class UdfApiServlet extends BasicAuthServlet {
         }
     }
 
-    private HttpResponseStatus toHttpErrorStatus(Exception e) {
+    protected HttpResponseStatus toHttpErrorStatus(Exception e) {
         if (e instanceof IFormattedException) {
             IFormattedException fe = (IFormattedException) e;
             if (ErrorCode.ASTERIX.equals(fe.getComponent())) {
@@ -337,10 +325,6 @@ public class UdfApiServlet extends BasicAuthServlet {
         return HttpResponseStatus.INTERNAL_SERVER_ERROR;
     }
 
-    private static String generateRandomString(int size) {
-        return RandomStringUtils.randomAlphanumeric(size);
-    }
-
     protected void readFromFile(Path filePath, IServletResponse response) throws Exception {
         class InputStreamGetter extends SynchronizableWork {
             private InputStream is;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index e80995c..17cf664 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.asterix.api.http.IQueryWebServerRegistrant;
 import org.apache.asterix.api.http.server.ActiveStatsApiServlet;
 import org.apache.asterix.api.http.server.ApiServlet;
+import org.apache.asterix.api.http.server.BasicAuthServlet;
 import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
 import org.apache.asterix.api.http.server.ClusterApiServlet;
 import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
@@ -350,9 +351,10 @@ public class CCApplication extends BaseCCApplication {
             case Servlets.ACTIVE_STATS:
                 return new ActiveStatsApiServlet(appCtx, ctx, paths);
             case Servlets.UDF:
-                return new UdfApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP),
-                        getStatementExecutorFactory(), componentProvider, server.getScheme(),
-                        server.getAddress().getPort());
+                return new BasicAuthServlet(ctx,
+                        new UdfApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP),
+                                getStatementExecutorFactory(), componentProvider, server.getScheme(),
+                                server.getAddress().getPort()));
             default:
                 throw new IllegalStateException(key);
         }
diff --git a/asterixdb/asterix-app/src/main/resources/entrypoint.py b/asterixdb/asterix-app/src/main/resources/entrypoint.py
index bdb68e2..0917f49 100755
--- a/asterixdb/asterix-app/src/main/resources/entrypoint.py
+++ b/asterixdb/asterix-app/src/main/resources/entrypoint.py
@@ -16,8 +16,12 @@
 # under the License.
 
 import sys
-sys.path.insert(0, './site-packages/')
-sys.path.insert(len(sys.path)-1, './ipc/site-packages')
+from os import pathsep
+addr = str(sys.argv[1])
+port = str(sys.argv[2])
+paths = sys.argv[3]
+for p in paths.split(pathsep):
+    sys.path.append(p)
 from struct import *
 import signal
 import msgpack
@@ -239,8 +243,6 @@ class Wrapper(object):
         self.disconnect_sock()
 
 
-addr = str(sys.argv[1])
-port = str(sys.argv[2])
 wrap = Wrapper()
 wrap.connect_sock(addr, port)
 signal.signal(signal.SIGTERM, wrap.disconnect_sock)
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
index c47145b..a5eccce 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
@@ -24,6 +24,8 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
@@ -31,7 +33,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.WarningUtil;
-import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
@@ -45,6 +46,7 @@ import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeTagUtil;
 import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -71,18 +73,47 @@ class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvalua
     private final ByteBuffer outputWrapper;
     private final IEvaluatorContext evaluatorContext;
     private static final String ENTRYPOINT = "entrypoint.py";
+    private static final String SITE_PACKAGES = "site-packages";
 
     private final IPointable[] argValues;
 
     ExternalScalarPythonFunctionEvaluator(IExternalFunctionInfo finfo, IScalarEvaluatorFactory[] args,
             IAType[] argTypes, IEvaluatorContext ctx, SourceLocation sourceLoc) throws HyracksDataException {
         super(finfo, args, argTypes, ctx);
+        IApplicationConfig cfg = ctx.getServiceContext().getAppConfig();
+        String pythonPathCmd = cfg.getString(NCConfig.Option.PYTHON_CMD);
+        List<String> pythonArgs = new ArrayList<>();
+        if (pythonPathCmd == null) {
+            //if absolute path to interpreter is not specified, use environmental python
+            pythonPathCmd = "/usr/bin/env";
+            pythonArgs.add("python3");
+        }
+        File pythonPath = new File(pythonPathCmd);
+        List<String> sitePkgs = new ArrayList<>();
+        sitePkgs.add(SITE_PACKAGES);
+        String addlSitePackagesRaw =
+                ctx.getServiceContext().getAppConfig().getString((NCConfig.Option.PYTHON_ADDITIONAL_PACKAGES));
+        if (addlSitePackagesRaw != null) {
+            sitePkgs.addAll(Arrays.asList(addlSitePackagesRaw.split(File.pathSeparator)));
+        }
+        if (cfg.getBoolean(NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK)) {
+            sitePkgs.add("ipc" + File.separator + SITE_PACKAGES + File.separator);
+        }
+        String[] pythonArgsRaw = ctx.getServiceContext().getAppConfig().getStringArray(NCConfig.Option.PYTHON_ARGS);
+        if (pythonArgsRaw != null) {
+            pythonArgs.addAll(Arrays.asList(pythonArgsRaw));
+        }
+        StringBuilder sitePackagesPathBuilder = new StringBuilder();
+        for (int i = 0; i < sitePkgs.size() - 1; i++) {
+            sitePackagesPathBuilder.append(sitePkgs.get(i));
+            sitePackagesPathBuilder.append(File.pathSeparator);
+        }
+        sitePackagesPathBuilder.append(sitePkgs.get(sitePkgs.size() - 1));
 
-        File pythonPath = new File(ctx.getServiceContext().getAppConfig().getString(NCConfig.Option.PYTHON_HOME));
-        DataverseName dataverseName = FunctionSignature.getDataverseName(finfo.getFunctionIdentifier());
         try {
             libraryEvaluator = PythonLibraryEvaluator.getInstance(finfo, libraryManager, router, ipcSys, pythonPath,
-                    ctx.getTaskContext(), ctx.getWarningCollector(), sourceLoc);
+                    ctx.getTaskContext(), sitePackagesPathBuilder.toString(), pythonArgs, ctx.getWarningCollector(),
+                    sourceLoc);
         } catch (IOException | AsterixException e) {
             throw new HyracksDataException("Failed to initialize Python", e);
         }
@@ -128,17 +159,22 @@ class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvalua
         String module;
         String clazz;
         String fn;
+        String sitePkgs;
+        List<String> pythonArgs;
         TaskAttemptId task;
         IWarningCollector warningCollector;
         SourceLocation sourceLoc;
 
         private PythonLibraryEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, IExternalFunctionInfo finfo,
-                ILibraryManager libMgr, File pythonHome, ExternalFunctionResultRouter router, IPCSystem ipcSys,
-                TaskAttemptId task, IWarningCollector warningCollector, SourceLocation sourceLoc) {
+                ILibraryManager libMgr, File pythonHome, String sitePkgs, List<String> pythonArgs,
+                ExternalFunctionResultRouter router, IPCSystem ipcSys, TaskAttemptId task,
+                IWarningCollector warningCollector, SourceLocation sourceLoc) {
             super(jobId, evaluatorId);
             this.finfo = finfo;
             this.libMgr = libMgr;
             this.pythonHome = pythonHome;
+            this.sitePkgs = sitePkgs;
+            this.pythonArgs = pythonArgs;
             this.router = router;
             this.task = task;
             this.ipcSys = ipcSys;
@@ -168,8 +204,14 @@ class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvalua
             this.clazz = clazz;
             this.module = packageModule;
             int port = ipcSys.getSocketAddress().getPort();
-            ProcessBuilder pb = new ProcessBuilder(pythonHome.getAbsolutePath(), ENTRYPOINT,
-                    InetAddress.getLoopbackAddress().getHostAddress(), Integer.toString(port));
+            List<String> args = new ArrayList<>();
+            args.add(pythonHome.getAbsolutePath());
+            args.addAll(pythonArgs);
+            args.add(ENTRYPOINT);
+            args.add(InetAddress.getLoopbackAddress().getHostAddress());
+            args.add(Integer.toString(port));
+            args.add(sitePkgs);
+            ProcessBuilder pb = new ProcessBuilder(args.toArray(new String[0]));
             pb.directory(new File(wd));
             p = pb.start();
             proto = new PythonIPCProto(p.getOutputStream(), router, ipcSys);
@@ -207,13 +249,15 @@ class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvalua
 
         private static PythonLibraryEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr,
                 ExternalFunctionResultRouter router, IPCSystem ipcSys, File pythonHome, IHyracksTaskContext ctx,
-                IWarningCollector warningCollector, SourceLocation sourceLoc) throws IOException, AsterixException {
+                String sitePkgs, List<String> pythonArgs, IWarningCollector warningCollector, SourceLocation sourceLoc)
+                throws IOException, AsterixException {
             PythonLibraryEvaluatorId evaluatorId =
                     new PythonLibraryEvaluatorId(finfo.getLibraryDataverseName(), finfo.getLibraryName());
             PythonLibraryEvaluator evaluator = (PythonLibraryEvaluator) ctx.getStateObject(evaluatorId);
             if (evaluator == null) {
                 evaluator = new PythonLibraryEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, finfo, libMgr,
-                        pythonHome, router, ipcSys, ctx.getTaskAttemptId(), warningCollector, sourceLoc);
+                        pythonHome, sitePkgs, pythonArgs, router, ipcSys, ctx.getTaskAttemptId(), warningCollector,
+                        sourceLoc);
                 ctx.registerDeallocatable(evaluator);
                 evaluator.initialize();
                 ctx.setStateObject(evaluator);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
index 8929a60..ddf3d66 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
@@ -20,6 +20,7 @@
 package org.apache.asterix.external.operators;
 
 import static org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME;
+import static org.apache.hyracks.control.common.controllers.NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
@@ -161,7 +162,9 @@ public class LibraryDeployPrepareOperatorDescriptor extends AbstractLibraryOpera
                             // shouldn't happen
                             throw new IOException("Unexpected file type: " + fileExt);
                         }
-                        shiv(targetFile, stageDir, contentsDir);
+                        boolean extractMsgPack = ctx.getJobletContext().getServiceContext().getAppConfig()
+                                .getBoolean(PYTHON_USE_BUNDLED_MSGPACK);
+                        shiv(targetFile, stageDir, contentsDir, extractMsgPack);
                         break;
                     default:
                         // shouldn't happen
@@ -285,20 +288,22 @@ public class LibraryDeployPrepareOperatorDescriptor extends AbstractLibraryOpera
                 }
             }
 
-            private void shiv(FileReference sourceFile, FileReference stageDir, FileReference contentsDir)
-                    throws IOException {
+            private void shiv(FileReference sourceFile, FileReference stageDir, FileReference contentsDir,
+                    boolean writeMsgpack) throws IOException {
                 FileReference msgpack = stageDir.getChild("msgpack.pyz");
-                writeShim(msgpack);
+                if (writeMsgpack) {
+                    writeShim(msgpack, writeMsgpack);
+                    File msgPackFolder = new File(contentsDir.getRelativePath(), "ipc");
+                    FileReference msgPackFolderRef =
+                            new FileReference(contentsDir.getDeviceHandle(), msgPackFolder.getPath());
+                    unzip(msgpack, msgPackFolderRef);
+                    Files.delete(msgpack.getFile().toPath());
+                }
                 unzip(sourceFile, contentsDir);
-                File msgPackFolder = new File(contentsDir.getRelativePath(), "ipc");
-                FileReference msgPackFolderRef =
-                        new FileReference(contentsDir.getDeviceHandle(), msgPackFolder.getPath());
-                unzip(msgpack, msgPackFolderRef);
-                writeShim(contentsDir.getChild("entrypoint.py"));
-                Files.delete(msgpack.getFile().toPath());
+                writeShim(contentsDir.getChild("entrypoint.py"), false);
             }
 
-            private void writeShim(FileReference outputFile) throws IOException {
+            private boolean writeShim(FileReference outputFile, boolean optional) throws IOException {
                 InputStream is = getClass().getClassLoader().getResourceAsStream(outputFile.getFile().getName());
                 if (is == null) {
                     throw new IOException("Classpath does not contain necessary Python resources!");
@@ -308,6 +313,7 @@ public class LibraryDeployPrepareOperatorDescriptor extends AbstractLibraryOpera
                 } finally {
                     is.close();
                 }
+                return true;
             }
 
             private void writeDescriptor(FileReference descFile, LibraryDescriptor desc) throws IOException {
@@ -338,4 +344,4 @@ public class LibraryDeployPrepareOperatorDescriptor extends AbstractLibraryOpera
             }
         };
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 510db51..ce299b0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.common.controllers;
 
+import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
 import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
@@ -91,7 +92,10 @@ public class NCConfig extends ControllerConfig {
         KEY_STORE_PASSWORD(STRING, (String) null),
         IO_WORKERS_PER_PARTITION(POSITIVE_INTEGER, 2),
         IO_QUEUE_SIZE(POSITIVE_INTEGER, 10),
-        PYTHON_HOME(STRING, "/usr/bin/python3");
+        PYTHON_CMD(STRING, (String) null),
+        PYTHON_ADDITIONAL_PACKAGES(STRING, (String) null),
+        PYTHON_USE_BUNDLED_MSGPACK(BOOLEAN, true),
+        PYTHON_ARGS(STRING_ARRAY, (String[]) null);
 
         private final IOptionType parser;
         private final String defaultValueDescription;
@@ -224,8 +228,14 @@ public class NCConfig extends ControllerConfig {
                     return "Number of threads per partition used to write and read from storage";
                 case IO_QUEUE_SIZE:
                     return "Length of the queue used for requests to write and read";
-                case PYTHON_HOME:
-                    return "Path to python interpreter";
+                case PYTHON_CMD:
+                    return "Absolute path to python interpreter. Defaults to environmental Python3";
+                case PYTHON_ADDITIONAL_PACKAGES:
+                    return "List of additional paths, separated by a path separator character, to add to sys.path behind msgpack and library package paths";
+                case PYTHON_USE_BUNDLED_MSGPACK:
+                    return "True to include bundled msgpack on Python sys.path, false to use system-provided msgpack";
+                case PYTHON_ARGS:
+                    return "Python args to pass to Python interpreter";
                 default:
                     throw new IllegalStateException("Not yet implemented: " + this);
             }