You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/05 17:14:25 UTC

[GitHub] srkukarni closed pull request #2729: Added ability a download dependencies in Kubernetes

srkukarni closed pull request #2729: Added ability a download dependencies in Kubernetes
URL: https://github.com/apache/pulsar/pull/2729
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index f8a14ffc1a..2aae5f64c6 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -26,7 +26,10 @@
 import os
 import signal
 import time
-import Queue
+try:
+  import Queue as queue
+except:
+  import queue
 import threading
 from functools import partial
 from collections import namedtuple
@@ -117,7 +120,7 @@ class PythonInstance(object):
   def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client):
     self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples)
     self.user_code = user_code
-    self.queue = Queue.Queue(max_buffered_tuples)
+    self.queue = queue.Queue(max_buffered_tuples)
     self.log_topic_handler = None
     if function_details.logTopic is not None and function_details.logTopic != "":
       self.log_topic_handler = log.LogTopicHandler(str(function_details.logTopic), pulsar_client)
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 6ef74c2fba..5fc899ad84 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -71,6 +71,7 @@ def main():
   parser.add_argument('--logging_directory', required=True, help='Logging Directory')
   parser.add_argument('--logging_file', required=True, help='Log file name')
   parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int)
+  parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool)
 
   args = parser.parse_args()
   function_details = Function_pb2.FunctionDetails()
@@ -82,8 +83,11 @@ def main():
   json_format.Parse(args.function_details, function_details)
 
   if os.path.splitext(str(args.py))[1] == '.whl':
-    zpfile = zipfile.ZipFile(str(args.py), 'r')
-    zpfile.extractall(os.path.dirname(str(args.py)))
+    if args.install_usercode_dependencies:
+      os.system("pip install -t %s %s" % (os.path.dirname(str(args.py)), str(args.py)))
+    else:
+      zpfile = zipfile.ZipFile(str(args.py), 'r')
+      zpfile.extractall(os.path.dirname(str(args.py)))
     sys.path.insert(0, os.path.dirname(str(args.py)))
 
   log_file = os.path.join(args.logging_directory,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 1551f6f345..157de9e37d 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -95,6 +95,9 @@
     @Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in seconds between healtchecks", required = true)
     protected int expectedHealthCheckInterval;
 
+    @Parameter(names = "--install_usercode_dependencies", description = "Do we need to explictly install any user code dependencies(Does not apply to Java", required = false)
+    protected Boolean installUsercodeDependencies;
+
     private Server server;
     private RuntimeSpawner runtimeSpawner;
     private ThreadRuntimeFactory containerFactory;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 43f5a5d386..631b910a80 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -96,6 +96,7 @@
                       CoreV1Api coreClient,
                       String jobNamespace,
                       Map<String, String> customLabels,
+                      Boolean installUserCodeDependencies,
                       String pulsarDockerImageName,
                       String pulsarRootDir,
                       InstanceConfig instanceConfig,
@@ -118,7 +119,7 @@
         this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName;
         this.pulsarAdminUrl = pulsarAdminUrl;
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
-                authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "conf/log4j2.yaml");
+                authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "conf/log4j2.yaml", installUserCodeDependencies);
         running = false;
         doChecks(instanceConfig.getFunctionDetails());
     }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index cba9ebf2d6..dee265f346 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -45,6 +45,7 @@
     private final String pulsarDockerImageName;
     private final String pulsarRootDir;
     private final Boolean submittingInsidePod;
+    private final Boolean installUserCodeDependencies;
     private final Map<String, String> customLabels;
     private final String pulsarAdminUri;
     private final String pulsarServiceUri;
