You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by zt...@apache.org on 2021/10/25 11:24:15 UTC
[dubbo-go-pixiu] branch eventmesh updated: polish kafka init (#287)
This is an automated email from the ASF dual-hosted git repository.
ztelur pushed a commit to branch eventmesh
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/eventmesh by this push:
new 1e75e1e polish kafka init (#287)
1e75e1e is described below
commit 1e75e1ec87140fc838d661210d35a13b432627d5
Author: YuDong Tang <58...@qq.com>
AuthorDate: Mon Oct 25 19:24:08 2021 +0800
polish kafka init (#287)
---
go.sum | 1 +
pkg/client/mq/kafka_facade.go | 36 +++++++++++++++++++----
pkg/client/mq/mq.go | 20 +++++--------
pkg/filter/event/event.go | 68 ++++++++++++++++++-------------------------
4 files changed, 69 insertions(+), 56 deletions(-)
diff --git a/go.sum b/go.sum
index 8eb4eea..1f5ba1e 100644
--- a/go.sum
+++ b/go.sum
@@ -67,6 +67,7 @@ github.com/RoaringBitmap/roaring v0.6.1 h1:O36Tdaj1Fi/zyr25shTHwlQPGdq53+u4WkM08
github.com/RoaringBitmap/roaring v0.6.1/go.mod h1:WZ83fjBF/7uBHi6QoFyfGL4+xuV4Qn+xFkm4+vSzrhE=
github.com/Shopify/sarama v1.19.0 h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
+github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
diff --git a/pkg/client/mq/kafka_facade.go b/pkg/client/mq/kafka_facade.go
index f2abc21..99b3661 100644
--- a/pkg/client/mq/kafka_facade.go
+++ b/pkg/client/mq/kafka_facade.go
@@ -37,13 +37,24 @@ import (
perrors "github.com/pkg/errors"
)
-func NewKafkaConsumerFacade(addrs []string, config *sarama.Config) (*KafkaConsumerFacade, error) {
- consumer, err := sarama.NewConsumer(addrs, config)
+func NewKafkaConsumerFacade(config event.KafkaConsumerConfig) (*KafkaConsumerFacade, error) {
+ c := sarama.NewConfig()
+ c.ClientID = config.ClientID
+ c.Metadata.Full = config.Metadata.Full
+ c.Metadata.Retry.Max = config.Metadata.Retry.Max
+ c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
+ if config.ProtocolVersion != "" {
+ version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
+ if err != nil {
+ return nil, err
+ }
+ c.Version = version
+ }
+ consumer, err := sarama.NewConsumer(config.Brokers, c)
if err != nil {
return nil, err
}
- // does not set up cookiejar may cause some problem
return &KafkaConsumerFacade{consumer: consumer, httpClient: &http.Client{Timeout: 5 * time.Second}, done: make(chan struct{})}, nil
}
@@ -186,8 +197,23 @@ func (f *KafkaConsumerFacade) Stop() {
f.wg.Wait()
}
-func NewKafkaProviderFacade(addrs []string, config *sarama.Config) (*KafkaProducerFacade, error) {
- producer, err := sarama.NewSyncProducer(addrs, config)
+func NewKafkaProviderFacade(config event.KafkaProducerConfig) (*KafkaProducerFacade, error) {
+ c := sarama.NewConfig()
+ c.Producer.Return.Successes = true
+ c.Producer.Return.Errors = true
+ c.Producer.RequiredAcks = sarama.WaitForLocal
+ c.Metadata.Full = config.Metadata.Full
+ c.Metadata.Retry.Max = config.Metadata.Retry.Max
+ c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
+ c.Producer.MaxMessageBytes = config.Producer.MaxMessageBytes
+ if config.ProtocolVersion != "" {
+ version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
+ if err != nil {
+ return nil, err
+ }
+ c.Version = version
+ }
+ producer, err := sarama.NewSyncProducer(config.Brokers, c)
if err != nil {
return nil, err
}
diff --git a/pkg/client/mq/mq.go b/pkg/client/mq/mq.go
index d05e51f..de57170 100644
--- a/pkg/client/mq/mq.go
+++ b/pkg/client/mq/mq.go
@@ -57,20 +57,16 @@ func NewSingletonMQClient(config event.Config) *Client {
func NewMQClient(config event.Config) (*Client, error) {
var c *Client
ctx := context.Background()
-
- sc := config.ToSaramaConfig()
- addrs := strings.Split(config.Endpoints, ",")
- cf, err := NewKafkaConsumerFacade(addrs, sc)
- if err != nil {
- return nil, err
- }
- pf, err := NewKafkaProviderFacade(addrs, sc)
- if err != nil {
- return nil, err
- }
-
switch config.MqType {
case constant.MQTypeKafka:
+ cf, err := NewKafkaConsumerFacade(config.KafkaConsumerConfig)
+ if err != nil {
+ return nil, err
+ }
+ pf, err := NewKafkaProviderFacade(config.KafkaProducerConfig)
+ if err != nil {
+ return nil, err
+ }
c = &Client{
ctx: ctx,
consumerFacade: cf,
diff --git a/pkg/filter/event/event.go b/pkg/filter/event/event.go
index 56bfb0e..e367819 100644
--- a/pkg/filter/event/event.go
+++ b/pkg/filter/event/event.go
@@ -25,11 +25,6 @@ import (
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
"github.com/apache/dubbo-go-pixiu/pkg/context/http"
- "github.com/apache/dubbo-go-pixiu/pkg/logger"
-)
-
-import (
- "github.com/Shopify/sarama"
)
const (
@@ -52,48 +47,43 @@ type (
}
Config struct {
- ClientId string `yaml:"client_id" json:"client_id"`
- Endpoints string `yaml:"endpoints" json:"endpoints"`
- MqType MQType `yaml:"type" json:"type"`
- KafkaVersion string `yaml:"kafka_version" json:"kafka_version"`
- Retry int `yaml:"retry" json:"retry" default:"5"`
- Timeout time.Duration `yaml:"timeout" json:"timeout" default:"2s"`
- Offset string `yaml:"offset" json:"offset" default:"oldest"` // Offset newest or oldest
+ ClientId string `yaml:"client_id" json:"client_id"`
+ Endpoints string `yaml:"endpoints" json:"endpoints"`
+ MqType MQType `yaml:"type" json:"type"`
+ Retry int `yaml:"retry" json:"retry" default:"5"`
+ Timeout time.Duration `yaml:"timeout" json:"timeout" default:"2s"`
+ KafkaConsumerConfig KafkaConsumerConfig `yaml:"kafka_consumer_config" json:"kafka_consumer_config"`
+ KafkaProducerConfig KafkaProducerConfig `yaml:"kafka_producer_config" json:"kafka_producer_config"`
}
-)
-
-func (c *Config) ToSaramaConfig() *sarama.Config {
- config := sarama.NewConfig()
- version, err := sarama.ParseKafkaVersion(c.KafkaVersion)
- if err != nil {
- version = sarama.V2_0_0_0
- logger.Warnf("kafka version is invalid, use sarama.V2_0_0_0 instead, err: %s", err.Error())
+ KafkaConsumerConfig struct {
+ Brokers []string `yaml:"brokers" json:"brokers"`
+ ProtocolVersion string `yaml:"protocol_version" json:"protocol_version"`
+ ClientID string `yaml:"client_id" json:"client_id"`
+ Metadata Metadata `yaml:"metadata" json:"metadata"`
}
- config.Version = version
-
- offset := sarama.OffsetNewest
- switch c.Offset {
- case "newest":
- offset = sarama.OffsetNewest
- case "oldest":
- offset = sarama.OffsetOldest
- default:
- logger.Warn("offset is invalid, use oldest instead")
+
+ KafkaProducerConfig struct {
+ Brokers []string `yaml:"brokers" json:"brokers"`
+ ProtocolVersion string `yaml:"protocol_version" json:"protocol_version"`
+ Metadata Metadata `yaml:"metadata" json:"metadata"`
+ Producer Producer `yaml:"producer" json:"producer"`
}
- config.Consumer.Offsets.Initial = offset
- config.ClientID = "pixiu-kafka"
- if c.ClientId != "" {
- config.ClientID = c.ClientId
+ Metadata struct {
+ Full bool `yaml:"full" json:"full"`
+ Retry MetadataRetry `yaml:"retry" json:"retry"`
}
- logger.Infof("kafka client id is %s", config.ClientID)
- config.Producer.Retry.Max = c.Retry
- config.Producer.Timeout = c.Timeout
+ MetadataRetry struct {
+ Max int `yaml:"max" json:"max"`
+ Backoff time.Duration `yaml:"backoff" json:"backoff"`
+ }
- return config
-}
+ Producer struct {
+ MaxMessageBytes int `yaml:"max_message_bytes" json:"max_message_bytes"`
+ }
+)
func (p *Plugin) Kind() string {
return Kind