You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/24 09:54:19 UTC
[pulsar] 05/06: [pulsar-functions-go] support set subscription
position (#11990)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d5d40a2068485c341a72f223e7f1db76d1bedd30
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Fri Sep 24 10:27:40 2021 +0800
[pulsar-functions-go] support set subscription position (#11990)
* stash
* set default value
(cherry picked from commit 652d1546569bd7bc2eabb0c258d7e5dc038bb334)
---
pulsar-function-go/conf/conf.go | 9 +++++----
pulsar-function-go/conf/conf.yaml | 1 +
pulsar-function-go/pf/instanceConf.go | 7 ++++---
pulsar-function-go/pf/instanceConf_test.go | 7 ++++---
.../apache/pulsar/functions/instance/go/GoInstanceConfig.java | 2 ++
.../java/org/apache/pulsar/functions/runtime/RuntimeUtils.java | 2 ++
6 files changed, 18 insertions(+), 10 deletions(-)
diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index c0d1262..dfbf542 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -52,10 +52,11 @@ type Conf struct {
AutoACK bool `json:"autoAck" yaml:"autoAck"`
Parallelism int32 `json:"parallelism" yaml:"parallelism"`
//source config
- SubscriptionType int32 `json:"subscriptionType" yaml:"subscriptionType"`
- TimeoutMs uint64 `json:"timeoutMs" yaml:"timeoutMs"`
- SubscriptionName string `json:"subscriptionName" yaml:"subscriptionName"`
- CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"`
+ SubscriptionType int32 `json:"subscriptionType" yaml:"subscriptionType"`
+ TimeoutMs uint64 `json:"timeoutMs" yaml:"timeoutMs"`
+ SubscriptionName string `json:"subscriptionName" yaml:"subscriptionName"`
+ CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"`
+ SubscriptionPosition int32 `json:"subscriptionPosition" yaml:"subscriptionPosition"`
//source input specs
SourceSpecTopic string `json:"sourceSpecsTopic" yaml:"sourceSpecsTopic"`
SourceSchemaType string `json:"sourceSchemaType" yaml:"sourceSchemaType"`
diff --git a/pulsar-function-go/conf/conf.yaml b/pulsar-function-go/conf/conf.yaml
index 7809ab5..59ac9bb 100644
--- a/pulsar-function-go/conf/conf.yaml
+++ b/pulsar-function-go/conf/conf.yaml
@@ -41,6 +41,7 @@ subscriptionType: 0
timeoutMs: 0
subscriptionName: ""
cleanupSubscription: false
+subscriptionPosition: 1
# source input specs
sourceSpecsTopic: persistent://public/default/topic-01
sourceSchemaType: ""
diff --git a/pulsar-function-go/pf/instanceConf.go b/pulsar-function-go/pf/instanceConf.go
index 1227024..2fc8d95 100644
--- a/pulsar-function-go/pf/instanceConf.go
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -82,9 +82,10 @@ func newInstanceConf() *instanceConf {
},
},
},
- TimeoutMs: cfg.TimeoutMs,
- SubscriptionName: cfg.SubscriptionName,
- CleanupSubscription: cfg.CleanupSubscription,
+ TimeoutMs: cfg.TimeoutMs,
+ SubscriptionName: cfg.SubscriptionName,
+ CleanupSubscription: cfg.CleanupSubscription,
+ SubscriptionPosition: pb.SubscriptionPosition(cfg.SubscriptionPosition),
},
Sink: &pb.SinkSpec{
Topic: cfg.SinkSpecTopic,
diff --git a/pulsar-function-go/pf/instanceConf_test.go b/pulsar-function-go/pf/instanceConf_test.go
index be93239..fa87002 100644
--- a/pulsar-function-go/pf/instanceConf_test.go
+++ b/pulsar-function-go/pf/instanceConf_test.go
@@ -62,9 +62,10 @@ func Test_newInstanceConf(t *testing.T) {
},
},
},
- TimeoutMs: 0,
- SubscriptionName: "",
- CleanupSubscription: false,
+ TimeoutMs: 0,
+ SubscriptionName: "",
+ CleanupSubscription: false,
+ SubscriptionPosition: pb.SubscriptionPosition_EARLIEST,
},
Sink: &pb.SinkSpec{
Topic: "persistent://public/default/topic-02",
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
index 0cb1de2..85e73d3 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.instance.go;
import lombok.Getter;
import lombok.Setter;
+import org.apache.pulsar.functions.proto.Function;
@Setter
@Getter
@@ -50,6 +51,7 @@ public class GoInstanceConfig {
private long timeoutMs;
private String subscriptionName = "";
private boolean cleanupSubscription;
+ private int subscriptionPosition = Function.SubscriptionPosition.LATEST.getNumber();
private String sourceSpecsTopic = "";
private String sourceSchemaType = "";
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 592cd55..107d5cf 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -196,6 +196,8 @@ public class RuntimeUtils {
if (instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null) {
goInstanceConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName());
}
+ goInstanceConfig.setSubscriptionPosition(
+ instanceConfig.getFunctionDetails().getSource().getSubscriptionPosition().getNumber());
if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap() != null) {
for (String inputTopic : instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().keySet()) {