You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/07 03:22:27 UTC

[pulsar] 01/05: [component/functions|component/go] Propagate user-config parameter into instances of Golang pulsar functions (#8132)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a8fac8b435119b1a942185273feac3680dc6f638
Author: Emil Shakirov <me...@emil.sh>
AuthorDate: Tue Oct 6 00:39:22 2020 +0200

    [component/functions|component/go] Propagate user-config parameter into instances of Golang pulsar functions (#8132)
    
    ### Motivation
    
    Currently, pulsar functions written in Golang have no access to the content of the user-config parameter from admin CLI.
    That's the simplest way to pass custom configuration into a go function right now because neither secretsMap nor Kubernetes runtime is available for Golang functions yet.
    
    **Note**: For the `secretsMap` parameter, the same technique can be used, but this has to be addressed in a separate PR.
    
    ### Modifications
    
    - Added `String GoInstanceConfig#userConfig` attribute
    - Propagated user-config parameter in `RuntimeUtils.getGoInstanceCmd`
    - Exposed userConfig in `pulsar-function-go` and made it available in the `FunctionContext`
    - Add unit tests for the affected code
    
    (cherry picked from commit d6f18ab993a547539c402f8732b81e69dfee58c4)
---
 pulsar-function-go/conf/conf.go                    |   1 +
 pulsar-function-go/conf/conf.yaml                  |   3 +-
 pulsar-function-go/go.sum                          |  44 ++++-----
 pulsar-function-go/pf/context.go                   |  21 ++++-
 pulsar-function-go/pf/context_test.go              |  23 ++++-
 pulsar-function-go/pf/instanceConf.go              |   2 +
 pulsar-function-go/pf/instanceConf_test.go         |  68 +++++++++++++-
 .../functions/instance/go/GoInstanceConfig.java    |   1 +
 .../pulsar/functions/runtime/RuntimeUtils.java     |   3 +
 .../pulsar/functions/runtime/RuntimeUtilsTest.java | 103 +++++++++++++++++++++
 10 files changed, 233 insertions(+), 36 deletions(-)

diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index 0efa892..3beeaf3 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -72,6 +72,7 @@ type Conf struct {
 	MaxMessageRetries           int32  `json:"maxMessageRetries" yaml:"maxMessageRetries"`
 	DeadLetterTopic             string `json:"deadLetterTopic" yaml:"deadLetterTopic"`
 	ExpectedHealthCheckInterval int32  `json:"expectedHealthCheckInterval" yaml:"expectedHealthCheckInterval"`
+	UserConfig                  string `json:"userConfig" yaml:"userConfig"`
 }
 
 var (
diff --git a/pulsar-function-go/conf/conf.yaml b/pulsar-function-go/conf/conf.yaml
index 6652014..8aecd57 100644
--- a/pulsar-function-go/conf/conf.yaml
+++ b/pulsar-function-go/conf/conf.yaml
@@ -35,6 +35,7 @@ secretsMap: ""
 runtime: 0
 autoAck: true
 parallelism: 0
+userConfig: '{"word-of-the-day": "hapax legomenon"}'
 # source config
 subscriptionType: 0
 timeoutMs: 0
@@ -55,4 +56,4 @@ disk: 0
 # retryDetails config
 maxMessageRetries: 0
 deadLetterTopic: ""
-expectedHealthCheckInterval: 3
\ No newline at end of file
+expectedHealthCheckInterval: 3
diff --git a/pulsar-function-go/go.sum b/pulsar-function-go/go.sum
index d0984b6..7057435 100644
--- a/pulsar-function-go/go.sum
+++ b/pulsar-function-go/go.sum
@@ -1,19 +1,12 @@
 cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
-github.com/apache/pulsar v2.5.0+incompatible h1:g7BI4EcfHFq+OwyXXPeEnTo2hL2Koq8Y/ckBkccDgmA=
-github.com/apache/pulsar-client-go v0.0.0-20200113085434-9b739cf9d098/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
-github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed h1:Lp7eU5ym84jPmIXoonoaJWVN6psyB90Olookp61LCeA=
-github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
-github.com/apache/pulsar-client-go v0.1.0/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
-github.com/apache/pulsar/pulsar-function-go v0.0.0-20200124033432-ec122ed9562c h1:uqA9RBsmQz3gN045GQ6we1RfRsk+5dco60yJ695Yb0E=
-github.com/apache/pulsar/pulsar-function-go v0.0.0-20200124033432-ec122ed9562c/go.mod h1:2a3PacwSg4KPcGxO3bjH29xsoKSuSkq2mG0sjKtxsP4=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
 github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
-github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed h1:Lp7eU5ym84jPmIXoonoaJWVN6psyB90Olookp61LCeA=
-github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
+github.com/apache/pulsar-client-go v0.1.0 h1:2BFZztxtNgFyOzBc+5On84CX6aIZW5xwh7KM0MWigGI=
+github.com/apache/pulsar-client-go v0.1.0/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
 github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@@ -35,14 +28,14 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
-github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
-github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
-github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
-github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
 github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8=
 github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
@@ -73,7 +66,6 @@ github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
 github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
 github.com/prometheus/client_golang v1.3.0 h1:miYCvYqFXtl/J9FIy8eNpBfYthAEFg+Ys0XyUVEcDsc=
@@ -82,6 +74,7 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:
 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/prometheus/client_model v0.1.0 h1:ElTg5tNp4DqfV7UQjDqv2+RJlNzsDtvNAWccbItceIE=
 github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
@@ -116,28 +109,28 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx
 golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
 golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
 golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
-golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
-golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
-golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191220142924-d4481acd189f h1:68K/z8GLUxV76xGSqwTWw2gyk/jwn79LUL43rES2g8o=
 golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
@@ -162,8 +155,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
 gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
 gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
-honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
\ No newline at end of file
+honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/pulsar-function-go/pf/context.go b/pulsar-function-go/pf/context.go
index 89cae42..a29abf1 100644
--- a/pulsar-function-go/pf/context.go
+++ b/pulsar-function-go/pf/context.go
@@ -21,6 +21,7 @@ package pf
 
 import (
 	"context"
+	"encoding/json"
 	"time"
 )
 
@@ -31,9 +32,12 @@ type FunctionContext struct {
 }
 
 func NewFuncContext() *FunctionContext {
+	instanceConf := newInstanceConf()
+	userConfigs := buildUserConfig(instanceConf.funcDetails.GetUserConfig())
+
 	fc := &FunctionContext{
-		instanceConf: newInstanceConf(),
-		userConfigs:  make(map[string]interface{}),
+		instanceConf: instanceConf,
+		userConfigs:  userConfigs,
 	}
 	return fc
 }
@@ -116,8 +120,9 @@ func (c *FunctionContext) GetUserConfMap() map[string]interface{} {
 // This prevents collisions with keys defined in other packages.
 type key struct{}
 
-// contextKey is the key for user.User values in Contexts. It is
-// unexported; clients use user.NewContext and user.FromContext
+// contextKey is the key for FunctionContext values in context.Context.
+// It is unexported;
+// clients should use FunctionContext.NewContext and FunctionContext.FromContext
 // instead of using this key directly.
 var contextKey = &key{}
 
@@ -131,3 +136,11 @@ func FromContext(ctx context.Context) (*FunctionContext, bool) {
 	fc, ok := ctx.Value(contextKey).(*FunctionContext)
 	return fc, ok
 }
+
+func buildUserConfig(data string) map[string]interface{} {
+	m := make(map[string]interface{})
+
+	json.Unmarshal([]byte(data), &m)
+
+	return m
+}
diff --git a/pulsar-function-go/pf/context_test.go b/pulsar-function-go/pf/context_test.go
index 379c66d..4642ec9 100644
--- a/pulsar-function-go/pf/context_test.go
+++ b/pulsar-function-go/pf/context_test.go
@@ -22,6 +22,7 @@ package pf
 import (
 	"context"
 	"testing"
+	"time"
 
 	"github.com/stretchr/testify/assert"
 )
@@ -31,11 +32,27 @@ func TestContext(t *testing.T) {
 	defer cancel()
 	fc := NewFuncContext()
 	ctx = NewContext(ctx, fc)
+
 	if resfc, ok := FromContext(ctx); ok {
+		assert.Equal(t, 101, resfc.GetInstanceID())
 		assert.Equal(t, []string{"persistent://public/default/topic-01"}, resfc.GetInputTopics())
-		assert.Equal(t, "1.0.0", resfc.GetFuncVersion())
-		assert.Equal(t, "pulsar-function", resfc.GetFuncID())
-		assert.Equal(t, "go-function", resfc.GetFuncName())
 		assert.Equal(t, "persistent://public/default/topic-02", resfc.GetOutputTopic())
+		assert.Equal(t, "/", resfc.GetTenantAndNamespace())
+		assert.Equal(t, "//go-function", resfc.GetTenantAndNamespaceAndName())
+		assert.Equal(t, "", resfc.GetFuncTenant())
+		assert.Equal(t, "go-function", resfc.GetFuncName())
+		assert.Equal(t, "", resfc.GetFuncNamespace())
+		assert.Equal(t, "pulsar-function", resfc.GetFuncID())
+		assert.Equal(t, 8091, resfc.GetPort())
+		assert.Equal(t, "pulsar-function-go", resfc.GetClusterName())
+		assert.Equal(t, int32(3), resfc.GetExpectedHealthCheckInterval())
+		assert.Equal(t, time.Duration(3), resfc.GetExpectedHealthCheckIntervalAsDuration())
+		assert.Equal(t, int64(9000000000), resfc.GetMaxIdleTime())
+		assert.Equal(t, "1.0.0", resfc.GetFuncVersion())
+		assert.Equal(t, "hapax legomenon", resfc.GetUserConfValue("word-of-the-day"))
+		assert.Equal(
+			t,
+			map[string]interface{}{"word-of-the-day": "hapax legomenon"},
+			resfc.GetUserConfMap())
 	}
 }
diff --git a/pulsar-function-go/pf/instanceConf.go b/pulsar-function-go/pf/instanceConf.go
index c4e31de..72cf778 100644
--- a/pulsar-function-go/pf/instanceConf.go
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -48,6 +48,7 @@ func newInstanceConf() *instanceConf {
 	if cfg == nil {
 		panic("config file is nil.")
 	}
+
 	instanceConf := &instanceConf{
 		instanceID:                  cfg.InstanceID,
 		funcID:                      cfg.FuncID,
@@ -96,6 +97,7 @@ func newInstanceConf() *instanceConf {
 				MaxMessageRetries: cfg.MaxMessageRetries,
 				DeadLetterTopic:   cfg.DeadLetterTopic,
 			},
+			UserConfig: cfg.UserConfig,
 		},
 	}
 	return instanceConf
diff --git a/pulsar-function-go/pf/instanceConf_test.go b/pulsar-function-go/pf/instanceConf_test.go
index 7b71b11..5b386f3 100644
--- a/pulsar-function-go/pf/instanceConf_test.go
+++ b/pulsar-function-go/pf/instanceConf_test.go
@@ -22,11 +22,75 @@ package pf
 import (
 	"testing"
 
+	pb "github.com/apache/pulsar/pulsar-function-go/pb"
 	"github.com/stretchr/testify/assert"
 )
 
+func Test_newInstanceConf(t *testing.T) {
+	assert.Equal(
+		t,
+		&instanceConf{
+			instanceID:                  101,
+			funcID:                      "pulsar-function",
+			funcVersion:                 "1.0.0",
+			maxBufTuples:                10,
+			port:                        8091,
+			clusterName:                 "pulsar-function-go",
+			pulsarServiceURL:            "pulsar://localhost:6650",
+			killAfterIdle:               50000,
+			expectedHealthCheckInterval: 3,
+			funcDetails: pb.FunctionDetails{Tenant: "",
+				Namespace:            "",
+				Name:                 "go-function",
+				ClassName:            "",
+				LogTopic:             "log-topic",
+				ProcessingGuarantees: 0,
+				UserConfig:           `{"word-of-the-day": "hapax legomenon"}`,
+				SecretsMap:           "",
+				Runtime:              0,
+				AutoAck:              true,
+				Parallelism:          0,
+				Source: &pb.SourceSpec{
+					SubscriptionType: pb.SubscriptionType(0),
+					InputSpecs: map[string]*pb.ConsumerSpec{
+						"persistent://public/default/topic-01": {
+							SchemaType:     "",
+							IsRegexPattern: false,
+							ReceiverQueueSize: &pb.ConsumerSpec_ReceiverQueueSize{
+								Value: 10,
+							},
+						},
+					},
+					TimeoutMs:           0,
+					SubscriptionName:    "",
+					CleanupSubscription: false,
+				},
+				Sink: &pb.SinkSpec{
+					Topic:      "persistent://public/default/topic-02",
+					SchemaType: "",
+				},
+				Resources: &pb.Resources{
+					Cpu:  0,
+					Ram:  0,
+					Disk: 0,
+				},
+				PackageUrl: "",
+				RetryDetails: &pb.RetryDetails{
+					MaxMessageRetries: 0,
+					DeadLetterTopic:   "",
+				},
+				RuntimeFlags:         "",
+				ComponentType:        0,
+				CustomRuntimeOptions: "",
+			},
+		},
+		newInstanceConf(),
+	)
+}
+
 func TestInstanceConf_GetInstanceName(t *testing.T) {
 	instanceConf := newInstanceConf()
-	str := instanceConf.getInstanceName()
-	assert.Equal(t, "101", str)
+	instanceName := instanceConf.getInstanceName()
+
+	assert.Equal(t, "101", instanceName)
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
index 2d2d7ad..710b4fd 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/go/GoInstanceConfig.java
@@ -41,6 +41,7 @@ public class GoInstanceConfig {
     private String logTopic = "";
     private int processingGuarantees;
     private String secretsMap = "";
+    private String userConfig = "";
     private int runtime;
     private boolean autoAck;
     private int parallelism;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 77d4df2..4108f78 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -162,6 +162,9 @@ public class RuntimeUtils {
         if (instanceConfig.getFunctionDetails().getSecretsMap() != null) {
             goInstanceConfig.setSecretsMap(instanceConfig.getFunctionDetails().getSecretsMap());
         }
+        if (instanceConfig.getFunctionDetails().getUserConfig() != null) {
+            goInstanceConfig.setUserConfig(instanceConfig.getFunctionDetails().getUserConfig());
+        }
         if (instanceConfig.getFunctionDetails().getParallelism() != 0) {
             goInstanceConfig.setParallelism(instanceConfig.getFunctionDetails().getParallelism());
         }
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
index 18aa2cb..479ddbe 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
@@ -18,9 +18,17 @@
  */
 package org.apache.pulsar.functions.runtime;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.jose4j.json.internal.json_simple.JSONObject;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+
 public class RuntimeUtilsTest {
 
     @Test
@@ -43,4 +51,99 @@ public class RuntimeUtilsTest {
         Assert.assertEquals(result[1], "-Dbar=foo");
         Assert.assertEquals(result[2], "-Dfoo=\"bar foo\"");
     }
+
+    @Test
+    public void getGoInstanceCmd() throws IOException {
+        InstanceConfig instanceConfig = new InstanceConfig();
+        instanceConfig.setClusterName("kluster");
+        instanceConfig.setInstanceId(3000);
+        instanceConfig.setFunctionId("func-7734");
+        instanceConfig.setFunctionVersion("1.0.0");
+        instanceConfig.setMaxBufferedTuples(5);
+        instanceConfig.setPort(1337);
+
+
+        JSONObject userConfig = new JSONObject();
+        userConfig.put("word-of-the-day", "der Weltschmerz");
+
+        JSONObject secretsMap = new JSONObject();
+        secretsMap.put("secret", "cake is a lie");
+
+        Function.SourceSpec sources = Function.SourceSpec.newBuilder()
+                .setCleanupSubscription(true)
+                .setSubscriptionName("go-func-sub")
+                .setTimeoutMs(500)
+                .putInputSpecs("go-func-input", Function.ConsumerSpec.newBuilder().setIsRegexPattern(false).build())
+                .build();
+
+        Function.RetryDetails retryDetails = Function.RetryDetails.newBuilder()
+                .setDeadLetterTopic("go-func-deadletter")
+                .setMaxMessageRetries(1)
+                .build();
+
+        Function.Resources resources = Function.Resources.newBuilder()
+                .setCpu(2)
+                .setDisk(1024)
+                .setRam(32)
+                .build();
+
+        Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder()
+                .setAutoAck(true)
+                .setTenant("public")
+                .setNamespace("default")
+                .setName("go-func")
+                .setLogTopic("go-func-log")
+                .setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE)
+                .setSecretsMap(secretsMap.toJSONString())
+                .setParallelism(1)
+                .setSource(sources)
+                .setRetryDetails(retryDetails)
+                .setResources(resources)
+                .setUserConfig(userConfig.toJSONString())
+                .build();
+
+        instanceConfig.setFunctionDetails(functionDetails);
+
+        List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, "config", "pulsar://localhost:6650");
+
+        HashMap goInstanceConfig = new ObjectMapper().readValue(commands.get(2), HashMap.class);
+
+        Assert.assertEquals(commands.toArray().length, 3);
+        Assert.assertEquals(commands.get(0), "config");
+        Assert.assertEquals(commands.get(1), "-instance-conf");
+        Assert.assertEquals(goInstanceConfig.get("maxBufTuples"), 5);
+        Assert.assertEquals(goInstanceConfig.get("maxMessageRetries"), 1);
+        Assert.assertEquals(goInstanceConfig.get("killAfterIdleMs"), 0);
+        Assert.assertEquals(goInstanceConfig.get("parallelism"), 1);
+        Assert.assertEquals(goInstanceConfig.get("className"), "");
+        Assert.assertEquals(goInstanceConfig.get("sourceSpecsTopic"), "go-func-input");
+        Assert.assertEquals(goInstanceConfig.get("secretsMap"), secretsMap.toString());
+        Assert.assertEquals(goInstanceConfig.get("sourceSchemaType"), "");
+        Assert.assertEquals(goInstanceConfig.get("sinkSpecsTopic"), "");
+        Assert.assertEquals(goInstanceConfig.get("clusterName"), "kluster");
+        Assert.assertEquals(goInstanceConfig.get("nameSpace"), "default");
+        Assert.assertEquals(goInstanceConfig.get("receiverQueueSize"), 0);
+        Assert.assertEquals(goInstanceConfig.get("tenant"), "public");
+        Assert.assertEquals(goInstanceConfig.get("ram"), 32);
+        Assert.assertEquals(goInstanceConfig.get("logTopic"), "go-func-log");
+        Assert.assertEquals(goInstanceConfig.get("processingGuarantees"), 0);
+        Assert.assertEquals(goInstanceConfig.get("autoAck"), true);
+        Assert.assertEquals(goInstanceConfig.get("regexPatternSubscription"), false);
+        Assert.assertEquals(goInstanceConfig.get("pulsarServiceURL"), "pulsar://localhost:6650");
+        Assert.assertEquals(goInstanceConfig.get("runtime"), 0);
+        Assert.assertEquals(goInstanceConfig.get("cpu"), 2.0);
+        Assert.assertEquals(goInstanceConfig.get("funcID"), "func-7734");
+        Assert.assertEquals(goInstanceConfig.get("funcVersion"), "1.0.0");
+        Assert.assertEquals(goInstanceConfig.get("disk"), 1024);
+        Assert.assertEquals(goInstanceConfig.get("instanceID"), 3000);
+        Assert.assertEquals(goInstanceConfig.get("cleanupSubscription"), true);
+        Assert.assertEquals(goInstanceConfig.get("port"), 1337);
+        Assert.assertEquals(goInstanceConfig.get("subscriptionType"), 0);
+        Assert.assertEquals(goInstanceConfig.get("timeoutMs"), 500);
+        Assert.assertEquals(goInstanceConfig.get("subscriptionName"), "go-func-sub");
+        Assert.assertEquals(goInstanceConfig.get("name"), "go-func");
+        Assert.assertEquals(goInstanceConfig.get("expectedHealthCheckInterval"), 0);
+        Assert.assertEquals(goInstanceConfig.get("deadLetterTopic"), "go-func-deadletter");
+        Assert.assertEquals(goInstanceConfig.get("userConfig"), userConfig.toString());
+    }
 }