You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/16 08:35:21 UTC

[pulsar] branch master updated: [go function]support log topic for go function (#4008)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ccf4581  [go function]support log topic for go function (#4008)
ccf4581 is described below

commit ccf4581e91e1c8b60488f1fd0c8f114032c3c950
Author: 冉小龙 <rx...@qq.com>
AuthorDate: Tue Apr 16 16:35:15 2019 +0800

    [go function]support log topic for go function (#4008)
    
    ### Motivation
    
    support log topic for go function, user-friendly debugging.
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
---
 pulsar-function-go/conf/conf.yaml    |  2 +-
 pulsar-function-go/go.mod            |  4 +-
 pulsar-function-go/go.sum            |  4 +-
 pulsar-function-go/pf/context.go     |  1 +
 pulsar-function-go/pf/instance.go    | 46 ++++++++++++++++++-
 pulsar-function-go/pf/logAppender.go | 87 ++++++++++++++++++++++++++++++++++++
 6 files changed, 136 insertions(+), 8 deletions(-)

diff --git a/pulsar-function-go/conf/conf.yaml b/pulsar-function-go/conf/conf.yaml
index f786cca..86cf8f2 100644
--- a/pulsar-function-go/conf/conf.yaml
+++ b/pulsar-function-go/conf/conf.yaml
@@ -29,7 +29,7 @@ killAfterIdleMs: 50000
 tenant: ""
 nameSpace: ""
 name: "go-function"
-logTopic: ""
+logTopic: "log-topic"
 processingGuarantees: 0
 secretsMap: ""
 runtime: 0
diff --git a/pulsar-function-go/go.mod b/pulsar-function-go/go.mod
index 0762d53..f9957c2 100644
--- a/pulsar-function-go/go.mod
+++ b/pulsar-function-go/go.mod
@@ -1,11 +1,9 @@
 module github.com/apache/pulsar/pulsar-function-go
 
 require (
-	github.com/apache/pulsar/pulsar-client-go v0.0.0-20190312044336-ff4db8db12be
-	github.com/davecgh/go-spew v1.1.1
+	github.com/apache/pulsar/pulsar-client-go v0.0.0-20190320194916-f03971cb5b4f
 	github.com/golang/protobuf v1.3.0
 	github.com/sirupsen/logrus v1.4.0
 	github.com/stretchr/testify v1.3.0
-	gopkg.in/natefinch/lumberjack.v2 v2.0.0
 	gopkg.in/yaml.v2 v2.2.2
 )
diff --git a/pulsar-function-go/go.sum b/pulsar-function-go/go.sum
index 4a17410..2187bf9 100644
--- a/pulsar-function-go/go.sum
+++ b/pulsar-function-go/go.sum
@@ -1,7 +1,7 @@
 github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
-github.com/apache/pulsar/pulsar-client-go v0.0.0-20190312044336-ff4db8db12be h1:BpPXJTLeqlWQPlMZxFP+/Sm1Zva7NJB2gWRfWlN8xi8=
-github.com/apache/pulsar/pulsar-client-go v0.0.0-20190312044336-ff4db8db12be/go.mod h1:Dt5jPpS2v4WlPya7e9jXrqGN+6BVzVyaBw+bixjLFFU=
+github.com/apache/pulsar/pulsar-client-go v0.0.0-20190320194916-f03971cb5b4f h1:X408m6eknpEC7CoRBLzkvYzNpslP2D4/xQJNfR5Q0mk=
+github.com/apache/pulsar/pulsar-client-go v0.0.0-20190320194916-f03971cb5b4f/go.mod h1:Dt5jPpS2v4WlPya7e9jXrqGN+6BVzVyaBw+bixjLFFU=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
diff --git a/pulsar-function-go/pf/context.go b/pulsar-function-go/pf/context.go
index 4818326..9681ae7 100644
--- a/pulsar-function-go/pf/context.go
+++ b/pulsar-function-go/pf/context.go
@@ -27,6 +27,7 @@ type FunctionContext struct {
 	instanceConf *instanceConf
 	userConfigs  map[string]interface{}
 	inputTopics  []string
+	logAppender  *LogAppender
 }
 
 func NewFuncContext() *FunctionContext {
diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go
index 9c4e670..d809932 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -63,6 +63,11 @@ func (gi *goInstance) startFunction(function function) error {
 		log.Errorf("setup consumer failed, error is:%v", err)
 		return err
 	}
+	err = gi.setupLogHandler()
+	if err != nil {
+		log.Errorf("setup log appender failed, error is:%v", err)
+		return err
+	}
 
 CLOSE:
 	for {
@@ -75,6 +80,9 @@ CLOSE:
 			if autoAck && atMostOnce {
 				gi.ackInputMessage(msgInput)
 			}
+
+			gi.addLogTopicHandler()
+
 			output, err := gi.handlerMsg(msgInput)
 			if err != nil {
 				log.Errorf("handler message error:%v", err)
@@ -83,6 +91,7 @@ CLOSE:
 				}
 				return err
 			}
+
 			gi.processResult(msgInput, output)
 
 		case <-time.After(getIdleTimeout(time.Millisecond * gi.context.instanceConf.killAfterIdleMs)):
@@ -91,6 +100,7 @@ CLOSE:
 		}
 	}
 
+	gi.closeLogTopic()
 	gi.close()
 	return nil
 }
