You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/05/12 06:13:13 UTC

[pulsar] branch master updated: [pulsar-function-go] Add statistics and Prometheus to Go Function instances for production readiness (#6105)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 25344c7  [pulsar-function-go] Add statistics and Prometheus to Go Function instances for production readiness (#6105)
25344c7 is described below

commit 25344c70aa0a44202026bed6e993735989910c17
Author: Devin Bost <de...@users.noreply.github.com>
AuthorDate: Tue May 12 00:13:04 2020 -0600

    [pulsar-function-go] Add statistics and Prometheus to Go Function instances for production readiness (#6105)
    
    * Enabled grpc plugin to gRPC generate.sh script to fix issues causing omission of methods for gRPC server registration in generated gRPC files for Go. (#4175)
    
    Generated updated gRPC files that contain service registration methods for creating gRPC service in Go. Also, upgraded proto version to 3. (#4175)
    
    Fixed build errors by prefixing pulsar-function-go/pb with pb alias. (#4175).
    
    Added instanceControlServicer.go as the servicer responsible for serving the gRPC service for the Go Function instances (#4175). Rough draft right now.
    
    Added changes to show intent behind passing port value to Start in function.go. Also, added some code to support healthcheck and added methods to support instanceConrolServicer. Just needed to commit changes to allow reproducible test errors. (#4175).
    
    Updated function.go Start method to make it more clear where we need to provide a port value (#4175).
    
    Added port and expectedHealthCheckInterval to use of function context. Updated all references. (#4175)
    
    Added Apache license to gRPC-generated files in attempt to get license check test to pass (#4175).
    
    Created instanceControlServicer_test.go to test gRPC server and validate that HealthCheck method returns true as expected (#4175).
    
    Fixed bug in FunctionContext (and context_test.go) where the inputTopics field was being referenced when it wasn't getting populated. Updated GetInputTopics method to get input topics from the source location (#4175).
    
    Fixed bug in FunctionContext (and context_test.go) where the inputTopics field was being referenced when it wasn't getting populated. Updated GetInputTopics method to get input topics from the source location. (Should have been part of previous commit.) Also, added expectedHealthCheckInterval to conf.yaml for testing. (#4175).
    
    Fixed license formatting by running mvn license:format (#4175).
    
    Added logic and tests to allow healthCheck to kill instances that aren't receiving their regular health checks. Still needs an end-to-end test involving FunctionManager to check for possible issues that could kill instances incorrectly (#4175).
    
    Removed inputTopics field from FunctionContext (#4175).
    
    Adding the progress I've made so far on migrating the Prometheus code to Go... currently blocked due to missing methods from the Go client. Waiting for information from the Prometheus maintainers to find a workaround. (#4175).
    
    Fixed license check. (#4175)
    
    Reverting the last two commits since they should go into a separate PR. (#4174).
    
    Re-added test file that was accidentially deleted (#4175).
    
    Added a few comments to make review easier (#4175).
    
    Made minor (non-functional) changes as per PR review (#4175).
    
    Fixed print statements (#4175).
    
    Re-added comment after getting maven license formatting correct (#4175).
    
    Removed comment that I forgot to remove (#4175).
    
    Fixed formatting issues for style check (#4175).
    
    Updated gRPC test to no longer use deprecated method (#4175).
    
    Fixed more formatting issues by using goimports (#4175).
    
    Fixed even more formatting issues (#4175).
    
    Fixed yet even more formatting issues (#4175).
    
    Added statistics functionality for supporting Prometheus and stats and status commands on Go functions. Needs testing. Also, needs review of specific locations of stats method calls to ensure we're collecting data in the right places. Also, still needs the 1m interval stats to be created. Upstream Prometheus changes prevented us from using the existing approaches for collecting these stats.
    
    * Improved formatting of Go code. Also, added some required comments to get golint to pass. #6105
    
    * Fixed more Go formatting issues. #6105
    
    * Fixed more formatting issues. #6105
    
    * Ran 'gofmt -s -w .' #6105
    
    Co-authored-by: Devin Bost <db...@overstock.com>
---
 pulsar-function-go/examples/go.sum  |  49 ++++++
 pulsar-function-go/go.mod           |   3 +
 pulsar-function-go/go.sum           |  82 +++++++++-
 pulsar-function-go/pf/context.go    |   8 +
 pulsar-function-go/pf/instance.go   | 235 ++++++++++++++++++++++++++-
 pulsar-function-go/pf/stats.go      | 316 ++++++++++++++++++++++++++++++++++++
 pulsar-function-go/pf/stats_test.go | 222 +++++++++++++++++++++++++
 7 files changed, 903 insertions(+), 12 deletions(-)

diff --git a/pulsar-function-go/examples/go.sum b/pulsar-function-go/examples/go.sum
index 583ae21..338a6fe 100644
--- a/pulsar-function-go/examples/go.sum
+++ b/pulsar-function-go/examples/go.sum
@@ -1,18 +1,32 @@
 cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
 github.com/apache/pulsar-client-go v0.0.0-20200113085434-9b739cf9d098/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
 github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed h1:Lp7eU5ym84jPmIXoonoaJWVN6psyB90Olookp61LCeA=
 github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
 github.com/apache/pulsar-client-go v0.1.0/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
 github.com/apache/pulsar/pulsar-function-go v0.0.0-20200124033432-ec122ed9562c/go.mod h1:2a3PacwSg4KPcGxO3bjH29xsoKSuSkq2mG0sjKtxsP4=
 github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
+github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
+github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
+github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
+github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
+github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
+github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
 github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
 github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@@ -21,16 +35,41 @@ github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs
 github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
+github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
+github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
 github.com/klauspost/compress v1.9.2 h1:LfVyl+ZlLlLDeQ/d2AqfGIIH4qEDu0Ed2S5GyhCWIWY=
 github.com/klauspost/compress v1.9.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
+github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
 github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
+github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
+github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
+github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og=
+github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
+github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
+github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
+github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
+github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
+github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
+github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
 github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
 github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
@@ -43,6 +82,7 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -50,18 +90,25 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx
 golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
 golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
 golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
 golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -77,7 +124,9 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi
 google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
 google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg=
 google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
diff --git a/pulsar-function-go/go.mod b/pulsar-function-go/go.mod
index 4a2e8f3..6352535 100644
--- a/pulsar-function-go/go.mod
+++ b/pulsar-function-go/go.mod
@@ -5,8 +5,11 @@ go 1.13
 require (
 	github.com/apache/pulsar-client-go v0.1.0
 	github.com/golang/protobuf v1.3.2
+	github.com/prometheus/client_golang v1.3.0
+	github.com/prometheus/client_model v0.1.0
 	github.com/sirupsen/logrus v1.4.2
 	github.com/stretchr/testify v1.4.0
+	golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect
 	google.golang.org/grpc v1.26.0
 	gopkg.in/yaml.v2 v2.2.8
 )
diff --git a/pulsar-function-go/go.sum b/pulsar-function-go/go.sum
index 02c922f..d0984b6 100644
--- a/pulsar-function-go/go.sum
+++ b/pulsar-function-go/go.sum
@@ -1,4 +1,5 @@
 cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/apache/pulsar v2.5.0+incompatible h1:g7BI4EcfHFq+OwyXXPeEnTo2hL2Koq8Y/ckBkccDgmA=
 github.com/apache/pulsar-client-go v0.0.0-20200113085434-9b739cf9d098/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
@@ -7,37 +8,91 @@ github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed/go.mod h1:
 github.com/apache/pulsar-client-go v0.1.0/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
 github.com/apache/pulsar/pulsar-function-go v0.0.0-20200124033432-ec122ed9562c h1:uqA9RBsmQz3gN045GQ6we1RfRsk+5dco60yJ695Yb0E=
 github.com/apache/pulsar/pulsar-function-go v0.0.0-20200124033432-ec122ed9562c/go.mod h1:2a3PacwSg4KPcGxO3bjH29xsoKSuSkq2mG0sjKtxsP4=
+github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed h1:Lp7eU5ym84jPmIXoonoaJWVN6psyB90Olookp61LCeA=
+github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4=
 github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
+github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
+github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
+github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
+github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
+github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
-github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
-github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
-github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
+github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
+github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
 github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
 github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
+github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8=
+github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg=
+github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
+github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
+github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
 github.com/klauspost/compress v1.9.2 h1:LfVyl+ZlLlLDeQ/d2AqfGIIH4qEDu0Ed2S5GyhCWIWY=
 github.com/klauspost/compress v1.9.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
+github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
+github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
 github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
+github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
+github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
+github.com/prometheus/client_golang v1.3.0 h1:miYCvYqFXtl/J9FIy8eNpBfYthAEFg+Ys0XyUVEcDsc=
+github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og=
+github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
+github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM=
+github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/client_model v0.1.0 h1:ElTg5tNp4DqfV7UQjDqv2+RJlNzsDtvNAWccbItceIE=
+github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
+github.com/prometheus/common v0.7.0 h1:L+1lyG48J1zAQXA3RBX/nG/B3gjlHq0zTt2tlbJLyCY=
+github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
+github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
+github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
+github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8=
+github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
+github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
 github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
 github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
@@ -48,9 +103,12 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -70,11 +128,24 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
 golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
+golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
+golang.org/x/sys v0.0.0-20191220142924-d4481acd189f h1:68K/z8GLUxV76xGSqwTWw2gyk/jwn79LUL43rES2g8o=
+golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A=
 golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
 google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
 google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@@ -85,11 +156,14 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi
 google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
 google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg=
 google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
 gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
-honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
\ No newline at end of file
diff --git a/pulsar-function-go/pf/context.go b/pulsar-function-go/pf/context.go
index d8e7dbf..89cae42 100644
--- a/pulsar-function-go/pf/context.go
+++ b/pulsar-function-go/pf/context.go
@@ -57,6 +57,14 @@ func (c *FunctionContext) GetOutputTopic() string {
 	return c.instanceConf.funcDetails.GetSink().Topic
 }
 
+func (c *FunctionContext) GetTenantAndNamespace() string {
+	return c.GetFuncTenant() + "/" + c.GetFuncNamespace()
+}
+
+func (c *FunctionContext) GetTenantAndNamespaceAndName() string {
+	return c.GetFuncTenant() + "/" + c.GetFuncNamespace() + "/" + c.GetFuncName()
+}
+
 func (c *FunctionContext) GetFuncTenant() string {
 	return c.instanceConf.funcDetails.Tenant
 }
diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go
index e039490..f1c600c 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -22,6 +22,7 @@ package pf
 import (
 	"context"
 	"math"
+	"strconv"
 	"time"
 
 	"github.com/golang/protobuf/ptypes/empty"
@@ -29,6 +30,7 @@ import (
 	"github.com/apache/pulsar-client-go/pulsar"
 	log "github.com/apache/pulsar/pulsar-function-go/logutil"
 	pb "github.com/apache/pulsar/pulsar-function-go/pb"
+	io_prometheus_client "github.com/prometheus/client_model/go"
 )
 
 type goInstance struct {
@@ -39,6 +41,21 @@ type goInstance struct {
 	client            pulsar.Client
 	lastHealthCheckTs int64
 	properties        map[string]string
+	stats             StatWithLabelValues
+}
+
+func (gi *goInstance) getMetricsLabels() []string {
+	// e.g. metrics_labels = []string{"test-tenant","test-tenant/test-namespace", "test-name", "1234", "test-cluster",
+	//	"test-tenant/test-namespace/test-name"}
+	metricsLabels := []string{
+		gi.context.GetFuncTenant(),
+		gi.context.GetTenantAndNamespace(),
+		gi.context.GetFuncName(),
+		gi.context.GetFuncID(),
+		gi.context.GetClusterName(),
+		gi.context.GetTenantAndNamespaceAndName(),
+	}
+	return metricsLabels
 }
 
 // newGoInstance init goInstance and init function context
@@ -50,6 +67,7 @@ func newGoInstance() *goInstance {
 	now := time.Now()
 	goInstance.lastHealthCheckTs = now.UnixNano()
 	goInstance.properties = make(map[string]string)
+	goInstance.stats = NewStatWithLabelValues(goInstance.getMetricsLabels()...)
 	return goInstance
 }
 
@@ -128,20 +146,27 @@ CLOSE:
 			if autoAck && atMostOnce {
 				gi.ackInputMessage(msgInput)
 			}
-
+			gi.stats.incrTotalReceived()
 			gi.addLogTopicHandler()
 
+			gi.stats.setLastInvocation()
+			gi.stats.processTimeStart()
+
 			output, err := gi.handlerMsg(msgInput)
 			if err != nil {
 				log.Errorf("handler message error:%v", err)
 				if autoAck && atLeastOnce {
 					gi.nackInputMessage(msgInput)
 				}
+				gi.stats.incrTotalUserExceptions(err)
 				return err
 			}
 
 			gi.processResult(msgInput, output)
 
+			gi.stats.processTimeEnd() // Should this be called here or before processResult(..)?
+			gi.stats.incrTotalProcessedSuccessfully()
+
 		case <-idleTimer.C:
 			close(channel)
 			break CLOSE
@@ -160,6 +185,7 @@ func (gi *goInstance) setupClient() error {
 	})
 	if err != nil {
 		log.Errorf("create client error:%v", err)
+		gi.stats.incrTotalSysExceptions(err)
 		return err
 	}
 	gi.client = client
@@ -182,6 +208,7 @@ func (gi *goInstance) setupProducer() (err error) {
 			// that might happen when consumer is blocked due to unacked messages
 		})
 		if err != nil {
+			gi.stats.incrTotalSysExceptions(err)
 			log.Errorf("create producer error:%s", err.Error())
 			return err
 		}
@@ -255,6 +282,7 @@ func (gi *goInstance) setupConsumer() (chan pulsar.ConsumerMessage, error) {
 
 		if err != nil {
 			log.Errorf("create consumer error:%s", err.Error())
+			gi.stats.incrTotalSysExceptions(err)
 			return nil, err
 		}
 		gi.consumers[topic] = consumer
@@ -286,13 +314,20 @@ func (gi *goInstance) processResult(msgInput pulsar.Message, output []byte) {
 				if autoAck && atLeastOnce {
 					gi.nackInputMessage(msgInput)
 				}
+				gi.stats.incrTotalSysExceptions(err)
 				log.Fatal(err)
-			} else if autoAck && !atMostOnce {
-				gi.ackInputMessage(msgInput)
+			} else {
+				if autoAck && !atMostOnce {
+					gi.ackInputMessage(msgInput)
+				}
+				gi.stats.incrTotalProcessedSuccessfully()
 			}
 		})
 	} else if autoAck && atLeastOnce {
 		gi.ackInputMessage(msgInput)
+		// Report that we processed successfully even though it's not going to an output topic?
+		// We probably shouldn't...
+		// gi.stats.incrTotalProcessedSuccessfully()
 	}
 }
 
@@ -374,17 +409,201 @@ func (gi *goInstance) healthCheck() *pb.HealthCheckResult {
 }
 
 func (gi *goInstance) getFunctionStatus() *pb.FunctionStatus {
-	return nil // Not implemented until we add the statistics features
+	status := pb.FunctionStatus{}
+	status.Running = true
+	totalReceived := gi.getTotalReceived()
+	totalProcessedSuccessfully := gi.getTotalProcessedSuccessfully()
+	totalUserExceptions := gi.getTotalUserExceptions()
+	totalSysExceptions := gi.getTotalSysExceptions()
+	avgProcessLatencyMs := gi.getAvgProcessLatency()
+	lastInvocation := gi.getLastInvocation()
+
+	status.NumReceived = int64(totalReceived)
+	status.NumSuccessfullyProcessed = int64(totalProcessedSuccessfully)
+	status.NumUserExceptions = int64(totalUserExceptions)
+	status.InstanceId = strconv.Itoa(gi.context.instanceConf.instanceID)
+
+	status.NumUserExceptions = int64(totalUserExceptions)
+	for _, exPair := range gi.stats.latestUserException {
+		toAdd := pb.FunctionStatus_ExceptionInformation{}
+		toAdd.ExceptionString = exPair.exception.Error()
+		toAdd.MsSinceEpoch = exPair.timestamp
+		status.LatestUserExceptions = append(status.LatestUserExceptions, &toAdd)
+	}
+
+	status.NumSystemExceptions = int64(totalSysExceptions)
+	for _, exPair := range gi.stats.latestSysException {
+		toAdd := pb.FunctionStatus_ExceptionInformation{}
+		toAdd.ExceptionString = exPair.exception.Error()
+		toAdd.MsSinceEpoch = exPair.timestamp
+		status.LatestSystemExceptions = append(status.LatestSystemExceptions, &toAdd)
+	}
+	status.AverageLatency = float64(avgProcessLatencyMs)
+	status.LastInvocationTime = int64(lastInvocation)
+	return &status
+}
+
+func (gi *goInstance) getMetrics() *pb.MetricsData {
+	totalReceived := gi.getTotalReceived()
+	totalProcessedSuccessfully := gi.getTotalProcessedSuccessfully()
+	totalUserExceptions := gi.getTotalUserExceptions()
+	totalSysExceptions := gi.getTotalSysExceptions()
+	avgProcessLatencyMs := gi.getAvgProcessLatency()
+	lastInvocation := gi.getLastInvocation()
+
+	totalReceived1min := gi.getTotalReceived1min()
+	totalProcessedSuccessfully1min := gi.getTotalProcessedSuccessfully1min()
+	totalUserExceptions1min := gi.getTotalUserExceptions1min()
+	totalSysExceptions1min := gi.getTotalSysExceptions1min()
+	//avg_process_latency_ms_1min := gi.get_avg_process_latency_1min()
+
+	metricsData := pb.MetricsData{}
+	// total metrics
+	metricsData.ReceivedTotal = int64(totalReceived)
+	metricsData.ProcessedSuccessfullyTotal = int64(totalProcessedSuccessfully)
+	metricsData.SystemExceptionsTotal = int64(totalSysExceptions)
+	metricsData.UserExceptionsTotal = int64(totalUserExceptions)
+	metricsData.AvgProcessLatency = float64(avgProcessLatencyMs)
+	metricsData.LastInvocation = int64(lastInvocation)
+	// 1min metrics
+	metricsData.ReceivedTotal_1Min = int64(totalReceived1min)
+	metricsData.ProcessedSuccessfullyTotal_1Min = int64(totalProcessedSuccessfully1min)
+	metricsData.SystemExceptionsTotal_1Min = int64(totalSysExceptions1min)
+	metricsData.UserExceptionsTotal_1Min = int64(totalUserExceptions1min)
+	//metrics_data.AvgProcessLatency_1Min = avg_process_latency_ms_1min
+
+	// get any user metrics
+	// Not sure yet where these are stored.
+	/*
+	   user_metrics := self.contextimpl.get_metrics()
+	   for metric_name, value in user_metrics.items():
+	     metrics_data.userMetrics[metric_name] = value
+	*/
+
+	return &metricsData
 }
 
 func (gi *goInstance) getAndResetMetrics() *pb.MetricsData {
-	return nil // Not implemented until we add the statistics features
+	metricsData := gi.getMetrics()
+	gi.resetMetrics()
+	return metricsData
 }
 
 func (gi *goInstance) resetMetrics() *empty.Empty {
-	return nil // Not implemented until we add the statistics features
+	gi.stats.reset()
+	return &empty.Empty{}
 }
 
-func (gi *goInstance) getMetrics() *pb.MetricsData {
-	return nil // Not implemented until we add the statistics features
+// This method is used to get the required metrics for Prometheus.
+// Note that this doesn't distinguish between parallel function instances!
+func (gi *goInstance) getMatchingMetricFunc() func(lbl *io_prometheus_client.LabelPair) bool {
+	matchMetricFunc := func(lbl *io_prometheus_client.LabelPair) bool {
+		return *lbl.Name == "fqfn" && *lbl.Value == gi.context.GetTenantAndNamespaceAndName()
+	}
+	return matchMetricFunc
+}
+
+// e.g. metricName = "pulsar_function_process_latency_ms"
+func (gi *goInstance) getMatchingMetricFromRegistry(metricName string) io_prometheus_client.Metric {
+	metricFamilies, err := reg.Gather()
+	if err != nil {
+		log.Error("Something went wrong when calling reg.Gather() in getMatchingMetricFromRegistry(..) for " + metricName)
+	}
+	matchFamilyFunc := func(vect *io_prometheus_client.MetricFamily) bool {
+		return *vect.Name == metricName
+	}
+	fiteredMetricFamilies := filter(metricFamilies, matchFamilyFunc)
+	if len(fiteredMetricFamilies) > 1 {
+		// handle this.
+		log.Error("Too many metric families for metricName = " + metricName)
+		// Should we panic here instead of report an error since it reflects a code problem, not a user problem?
+	}
+	metricFunc := gi.getMatchingMetricFunc()
+	matchingMetric := getFirstMatch(fiteredMetricFamilies[0].Metric, metricFunc)
+	return *matchingMetric
+}
+
+func (gi *goInstance) getTotalReceived() float32 {
+	// "pulsar_function_" + "received_total", NewGaugeVec.
+	metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix + TotalReceived)
+	val := metric.GetGauge().Value
+	return float32(*val)
+}
+func (gi *goInstance) getTotalProcessedSuccessfully() float32 {
+	metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix + TotalSuccessfullyProcessed)
+	// "pulsar_function_" + "processed_successfully_total", NewGaugeVec.
+	val := metric.GetGauge().Value
+	return float32(*val)
+}
+func (gi *goInstance) getTotalSysExceptions() float32 {
+	metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix + TotalSystemExceptions)
+	// "pulsar_function_"+ "system_exceptions_total", NewGaugeVec.
+	val := metric.GetGauge().Value
+	return float32(*val)
+}
+
+func (gi *goInstance) getTotalUserExceptions() float32 {
+	metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix + TotalUserExceptions)
+	// "pulsar_function_" + "user_exceptions_total", NewGaugeVec
+	val := metric.GetGauge().Value
+	return float32(*val)
+}
+
+func (gi *goInstance) getAvgProcessLatency() float32 {
+	metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix + ProcessLatencyMs)
+	// "pulsar_function_" + "process_latency_ms", SummaryVec.
+	count := metric.GetSummary().SampleCount
+	sum := metric.GetSummary().SampleSum
+	if *count <= 0.0 {
+		return 0.0
+	}
+	return float32(*sum) / float32(*count)
+}
+
+func (gi *goInstance) getLastInvocation() float32 {
+	metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix + LastInvocation)
+	// "pulsar_function_" + "last_invocation", GaugeVec.
+	val := metric.GetGauge().Value
+	return float32(*val)
+}
+
+func (gi *goInstance) getTotalProcessedSuccessfully1min() float32 {
+	metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix + TotalSuccessfullyProcessed1min)
+	// "pulsar_function_" + "processed_successfully_total_1min", GaugeVec.
+	val := metric.GetGauge().Value
+	return float32(*val)
+}
+
+func (gi *goInstance) getTotalSysExceptions1min() float32 {
+	metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix + TotalSystemExceptions1min)
+	// "pulsar_function_" + "system_exceptions_total_1min", GaugeVec
+	val := metric.GetGauge().Value
+	return float32(*val)
+}
+
+func (gi *goInstance) getTotalUserExceptions1min() float32 {
+	metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix + TotalUserExceptions1min)
+	// "pulsar_function_" + "user_exceptions_total_1min", GaugeVec
+	val := metric.GetGauge().Value
+	return float32(*val)
+}
+
+/*
+func (gi *goInstance) get_avg_process_latency_1min() float32 {
+	metric := gi.getMatchingMetricFromRegistry(PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS_1min)
+	// "pulsar_function_" + "process_latency_ms_1min", SummaryVec
+	count := metric.GetSummary().SampleCount
+	sum := metric.GetSummary().SampleSum
+	if *count <= 0.0 {
+		return 0.0
+	} else {
+		return float32(*sum) / float32(*count)
+	}
+}*/
+
+func (gi *goInstance) getTotalReceived1min() float32 {
+	metric := gi.getMatchingMetricFromRegistry(PulsarFunctionMetricsPrefix + TotalReceived1min)
+	// "pulsar_function_" +  "received_total_1min", GaugeVec
+	val := metric.GetGauge().Value
+	return float32(*val)
 }
diff --git a/pulsar-function-go/pf/stats.go b/pulsar-function-go/pf/stats.go
new file mode 100644
index 0000000..fde52f8
--- /dev/null
+++ b/pulsar-function-go/pf/stats.go
@@ -0,0 +1,316 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+	"strconv"
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+	io_prometheus_client "github.com/prometheus/client_model/go"
+	//"strings"
+	//"github.com/prometheus/common/expfmt"
+	//"time"
+)
+
+var metricsLabelNames = []string{"tenant", "namespace", "name", "instance_id", "cluster", "fqfn"}
+var exceptionLabelNames = []string{"error", "ts"}
+var exceptionMetricsLabelNames = append(metricsLabelNames, exceptionLabelNames...)
+
+const (
+	PulsarFunctionMetricsPrefix = "pulsar_function_"
+
+	TotalSuccessfullyProcessed = "processed_successfully_total"
+	TotalSystemExceptions      = "system_exceptions_total"
+	TotalUserExceptions        = "user_exceptions_total"
+	ProcessLatencyMs           = "process_latency_ms"
+	LastInvocation             = "last_invocation"
+	TotalReceived              = "received_total"
+
+	TotalSuccessfullyProcessed1min = "processed_successfully_total_1min"
+	TotalSystemExceptions1min      = "system_exceptions_total_1min"
+	TotalUserExceptions1min        = "user_exceptions_total_1min"
+	ProcessLatencyMs1min           = "process_latency_ms_1min"
+	TotalReceived1min              = "received_total_1min"
+)
+
+// Declare Prometheus
+var statTotalProcessedSuccessfully = prometheus.NewGaugeVec(
+	prometheus.GaugeOpts{
+		Name: PulsarFunctionMetricsPrefix + TotalSuccessfullyProcessed,
+		Help: "Total number of messages processed successfully."},
+	metricsLabelNames)
+var statTotalSysExceptions = prometheus.NewGaugeVec(
+	prometheus.GaugeOpts{
+		Name: PulsarFunctionMetricsPrefix + TotalSystemExceptions,
+		Help: "Total number of system exceptions."},
+	metricsLabelNames)
+var statTotalUserExceptions = prometheus.NewGaugeVec(
+	prometheus.GaugeOpts{
+		Name: PulsarFunctionMetricsPrefix + TotalUserExceptions,
+		Help: "Total number of user exceptions."},
+	metricsLabelNames)
+
+var statProcessLatencyMs = prometheus.NewSummaryVec(
+	prometheus.SummaryOpts{
+		Name: PulsarFunctionMetricsPrefix + ProcessLatencyMs,
+		Help: "Process latency in milliseconds."}, metricsLabelNames)
+
+var statLastInvocation = prometheus.NewGaugeVec(
+	prometheus.GaugeOpts{
+		Name: PulsarFunctionMetricsPrefix + LastInvocation,
+		Help: "The timestamp of the last invocation of the function."}, metricsLabelNames)
+
+var statTotalReceived = prometheus.NewGaugeVec(
+	prometheus.GaugeOpts{
+		Name: PulsarFunctionMetricsPrefix + TotalReceived,
+		Help: "Total number of messages received from source."}, metricsLabelNames)
+
+// 1min windowed metrics
+var statTotalProcessedSuccessfully1min = prometheus.NewGaugeVec(
+	prometheus.GaugeOpts{
+		Name: PulsarFunctionMetricsPrefix + TotalSuccessfullyProcessed1min,
+		Help: "Total number of messages processed successfully in the last 1 minute."}, metricsLabelNames)
+var statTotalSysExceptions1min = prometheus.NewGaugeVec(
+	prometheus.GaugeOpts{
+		Name: PulsarFunctionMetricsPrefix + TotalSystemExceptions1min,
+		Help: "Total number of system exceptions in the last 1 minute."},
+	metricsLabelNames)
+var statTotalUserExceptions1min = prometheus.NewGaugeVec(
+	prometheus.GaugeOpts{
+		Name: PulsarFunctionMetricsPrefix + TotalUserExceptions1min,
+		Help: "Total number of user exceptions in the last 1 minute."},
+	metricsLabelNames)
+
+var statProcessLatencyMs1min = prometheus.NewSummaryVec(
+	prometheus.SummaryOpts{
+		Name: PulsarFunctionMetricsPrefix + ProcessLatencyMs1min,
+		Help: "Process latency in milliseconds in the last 1 minute."}, metricsLabelNames)
+
+var statTotalReceived1min = prometheus.NewGaugeVec(
+	prometheus.GaugeOpts{
+		Name: PulsarFunctionMetricsPrefix + TotalReceived1min,
+		Help: "Total number of messages received from source in the last 1 minute."}, metricsLabelNames)
+
+// exceptions
+var userExceptions = prometheus.NewGaugeVec(
+	prometheus.GaugeOpts{
+		Name: PulsarFunctionMetricsPrefix + "user_exception",
+		Help: "Exception from user code."}, exceptionMetricsLabelNames)
+
+var systemExceptions = prometheus.NewGaugeVec(
+	prometheus.GaugeOpts{
+		Name: PulsarFunctionMetricsPrefix + "system_exception",
+		Help: "Exception from system code."}, exceptionMetricsLabelNames)
+
+var reg *prometheus.Registry
+
+func init() {
+	reg = prometheus.NewRegistry()
+	reg.MustRegister(statTotalProcessedSuccessfully)
+	reg.MustRegister(statTotalSysExceptions)
+	reg.MustRegister(statTotalUserExceptions)
+	reg.MustRegister(statProcessLatencyMs)
+	reg.MustRegister(statLastInvocation)
+	reg.MustRegister(statTotalReceived)
+	reg.MustRegister(statTotalProcessedSuccessfully1min)
+	reg.MustRegister(statTotalSysExceptions1min)
+	reg.MustRegister(statTotalUserExceptions1min)
+	reg.MustRegister(statProcessLatencyMs1min)
+	reg.MustRegister(statTotalReceived1min)
+	reg.MustRegister(userExceptions)
+	reg.MustRegister(systemExceptions)
+
+}
+
+type LatestException struct {
+	exception error
+	timestamp int64
+}
+
+// Be sure to use the constructor method: NewStatWithLabelValues
+type StatWithLabelValues struct {
+	statTotalProcessedSuccessfully     prometheus.Gauge
+	statTotalSysExceptions             prometheus.Gauge
+	statTotalUserExceptions            prometheus.Gauge
+	statProcessLatencyMs               prometheus.Observer
+	statLastInvocation                 prometheus.Gauge
+	statTotalReceived                  prometheus.Gauge
+	statTotalProcessedSuccessfully1min prometheus.Gauge
+	statTotalSysExceptions1min         prometheus.Gauge
+	statTotalUserExceptions1min        prometheus.Gauge
+	//_stat_process_latency_ms_1min prometheus.Observer
+	statTotalReceived1min prometheus.Gauge
+	latestUserException   []LatestException
+	latestSysException    []LatestException
+	processStartTime      int64
+	metricsLabels         []string
+}
+
+func NewStatWithLabelValues(metricsLabels ...string) StatWithLabelValues {
+	// as optimization
+	var statTotalProcessedSuccessfully = statTotalProcessedSuccessfully.WithLabelValues(metricsLabels...)
+	var statTotalSysExceptions = statTotalSysExceptions.WithLabelValues(metricsLabels...)
+	var statTotalUserExceptions = statTotalUserExceptions.WithLabelValues(metricsLabels...)
+	var statProcessLatencyMs = statProcessLatencyMs.WithLabelValues(metricsLabels...)
+	var statLastInvocation = statLastInvocation.WithLabelValues(metricsLabels...)
+	var statTotalReceived = statTotalReceived.WithLabelValues(metricsLabels...)
+	var statTotalProcessedSuccessfully1min = statTotalProcessedSuccessfully1min.WithLabelValues(metricsLabels...)
+	var statTotalSysExceptions1min = statTotalSysExceptions1min.WithLabelValues(metricsLabels...)
+	var statTotalUserExceptions1min = statTotalUserExceptions1min.WithLabelValues(metricsLabels...)
+	//var _stat_process_latency_ms_1min = stat_process_latency_ms_1min.WithLabelValues(metrics_labels...)
+	var statTotalReceived1min = statTotalReceived1min.WithLabelValues(metricsLabels...)
+
+	statObj := StatWithLabelValues{
+		statTotalProcessedSuccessfully,
+		statTotalSysExceptions,
+		statTotalUserExceptions,
+		statProcessLatencyMs,
+		statLastInvocation,
+		statTotalReceived,
+		statTotalProcessedSuccessfully1min,
+		statTotalSysExceptions1min,
+		statTotalUserExceptions1min,
+		//_stat_process_latency_ms_1min,
+		statTotalReceived1min,
+		[]LatestException{},
+		[]LatestException{},
+		0,
+		metricsLabels,
+	}
+	return statObj
+}
+
+func filter(
+	ss []*io_prometheus_client.MetricFamily,
+	test func(*io_prometheus_client.MetricFamily) bool) (ret []*io_prometheus_client.MetricFamily) {
+	for _, s := range ss {
+		if test(s) {
+			ret = append(ret, s)
+		}
+	}
+	return
+}
+
+func getFirstMatch(
+	metrics []*io_prometheus_client.Metric,
+	test func(*io_prometheus_client.LabelPair) bool) *io_prometheus_client.Metric {
+	for _, met := range metrics {
+		for _, lbl := range met.Label {
+			if test(lbl) {
+				return met
+			}
+		}
+	}
+	return nil
+}
+
+func (stat *StatWithLabelValues) setLastInvocation() {
+	now := time.Now()
+	stat.statLastInvocation.Set(float64(now.UnixNano()))
+}
+
+func (stat *StatWithLabelValues) processTimeStart() {
+	now := time.Now()
+	stat.processStartTime = now.UnixNano()
+}
+
+func (stat *StatWithLabelValues) processTimeEnd() {
+	if stat.processStartTime != 0 {
+		now := time.Now()
+		duration := now.UnixNano() - stat.processStartTime
+		stat.statProcessLatencyMs.Observe(float64(duration))
+		//stat._stat_process_latency_ms_1min.Observe(float64(duration))
+	}
+}
+
+func (stat *StatWithLabelValues) incrTotalUserExceptions(err error) {
+	stat.statTotalUserExceptions.Inc()
+	stat.statTotalUserExceptions1min.Inc()
+	stat.addUserException(err)
+}
+
+func (stat *StatWithLabelValues) addUserException(err error) {
+	now := time.Now()
+	ts := now.UnixNano()
+	errorTs := LatestException{err, ts}
+	stat.latestUserException = append(stat.latestUserException, errorTs)
+	if len(stat.latestUserException) > 10 {
+		stat.latestUserException = stat.latestUserException[1:]
+	}
+	// report exception via prometheus
+	stat.reportUserExceptionPrometheus(err, ts)
+}
+
+//@limits(calls=5, period=60)
+func (stat *StatWithLabelValues) reportUserExceptionPrometheus(exception error, ts int64) {
+	errorTs := []string{exception.Error(), strconv.FormatInt(ts, 10)}
+	exceptionMetricLabels := append(stat.metricsLabels, errorTs...)
+	userExceptions.WithLabelValues(exceptionMetricLabels...).Set(1.0)
+}
+
+func (stat *StatWithLabelValues) incrTotalProcessedSuccessfully() {
+	stat.statTotalProcessedSuccessfully.Inc()
+	stat.statTotalProcessedSuccessfully1min.Inc()
+}
+
+func (stat *StatWithLabelValues) incrTotalSysExceptions(exception error) {
+	stat.statTotalSysExceptions.Inc()
+	stat.statTotalSysExceptions1min.Inc()
+	stat.addSysException(exception)
+}
+
+func (stat *StatWithLabelValues) addSysException(exception error) {
+	now := time.Now()
+	ts := now.UnixNano()
+	errorTs := LatestException{exception, ts}
+	stat.latestSysException = append(stat.latestSysException, errorTs)
+	if len(stat.latestSysException) > 10 {
+		stat.latestSysException = stat.latestSysException[1:]
+	}
+	// report exception via prometheus
+	stat.reportSystemExceptionPrometheus(exception, ts)
+}
+
+//@limits(calls=5, period=60)
+func (stat *StatWithLabelValues) reportSystemExceptionPrometheus(exception error, ts int64) {
+	errorTs := []string{exception.Error(), strconv.FormatInt(ts, 10)}
+	exceptionMetricLabels := append(stat.metricsLabels, errorTs...)
+	systemExceptions.WithLabelValues(exceptionMetricLabels...).Set(1.0)
+}
+
+func (stat *StatWithLabelValues) incrTotalReceived() {
+	stat.statTotalReceived.Inc()
+	stat.statTotalReceived1min.Inc()
+}
+
+func (stat *StatWithLabelValues) reset() {
+	stat.statTotalProcessedSuccessfully1min.Set(0.0)
+	stat.statTotalUserExceptions1min.Set(0.0)
+	stat.statTotalSysExceptions1min.Set(0.0)
+	//stat._stat_process_latency_ms_1min._sum.set(0.0)
+	//stat._stat_process_latency_ms_1min._count.set(0.0)
+	stat.statTotalReceived1min.Set(0.0)
+}
+
+/*
+// start time for windowed metrics
+util.FixedTimer(60, reset, name="windowed-metrics-timer").start()
+*/
diff --git a/pulsar-function-go/pf/stats_test.go b/pulsar-function-go/pf/stats_test.go
new file mode 100644
index 0000000..c79c494
--- /dev/null
+++ b/pulsar-function-go/pf/stats_test.go
@@ -0,0 +1,222 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+	"fmt"
+	"math"
+	"testing"
+
+	"github.com/golang/protobuf/proto"
+	"github.com/prometheus/client_golang/prometheus"
+	io_prometheus_client "github.com/prometheus/client_model/go"
+	"github.com/stretchr/testify/assert"
+)
+
+/*func test(){
+	var metrics_label_names = []string{"tenant", "namespace", "name", "instance_id", "cluster", "fqfn"}
+	var exception_label_names = []string{"error", "ts"}
+	var exception_metrics_label_names = append(metrics_label_names, exception_label_names...)
+	var stat_process_latency_ms = prometheus.NewSummaryVec(
+		prometheus.SummaryOpts{
+			Name: PULSAR_FUNCTION_METRICS_PREFIX + PROCESS_LATENCY_MS,
+			Help: "Process latency in milliseconds."}, metrics_label_names)
+	var reg *prometheus.Registry
+	reg = prometheus.NewRegistry()
+	reg.MustRegister(stat_process_latency_ms)
+	metrics_labels := []string{"test-tenant","test-tenant/test-namespace", "test-name", "1234", "test-cluster",
+		"test-tenant/test-namespace/test-name"}
+	// 1234 is instanceId
+	// ['test-tenant', 'test-tenant/test-namespace', 'test-name',1234,
+    //    'test-cluster', 'test-tenant/test-namespace/test-name']
+	//var _stat_process_latency_ms = stat_process_latency_ms.WithLabelValues(metrics_labels...)
+	//process_latency_ms_count := stat._stat_process_latency_ms._count.get()
+	//process_latency_ms_sum := stat._stat_process_latency_ms._sum.get()
+}
+func  (stat *StatWithLabelValues) getTotalReceived() float32 {
+	gathering, _ := reg.Gather()
+	out := &bytes.Buffer{}
+	for _, mf := range gathering {
+		if _, err := expfmt.MetricFamilyToText(out, mf); err != nil {
+			panic(err)
+		}
+	}
+	fmt.Print(out.String())
+	fmt.Println("----------")
+}
+*/
+func TestExampleSummaryVec(t *testing.T) {
+
+	temps := prometheus.NewSummaryVec(
+		prometheus.SummaryOpts{
+			Name:       "pond_temperature_celsius",
+			Help:       "The temperature of the frog pond.",
+			Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
+		},
+		[]string{"species"},
+	)
+	// Simulate some observations.
+	for i := 0; i < 1000; i++ {
+		temps.WithLabelValues("litoria-caerulea").Observe(30 + math.Floor(120*math.Sin(float64(i)*0.1))/10)
+		temps.WithLabelValues("lithobates-catesbeianus").Observe(32 + math.Floor(100*math.Cos(float64(i)*0.11))/10)
+	}
+
+	// Create a Summary without any observations.
+	temps.WithLabelValues("leiopelma-hochstetteri")
+
+	// Just for demonstration, let's check the state of the summary vector
+	// by registering it with a custom registry and then let it collect the
+	// metrics.
+	reg := prometheus.NewRegistry()
+	reg.MustRegister(temps)
+
+	metricFamilies, err := reg.Gather()
+	if err != nil || len(metricFamilies) != 1 {
+		panic("unexpected behavior of custom test registry")
+	}
+	match := func(vect *io_prometheus_client.MetricFamily) bool {
+		return *vect.Name == "pond_temperature_celsius"
+	}
+	fiteredMetricFamilies := filter(metricFamilies, match)
+
+	if len(fiteredMetricFamilies) > 1 {
+		panic("Too many metric families")
+	}
+	// Then, we need to filter the metrics in the family to one that matches our label.
+
+	fmt.Println(proto.MarshalTextString(metricFamilies[0]))
+
+	// Output:
+	// name: "pond_temperature_celsius"
+	// help: "The temperature of the frog pond."
+	// type: SUMMARY
+	// metric: <
+	//   label: <
+	//     name: "species"
+	//     value: "leiopelma-hochstetteri"
+	//   >
+	//   summary: <
+	//     sample_count: 0
+	//     sample_sum: 0
+	//     quantile: <
+	//       quantile: 0.5
+	//       value: nan
+	//     >
+	//     quantile: <
+	//       quantile: 0.9
+	//       value: nan
+	//     >
+	//     quantile: <
+	//       quantile: 0.99
+	//       value: nan
+	//     >
+	//   >
+	// >
+	// metric: <
+	//   label: <
+	//     name: "species"
+	//     value: "lithobates-catesbeianus"
+	//   >
+	//   summary: <
+	//     sample_count: 1000
+	//     sample_sum: 31956.100000000017
+	//     quantile: <
+	//       quantile: 0.5
+	//       value: 32.4
+	//     >
+	//     quantile: <
+	//       quantile: 0.9
+	//       value: 41.4
+	//     >
+	//     quantile: <
+	//       quantile: 0.99
+	//       value: 41.9
+	//     >
+	//   >
+	// >
+	// metric: <
+	//   label: <
+	//     name: "species"
+	//     value: "litoria-caerulea"
+	//   >
+	//   summary: <
+	//     sample_count: 1000
+	//     sample_sum: 29969.50000000001
+	//     quantile: <
+	//       quantile: 0.5
+	//       value: 31.1
+	//     >
+	//     quantile: <
+	//       quantile: 0.9
+	//       value: 41.3
+	//     >
+	//     quantile: <
+	//       quantile: 0.99
+	//       value: 41.9
+	//     >
+	//   >
+	// >
+}
+func TestExampleSummaryVec_Pulsar(t *testing.T) {
+
+	_statProcessLatencyMs1 := prometheus.NewSummaryVec(
+		prometheus.SummaryOpts{
+			Name: "pulsar_function_process_latency_ms",
+			Help: "Process latency in milliseconds."}, metricsLabelNames)
+
+	metricsLabels := []string{"test-tenant", "test-tenant/test-namespace", "test-name", "1234", "test-cluster",
+		"test-tenant/test-namespace/test-name"}
+	// 1234 is instanceId
+	statProcessLatencyMs := _statProcessLatencyMs1.WithLabelValues(metricsLabels...)
+
+	// Simulate some observations.
+	for i := 0; i < 1000; i++ {
+		statProcessLatencyMs.Observe(30 + math.Floor(120*math.Sin(float64(i)*0.1))/10)
+		statProcessLatencyMs.Observe(32 + math.Floor(100*math.Cos(float64(i)*0.11))/10)
+	}
+
+	// Just for demonstration, let's check the state of the summary vector
+	// by registering it with a custom registry and then let it collect the
+	// metrics.
+	reg := prometheus.NewRegistry()
+	reg.MustRegister(_statProcessLatencyMs1)
+
+	metricFamilies, err := reg.Gather()
+	if err != nil || len(metricFamilies) != 1 {
+		panic("unexpected behavior of custom test registry")
+	}
+	matchFamilyFunc := func(vect *io_prometheus_client.MetricFamily) bool {
+		return *vect.Name == "pulsar_function_process_latency_ms"
+	}
+	fiteredMetricFamilies := filter(metricFamilies, matchFamilyFunc)
+	if len(fiteredMetricFamilies) > 1 {
+		panic("Too many metric families")
+	}
+	// Then, we need to filter the metrics in the family to one that matches our label.
+	// *lbl.Name == "fqfn" && *lbl.Value == fqfn
+	matchMetricFunc := func(lbl *io_prometheus_client.LabelPair) bool {
+		return *lbl.Name == "fqfn" && *lbl.Value == "test-tenant/test-namespace/test-name"
+	}
+	matchingMetric := getFirstMatch(fiteredMetricFamilies[0].Metric, matchMetricFunc)
+	count := matchingMetric.GetSummary().SampleCount
+	sum := matchingMetric.GetSummary().SampleSum
+	assert.Equal(t, 61925, int(*sum))
+	assert.Equal(t, 2000, int(*count))
+}