You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/26 02:05:36 UTC

[pulsar] branch master updated: Add ability to specify runtime flags in functions/sources/sinks (#3872)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 41d5af7  Add ability to specify runtime flags in functions/sources/sinks (#3872)
41d5af7 is described below

commit 41d5af75250855bd851026f94807efa9dc55abe7
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon Mar 25 19:05:31 2019 -0700

    Add ability to specify runtime flags in functions/sources/sinks (#3872)
---
 .../java/org/apache/pulsar/common/functions/FunctionConfig.java   | 4 ++++
 .../src/main/java/org/apache/pulsar/common/io/SinkConfig.java     | 3 +++
 .../src/main/java/org/apache/pulsar/common/io/SourceConfig.java   | 2 ++
 pulsar-functions/proto/src/main/proto/Function.proto              | 1 +
 .../java/org/apache/pulsar/functions/runtime/RuntimeUtils.java    | 6 ++++++
 .../org/apache/pulsar/functions/utils/FunctionConfigUtils.java    | 8 ++++++++
 .../java/org/apache/pulsar/functions/utils/SinkConfigUtils.java   | 8 ++++++++
 .../java/org/apache/pulsar/functions/utils/SourceConfigUtils.java | 8 ++++++++
 .../apache/pulsar/functions/utils/FunctionConfigUtilsTest.java    | 1 +
 .../org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java    | 1 +
 .../org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java  | 1 +
 11 files changed, 43 insertions(+)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 68a9a7d..dc9023a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -48,6 +48,10 @@ public class FunctionConfig {
         PYTHON
     }
 
+    // Any flags that you want to pass to the runtime.
+    // note that in thread mode, these flags will have no impact
+    private String runtimeFlags;
+
     private String tenant;
     private String namespace;
     private String name;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index abb2067..d6dd92c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -69,4 +69,7 @@ public class SinkConfig {
     private String archive;
     // Whether the subscriptions the functions created/used should be deleted when the functions is deleted
     private Boolean cleanupSubscription;
+
+    // Any flags that you want to pass to the runtime.
+    private String runtimeFlags;
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
index 88955b8..7d557c9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
@@ -56,4 +56,6 @@ public class SourceConfig {
     private Resources resources;
 
     private String archive;
+    // Any flags that you want to pass to the runtime.
+    private String runtimeFlags;
 }
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index fba37e4..4e7dfc1 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -66,6 +66,7 @@ message FunctionDetails {
     Resources resources = 13;
     string packageUrl = 14; //present only if function submitted with package-url
     RetryDetails retryDetails = 15;
+    string runtimeFlags = 17;
 }
 
 message ConsumerSpec {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 37d2b04..b866f46 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -140,6 +140,9 @@ public class RuntimeUtils {
                     "%s-%s",
                     instanceConfig.getFunctionDetails().getName(),
                     shardId));
+            if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
+                args.add(instanceConfig.getFunctionDetails().getRuntimeFlags());
+            }
             if (instanceConfig.getFunctionDetails().getResources() != null) {
                 Function.Resources resources = instanceConfig.getFunctionDetails().getResources();
                 if (resources.getRam() != 0) {
@@ -151,6 +154,9 @@ public class RuntimeUtils {
             args.add(originalCodeFileName);
         } else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
             args.add("python");
+            if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
+                args.add(instanceConfig.getFunctionDetails().getRuntimeFlags());
+            }
             args.add(instanceFile);
             args.add("--py");
             args.add(originalCodeFileName);
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index d627700..1a41eb5 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -221,6 +221,10 @@ public class FunctionConfigUtils {
         bldr.setDisk(resources.getDisk());
         functionDetailsBuilder.setResources(bldr);
 
+        if (!StringUtils.isEmpty(functionConfig.getRuntimeFlags())) {
+            functionDetailsBuilder.setRuntimeFlags(functionConfig.getRuntimeFlags());
+        }
+
         return functionDetailsBuilder.build();
     }
 
@@ -315,6 +319,10 @@ public class FunctionConfigUtils {
             functionConfig.setResources(resources);
         }
 
+        if (!isEmpty(functionDetails.getRuntimeFlags())) {
+            functionConfig.setRuntimeFlags(functionDetails.getRuntimeFlags());
+        }
+
         return functionConfig;
     }
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index efb4133..42538a2 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -189,6 +189,10 @@ public class SinkConfigUtils {
         bldr.setDisk(resources.getDisk());
         functionDetailsBuilder.setResources(bldr);
 
+        if (isNotBlank(sinkConfig.getRuntimeFlags())) {
+            functionDetailsBuilder.setRuntimeFlags(sinkConfig.getRuntimeFlags());
+        }
+
         return functionDetailsBuilder.build();
     }
 
@@ -248,6 +252,10 @@ public class SinkConfigUtils {
             resources.setDisk(functionDetails.getResources().getDisk());
         }
 
+        if (isNotBlank(functionDetails.getRuntimeFlags())) {
+            sinkConfig.setRuntimeFlags(functionDetails.getRuntimeFlags());
+        }
+
         return sinkConfig;
     }
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 0f7ff51..f721112 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -133,6 +133,10 @@ public class SourceConfigUtils {
         bldr.setDisk(resources.getDisk());
         functionDetailsBuilder.setResources(bldr);
 
+        if (!org.apache.commons.lang3.StringUtils.isEmpty(sourceConfig.getRuntimeFlags())) {
+            functionDetailsBuilder.setRuntimeFlags(sourceConfig.getRuntimeFlags());
+        }
+
         return functionDetailsBuilder.build();
     }
 
@@ -174,6 +178,10 @@ public class SourceConfigUtils {
             resources.setDisk(functionDetails.getResources().getDisk());
             sourceConfig.setResources(resources);
         }
+
+        if (!org.apache.commons.lang3.StringUtils.isEmpty(functionDetails.getRuntimeFlags())) {
+            sourceConfig .setRuntimeFlags(functionDetails.getRuntimeFlags());
+        }
         return sourceConfig;
     }
 
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index da2a728..df1d69a 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -62,6 +62,7 @@ public class FunctionConfigUtilsTest {
         functionConfig.setUserConfig(new HashMap<>());
         functionConfig.setAutoAck(true);
         functionConfig.setTimeoutMs(2000l);
+        functionConfig.setRuntimeFlags("-DKerberos");
         Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
         FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
 
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index f147cfb..1bde64d 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -57,6 +57,7 @@ public class SinkConfigUtilsTest {
         sinkConfig.setRetainOrdering(false);
         sinkConfig.setAutoAck(true);
         sinkConfig.setTimeoutMs(2000l);
+        sinkConfig.setRuntimeFlags("-DKerberos");
         Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
         SinkConfig convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
         assertEquals(
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index cb141e8..f8c138b 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -49,6 +49,7 @@ public class SourceConfigUtilsTest {
         sourceConfig.setTopicName("test-output");
         sourceConfig.setSerdeClassName("test-serde");
         sourceConfig.setParallelism(1);
+        sourceConfig.setRuntimeFlags("-DKerberos");
         sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         sourceConfig.setConfigs(new HashMap<>());
         Function.FunctionDetails functionDetails = SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));