You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/24 00:28:10 UTC
[pulsar] branch master updated: Allow the ability to specify which
artifactory to download dep from (#2824)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3db8b73 Allow the ability to specify which artifactory to download dep from (#2824)
3db8b73 is described below
commit 3db8b7392b1be22544117ea336dd46cc85c61ba0
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue Oct 23 17:28:04 2018 -0700
Allow the ability to specify which artifactory to download dep from (#2824)
* Allow the ability to specify which artifactory to download dep from
* reverted earlier changes
* Made python dependency repository a config of k8runtime
* Fixed unittest
* Added extra dependency
* Fixed cmd lines
---
.../instance/src/main/python/python_instance_main.py | 11 ++++++++++-
.../apache/pulsar/functions/runtime/KubernetesRuntime.java | 5 ++++-
.../pulsar/functions/runtime/KubernetesRuntimeFactory.java | 8 ++++++++
.../org/apache/pulsar/functions/runtime/ProcessRuntime.java | 2 +-
.../org/apache/pulsar/functions/runtime/RuntimeUtils.java | 13 ++++++++++++-
.../pulsar/functions/runtime/KubernetesRuntimeTest.java | 8 +++++---
.../pulsar/functions/worker/FunctionRuntimeManager.java | 2 ++
.../org/apache/pulsar/functions/worker/WorkerConfig.java | 2 ++
8 files changed, 44 insertions(+), 7 deletions(-)
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 5fc899a..1bfc793 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -72,6 +72,9 @@ def main():
parser.add_argument('--logging_file', required=True, help='Log file name')
parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int)
parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool)
+ parser.add_argument('--dependency_repository', required=False, help='For packaged python like wheel files, which repository to pull the dependencies from')
+ parser.add_argument('--extra_dependency_repository', required=False, help='For packaged python like wheel files, any extra repository to pull the dependencies from')
+
args = parser.parse_args()
function_details = Function_pb2.FunctionDetails()
@@ -84,7 +87,13 @@ def main():
if os.path.splitext(str(args.py))[1] == '.whl':
if args.install_usercode_dependencies:
- os.system("pip install -t %s %s" % (os.path.dirname(str(args.py)), str(args.py)))
+ cmd = "pip install -t %s" % os.path.dirname(str(args.py))
+ if args.dependency_repository:
+ cmd = cmd + " -i %s" % str(args.dependency_repository)
+ if args.extra_dependency_repository:
+ cmd = cmd + " --extra-index-url %s" % str(args.extra_dependency_repository)
+ cmd = cmd + " %s" % str(args.py)
+ os.system(cmd)
else:
zpfile = zipfile.ZipFile(str(args.py), 'r')
zpfile.extractall(os.path.dirname(str(args.py)))
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index ddfe36f..ee7125f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -105,6 +105,8 @@ class KubernetesRuntime implements Runtime {
String jobNamespace,
Map<String, String> customLabels,
Boolean installUserCodeDependencies,
+ String pythonDependencyRepository,
+ String pythonExtraDependencyRepository,
String pulsarDockerImageName,
String pulsarRootDir,
InstanceConfig instanceConfig,
@@ -129,7 +131,8 @@ class KubernetesRuntime implements Runtime {
this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName;
this.pulsarAdminUrl = pulsarAdminUrl;
this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
- authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "kubernetes_instance_log4j2.yml", installUserCodeDependencies);
+ authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "kubernetes_instance_log4j2.yml", installUserCodeDependencies,
+ pythonDependencyRepository, pythonExtraDependencyRepository);
this.prometheusMetricsServerArgs = composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, expectedMetricsInterval);
running = false;
doChecks(instanceConfig.getFunctionDetails());
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index b257cbf..1a180ae 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -46,6 +46,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
private final String pulsarRootDir;
private final Boolean submittingInsidePod;
private final Boolean installUserCodeDependencies;
+ private final String pythonDependencyRepository;
+ private final String pythonExtraDependencyRepository;
private final Map<String, String> customLabels;
private final String pulsarAdminUri;
private final String pulsarServiceUri;
@@ -66,6 +68,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
String pulsarRootDir,
Boolean submittingInsidePod,
Boolean installUserCodeDependencies,
+ String pythonDependencyRepository,
+ String pythonExtraDependencyRepository,
Map<String, String> customLabels,
String pulsarServiceUri,
String pulsarAdminUri,
@@ -90,6 +94,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
}
this.submittingInsidePod = submittingInsidePod;
this.installUserCodeDependencies = installUserCodeDependencies;
+ this.pythonDependencyRepository = pythonDependencyRepository;
+ this.pythonExtraDependencyRepository = pythonExtraDependencyRepository;
this.customLabels = customLabels;
this.pulsarServiceUri = pulsarServiceUri;
this.pulsarAdminUri = pulsarAdminUri;
@@ -128,6 +134,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
jobNamespace,
customLabels,
installUserCodeDependencies,
+ pythonDependencyRepository,
+ pythonExtraDependencyRepository,
pulsarDockerImageName,
pulsarRootDir,
instanceConfig,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index bb77768..1cf0db9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -75,7 +75,7 @@ class ProcessRuntime implements Runtime {
this.expectedHealthCheckInterval = expectedHealthCheckInterval;
this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval,
- "java_instance_log4j2.yml", false);
+ "java_instance_log4j2.yml", false, null, null);
}
/**
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 0572ed6..00a04b5 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import java.util.*;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
/**
@@ -48,7 +49,9 @@ class RuntimeUtils {
Integer grpcPort,
Long expectedHealthCheckInterval,
String javaLog4jFileName,
- Boolean installUserCodeDepdendencies) throws Exception {
+ Boolean installUserCodeDepdendencies,
+ String pythonDependencyRepository,
+ String pythonExtraDependencyRepository) throws Exception {
List<String> args = new LinkedList<>();
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
args.add("java");
@@ -90,6 +93,14 @@ class RuntimeUtils {
args.add("--install_usercode_dependencies");
args.add("True");
}
+ if (!isEmpty(pythonDependencyRepository)) {
+ args.add("--dependency_repository");
+ args.add(pythonDependencyRepository);
+ }
+ if (!isEmpty(pythonExtraDependencyRepository)) {
+ args.add("--extra_dependency_repository");
+ args.add(pythonExtraDependencyRepository);
+ }
// TODO:- Find a platform independent way of controlling memory for a python application
}
args.add("--instance_id");
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 61cacca..bc2a823 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -72,7 +72,7 @@ public class KubernetesRuntimeTest {
this.stateStorageServiceUrl = "bk://localhost:4181";
this.logDirectory = "logs/functions";
this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir,
- false, true, null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, null));
+ false, true, "myrepo", "anotherrepo", null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, null));
doNothing().when(this.factory).setupClient();
}
@@ -145,18 +145,20 @@ public class KubernetesRuntimeTest {
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 26);
+ assertEquals(args.size(), 30);
String expectedArgs = "python " + pythonInstanceFile
+ " --py " + pulsarRootDir + "/" + userJarFile
+ " --logging_directory " + logDirectory
+ " --logging_file " + config.getFunctionDetails().getName()
+ " --install_usercode_dependencies True"
+ + " --dependency_repository myrepo"
+ + " --extra_dependency_repository anotherrepo"
+ " --instance_id " + "$SHARD_ID"
+ " --function_id " + config.getFunctionId()
+ " --function_version " + config.getFunctionVersion()
+ " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(23)
+ + " --max_buffered_tuples 1024 --port " + args.get(27)
+ " --expected_healthcheck_interval -1";
assertEquals(String.join(" ", args), expectedArgs);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index ee57659..f7dd4bc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -133,6 +133,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
workerConfig.getKubernetesContainerFactory().getPulsarRootDir(),
workerConfig.getKubernetesContainerFactory().getSubmittingInsidePod(),
workerConfig.getKubernetesContainerFactory().getInstallUserCodeDependencies(),
+ workerConfig.getKubernetesContainerFactory().getPythonDependencyRepository(),
+ workerConfig.getKubernetesContainerFactory().getPythonExtraDependencyRepository(),
workerConfig.getKubernetesContainerFactory().getCustomLabels(),
StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(),
StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index bc97969..2b3e816 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -140,6 +140,8 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
private String pulsarServiceUrl;
private String pulsarAdminUrl;
private Boolean installUserCodeDependencies;
+ private String pythonDependencyRepository;
+ private String pythonExtraDependencyRepository;
private Map<String, String> customLabels;
private Integer expectedMetricsCollectionInterval;
}