You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/10/27 01:34:06 UTC

[pulsar] branch master updated: [Go Functions] Add newOuputMessage interface for Go Function (#8327)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cd8e22ee [Go Functions] Add newOuputMessage interface for Go Function (#8327)
cd8e22ee is described below

commit cd8e22eea97dedb62844155b3ba63a880d3c9236
Author: xiaolong ran <rx...@apache.org>
AuthorDate: Tue Oct 27 09:33:51 2020 +0800

    [Go Functions] Add newOuputMessage interface for Go Function (#8327)
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    Fixes #8266
    
    
    ### Motivation
    
    Implement the context object to be able to send to different topics. Like with the java SDK that allows you to return a future, which makes sending to arbitrary topics easy and fast, we may want to think of an analogous approach
    
    ### Modifications
    
    - Add `NewOuputMessage()` interface for Go Function
    - Add examples for `NewOuputMessage()`
    - Add docs for `NewOuputMessage()`
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    This change added tests and can be verified as follows:
    
    - Added unit tests for `FunctionContext#NewOutputMessage`
    
    ### Does this pull request potentially affect one of the following parts:
    
      - The public API: (yes)
      - The default values of configurations: (yes)
    
    ### Documentation
    
      - Does this pull request introduce a new feature? (yes )
      - If yes, how is the feature documented? (docs)
---
 pulsar-function-go/examples/go.sum                 | 30 ++++++++++++
 .../examples/publishFunc/publishFunc.go            | 54 +++++++++++++++++++++
 pulsar-function-go/pf/context.go                   | 16 +++++--
 pulsar-function-go/pf/context_test.go              | 13 +++++
 pulsar-function-go/pf/instance.go                  | 55 +++++++++++++++-------
 pulsar-function-go/pf/mockMessage_test.go          | 30 ++++++++++++
 site2/docs/functions-develop.md                    |  4 ++
 7 files changed, 182 insertions(+), 20 deletions(-)

diff --git a/pulsar-function-go/examples/go.sum b/pulsar-function-go/examples/go.sum
index 6a89359..8160ae2 100644
--- a/pulsar-function-go/examples/go.sum
+++ b/pulsar-function-go/examples/go.sum
@@ -1,7 +1,9 @@
 cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/99designs/keyring v1.1.5 h1:wLv7QyzYpFIyMSwOADq1CLTF9KbjbBfcnfmOGJ64aO4=
 github.com/99designs/keyring v1.1.5/go.mod h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpHqTozIPqf0=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32 h1:/gZKpgSMydtrih81nvUhlkXpZIUfthKShSCVbRzBt9Y=
 github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@@ -11,25 +13,33 @@ github.com/apache/pulsar-client-go v0.0.0-20200113085434-9b739cf9d098/go.mod h1:
 github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed h1:Lp7eU5ym84jPmIXoonoaJWVN6psyB90Olookp61LCeA=
 github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
 github.com/apache/pulsar-client-go v0.1.0/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
+github.com/apache/pulsar-client-go v0.2.0 h1:7teu0FaXzzKPjDdUNjA7dVYKFjCy6OVX5as6nUww4qk=
 github.com/apache/pulsar-client-go v0.2.0/go.mod h1:POSPPmXv1RuoM7FzHaS3NurCSOopwin2ekGK2PcOgVM=
+github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb h1:E1P0FudxDdj2RhbveZC9i3PwukLCA/4XQSkBS/dw6/I=
 github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb/go.mod h1:0UtvvETGDdvXNDCHa8ZQpxl+w3HbdFtfYZvDHLgWGTY=
 github.com/apache/pulsar/pulsar-function-go v0.0.0-20200124033432-ec122ed9562c/go.mod h1:2a3PacwSg4KPcGxO3bjH29xsoKSuSkq2mG0sjKtxsP4=
+github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
 github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
 github.com/ardielle/ardielle-tools v1.5.4/go.mod h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk=
 github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
 github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod h1:Crc/GCZ3NXDVCio7Yr0o+SSrytpcFhLmVCIzi0s49t4=
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
+github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
 github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
 github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
+github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a h1:mq+R6XEM6lJX5VlLyZIrUSP8tSuJp82xTK89hvBwJbU=
 github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
 github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
@@ -42,6 +52,7 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
 github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
+github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
 github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
 github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
@@ -54,6 +65,7 @@ github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:x
 github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
 github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
 github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@@ -71,11 +83,13 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV
 github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
 github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
 github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
+github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d h1:Z+RDyXzjKE0i2sTjZ/b1uxiGtPhFy34Ou/Tk0qwN0kM=
 github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc=
 github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
 github.com/klauspost/compress v1.9.2 h1:LfVyl+ZlLlLDeQ/d2AqfGIIH4qEDu0Ed2S5GyhCWIWY=
 github.com/klauspost/compress v1.9.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
+github.com/klauspost/compress v1.10.8 h1:eLeJ3dr/Y9+XRfJT4l+8ZjmtB5RPJhucH2HeCV5+IZY=
 github.com/klauspost/compress v1.10.8/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
@@ -83,12 +97,15 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
 github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
 github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
 github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
 github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs=
 github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
 github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
@@ -102,23 +119,28 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
 github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
 github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
 github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
 github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og=
+github.com/prometheus/client_golang v1.7.1 h1:NTGy1Ja9pByO+xAeH/qiWnLrKtr3hJPNjaVUwnjpdpA=
 github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
 github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
 github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
 github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
+github.com/prometheus/common v0.10.0 h1:RyRA7RzGXQZiW+tGMr7sxa85G1z0yOpM1qq5c8lNawc=
 github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
 github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
 github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
 github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
+github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
 github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
 github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
@@ -134,9 +156,11 @@ github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoH
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0=
 github.com/yahoo/athenz v1.8.55/go.mod h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
 golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -154,8 +178,10 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
 golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0=
 golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw=
 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -179,9 +205,11 @@ golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
 golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -206,6 +234,7 @@ google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ
 google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
 google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
 google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@@ -222,6 +251,7 @@ gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
 gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
 gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/pulsar-function-go/examples/publishFunc/publishFunc.go b/pulsar-function-go/examples/publishFunc/publishFunc.go
new file mode 100644
index 0000000..bd0fc4b
--- /dev/null
+++ b/pulsar-function-go/examples/publishFunc/publishFunc.go
@@ -0,0 +1,54 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package main
+
+import (
+	"context"
+	"errors"
+	"log"
+
+	"github.com/apache/pulsar-client-go/pulsar"
+	"github.com/apache/pulsar/pulsar-function-go/pf"
+)
+
+func PublishFunc(ctx context.Context, in []byte) error {
+	fctx, ok := pf.FromContext(ctx)
+	if !ok {
+		return errors.New("get Go Functions Context error")
+	}
+
+	publishTopic := "publish-topic"
+	output := append(in, 110)
+
+	producer := fctx.NewOutputMessage(publishTopic)
+	msgID, err := producer.Send(ctx, &pulsar.ProducerMessage{
+		Payload: output,
+	})
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	log.Printf("The output message ID is: %+v", msgID)
+	return nil
+}
+
+func main() {
+	pf.Start(PublishFunc)
+}
\ No newline at end of file
diff --git a/pulsar-function-go/pf/context.go b/pulsar-function-go/pf/context.go
index b8414ce..ef27471 100644
--- a/pulsar-function-go/pf/context.go
+++ b/pulsar-function-go/pf/context.go
@@ -28,10 +28,11 @@ import (
 )
 
 type FunctionContext struct {
-	instanceConf *instanceConf
-	userConfigs  map[string]interface{}
-	logAppender  *LogAppender
-	record       pulsar.Message
+	instanceConf  *instanceConf
+	userConfigs   map[string]interface{}
+	logAppender   *LogAppender
+	outputMessage func(topic string) pulsar.Producer
+	record        pulsar.Message
 }
 
 func NewFuncContext() *FunctionContext {
@@ -119,6 +120,12 @@ func (c *FunctionContext) GetUserConfMap() map[string]interface{} {
 	return c.userConfigs
 }
 
+// NewOutputMessage send message to the topic
+// @param topicName: The name of the topic for output message
+func (c *FunctionContext) NewOutputMessage(topicName string) pulsar.Producer {
+	return c.outputMessage(topicName)
+}
+
 // SetCurrentRecord sets the current message into the function context
 // called for each message before executing a handler function
 func (c *FunctionContext) SetCurrentRecord(record pulsar.Message) {
@@ -128,6 +135,7 @@ func (c *FunctionContext) SetCurrentRecord(record pulsar.Message) {
 // GetCurrentRecord gets the current message from the function context
 func (c *FunctionContext) GetCurrentRecord() pulsar.Message {
 	return c.record
+
 }
 
 // An unexported type to be used as the key for types in this package.
diff --git a/pulsar-function-go/pf/context_test.go b/pulsar-function-go/pf/context_test.go
index 6da6e78..52a8abc 100644
--- a/pulsar-function-go/pf/context_test.go
+++ b/pulsar-function-go/pf/context_test.go
@@ -24,6 +24,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/apache/pulsar-client-go/pulsar"
 	"github.com/stretchr/testify/assert"
 )
 
@@ -69,3 +70,15 @@ func TestFunctionContext_setCurrentRecord(t *testing.T) {
 
 	assert.IsType(t, &MockMessage{}, fc.record)
 }
+
+func TestFunctionContext_NewOutputMessage(t *testing.T) {
+	fc := NewFuncContext()
+	publishTopic := "publish-topic"
+
+	fc.outputMessage = func(topic string) pulsar.Producer {
+		return &MockPulsarProducer{}
+	}
+
+	actualProducer := fc.NewOutputMessage(publishTopic)
+	assert.IsType(t, &MockPulsarProducer{}, actualProducer)
+}
diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go
index 45ced39..949ab5c 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -66,6 +66,15 @@ func newGoInstance() *goInstance {
 		consumers: make(map[string]pulsar.Consumer),
 	}
 	now := time.Now()
+
+	goInstance.context.outputMessage = func(topic string) pulsar.Producer {
+		producer, err := goInstance.getProducer(topic)
+		if err != nil {
+			log.Fatal(err)
+		}
+		return producer
+	}
+
 	goInstance.lastHealthCheckTs = now.UnixNano()
 	goInstance.properties = make(map[string]string)
 	goInstance.stats = NewStatWithLabelValues(goInstance.getMetricsLabels()...)
@@ -191,30 +200,44 @@ func (gi *goInstance) setupClient() error {
 	return nil
 }
 
-func (gi *goInstance) setupProducer() (err error) {
+func (gi *goInstance) setupProducer() error {
 	if gi.context.instanceConf.funcDetails.Sink.Topic != "" && len(gi.context.instanceConf.funcDetails.Sink.Topic) > 0 {
 		log.Debugf("Setting up producer for topic %s", gi.context.instanceConf.funcDetails.Sink.Topic)
-		properties := getProperties(getDefaultSubscriptionName(
-			gi.context.instanceConf.funcDetails.Tenant,
-			gi.context.instanceConf.funcDetails.Namespace,
-			gi.context.instanceConf.funcDetails.Name), gi.context.instanceConf.instanceID)
-		gi.producer, err = gi.client.CreateProducer(pulsar.ProducerOptions{
-			Topic:                   gi.context.instanceConf.funcDetails.Sink.Topic,
-			Properties:              properties,
-			CompressionType:         pulsar.LZ4,
-			BatchingMaxPublishDelay: time.Millisecond * 10,
-			// Set send timeout to be infinity to prevent potential deadlock with consumer
-			// that might happen when consumer is blocked due to unacked messages
-		})
+		producer, err := gi.getProducer(gi.context.instanceConf.funcDetails.Sink.Topic)
 		if err != nil {
-			gi.stats.incrTotalSysExceptions(err)
-			log.Errorf("create producer error:%s", err.Error())
-			return err
+			log.Fatal(err)
 		}
+
+		gi.producer = producer
+		return nil
 	}
+
 	return nil
 }
 
+func (gi *goInstance) getProducer(topicName string) (pulsar.Producer, error) {
+	properties := getProperties(getDefaultSubscriptionName(
+		gi.context.instanceConf.funcDetails.Tenant,
+		gi.context.instanceConf.funcDetails.Namespace,
+		gi.context.instanceConf.funcDetails.Name), gi.context.instanceConf.instanceID)
+
+	producer, err := gi.client.CreateProducer(pulsar.ProducerOptions{
+		Topic:                   topicName,
+		Properties:              properties,
+		CompressionType:         pulsar.LZ4,
+		BatchingMaxPublishDelay: time.Millisecond * 10,
+		// Set send timeout to be infinity to prevent potential deadlock with consumer
+		// that might happen when consumer is blocked due to unacked messages
+	})
+	if err != nil {
+		gi.stats.incrTotalSysExceptions(err)
+		log.Errorf("create producer error:%s", err.Error())
+		return nil, err
+	}
+
+	return producer, err
+}
+
 func (gi *goInstance) setupConsumer() (chan pulsar.ConsumerMessage, error) {
 	subscriptionType := pulsar.Shared
 	if int32(gi.context.instanceConf.funcDetails.Source.SubscriptionType) == pb.SubscriptionType_value["FAILOVER"] {
diff --git a/pulsar-function-go/pf/mockMessage_test.go b/pulsar-function-go/pf/mockMessage_test.go
index c708553..2e1cade 100644
--- a/pulsar-function-go/pf/mockMessage_test.go
+++ b/pulsar-function-go/pf/mockMessage_test.go
@@ -20,6 +20,7 @@
 package pf
 
 import (
+	"context"
 	"time"
 
 	"github.com/apache/pulsar-client-go/pulsar"
@@ -80,3 +81,32 @@ type MockMessageID struct{}
 func (m *MockMessageID) Serialize() []byte {
 	return []byte(`message-id`)
 }
+
+type MockPulsarProducer struct{}
+
+func (producer *MockPulsarProducer) Topic() string {
+	return "publish-topic"
+}
+
+func (producer *MockPulsarProducer) Name() string {
+	return "publish-producer"
+}
+
+func (producer *MockPulsarProducer) Send(context.Context, *pulsar.ProducerMessage) (pulsar.MessageID, error) {
+	return nil, nil
+}
+
+func (producer *MockPulsarProducer) SendAsync(context.Context, *pulsar.ProducerMessage,
+	func(pulsar.MessageID, *pulsar.ProducerMessage, error)) {
+}
+
+func (producer *MockPulsarProducer) LastSequenceID() int64 {
+	return int64(10)
+}
+
+func (producer *MockPulsarProducer) Flush() error {
+	return nil
+}
+
+func (producer *MockPulsarProducer) Close() {
+}
diff --git a/site2/docs/functions-develop.md b/site2/docs/functions-develop.md
index bfee5c6..de059b8 100644
--- a/site2/docs/functions-develop.md
+++ b/site2/docs/functions-develop.md
@@ -455,6 +455,10 @@ func (c *FunctionContext) SetCurrentRecord(record pulsar.Message) {
 func (c *FunctionContext) GetCurrentRecord() pulsar.Message {
   return c.record
 }
+
+func (c *FunctionContext) NewOutputMessage(topic string) pulsar.Producer {
+	return c.outputMessage(topic)
+}
 ```
 
 The following example uses several methods available via the `Context` object.