You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/26 02:05:36 UTC
[pulsar] branch master updated: Add ability to specify runtime
flags in functions/sources/sinks (#3872)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 41d5af7 Add ability to specify runtime flags in functions/sources/sinks (#3872)
41d5af7 is described below
commit 41d5af75250855bd851026f94807efa9dc55abe7
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon Mar 25 19:05:31 2019 -0700
Add ability to specify runtime flags in functions/sources/sinks (#3872)
---
.../java/org/apache/pulsar/common/functions/FunctionConfig.java | 4 ++++
.../src/main/java/org/apache/pulsar/common/io/SinkConfig.java | 3 +++
.../src/main/java/org/apache/pulsar/common/io/SourceConfig.java | 2 ++
pulsar-functions/proto/src/main/proto/Function.proto | 1 +
.../java/org/apache/pulsar/functions/runtime/RuntimeUtils.java | 6 ++++++
.../org/apache/pulsar/functions/utils/FunctionConfigUtils.java | 8 ++++++++
.../java/org/apache/pulsar/functions/utils/SinkConfigUtils.java | 8 ++++++++
.../java/org/apache/pulsar/functions/utils/SourceConfigUtils.java | 8 ++++++++
.../apache/pulsar/functions/utils/FunctionConfigUtilsTest.java | 1 +
.../org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java | 1 +
.../org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java | 1 +
11 files changed, 43 insertions(+)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 68a9a7d..dc9023a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -48,6 +48,10 @@ public class FunctionConfig {
PYTHON
}
+ // Any flags that you want to pass to the runtime.
+ // note that in thread mode, these flags will have no impact
+ private String runtimeFlags;
+
private String tenant;
private String namespace;
private String name;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index abb2067..d6dd92c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -69,4 +69,7 @@ public class SinkConfig {
private String archive;
// Whether the subscriptions the functions created/used should be deleted when the functions is deleted
private Boolean cleanupSubscription;
+
+ // Any flags that you want to pass to the runtime.
+ private String runtimeFlags;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
index 88955b8..7d557c9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
@@ -56,4 +56,6 @@ public class SourceConfig {
private Resources resources;
private String archive;
+ // Any flags that you want to pass to the runtime.
+ private String runtimeFlags;
}
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index fba37e4..4e7dfc1 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -66,6 +66,7 @@ message FunctionDetails {
Resources resources = 13;
string packageUrl = 14; //present only if function submitted with package-url
RetryDetails retryDetails = 15;
+ string runtimeFlags = 17;
}
message ConsumerSpec {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 37d2b04..b866f46 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -140,6 +140,9 @@ public class RuntimeUtils {
"%s-%s",
instanceConfig.getFunctionDetails().getName(),
shardId));
+ if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
+ args.add(instanceConfig.getFunctionDetails().getRuntimeFlags());
+ }
if (instanceConfig.getFunctionDetails().getResources() != null) {
Function.Resources resources = instanceConfig.getFunctionDetails().getResources();
if (resources.getRam() != 0) {
@@ -151,6 +154,9 @@ public class RuntimeUtils {
args.add(originalCodeFileName);
} else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
args.add("python");
+ if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
+ args.add(instanceConfig.getFunctionDetails().getRuntimeFlags());
+ }
args.add(instanceFile);
args.add("--py");
args.add(originalCodeFileName);
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index d627700..1a41eb5 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -221,6 +221,10 @@ public class FunctionConfigUtils {
bldr.setDisk(resources.getDisk());
functionDetailsBuilder.setResources(bldr);
+ if (!StringUtils.isEmpty(functionConfig.getRuntimeFlags())) {
+ functionDetailsBuilder.setRuntimeFlags(functionConfig.getRuntimeFlags());
+ }
+
return functionDetailsBuilder.build();
}
@@ -315,6 +319,10 @@ public class FunctionConfigUtils {
functionConfig.setResources(resources);
}
+ if (!isEmpty(functionDetails.getRuntimeFlags())) {
+ functionConfig.setRuntimeFlags(functionDetails.getRuntimeFlags());
+ }
+
return functionConfig;
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index efb4133..42538a2 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -189,6 +189,10 @@ public class SinkConfigUtils {
bldr.setDisk(resources.getDisk());
functionDetailsBuilder.setResources(bldr);
+ if (isNotBlank(sinkConfig.getRuntimeFlags())) {
+ functionDetailsBuilder.setRuntimeFlags(sinkConfig.getRuntimeFlags());
+ }
+
return functionDetailsBuilder.build();
}
@@ -248,6 +252,10 @@ public class SinkConfigUtils {
resources.setDisk(functionDetails.getResources().getDisk());
}
+ if (isNotBlank(functionDetails.getRuntimeFlags())) {
+ sinkConfig.setRuntimeFlags(functionDetails.getRuntimeFlags());
+ }
+
return sinkConfig;
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 0f7ff51..f721112 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -133,6 +133,10 @@ public class SourceConfigUtils {
bldr.setDisk(resources.getDisk());
functionDetailsBuilder.setResources(bldr);
+ if (!org.apache.commons.lang3.StringUtils.isEmpty(sourceConfig.getRuntimeFlags())) {
+ functionDetailsBuilder.setRuntimeFlags(sourceConfig.getRuntimeFlags());
+ }
+
return functionDetailsBuilder.build();
}
@@ -174,6 +178,10 @@ public class SourceConfigUtils {
resources.setDisk(functionDetails.getResources().getDisk());
sourceConfig.setResources(resources);
}
+
+ if (!org.apache.commons.lang3.StringUtils.isEmpty(functionDetails.getRuntimeFlags())) {
+ sourceConfig .setRuntimeFlags(functionDetails.getRuntimeFlags());
+ }
return sourceConfig;
}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index da2a728..df1d69a 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -62,6 +62,7 @@ public class FunctionConfigUtilsTest {
functionConfig.setUserConfig(new HashMap<>());
functionConfig.setAutoAck(true);
functionConfig.setTimeoutMs(2000l);
+ functionConfig.setRuntimeFlags("-DKerberos");
Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index f147cfb..1bde64d 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -57,6 +57,7 @@ public class SinkConfigUtilsTest {
sinkConfig.setRetainOrdering(false);
sinkConfig.setAutoAck(true);
sinkConfig.setTimeoutMs(2000l);
+ sinkConfig.setRuntimeFlags("-DKerberos");
Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
SinkConfig convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index cb141e8..f8c138b 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -49,6 +49,7 @@ public class SourceConfigUtilsTest {
sourceConfig.setTopicName("test-output");
sourceConfig.setSerdeClassName("test-serde");
sourceConfig.setParallelism(1);
+ sourceConfig.setRuntimeFlags("-DKerberos");
sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
sourceConfig.setConfigs(new HashMap<>());
Function.FunctionDetails functionDetails = SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));