You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/07 03:22:27 UTC
[pulsar] 01/05: [component/functions|component/go] Propagate
user-config parameter into instances of Golang pulsar functions (#8132)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a8fac8b435119b1a942185273feac3680dc6f638
Author: Emil Shakirov <me...@emil.sh>
AuthorDate: Tue Oct 6 00:39:22 2020 +0200
[component/functions|component/go] Propagate user-config parameter into instances of Golang pulsar functions (#8132)
### Motivation
Currently, pulsar functions written in Golang have no access to the content of the user-config parameter from admin CLI.
That's the simplest way to pass custom configuration into a go function right now because neither secretsMap nor Kubernetes runtime is available for Golang functions yet.
**Note**: For the `secretsMap` parameter, the same technique can be used, but this has to be addressed in a separate PR.
### Modifications
- Added `String GoInstanceConfig#userConfig` attribute
- Propagated user-config parameter in `RuntimeUtils.getGoInstanceCmd`
- Exposed userConfig in `pulsar-function-go` and made it available in the `FunctionContext`
- Add unit tests for the affected code
(cherry picked from commit d6f18ab993a547539c402f8732b81e69dfee58c4)
---
pulsar-function-go/conf/conf.go | 1 +
pulsar-function-go/conf/conf.yaml | 3 +-
pulsar-function-go/go.sum | 44 ++++-----
pulsar-function-go/pf/context.go | 21 ++++-
pulsar-function-go/pf/context_test.go | 23 ++++-
pulsar-function-go/pf/instanceConf.go | 2 +
pulsar-function-go/pf/instanceConf_test.go | 68 +++++++++++++-
.../functions/instance/go/GoInstanceConfig.java | 1 +
.../pulsar/functions/runtime/RuntimeUtils.java | 3 +
.../pulsar/functions/runtime/RuntimeUtilsTest.java | 103 +++++++++++++++++++++
10 files changed, 233 insertions(+), 36 deletions(-)
diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index 0efa892..3beeaf3 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -72,6 +72,7 @@ type Conf struct {
MaxMessageRetries int32 `json:"maxMessageRetries" yaml:"maxMessageRetries"`
DeadLetterTopic string `json:"deadLetterTopic" yaml:"deadLetterTopic"`
ExpectedHealthCheckInterval int32 `json:"expectedHealthCheckInterval" yaml:"expectedHealthCheckInterval"`
+ UserConfig string `json:"userConfig" yaml:"userConfig"`
}
var (
diff --git a/pulsar-function-go/conf/conf.yaml b/pulsar-function-go/conf/conf.yaml
index 6652014..8aecd57 100644
--- a/pulsar-function-go/conf/conf.yaml
+++ b/pulsar-function-go/conf/conf.yaml
@@ -35,6 +35,7 @@ secretsMap: ""
runtime: 0
autoAck: true
parallelism: 0
+userConfig: '{"word-of-the-day": "hapax legomenon"}'
# source config
subscriptionType: 0
timeoutMs: 0
@@ -55,4 +56,4 @@ disk: 0
# retryDetails config
maxMessageRetries: 0
deadLetterTopic: ""
-expectedHealthCheckInterval: 3
\ No newline at end of file
+expectedHealthCheckInterval: 3
diff --git a/pulsar-function-go/go.sum b/pulsar-function-go/go.sum
index d0984b6..7057435 100644
--- a/pulsar-function-go/go.sum
+++ b/pulsar-function-go/go.sum
@@ -1,19 +1,12 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
-github.com/apache/pulsar v2.5.0+incompatible h1:g7BI4EcfHFq+OwyXXPeEnTo2hL2Koq8Y/ckBkccDgmA=
-github.com/apache/pulsar-client-go v0.0.0-20200113085434-9b739cf9d098/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
-github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed h1:Lp7eU5ym84jPmIXoonoaJWVN6psyB90Olookp61LCeA=
-github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
-github.com/apache/pulsar-client-go v0.1.0/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
-github.com/apache/pulsar/pulsar-function-go v0.0.0-20200124033432-ec122ed9562c h1:uqA9RBsmQz3gN045GQ6we1RfRsk+5dco60yJ695Yb0E=
-github.com/apache/pulsar/pulsar-function-go v0.0.0-20200124033432-ec122ed9562c/go.mod h1:2a3PacwSg4KPcGxO3bjH29xsoKSuSkq2mG0sjKtxsP4=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
-github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed h1:Lp7eU5ym84jPmIXoonoaJWVN6psyB90Olookp61LCeA=
-github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
+github.com/apache/pulsar-client-go v0.1.0 h1:2BFZztxtNgFyOzBc+5On84CX6aIZW5xwh7KM0MWigGI=
+github.com/apache/pulsar-client-go v0.1.0/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@@ -35,14 +28,14 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
-github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
-github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
-github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
-github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
@@ -73,7 +66,6 @@ github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.3.0 h1:miYCvYqFXtl/J9FIy8eNpBfYthAEFg+Ys0XyUVEcDsc=
@@ -82,6 +74,7 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.1.0 h1:ElTg5tNp4DqfV7UQjDqv2+RJlNzsDtvNAWccbItceIE=
github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
@@ -116,28 +109,28 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
-golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
-golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
-golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f h1:68K/z8GLUxV76xGSqwTWw2gyk/jwn79LUL43rES2g8o=
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
@@ -162,8 +155,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
-honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
\ No newline at end of file
+honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/pulsar-function-go/pf/context.go b/pulsar-function-go/pf/context.go
index 89cae42..a29abf1 100644
--- a/pulsar-function-go/pf/context.go
+++ b/pulsar-function-go/pf/context.go
@@ -21,6 +21,7 @@ package pf
import (
"context"
+ "encoding/json"
"time"
)
@@ -31,9 +32,12 @@ type FunctionContext struct {
}
func NewFuncContext() *FunctionContext {
+ instanceConf := newInstanceConf()
+ userConfigs := buildUserConfig(instanceConf.funcDetails.GetUserConfig())
+
fc := &FunctionContext{
- instanceConf: newInstanceConf(),
- userConfigs: make(map[string]interface{}),
+ instanceConf: instanceConf,
+ userConfigs: userConfigs,
}
return fc
}
@@ -116,8 +120,9 @@ func (c *FunctionContext) GetUserConfMap() map[string]interface{} {
// This prevents collisions with keys defined in other packages.
type key struct{}
-// contextKey is the key for user.User values in Contexts. It is
-// unexported; clients use user.NewContext and user.FromContext
+// contextKey is the key for FunctionContext values in context.Context.
+// It is unexported;
+// clients should use FunctionContext.NewContext and FunctionContext.FromContext
// instead of using this key directly.
var contextKey = &key{}
@@ -131,3 +136,11 @@ func FromContext(ctx context.Context) (*FunctionContext, bool) {
fc, ok := ctx.Value(contextKey).(*FunctionContext)
return fc, ok
}
+
+func buildUserConfig(data string) map[string]interface{} {
+ m := make(map[string]interface{})
+
+ json.Unmarshal([]byte(data), &m)
+
+ return m
+}
diff --git a/pulsar-function-go/pf/context_test.go b/pulsar-function-go/pf/context_test.go
index 379c66d..4642ec9 100644
--- a/pulsar-function-go/pf/context_test.go
+++ b/pulsar-function-go/pf/context_test.go
@@ -22,6 +22,7 @@ package pf
import (
"context"
"testing"
+ "time"
"github.com/stretchr/testify/assert"
)
@@ -31,11 +32,27 @@ func TestContext(t *testing.T) {
defer cancel()
fc := NewFuncContext()
ctx = NewContext(ctx, fc)
+
if resfc, ok := FromContext(ctx); ok {
+ assert.Equal(t, 101, resfc.GetInstanceID())
assert.Equal(t, []string{"persistent://public/default/topic-01"}, resfc.GetInputTopics())
- assert.Equal(t, "1.0.0", resfc.GetFuncVersion())
- assert.Equal(t, "pulsar-function", resfc.GetFuncID())
- assert.Equal(t, "go-function", resfc.GetFuncName())
assert.Equal(t, "persistent://public/default/topic-02", resfc.GetOutputTopic())
+ assert.Equal(t, "/", resfc.GetTenantAndNamespace())
+ assert.Equal(t, "//go-function", resfc.GetTenantAndNamespaceAndName())
+ assert.Equal(t, "", resfc.GetFuncTenant())
+ assert.Equal(t, "go-function", resfc.GetFuncName())
+ assert.Equal(t, "", resfc.GetFuncNamespace())
+ assert.Equal(t, "pulsar-function", resfc.GetFuncID())
+ assert.Equal(t, 8091, resfc.GetPort())
+ assert.Equal(t, "pulsar-function-go", resfc.GetClusterName())
+ assert.Equal(t, int32(3), resfc.GetExpectedHealthCheckInterval())
+ assert.Equal(t, time.Duration(3), resfc.GetExpectedHealthCheckIntervalAsDuration())
+ assert.Equal(t, int64(9000000000), resfc.GetMaxIdleTime())
+ assert.Equal(t, "1.0.0", resfc.GetFuncVersion())
+ assert.Equal(t, "hapax legomenon", resfc.GetUserConfValue("word-of-the-day"))
+ assert.Equal(
+ t,
+ map[string]interface{}{"word-of-the-day": "hapax legomenon"},
+ resfc.GetUserConfMap())
}
}
diff --git a/pulsar-function-go/pf/instanceConf.go b/pulsar-function-go/pf/instanceConf.go
index c4e31de..72cf778 100644
--- a/pulsar-function-go/pf/instanceConf.go
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -48,6 +48,7 @@ func newInstanceConf() *instanceConf {
if cfg == nil {
panic("config file is nil.")
}
+
instanceConf := &instanceConf{
instanceID: cfg.InstanceID,
funcID: cfg.FuncID,
@@ -96,6 +97,7 @@ func newInstanceConf() *instanceConf {
MaxMessageRetries: cfg.MaxMessageRetries,
DeadLetterTopic: cfg.DeadLetterTopic,
},
+ UserConfig: cfg.UserConfig,
},
}
return instanceConf
diff --git a/pulsar-function-go/pf/instanceConf_test.go b/pulsar-function-go/pf/instanceConf_test.go
index 7b71b11..5b386f3 100644
--- a/pulsar-function-go/pf/instanceConf_test.go
+++ b/pulsar-function-go/pf/instanceConf_test.go
@@ -22,11 +22,75 @@ package pf
import (
"testing"
+ pb "github.com/apache/pulsar/pulsar-function-go/pb"
"github.com/stretchr/testify/assert"
)
+func Test_newInstanceConf(t *testing.T) {
+ assert.Equal(
+ t,
+ &instanceConf{
+ instanceID: 101,
+ funcID: "pulsar-function",
+ funcVersion: "1.0.0",
+ maxBufTuples: 10,
+ port: 8091,
+ clusterName: "pulsar-function-go",
+ pulsarServiceURL: "pulsar://localhost:6650",
+ killAfterIdle: 50000,
+ expectedHealthCheckInterval: 3,
+ funcDetails: pb.FunctionDetails{Tenant: "",
+ Namespace: "",
+ Name: "go-function",
+ ClassName: "",
+ LogTopic: "log-topic",
+ ProcessingGuarantees: 0,
+ UserConfig: `{"word-of-the-day": "hapax legomenon"}`,
+ SecretsMap: "",
+ Runtime: 0,
+ AutoAck: true,
+ Parallelism: 0,
+ Source: &pb.SourceSpec{
+ SubscriptionType: pb.SubscriptionType(0),
+ InputSpecs: map[string]*pb.ConsumerSpec{
+ "persistent://public/default/topic-01": {
+ SchemaType: "",
+ IsRegexPattern: false,
+ ReceiverQueueSize: &pb.ConsumerSpec_ReceiverQueueSize{
+ Value: 10,
+ },
+ },
+ },
+ TimeoutMs: 0,
+ SubscriptionName: "",
+ CleanupSubscription: false,
+ },
+ Sink: &pb.SinkSpec{
+ Topic: "persistent://public/default/topic-02",
+ SchemaType: "",
+ },
+ Resources: &pb.Resources{
+ Cpu: 0,
+ Ram: 0,
+ Disk: 0,
+ },
+ PackageUrl: "",
+ RetryDetails: &pb.RetryDetails{
+ MaxMessageRetries: 0,
+ DeadLetterTopic: "",
+ },
+ RuntimeFlags: "",
+ ComponentType: 0,
+ CustomRuntimeOptions: "",
+ },
+ },
+ newInstanceConf(),
+ )
+}
+
func TestInstanceConf_GetInstanceName(t *testing.T) {
instanceConf := newInstanceConf()
- str := instanceConf.getInstanceName()
- assert.Equal(t, "101", str)
+ instanceName := instanceConf.getInstanceName()
+
+ assert.Equal(t, "101", instanceName)
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
index 2d2d7ad..710b4fd 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
@@ -41,6 +41,7 @@ public class GoInstanceConfig {
private String logTopic = "";
private int processingGuarantees;
private String secretsMap = "";
+ private String userConfig = "";
private int runtime;
private boolean autoAck;
private int parallelism;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 77d4df2..4108f78 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -162,6 +162,9 @@ public class RuntimeUtils {
if (instanceConfig.getFunctionDetails().getSecretsMap() != null) {
goInstanceConfig.setSecretsMap(instanceConfig.getFunctionDetails().getSecretsMap());
}
+ if (instanceConfig.getFunctionDetails().getUserConfig() != null) {
+ goInstanceConfig.setUserConfig(instanceConfig.getFunctionDetails().getUserConfig());
+ }
if (instanceConfig.getFunctionDetails().getParallelism() != 0) {
goInstanceConfig.setParallelism(instanceConfig.getFunctionDetails().getParallelism());
}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
index 18aa2cb..479ddbe 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
@@ -18,9 +18,17 @@
*/
package org.apache.pulsar.functions.runtime;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.jose4j.json.internal.json_simple.JSONObject;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+
public class RuntimeUtilsTest {
@Test
@@ -43,4 +51,99 @@ public class RuntimeUtilsTest {
Assert.assertEquals(result[1], "-Dbar=foo");
Assert.assertEquals(result[2], "-Dfoo=\"bar foo\"");
}
+
+ @Test
+ public void getGoInstanceCmd() throws IOException {
+ InstanceConfig instanceConfig = new InstanceConfig();
+ instanceConfig.setClusterName("kluster");
+ instanceConfig.setInstanceId(3000);
+ instanceConfig.setFunctionId("func-7734");
+ instanceConfig.setFunctionVersion("1.0.0");
+ instanceConfig.setMaxBufferedTuples(5);
+ instanceConfig.setPort(1337);
+
+
+ JSONObject userConfig = new JSONObject();
+ userConfig.put("word-of-the-day", "der Weltschmerz");
+
+ JSONObject secretsMap = new JSONObject();
+ secretsMap.put("secret", "cake is a lie");
+
+ Function.SourceSpec sources = Function.SourceSpec.newBuilder()
+ .setCleanupSubscription(true)
+ .setSubscriptionName("go-func-sub")
+ .setTimeoutMs(500)
+ .putInputSpecs("go-func-input", Function.ConsumerSpec.newBuilder().setIsRegexPattern(false).build())
+ .build();
+
+ Function.RetryDetails retryDetails = Function.RetryDetails.newBuilder()
+ .setDeadLetterTopic("go-func-deadletter")
+ .setMaxMessageRetries(1)
+ .build();
+
+ Function.Resources resources = Function.Resources.newBuilder()
+ .setCpu(2)
+ .setDisk(1024)
+ .setRam(32)
+ .build();
+
+ Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder()
+ .setAutoAck(true)
+ .setTenant("public")
+ .setNamespace("default")
+ .setName("go-func")
+ .setLogTopic("go-func-log")
+ .setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE)
+ .setSecretsMap(secretsMap.toJSONString())
+ .setParallelism(1)
+ .setSource(sources)
+ .setRetryDetails(retryDetails)
+ .setResources(resources)
+ .setUserConfig(userConfig.toJSONString())
+ .build();
+
+ instanceConfig.setFunctionDetails(functionDetails);
+
+ List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, "config", "pulsar://localhost:6650");
+
+ HashMap goInstanceConfig = new ObjectMapper().readValue(commands.get(2), HashMap.class);
+
+ Assert.assertEquals(commands.toArray().length, 3);
+ Assert.assertEquals(commands.get(0), "config");
+ Assert.assertEquals(commands.get(1), "-instance-conf");
+ Assert.assertEquals(goInstanceConfig.get("maxBufTuples"), 5);
+ Assert.assertEquals(goInstanceConfig.get("maxMessageRetries"), 1);
+ Assert.assertEquals(goInstanceConfig.get("killAfterIdleMs"), 0);
+ Assert.assertEquals(goInstanceConfig.get("parallelism"), 1);
+ Assert.assertEquals(goInstanceConfig.get("className"), "");
+ Assert.assertEquals(goInstanceConfig.get("sourceSpecsTopic"), "go-func-input");
+ Assert.assertEquals(goInstanceConfig.get("secretsMap"), secretsMap.toString());
+ Assert.assertEquals(goInstanceConfig.get("sourceSchemaType"), "");
+ Assert.assertEquals(goInstanceConfig.get("sinkSpecsTopic"), "");
+ Assert.assertEquals(goInstanceConfig.get("clusterName"), "kluster");
+ Assert.assertEquals(goInstanceConfig.get("nameSpace"), "default");
+ Assert.assertEquals(goInstanceConfig.get("receiverQueueSize"), 0);
+ Assert.assertEquals(goInstanceConfig.get("tenant"), "public");
+ Assert.assertEquals(goInstanceConfig.get("ram"), 32);
+ Assert.assertEquals(goInstanceConfig.get("logTopic"), "go-func-log");
+ Assert.assertEquals(goInstanceConfig.get("processingGuarantees"), 0);
+ Assert.assertEquals(goInstanceConfig.get("autoAck"), true);
+ Assert.assertEquals(goInstanceConfig.get("regexPatternSubscription"), false);
+ Assert.assertEquals(goInstanceConfig.get("pulsarServiceURL"), "pulsar://localhost:6650");
+ Assert.assertEquals(goInstanceConfig.get("runtime"), 0);
+ Assert.assertEquals(goInstanceConfig.get("cpu"), 2.0);
+ Assert.assertEquals(goInstanceConfig.get("funcID"), "func-7734");
+ Assert.assertEquals(goInstanceConfig.get("funcVersion"), "1.0.0");
+ Assert.assertEquals(goInstanceConfig.get("disk"), 1024);
+ Assert.assertEquals(goInstanceConfig.get("instanceID"), 3000);
+ Assert.assertEquals(goInstanceConfig.get("cleanupSubscription"), true);
+ Assert.assertEquals(goInstanceConfig.get("port"), 1337);
+ Assert.assertEquals(goInstanceConfig.get("subscriptionType"), 0);
+ Assert.assertEquals(goInstanceConfig.get("timeoutMs"), 500);
+ Assert.assertEquals(goInstanceConfig.get("subscriptionName"), "go-func-sub");
+ Assert.assertEquals(goInstanceConfig.get("name"), "go-func");
+ Assert.assertEquals(goInstanceConfig.get("expectedHealthCheckInterval"), 0);
+ Assert.assertEquals(goInstanceConfig.get("deadLetterTopic"), "go-func-deadletter");
+ Assert.assertEquals(goInstanceConfig.get("userConfig"), userConfig.toString());
+ }
}