You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/24 09:54:19 UTC

[pulsar] 05/06: [pulsar-functions-go] support set subscription position (#11990)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d5d40a2068485c341a72f223e7f1db76d1bedd30
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Fri Sep 24 10:27:40 2021 +0800

    [pulsar-functions-go] support set subscription position (#11990)
    
    * stash
    
    * set default value
    
    (cherry picked from commit 652d1546569bd7bc2eabb0c258d7e5dc038bb334)
---
 pulsar-function-go/conf/conf.go                                  | 9 +++++----
 pulsar-function-go/conf/conf.yaml                                | 1 +
 pulsar-function-go/pf/instanceConf.go                            | 7 ++++---
 pulsar-function-go/pf/instanceConf_test.go                       | 7 ++++---
 .../apache/pulsar/functions/instance/go/GoInstanceConfig.java    | 2 ++
 .../java/org/apache/pulsar/functions/runtime/RuntimeUtils.java   | 2 ++
 6 files changed, 18 insertions(+), 10 deletions(-)

diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index c0d1262..dfbf542 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -52,10 +52,11 @@ type Conf struct {
 	AutoACK              bool   `json:"autoAck" yaml:"autoAck"`
 	Parallelism          int32  `json:"parallelism" yaml:"parallelism"`
 	//source config
-	SubscriptionType    int32  `json:"subscriptionType" yaml:"subscriptionType"`
-	TimeoutMs           uint64 `json:"timeoutMs" yaml:"timeoutMs"`
-	SubscriptionName    string `json:"subscriptionName" yaml:"subscriptionName"`
-	CleanupSubscription bool   `json:"cleanupSubscription"  yaml:"cleanupSubscription"`
+	SubscriptionType     int32  `json:"subscriptionType" yaml:"subscriptionType"`
+	TimeoutMs            uint64 `json:"timeoutMs" yaml:"timeoutMs"`
+	SubscriptionName     string `json:"subscriptionName" yaml:"subscriptionName"`
+	CleanupSubscription  bool   `json:"cleanupSubscription"  yaml:"cleanupSubscription"`
+	SubscriptionPosition int32  `json:"subscriptionPosition" yaml:"subscriptionPosition"`
 	//source input specs
 	SourceSpecTopic            string `json:"sourceSpecsTopic" yaml:"sourceSpecsTopic"`
 	SourceSchemaType           string `json:"sourceSchemaType" yaml:"sourceSchemaType"`
diff --git a/pulsar-function-go/conf/conf.yaml b/pulsar-function-go/conf/conf.yaml
index 7809ab5..59ac9bb 100644
--- a/pulsar-function-go/conf/conf.yaml
+++ b/pulsar-function-go/conf/conf.yaml
@@ -41,6 +41,7 @@ subscriptionType: 0
 timeoutMs: 0
 subscriptionName: ""
 cleanupSubscription: false
+subscriptionPosition: 1
 # source input specs
 sourceSpecsTopic: persistent://public/default/topic-01
 sourceSchemaType: ""
diff --git a/pulsar-function-go/pf/instanceConf.go b/pulsar-function-go/pf/instanceConf.go
index 1227024..2fc8d95 100644
--- a/pulsar-function-go/pf/instanceConf.go
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -82,9 +82,10 @@ func newInstanceConf() *instanceConf {
 						},
 					},
 				},
-				TimeoutMs:           cfg.TimeoutMs,
-				SubscriptionName:    cfg.SubscriptionName,
-				CleanupSubscription: cfg.CleanupSubscription,
+				TimeoutMs:            cfg.TimeoutMs,
+				SubscriptionName:     cfg.SubscriptionName,
+				CleanupSubscription:  cfg.CleanupSubscription,
+				SubscriptionPosition: pb.SubscriptionPosition(cfg.SubscriptionPosition),
 			},
 			Sink: &pb.SinkSpec{
 				Topic:      cfg.SinkSpecTopic,
diff --git a/pulsar-function-go/pf/instanceConf_test.go b/pulsar-function-go/pf/instanceConf_test.go
index be93239..fa87002 100644
--- a/pulsar-function-go/pf/instanceConf_test.go
+++ b/pulsar-function-go/pf/instanceConf_test.go
@@ -62,9 +62,10 @@ func Test_newInstanceConf(t *testing.T) {
 							},
 						},
 					},
-					TimeoutMs:           0,
-					SubscriptionName:    "",
-					CleanupSubscription: false,
+					TimeoutMs:            0,
+					SubscriptionName:     "",
+					CleanupSubscription:  false,
+					SubscriptionPosition: pb.SubscriptionPosition_EARLIEST,
 				},
 				Sink: &pb.SinkSpec{
 					Topic:      "persistent://public/default/topic-02",
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
index 0cb1de2..85e73d3 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.instance.go;
 
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.pulsar.functions.proto.Function;
 
 @Setter
 @Getter
@@ -50,6 +51,7 @@ public class GoInstanceConfig {
     private long timeoutMs;
     private String subscriptionName = "";
     private boolean cleanupSubscription;
+    private int subscriptionPosition = Function.SubscriptionPosition.LATEST.getNumber();
 
     private String sourceSpecsTopic = "";
     private String sourceSchemaType = "";
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 592cd55..107d5cf 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -196,6 +196,8 @@ public class RuntimeUtils {
         if (instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null) {
             goInstanceConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName());
         }
+        goInstanceConfig.setSubscriptionPosition(
+                instanceConfig.getFunctionDetails().getSource().getSubscriptionPosition().getNumber());
 
         if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap() != null) {
             for (String inputTopic : instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().keySet()) {