You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by zt...@apache.org on 2021/10/25 11:24:15 UTC

[dubbo-go-pixiu] branch eventmesh updated: polish kafka init (#287)

This is an automated email from the ASF dual-hosted git repository.

ztelur pushed a commit to branch eventmesh
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/eventmesh by this push:
     new 1e75e1e  polish kafka init (#287)
1e75e1e is described below

commit 1e75e1ec87140fc838d661210d35a13b432627d5
Author: YuDong Tang <58...@qq.com>
AuthorDate: Mon Oct 25 19:24:08 2021 +0800

    polish kafka init (#287)
---
 go.sum                        |  1 +
 pkg/client/mq/kafka_facade.go | 36 +++++++++++++++++++----
 pkg/client/mq/mq.go           | 20 +++++--------
 pkg/filter/event/event.go     | 68 ++++++++++++++++++-------------------------
 4 files changed, 69 insertions(+), 56 deletions(-)

diff --git a/go.sum b/go.sum
index 8eb4eea..1f5ba1e 100644
--- a/go.sum
+++ b/go.sum
@@ -67,6 +67,7 @@ github.com/RoaringBitmap/roaring v0.6.1 h1:O36Tdaj1Fi/zyr25shTHwlQPGdq53+u4WkM08
 github.com/RoaringBitmap/roaring v0.6.1/go.mod h1:WZ83fjBF/7uBHi6QoFyfGL4+xuV4Qn+xFkm4+vSzrhE=
 github.com/Shopify/sarama v1.19.0 h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s=
 github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
+github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
 github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
 github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
 github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
diff --git a/pkg/client/mq/kafka_facade.go b/pkg/client/mq/kafka_facade.go
index f2abc21..99b3661 100644
--- a/pkg/client/mq/kafka_facade.go
+++ b/pkg/client/mq/kafka_facade.go
@@ -37,13 +37,24 @@ import (
 	perrors "github.com/pkg/errors"
 )
 
-func NewKafkaConsumerFacade(addrs []string, config *sarama.Config) (*KafkaConsumerFacade, error) {
-	consumer, err := sarama.NewConsumer(addrs, config)
+func NewKafkaConsumerFacade(config event.KafkaConsumerConfig) (*KafkaConsumerFacade, error) {
+	c := sarama.NewConfig()
+	c.ClientID = config.ClientID
+	c.Metadata.Full = config.Metadata.Full
+	c.Metadata.Retry.Max = config.Metadata.Retry.Max
+	c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
+	if config.ProtocolVersion != "" {
+		version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
+		if err != nil {
+			return nil, err
+		}
+		c.Version = version
+	}
+	consumer, err := sarama.NewConsumer(config.Brokers, c)
 	if err != nil {
 		return nil, err
 	}
 
-	// does not set up cookiejar may cause some problem
 	return &KafkaConsumerFacade{consumer: consumer, httpClient: &http.Client{Timeout: 5 * time.Second}, done: make(chan struct{})}, nil
 }
 
@@ -186,8 +197,23 @@ func (f *KafkaConsumerFacade) Stop() {
 	f.wg.Wait()
 }
 
-func NewKafkaProviderFacade(addrs []string, config *sarama.Config) (*KafkaProducerFacade, error) {
-	producer, err := sarama.NewSyncProducer(addrs, config)
+func NewKafkaProviderFacade(config event.KafkaProducerConfig) (*KafkaProducerFacade, error) {
+	c := sarama.NewConfig()
+	c.Producer.Return.Successes = true
+	c.Producer.Return.Errors = true
+	c.Producer.RequiredAcks = sarama.WaitForLocal
+	c.Metadata.Full = config.Metadata.Full
+	c.Metadata.Retry.Max = config.Metadata.Retry.Max
+	c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
+	c.Producer.MaxMessageBytes = config.Producer.MaxMessageBytes
+	if config.ProtocolVersion != "" {
+		version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
+		if err != nil {
+			return nil, err
+		}
+		c.Version = version
+	}
+	producer, err := sarama.NewSyncProducer(config.Brokers, c)
 	if err != nil {
 		return nil, err
 	}
diff --git a/pkg/client/mq/mq.go b/pkg/client/mq/mq.go
index d05e51f..de57170 100644
--- a/pkg/client/mq/mq.go
+++ b/pkg/client/mq/mq.go
@@ -57,20 +57,16 @@ func NewSingletonMQClient(config event.Config) *Client {
 func NewMQClient(config event.Config) (*Client, error) {
 	var c *Client
 	ctx := context.Background()
-
-	sc := config.ToSaramaConfig()
-	addrs := strings.Split(config.Endpoints, ",")
-	cf, err := NewKafkaConsumerFacade(addrs, sc)
-	if err != nil {
-		return nil, err
-	}
-	pf, err := NewKafkaProviderFacade(addrs, sc)
-	if err != nil {
-		return nil, err
-	}
-
 	switch config.MqType {
 	case constant.MQTypeKafka:
+		cf, err := NewKafkaConsumerFacade(config.KafkaConsumerConfig)
+		if err != nil {
+			return nil, err
+		}
+		pf, err := NewKafkaProviderFacade(config.KafkaProducerConfig)
+		if err != nil {
+			return nil, err
+		}
 		c = &Client{
 			ctx:            ctx,
 			consumerFacade: cf,
diff --git a/pkg/filter/event/event.go b/pkg/filter/event/event.go
index 56bfb0e..e367819 100644
--- a/pkg/filter/event/event.go
+++ b/pkg/filter/event/event.go
@@ -25,11 +25,6 @@ import (
 	"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
 	"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
 	"github.com/apache/dubbo-go-pixiu/pkg/context/http"
-	"github.com/apache/dubbo-go-pixiu/pkg/logger"
-)
-
-import (
-	"github.com/Shopify/sarama"
 )
 
 const (
@@ -52,48 +47,43 @@ type (
 	}
 
 	Config struct {
-		ClientId     string        `yaml:"client_id" json:"client_id"`
-		Endpoints    string        `yaml:"endpoints" json:"endpoints"`
-		MqType       MQType        `yaml:"type" json:"type"`
-		KafkaVersion string        `yaml:"kafka_version" json:"kafka_version"`
-		Retry        int           `yaml:"retry" json:"retry" default:"5"`
-		Timeout      time.Duration `yaml:"timeout" json:"timeout" default:"2s"`
-		Offset       string        `yaml:"offset" json:"offset" default:"oldest"` // Offset newest or oldest
+		ClientId            string              `yaml:"client_id" json:"client_id"`
+		Endpoints           string              `yaml:"endpoints" json:"endpoints"`
+		MqType              MQType              `yaml:"type" json:"type"`
+		Retry               int                 `yaml:"retry" json:"retry" default:"5"`
+		Timeout             time.Duration       `yaml:"timeout" json:"timeout" default:"2s"`
+		KafkaConsumerConfig KafkaConsumerConfig `yaml:"kafka_consumer_config" json:"kafka_consumer_config"`
+		KafkaProducerConfig KafkaProducerConfig `yaml:"kafka_producer_config" json:"kafka_producer_config"`
 	}
-)
-
-func (c *Config) ToSaramaConfig() *sarama.Config {
-	config := sarama.NewConfig()
 
-	version, err := sarama.ParseKafkaVersion(c.KafkaVersion)
-	if err != nil {
-		version = sarama.V2_0_0_0
-		logger.Warnf("kafka version is invalid, use sarama.V2_0_0_0 instead, err: %s", err.Error())
+	KafkaConsumerConfig struct {
+		Brokers         []string `yaml:"brokers" json:"brokers"`
+		ProtocolVersion string   `yaml:"protocol_version" json:"protocol_version"`
+		ClientID        string   `yaml:"client_id" json:"client_id"`
+		Metadata        Metadata `yaml:"metadata" json:"metadata"`
 	}
-	config.Version = version
-
-	offset := sarama.OffsetNewest
-	switch c.Offset {
-	case "newest":
-		offset = sarama.OffsetNewest
-	case "oldest":
-		offset = sarama.OffsetOldest
-	default:
-		logger.Warn("offset is invalid, use oldest instead")
+
+	KafkaProducerConfig struct {
+		Brokers         []string `yaml:"brokers" json:"brokers"`
+		ProtocolVersion string   `yaml:"protocol_version" json:"protocol_version"`
+		Metadata        Metadata `yaml:"metadata" json:"metadata"`
+		Producer        Producer `yaml:"producer" json:"producer"`
 	}
-	config.Consumer.Offsets.Initial = offset
 
-	config.ClientID = "pixiu-kafka"
-	if c.ClientId != "" {
-		config.ClientID = c.ClientId
+	Metadata struct {
+		Full  bool          `yaml:"full" json:"full"`
+		Retry MetadataRetry `yaml:"retry" json:"retry"`
 	}
-	logger.Infof("kafka client id is %s", config.ClientID)
 
-	config.Producer.Retry.Max = c.Retry
-	config.Producer.Timeout = c.Timeout
+	MetadataRetry struct {
+		Max     int           `yaml:"max" json:"max"`
+		Backoff time.Duration `yaml:"backoff" json:"backoff"`
+	}
 
-	return config
-}
+	Producer struct {
+		MaxMessageBytes int `yaml:"max_message_bytes" json:"max_message_bytes"`
+	}
+)
 
 func (p *Plugin) Kind() string {
 	return Kind