You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/24 00:28:10 UTC

[pulsar] branch master updated: Allow the ability to specify which artifactory to download dep from (#2824)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3db8b73  Allow the ability to specify which artifactory to download dep from (#2824)
3db8b73 is described below

commit 3db8b7392b1be22544117ea336dd46cc85c61ba0
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue Oct 23 17:28:04 2018 -0700

    Allow the ability to specify which artifactory to download dep from (#2824)
    
    * Allow the ability to specify which artifactory to download dep from
    
    * reverted earlier changes
    
    * Made python dependency repository a config of k8runtime
    
    * Fixed unittest
    
    * Added extra dependency
    
    * Fixed cmd lines
---
 .../instance/src/main/python/python_instance_main.py        | 11 ++++++++++-
 .../apache/pulsar/functions/runtime/KubernetesRuntime.java  |  5 ++++-
 .../pulsar/functions/runtime/KubernetesRuntimeFactory.java  |  8 ++++++++
 .../org/apache/pulsar/functions/runtime/ProcessRuntime.java |  2 +-
 .../org/apache/pulsar/functions/runtime/RuntimeUtils.java   | 13 ++++++++++++-
 .../pulsar/functions/runtime/KubernetesRuntimeTest.java     |  8 +++++---
 .../pulsar/functions/worker/FunctionRuntimeManager.java     |  2 ++
 .../org/apache/pulsar/functions/worker/WorkerConfig.java    |  2 ++
 8 files changed, 44 insertions(+), 7 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 5fc899a..1bfc793 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -72,6 +72,9 @@ def main():
   parser.add_argument('--logging_file', required=True, help='Log file name')
   parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int)
   parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool)
+  parser.add_argument('--dependency_repository', required=False, help='For packaged python like wheel files, which repository to pull the dependencies from')
+  parser.add_argument('--extra_dependency_repository', required=False, help='For packaged python like wheel files, any extra repository to pull the dependencies from')
+
 
   args = parser.parse_args()
   function_details = Function_pb2.FunctionDetails()
@@ -84,7 +87,13 @@ def main():
 
   if os.path.splitext(str(args.py))[1] == '.whl':
     if args.install_usercode_dependencies:
-      os.system("pip install -t %s %s" % (os.path.dirname(str(args.py)), str(args.py)))
+      cmd = "pip install -t %s" % os.path.dirname(str(args.py))
+      if args.dependency_repository:
+        cmd = cmd + " -i %s" % str(args.dependency_repository)
+      if args.extra_dependency_repository:
+        cmd = cmd + " --extra-index-url %s" % str(args.extra_dependency_repository)
+      cmd = cmd + " %s" % str(args.py)
+      os.system(cmd)
     else:
       zpfile = zipfile.ZipFile(str(args.py), 'r')
       zpfile.extractall(os.path.dirname(str(args.py)))
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index ddfe36f..ee7125f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -105,6 +105,8 @@ class KubernetesRuntime implements Runtime {
                       String jobNamespace,
                       Map<String, String> customLabels,
                       Boolean installUserCodeDependencies,
+                      String pythonDependencyRepository,
+                      String pythonExtraDependencyRepository,
                       String pulsarDockerImageName,
                       String pulsarRootDir,
                       InstanceConfig instanceConfig,
@@ -129,7 +131,8 @@ class KubernetesRuntime implements Runtime {
         this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName;
         this.pulsarAdminUrl = pulsarAdminUrl;
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
-                authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "kubernetes_instance_log4j2.yml", installUserCodeDependencies);
+                authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "kubernetes_instance_log4j2.yml", installUserCodeDependencies,
+                pythonDependencyRepository, pythonExtraDependencyRepository);
         this.prometheusMetricsServerArgs = composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, expectedMetricsInterval);
         running = false;
         doChecks(instanceConfig.getFunctionDetails());
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index b257cbf..1a180ae 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -46,6 +46,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
     private final String pulsarRootDir;
     private final Boolean submittingInsidePod;
     private final Boolean installUserCodeDependencies;
+    private final String pythonDependencyRepository;
+    private final String pythonExtraDependencyRepository;
     private final Map<String, String> customLabels;
     private final String pulsarAdminUri;
     private final String pulsarServiceUri;
@@ -66,6 +68,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
                                     String pulsarRootDir,
                                     Boolean submittingInsidePod,
                                     Boolean installUserCodeDependencies,
