You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/16 08:35:21 UTC
[pulsar] branch master updated: [go function]support log topic for
go function (#4008)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ccf4581 [go function]support log topic for go function (#4008)
ccf4581 is described below
commit ccf4581e91e1c8b60488f1fd0c8f114032c3c950
Author: 冉小龙 <rx...@qq.com>
AuthorDate: Tue Apr 16 16:35:15 2019 +0800
[go function]support log topic for go function (#4008)
### Motivation
support log topic for go function, user-friendly debugging.
Signed-off-by: xiaolong.ran <ra...@gmail.com>
---
pulsar-function-go/conf/conf.yaml | 2 +-
pulsar-function-go/go.mod | 4 +-
pulsar-function-go/go.sum | 4 +-
pulsar-function-go/pf/context.go | 1 +
pulsar-function-go/pf/instance.go | 46 ++++++++++++++++++-
pulsar-function-go/pf/logAppender.go | 87 ++++++++++++++++++++++++++++++++++++
6 files changed, 136 insertions(+), 8 deletions(-)
diff --git a/pulsar-function-go/conf/conf.yaml b/pulsar-function-go/conf/conf.yaml
index f786cca..86cf8f2 100644
--- a/pulsar-function-go/conf/conf.yaml
+++ b/pulsar-function-go/conf/conf.yaml
@@ -29,7 +29,7 @@ killAfterIdleMs: 50000
tenant: ""
nameSpace: ""
name: "go-function"
-logTopic: ""
+logTopic: "log-topic"
processingGuarantees: 0
secretsMap: ""
runtime: 0
diff --git a/pulsar-function-go/go.mod b/pulsar-function-go/go.mod
index 0762d53..f9957c2 100644
--- a/pulsar-function-go/go.mod
+++ b/pulsar-function-go/go.mod
@@ -1,11 +1,9 @@
module github.com/apache/pulsar/pulsar-function-go
require (
- github.com/apache/pulsar/pulsar-client-go v0.0.0-20190312044336-ff4db8db12be
- github.com/davecgh/go-spew v1.1.1
+ github.com/apache/pulsar/pulsar-client-go v0.0.0-20190320194916-f03971cb5b4f
github.com/golang/protobuf v1.3.0
github.com/sirupsen/logrus v1.4.0
github.com/stretchr/testify v1.3.0
- gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v2 v2.2.2
)
diff --git a/pulsar-function-go/go.sum b/pulsar-function-go/go.sum
index 4a17410..2187bf9 100644
--- a/pulsar-function-go/go.sum
+++ b/pulsar-function-go/go.sum
@@ -1,7 +1,7 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
-github.com/apache/pulsar/pulsar-client-go v0.0.0-20190312044336-ff4db8db12be h1:BpPXJTLeqlWQPlMZxFP+/Sm1Zva7NJB2gWRfWlN8xi8=
-github.com/apache/pulsar/pulsar-client-go v0.0.0-20190312044336-ff4db8db12be/go.mod h1:Dt5jPpS2v4WlPya7e9jXrqGN+6BVzVyaBw+bixjLFFU=
+github.com/apache/pulsar/pulsar-client-go v0.0.0-20190320194916-f03971cb5b4f h1:X408m6eknpEC7CoRBLzkvYzNpslP2D4/xQJNfR5Q0mk=
+github.com/apache/pulsar/pulsar-client-go v0.0.0-20190320194916-f03971cb5b4f/go.mod h1:Dt5jPpS2v4WlPya7e9jXrqGN+6BVzVyaBw+bixjLFFU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
diff --git a/pulsar-function-go/pf/context.go b/pulsar-function-go/pf/context.go
index 4818326..9681ae7 100644
--- a/pulsar-function-go/pf/context.go
+++ b/pulsar-function-go/pf/context.go
@@ -27,6 +27,7 @@ type FunctionContext struct {
instanceConf *instanceConf
userConfigs map[string]interface{}
inputTopics []string
+ logAppender *LogAppender
}
func NewFuncContext() *FunctionContext {
diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go
index 9c4e670..d809932 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -63,6 +63,11 @@ func (gi *goInstance) startFunction(function function) error {
log.Errorf("setup consumer failed, error is:%v", err)
return err
}
+ err = gi.setupLogHandler()
+ if err != nil {
+ log.Errorf("setup log appender failed, error is:%v", err)
+ return err
+ }
CLOSE:
for {
@@ -75,6 +80,9 @@ CLOSE:
if autoAck && atMostOnce {
gi.ackInputMessage(msgInput)
}
+
+ gi.addLogTopicHandler()
+
output, err := gi.handlerMsg(msgInput)
if err != nil {
log.Errorf("handler message error:%v", err)
@@ -83,6 +91,7 @@ CLOSE:
}
return err
}
+
gi.processResult(msgInput, output)
case <-time.After(getIdleTimeout(time.Millisecond * gi.context.instanceConf.killAfterIdleMs)):
@@ -91,6 +100,7 @@ CLOSE:
}
}
+ gi.closeLogTopic()
gi.close()
return nil
}
@@ -248,8 +258,7 @@ func (gi *goInstance) ackInputMessage(inputMessage pulsar.Message) {
}
func (gi *goInstance) nackInputMessage(inputMessage pulsar.Message) {
- //todo: in the current version of pulsar-client-go, we do not support this operation
- //gi.consumers[inputMessage.Topic()].Nack(inputMessage)
+ gi.consumers[inputMessage.Topic()].Nack(inputMessage)
}
func getIdleTimeout(timeoutMilliSecond time.Duration) time.Duration {
@@ -259,6 +268,39 @@ func getIdleTimeout(timeoutMilliSecond time.Duration) time.Duration {
return timeoutMilliSecond
}
+func (gi *goInstance) setupLogHandler() error {
+ if gi.context.instanceConf.funcDetails.GetLogTopic() != "" {
+ gi.context.logAppender = NewLogAppender(
+ gi.client, //pulsar client
+ gi.context.instanceConf.funcDetails.GetLogTopic(), //log topic
+ getDefaultSubscriptionName(gi.context.instanceConf.funcDetails.Tenant, //fqn
+ gi.context.instanceConf.funcDetails.Namespace,
+ gi.context.instanceConf.funcDetails.Name),
+ )
+ return gi.context.logAppender.Start()
+ }
+ return nil
+}
+
+func (gi *goInstance) addLogTopicHandler() {
+ if gi.context.logAppender == nil {
+ panic("please init logAppender")
+ }
+
+ for _, logByte := range log.StrEntry {
+ gi.context.logAppender.Append([]byte(logByte))
+ }
+}
+
+func (gi *goInstance) closeLogTopic() {
+ log.Info("closing log topic...")
+ if gi.context.logAppender == nil {
+ return
+ }
+ gi.context.logAppender.Stop()
+ gi.context.logAppender = nil
+}
+
func (gi *goInstance) close() {
log.Info("closing go instance...")
if gi.producer != nil {
diff --git a/pulsar-function-go/pf/logAppender.go b/pulsar-function-go/pf/logAppender.go
new file mode 100644
index 0000000..f46b7c6
--- /dev/null
+++ b/pulsar-function-go/pf/logAppender.go
@@ -0,0 +1,87 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+ "context"
+ "time"
+
+ "github.com/apache/pulsar/pulsar-client-go/pulsar"
+ "github.com/apache/pulsar/pulsar-function-go/log"
+)
+
+type LogAppender struct {
+ pulsarClient pulsar.Client
+ logTopic string
+ fqn string
+ producer pulsar.Producer
+}
+
+func NewLogAppender(client pulsar.Client, logTopic, fqn string) *LogAppender {
+ logAppender := &LogAppender{
+ pulsarClient: client,
+ logTopic: logTopic,
+ fqn: fqn,
+ }
+ return logAppender
+}
+
+func (la *LogAppender) Start() error {
+ producer, err := la.pulsarClient.CreateProducer(pulsar.ProducerOptions{
+ Topic: la.logTopic,
+ BlockIfQueueFull: false,
+ Batching: true,
+ CompressionType: pulsar.LZ4,
+ BatchingMaxPublishDelay: 100 * time.Millisecond,
+ Properties: map[string]string{
+ "function": la.fqn,
+ },
+ })
+ if err != nil {
+ log.Errorf("create producer error:%s", err.Error())
+ return err
+ }
+ la.producer = producer
+ return nil
+}
+
+func (la *LogAppender) Append(logByte []byte) {
+ ctx := context.Background()
+ asyncMsg := pulsar.ProducerMessage{
+ Payload: logByte,
+ }
+ la.producer.SendAsync(ctx, asyncMsg, func(msg pulsar.ProducerMessage, err error) {
+ if err != nil {
+ log.Fatal(err)
+ }
+ })
+}
+
+func (la *LogAppender) GetName() string {
+ return la.fqn
+}
+
+func (la *LogAppender) Stop() {
+ err := la.producer.Close()
+ if err != nil {
+ log.Errorf("close log append error:%s", err.Error())
+ }
+ la.producer = nil
+}