@@ -248,8 +258,7 @@ func (gi *goInstance) ackInputMessage(inputMessage pulsar.Message) {
 }
 
 func (gi *goInstance) nackInputMessage(inputMessage pulsar.Message) {
-	//todo: in the current version of pulsar-client-go, we do not support this operation
-	//gi.consumers[inputMessage.Topic()].Nack(inputMessage)
+	gi.consumers[inputMessage.Topic()].Nack(inputMessage)
 }
 
 func getIdleTimeout(timeoutMilliSecond time.Duration) time.Duration {
@@ -259,6 +268,39 @@ func getIdleTimeout(timeoutMilliSecond time.Duration) time.Duration {
 	return timeoutMilliSecond
 }
 
+func (gi *goInstance) setupLogHandler() error {
+	if gi.context.instanceConf.funcDetails.GetLogTopic() != "" {
+		gi.context.logAppender = NewLogAppender(
+			gi.client,                                         //pulsar client
+			gi.context.instanceConf.funcDetails.GetLogTopic(), //log topic
+			getDefaultSubscriptionName(gi.context.instanceConf.funcDetails.Tenant, //fqn
+				gi.context.instanceConf.funcDetails.Namespace,
+				gi.context.instanceConf.funcDetails.Name),
+		)
+		return gi.context.logAppender.Start()
+	}
+	return nil
+}
+
+func (gi *goInstance) addLogTopicHandler() {
+	if gi.context.logAppender == nil {
+		panic("please init logAppender")
+	}
+
+	for _, logByte := range log.StrEntry {
+		gi.context.logAppender.Append([]byte(logByte))
+	}
+}
+
+func (gi *goInstance) closeLogTopic() {
+	log.Info("closing log topic...")
+	if gi.context.logAppender == nil {
+		return
+	}
+	gi.context.logAppender.Stop()
+	gi.context.logAppender = nil
+}
+
 func (gi *goInstance) close() {
 	log.Info("closing go instance...")
 	if gi.producer != nil {
diff --git a/pulsar-function-go/pf/logAppender.go b/pulsar-function-go/pf/logAppender.go
new file mode 100644
index 0000000..f46b7c6
--- /dev/null
+++ b/pulsar-function-go/pf/logAppender.go
@@ -0,0 +1,87 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pf
+
+import (
+	"context"
+	"time"
+
+	"github.com/apache/pulsar/pulsar-client-go/pulsar"
+	"github.com/apache/pulsar/pulsar-function-go/log"
+)
+
+type LogAppender struct {
+	pulsarClient pulsar.Client
+	logTopic     string
+	fqn          string
+	producer     pulsar.Producer
+}
+
+func NewLogAppender(client pulsar.Client, logTopic, fqn string) *LogAppender {
+	logAppender := &LogAppender{
+		pulsarClient: client,
+		logTopic:     logTopic,
+		fqn:          fqn,
+	}
+	return logAppender
+}
+
+func (la *LogAppender) Start() error {
+	producer, err := la.pulsarClient.CreateProducer(pulsar.ProducerOptions{
+		Topic:                   la.logTopic,
+		BlockIfQueueFull:        false,
+		Batching:                true,
+		CompressionType:         pulsar.LZ4,
+		BatchingMaxPublishDelay: 100 * time.Millisecond,
+		Properties: map[string]string{
+			"function": la.fqn,
+		},
+	})
+	if err != nil {
+		log.Errorf("create producer error:%s", err.Error())
+		return err
+	}
+	la.producer = producer
+	return nil
+}
+
+func (la *LogAppender) Append(logByte []byte) {
+	ctx := context.Background()
+	asyncMsg := pulsar.ProducerMessage{
+		Payload: logByte,
+	}
+	la.producer.SendAsync(ctx, asyncMsg, func(msg pulsar.ProducerMessage, err error) {
+		if err != nil {
+			log.Fatal(err)
+		}
+	})
+}
+
+func (la *LogAppender) GetName() string {
+	return la.fqn
+}
+
+func (la *LogAppender) Stop() {
+	err := la.producer.Close()
+	if err != nil {
+		log.Errorf("close log append error:%s", err.Error())
+	}
+	la.producer = nil
+}