+                                    String pythonDependencyRepository,
+                                    String pythonExtraDependencyRepository,
                                     Map<String, String> customLabels,
                                     String pulsarServiceUri,
                                     String pulsarAdminUri,
@@ -90,6 +94,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
         }
         this.submittingInsidePod = submittingInsidePod;
         this.installUserCodeDependencies = installUserCodeDependencies;
+        this.pythonDependencyRepository = pythonDependencyRepository;
+        this.pythonExtraDependencyRepository = pythonExtraDependencyRepository;
         this.customLabels = customLabels;
         this.pulsarServiceUri = pulsarServiceUri;
         this.pulsarAdminUri = pulsarAdminUri;
@@ -128,6 +134,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
             jobNamespace,
             customLabels,
             installUserCodeDependencies,
+            pythonDependencyRepository,
+            pythonExtraDependencyRepository,
             pulsarDockerImageName,
             pulsarRootDir,
             instanceConfig,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index bb77768..1cf0db9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -75,7 +75,7 @@ class ProcessRuntime implements Runtime {
         this.expectedHealthCheckInterval = expectedHealthCheckInterval;
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
                 authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval,
-                "java_instance_log4j2.yml", false);
+                "java_instance_log4j2.yml", false, null, null);
     }
 
     /**
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 0572ed6..00a04b5 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
 
 import java.util.*;
 
+import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
 /**
@@ -48,7 +49,9 @@ class RuntimeUtils {
                                            Integer grpcPort,
                                            Long expectedHealthCheckInterval,
                                            String javaLog4jFileName,
-                                           Boolean installUserCodeDepdendencies) throws Exception {
+                                           Boolean installUserCodeDepdendencies,
+                                           String pythonDependencyRepository,
+                                           String pythonExtraDependencyRepository) throws Exception {
         List<String> args = new LinkedList<>();
         if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
             args.add("java");
@@ -90,6 +93,14 @@ class RuntimeUtils {
                 args.add("--install_usercode_dependencies");
                 args.add("True");
             }
+            if (!isEmpty(pythonDependencyRepository)) {
+                args.add("--dependency_repository");
+                args.add(pythonDependencyRepository);
+            }
+            if (!isEmpty(pythonExtraDependencyRepository)) {
+                args.add("--extra_dependency_repository");
+                args.add(pythonExtraDependencyRepository);
+            }
             // TODO:- Find a platform independent way of controlling memory for a python application
         }
         args.add("--instance_id");
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 61cacca..bc2a823 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -72,7 +72,7 @@ public class KubernetesRuntimeTest {
         this.stateStorageServiceUrl = "bk://localhost:4181";
         this.logDirectory = "logs/functions";
         this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir,
-            false, true, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, null));
+            false, true, "myrepo", "anotherrepo",  null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, null));
         doNothing().when(this.factory).setupClient();
     }
 
@@ -145,18 +145,20 @@ public class KubernetesRuntimeTest {
 
         KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 26);
+        assertEquals(args.size(), 30);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + pulsarRootDir + "/" + userJarFile
                 + " --logging_directory " + logDirectory
                 + " --logging_file " + config.getFunctionDetails().getName()
                 + " --install_usercode_dependencies True"
+                + " --dependency_repository myrepo"
+                + " --extra_dependency_repository anotherrepo"
                 + " --instance_id " + "$SHARD_ID"
                 + " --function_id " + config.getFunctionId()
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(23)
+                + " --max_buffered_tuples 1024 --port " + args.get(27)
                 + " --expected_healthcheck_interval -1";
         assertEquals(String.join(" ", args), expectedArgs);
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index ee57659..f7dd4bc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -133,6 +133,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
                     workerConfig.getKubernetesContainerFactory().getPulsarRootDir(),
                     workerConfig.getKubernetesContainerFactory().getSubmittingInsidePod(),
                     workerConfig.getKubernetesContainerFactory().getInstallUserCodeDependencies(),
+                    workerConfig.getKubernetesContainerFactory().getPythonDependencyRepository(),
+                    workerConfig.getKubernetesContainerFactory().getPythonExtraDependencyRepository(),
                     workerConfig.getKubernetesContainerFactory().getCustomLabels(),
                     StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(),
                     StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index bc97969..2b3e816 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -140,6 +140,8 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
         private String pulsarServiceUrl;
         private String pulsarAdminUrl;
         private Boolean installUserCodeDependencies;
+        private String pythonDependencyRepository;
+        private String pythonExtraDependencyRepository;
         private Map<String, String> customLabels;
         private Integer expectedMetricsCollectionInterval;
     }