@@ -62,6 +63,7 @@ public KubernetesRuntimeFactory(String k8Uri,
                                     String pulsarDockerImageName,
                                     String pulsarRootDir,
                                     Boolean submittingInsidePod,
+                                    Boolean installUserCodeDependencies,
                                     Map<String, String> customLabels,
                                     String pulsarServiceUri,
                                     String pulsarAdminUri,
@@ -84,6 +86,7 @@ public KubernetesRuntimeFactory(String k8Uri,
             this.pulsarRootDir = "/pulsar";
         }
         this.submittingInsidePod = submittingInsidePod;
+        this.installUserCodeDependencies = installUserCodeDependencies;
         this.customLabels = customLabels;
         this.pulsarServiceUri = pulsarServiceUri;
         this.pulsarAdminUri = pulsarAdminUri;
@@ -119,6 +122,7 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c
             coreClient,
             jobNamespace,
             customLabels,
+            installUserCodeDependencies,
             pulsarDockerImageName,
             pulsarRootDir,
             instanceConfig,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 2146376d7a..35349159a3 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -74,7 +74,7 @@
         this.expectedHealthCheckInterval = expectedHealthCheckInterval;
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
                 authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval,
-                "java_instance_log4j2.yml");
+                "java_instance_log4j2.yml", false);
     }
 
     /**
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index fe2a88ee8e..33424820a2 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -47,7 +47,8 @@
                                            String shardId,
                                            Integer grpcPort,
                                            Long expectedHealthCheckInterval,
-                                           String javaLog4jFileName) throws Exception {
+                                           String javaLog4jFileName,
+                                           Boolean installUserCodeDepdendencies) throws Exception {
         List<String> args = new LinkedList<>();
         if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
             args.add("java");
@@ -130,6 +131,10 @@
         }
         args.add("--expected_healthcheck_interval");
         args.add(String.valueOf(expectedHealthCheckInterval));
+        if (installUserCodeDepdendencies != null && installUserCodeDepdendencies) {
+            args.add("--install_usercode_dependencies");
+            args.add("True");
+        }
         return args;
     }
 }
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index f4aeafc6a1..6d2fc23eed 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -72,7 +72,7 @@ public KubernetesRuntimeTest() throws Exception {
         this.stateStorageServiceUrl = "bk://localhost:4181";
         this.logDirectory = "logs/functions";
         this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir,
-            false, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null));
+            false, true, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null));
         doNothing().when(this.factory).setupClient();
     }
 
@@ -121,7 +121,7 @@ public void testJavaConstructor() throws Exception {
 
         KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 28);
+        assertEquals(args.size(), 30);
         String expectedArgs = "java -cp " + javaInstanceJarFile
                 + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
                 + " -Dlog4j.configurationFile=conf/log4j2.yaml "
@@ -135,7 +135,7 @@ public void testJavaConstructor() throws Exception {
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(23)
                 + " --state_storage_serviceurl " + stateStorageServiceUrl
-                + " --expected_healthcheck_interval -1";
+                + " --expected_healthcheck_interval -1 --install_usercode_dependencies True";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
@@ -145,7 +145,7 @@ public void testPythonConstructor() throws Exception {
 
         KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 24);
+        assertEquals(args.size(), 26);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + pulsarRootDir + "/" + userJarFile + " --logging_directory "
                 + logDirectory + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id "
@@ -154,7 +154,7 @@ public void testPythonConstructor() throws Exception {
                 + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(21)
-                + " --expected_healthcheck_interval -1";
+                + " --expected_healthcheck_interval -1 --install_usercode_dependencies True";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index e5b4a6b8d1..1b8a590baf 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -125,6 +125,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
                     workerConfig.getKubernetesContainerFactory().getPulsarDockerImageName(),
                     workerConfig.getKubernetesContainerFactory().getPulsarRootDir(),
                     workerConfig.getKubernetesContainerFactory().getSubmittingInsidePod(),
+                    workerConfig.getKubernetesContainerFactory().getInstallUserCodeDependencies(),
                     workerConfig.getKubernetesContainerFactory().getCustomLabels(),
                     StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(),
                     StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index a3c393f510..c18f824714 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -139,6 +139,7 @@
         private Boolean submittingInsidePod;
         private String pulsarServiceUrl;
         private String pulsarAdminUrl;
+        private Boolean installUserCodeDependencies;
         private Map<String, String> customLabels;
     }
     private KubernetesContainerFactory kubernetesContainerFactory;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services