You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/05 17:14:42 UTC
[pulsar] branch master updated: Added ability a download
dependencies in Kubernetes (#2729)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new da83322 Added ability a download dependencies in Kubernetes (#2729)
da83322 is described below
commit da833223404e6ea4ce2a13f65e9f74bb0db783a1
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Oct 5 10:14:23 2018 -0700
Added ability a download dependencies in Kubernetes (#2729)
* Add a config to allow user code dependencies to be installed at runtime.
Enable it only for Kubernetes runtime
* Fix bug
* Fixed indentation issue
* Specify the right cmd line
* Install dep only in the temp dir
* Queue is not in python3. Thus add a try catch block
* Fixed unittest
---
pulsar-functions/instance/src/main/python/python_instance.py | 7 +++++--
.../instance/src/main/python/python_instance_main.py | 8 ++++++--
.../org/apache/pulsar/functions/runtime/JavaInstanceMain.java | 3 +++
.../org/apache/pulsar/functions/runtime/KubernetesRuntime.java | 3 ++-
.../pulsar/functions/runtime/KubernetesRuntimeFactory.java | 4 ++++
.../org/apache/pulsar/functions/runtime/ProcessRuntime.java | 2 +-
.../java/org/apache/pulsar/functions/runtime/RuntimeUtils.java | 7 ++++++-
.../apache/pulsar/functions/runtime/KubernetesRuntimeTest.java | 10 +++++-----
.../apache/pulsar/functions/worker/FunctionRuntimeManager.java | 1 +
.../java/org/apache/pulsar/functions/worker/WorkerConfig.java | 1 +
10 files changed, 34 insertions(+), 12 deletions(-)
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index f8a14ff..2aae5f6 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -26,7 +26,10 @@ import base64
import os
import signal
import time
-import Queue
+try:
+ import Queue as queue
+except:
+ import queue
import threading
from functools import partial
from collections import namedtuple
@@ -117,7 +120,7 @@ class PythonInstance(object):
def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client):
self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples)
self.user_code = user_code
- self.queue = Queue.Queue(max_buffered_tuples)
+ self.queue = queue.Queue(max_buffered_tuples)
self.log_topic_handler = None
if function_details.logTopic is not None and function_details.logTopic != "":
self.log_topic_handler = log.LogTopicHandler(str(function_details.logTopic), pulsar_client)
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 6ef74c2..5fc899a 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -71,6 +71,7 @@ def main():
parser.add_argument('--logging_directory', required=True, help='Logging Directory')
parser.add_argument('--logging_file', required=True, help='Log file name')
parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int)
+ parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool)
args = parser.parse_args()
function_details = Function_pb2.FunctionDetails()
@@ -82,8 +83,11 @@ def main():
json_format.Parse(args.function_details, function_details)
if os.path.splitext(str(args.py))[1] == '.whl':
- zpfile = zipfile.ZipFile(str(args.py), 'r')
- zpfile.extractall(os.path.dirname(str(args.py)))
+ if args.install_usercode_dependencies:
+ os.system("pip install -t %s %s" % (os.path.dirname(str(args.py)), str(args.py)))
+ else:
+ zpfile = zipfile.ZipFile(str(args.py), 'r')
+ zpfile.extractall(os.path.dirname(str(args.py)))
sys.path.insert(0, os.path.dirname(str(args.py)))
log_file = os.path.join(args.logging_directory,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 1551f6f..157de9e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -95,6 +95,9 @@ public class JavaInstanceMain implements AutoCloseable {
@Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in seconds between healtchecks", required = true)
protected int expectedHealthCheckInterval;
+ @Parameter(names = "--install_usercode_dependencies", description = "Do we need to explictly install any user code dependencies(Does not apply to Java", required = false)
+ protected Boolean installUsercodeDependencies;
+
private Server server;
private RuntimeSpawner runtimeSpawner;
private ThreadRuntimeFactory containerFactory;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 43f5a5d..631b910 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -96,6 +96,7 @@ class KubernetesRuntime implements Runtime {
CoreV1Api coreClient,
String jobNamespace,
Map<String, String> customLabels,
+ Boolean installUserCodeDependencies,
String pulsarDockerImageName,
String pulsarRootDir,
InstanceConfig instanceConfig,
@@ -118,7 +119,7 @@ class KubernetesRuntime implements Runtime {
this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName;
this.pulsarAdminUrl = pulsarAdminUrl;
this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
- authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "conf/log4j2.yaml");
+ authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "conf/log4j2.yaml", installUserCodeDependencies);
running = false;
doChecks(instanceConfig.getFunctionDetails());
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index cba9ebf..dee265f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -45,6 +45,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
private final String pulsarDockerImageName;
private final String pulsarRootDir;
private final Boolean submittingInsidePod;
+ private final Boolean installUserCodeDependencies;
private final Map<String, String> customLabels;
private final String pulsarAdminUri;
private final String pulsarServiceUri;
@@ -62,6 +63,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
String pulsarDockerImageName,
String pulsarRootDir,
Boolean submittingInsidePod,
+ Boolean installUserCodeDependencies,
Map<String, String> customLabels,
String pulsarServiceUri,
String pulsarAdminUri,
@@ -84,6 +86,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
this.pulsarRootDir = "/pulsar";
}
this.submittingInsidePod = submittingInsidePod;
+ this.installUserCodeDependencies = installUserCodeDependencies;
this.customLabels = customLabels;
this.pulsarServiceUri = pulsarServiceUri;
this.pulsarAdminUri = pulsarAdminUri;
@@ -119,6 +122,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
coreClient,
jobNamespace,
customLabels,
+ installUserCodeDependencies,
pulsarDockerImageName,
pulsarRootDir,
instanceConfig,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 2146376..3534915 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -74,7 +74,7 @@ class ProcessRuntime implements Runtime {
this.expectedHealthCheckInterval = expectedHealthCheckInterval;
this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval,
- "java_instance_log4j2.yml");
+ "java_instance_log4j2.yml", false);
}
/**
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index fe2a88e..3342482 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -47,7 +47,8 @@ class RuntimeUtils {
String shardId,
Integer grpcPort,
Long expectedHealthCheckInterval,
- String javaLog4jFileName) throws Exception {
+ String javaLog4jFileName,
+ Boolean installUserCodeDepdendencies) throws Exception {
List<String> args = new LinkedList<>();
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
args.add("java");
@@ -130,6 +131,10 @@ class RuntimeUtils {
}
args.add("--expected_healthcheck_interval");
args.add(String.valueOf(expectedHealthCheckInterval));
+ if (installUserCodeDepdendencies != null && installUserCodeDepdendencies) {
+ args.add("--install_usercode_dependencies");
+ args.add("True");
+ }
return args;
}
}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index f4aeafc..6d2fc23 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -72,7 +72,7 @@ public class KubernetesRuntimeTest {
this.stateStorageServiceUrl = "bk://localhost:4181";
this.logDirectory = "logs/functions";
this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir,
- false, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null));
+ false, true, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null));
doNothing().when(this.factory).setupClient();
}
@@ -121,7 +121,7 @@ public class KubernetesRuntimeTest {
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 28);
+ assertEquals(args.size(), 30);
String expectedArgs = "java -cp " + javaInstanceJarFile
+ " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
+ " -Dlog4j.configurationFile=conf/log4j2.yaml "
@@ -135,7 +135,7 @@ public class KubernetesRuntimeTest {
+ "' --pulsar_serviceurl " + pulsarServiceUrl
+ " --max_buffered_tuples 1024 --port " + args.get(23)
+ " --state_storage_serviceurl " + stateStorageServiceUrl
- + " --expected_healthcheck_interval -1";
+ + " --expected_healthcheck_interval -1 --install_usercode_dependencies True";
assertEquals(String.join(" ", args), expectedArgs);
}
@@ -145,7 +145,7 @@ public class KubernetesRuntimeTest {
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 24);
+ assertEquals(args.size(), 26);
String expectedArgs = "python " + pythonInstanceFile
+ " --py " + pulsarRootDir + "/" + userJarFile + " --logging_directory "
+ logDirectory + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id "
@@ -154,7 +154,7 @@ public class KubernetesRuntimeTest {
+ " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
+ " --max_buffered_tuples 1024 --port " + args.get(21)
- + " --expected_healthcheck_interval -1";
+ + " --expected_healthcheck_interval -1 --install_usercode_dependencies True";
assertEquals(String.join(" ", args), expectedArgs);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index e5b4a6b..1b8a590 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -125,6 +125,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
workerConfig.getKubernetesContainerFactory().getPulsarDockerImageName(),
workerConfig.getKubernetesContainerFactory().getPulsarRootDir(),
workerConfig.getKubernetesContainerFactory().getSubmittingInsidePod(),
+ workerConfig.getKubernetesContainerFactory().getInstallUserCodeDependencies(),
workerConfig.getKubernetesContainerFactory().getCustomLabels(),
StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(),
StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index a3c393f..c18f824 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -139,6 +139,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
private Boolean submittingInsidePod;
private String pulsarServiceUrl;
private String pulsarAdminUrl;
+ private Boolean installUserCodeDependencies;
private Map<String, String> customLabels;
}
private KubernetesContainerFactory kubernetesContainerFactory;