You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2022/09/19 14:56:21 UTC
[incubator-eventmesh] branch eventmesh-server-go updated: add consumer implement
This is an automated email from the ASF dual-hosted git repository.
walleliu pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/eventmesh-server-go by this push:
new 194ba8e6 add consumer implement
new 5283d837 Merge pull request #1336 from walleliu1016/eventmesh-server-go-runtime
194ba8e6 is described below
commit 194ba8e6e57bd259c6200a66ec82cd57b3bdf880
Author: walleliu1016 <li...@163.com>
AuthorDate: Mon Sep 19 22:55:10 2022 +0800
add consumer implement
---
eventmesh-server-go/config/grpc.go | 27 +++
eventmesh-server-go/go.mod | 3 +-
eventmesh-server-go/go.sum | 2 +
.../{config/grpc.go => pkg/util/id.go} | 30 ++-
eventmesh-server-go/pkg/util/id_test.go | 94 +++++++++
.../{config/grpc.go => pkg/util/pid.go} | 23 +-
.../pkg/util/{until.go => pid_test.go} | 50 ++---
.../{config/grpc.go => pkg/version/version.go} | 17 +-
.../core/protocol/grpc/config/consumer_group.go | 80 +++++++
.../core/protocol/grpc/config/producer_group.go} | 13 +-
.../core/protocol/grpc/consumer/consumer.go | 233 +++++++++++++++++++++
.../core/protocol/grpc/consumer/group_client.go} | 32 ++-
.../core/protocol/grpc/consumer/group_option.go | 125 +++++++++++
.../runtime/core/protocol/grpc/consumer/manager.go | 202 ++++++++++++++++++
.../core/protocol/grpc/push/context.go} | 27 ++-
.../runtime/core/protocol/grpc/push/handler.go | 88 ++++++++
.../core/protocol/grpc/retry/context.go} | 24 ++-
.../runtime/core/wrapper/producer.go | 90 ++++++++
.../runtime/core/wrapper/producer_test.go | 79 +++++++
19 files changed, 1126 insertions(+), 113 deletions(-)
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/config/grpc.go
index 39abcef8..adbeb33f 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/config/grpc.go
@@ -15,6 +15,8 @@
package config
+import "time"
+
// GRPCOption configuratin for grpc server
type GRPCOption struct {
Port string `yaml:"port" toml:"port"`
@@ -26,4 +28,29 @@ type GRPCOption struct {
// will start on given port, and you can check
// on http://ip:port/pprof/debug
*PProfOption `yaml:"pprof" toml:"pprof"`
+
+ // SubscribePoolSize pool in handle subscribe msg
+ // default to 10
+ SubscribePoolSize int `yaml:"subscribe-pool-size" toml:"subscribe-pool-size"`
+ // RetryPoolSize pool in handle retry msg
+ // default to 10
+ RetryPoolSize int `yaml:"retry-pool-size" toml:"retry-pool-size"`
+ // PushMessagePoolSize pool to push message
+ // default to 10
+ PushMessagePoolSize int `yaml:"push-message-pool-size" toml:"push-message-pool-size"`
+
+ //MsgReqNumPerSecond
+ MsgReqNumPerSecond float64 `yaml:"msg-req-num-per-second" toml:"msg-req-num-per-second"`
+
+ // RegistryName name for registry plugin support nacos or etcd
+ RegistryName string `yaml:"registry-name" toml:"registry-name"`
+
+ // Cluster cluster for grpc server
+ Cluster string `yaml:"cluster" toml:"cluster"`
+
+ // IDC idc for grpc server
+ IDC string `yaml:"idc" toml:"idc"`
+
+ // SessionExpiredInMills internal to clean the not work session consumer
+ SessionExpiredInMills time.Duration `yaml:"session-expired-in-mills"`
}
diff --git a/eventmesh-server-go/go.mod b/eventmesh-server-go/go.mod
index 5a40104f..e3e34066 100644
--- a/eventmesh-server-go/go.mod
+++ b/eventmesh-server-go/go.mod
@@ -22,6 +22,7 @@ require (
github.com/gogf/gf v1.16.9
github.com/hashicorp/go-multierror v1.1.1
github.com/lestrrat-go/strftime v1.0.6
+ github.com/liyue201/gostl v1.0.1
github.com/nacos-group/nacos-sdk-go/v2 v2.0.3
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v1.5.0
@@ -87,7 +88,7 @@ require (
github.com/deckarep/golang-set/v2 v2.1.0
github.com/gin-contrib/pprof v1.4.0
github.com/gin-gonic/gin v1.8.1
- github.com/golang/mock v1.6.0
+ github.com/golang/mock v1.6.0 // indirect
github.com/google/uuid v1.1.2
github.com/stretchr/testify v1.8.0
github.com/unrolled/secure v1.12.0
diff --git a/eventmesh-server-go/go.sum b/eventmesh-server-go/go.sum
index 9faec8ee..7eda5f12 100644
--- a/eventmesh-server-go/go.sum
+++ b/eventmesh-server-go/go.sum
@@ -154,6 +154,8 @@ github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2t
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is=
github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205AhTIGQQ=
github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw=
+github.com/liyue201/gostl v1.0.1 h1:VQdvogZ90WpCb5WdG9UxS6r5ulnYEp8VfEMEZXVtpIs=
+github.com/liyue201/gostl v1.0.1/go.mod h1:rgK+T1a0sQ1+CsAonfuD1J8C4iuGfOU9VAt9mmR/m10=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/pkg/util/id.go
similarity index 57%
copy from eventmesh-server-go/config/grpc.go
copy to eventmesh-server-go/pkg/util/id.go
index 39abcef8..5a3833bf 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/pkg/util/id.go
@@ -13,17 +13,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package config
+package util
-// GRPCOption configuratin for grpc server
-type GRPCOption struct {
- Port string `yaml:"port" toml:"port"`
+import (
+ "fmt"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/version"
+ "strings"
+)
- // TLSOption process with the tls configuration
- *TLSOption `yaml:"tls" toml:"tls"`
+func BuildMeshTcpClientID(sys, purpose, cluster string) string {
+ return fmt.Sprintf("%v-%v-%v-%v-%v",
+ strings.TrimSpace(sys),
+ strings.TrimSpace(purpose),
+ strings.TrimSpace(cluster),
+ strings.TrimSpace(version.Current),
+ strings.TrimSpace(PID()))
+}
- // PProfOption if pprof is enabled, server
- // will start on given port, and you can check
- // on http://ip:port/pprof/debug
- *PProfOption `yaml:"pprof" toml:"pprof"`
+func BuildMeshClientID(group, cluster string) string {
+ return fmt.Sprintf("%v-(%v)-%v-%v",
+ strings.TrimSpace(group),
+ strings.TrimSpace(cluster),
+ strings.TrimSpace(version.Current),
+ strings.TrimSpace(PID()))
}
diff --git a/eventmesh-server-go/pkg/util/id_test.go b/eventmesh-server-go/pkg/util/id_test.go
new file mode 100644
index 00000000..61d15a6a
--- /dev/null
+++ b/eventmesh-server-go/pkg/util/id_test.go
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package util
+
+import (
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/version"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestBuildMeshClientID(t *testing.T) {
+ type args struct {
+ group string
+ cluster string
+ }
+ tests := []struct {
+ name string
+ args args
+ want string
+ }{
+ {
+ name: "build client id",
+ args: args{
+ group: "test-group",
+ cluster: "idc",
+ },
+ want: "test-group-(idc)-" + version.Current + "-" + PID(),
+ },
+ {
+ name: "build client id with upper",
+ args: args{
+ group: "Test-group",
+ cluster: "IDC",
+ },
+ want: "Test-group-(IDC)-" + version.Current + "-" + PID(),
+ },
+ {
+ name: "build client id wit space",
+ args: args{
+ group: " Test-group ",
+ cluster: "idc ",
+ },
+ want: "Test-group-(idc)-" + version.Current + "-" + PID(),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := BuildMeshClientID(tt.args.group, tt.args.cluster); got != tt.want {
+ t.Errorf("BuildMeshClientID() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestBuildMeshTcpClientID(t *testing.T) {
+ type args struct {
+ sys string
+ purpose string
+ cluster string
+ }
+ tests := []struct {
+ name string
+ args args
+ want string
+ }{
+ {
+ name: "uniq mesh tcp client id",
+ args: args{
+ sys: "1234",
+ purpose: "test",
+ cluster: "c1",
+ },
+ want: "1234-test-c1-" + version.Current + "-" + PID(),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ assert.Equalf(t, tt.want, BuildMeshTcpClientID(tt.args.sys, tt.args.purpose, tt.args.cluster), "BuildMeshTcpClientID(%v, %v, %v)", tt.args.sys, tt.args.purpose, tt.args.cluster)
+ })
+ }
+}
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/pkg/util/pid.go
similarity index 68%
copy from eventmesh-server-go/config/grpc.go
copy to eventmesh-server-go/pkg/util/pid.go
index 39abcef8..97208d9f 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/pkg/util/pid.go
@@ -13,17 +13,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package config
+package util
-// GRPCOption configuratin for grpc server
-type GRPCOption struct {
- Port string `yaml:"port" toml:"port"`
+import (
+ "fmt"
+ "os"
+)
- // TLSOption process with the tls configuration
- *TLSOption `yaml:"tls" toml:"tls"`
+var _pid string
- // PProfOption if pprof is enabled, server
- // will start on given port, and you can check
- // on http://ip:port/pprof/debug
- *PProfOption `yaml:"pprof" toml:"pprof"`
+func init() {
+ _pid = fmt.Sprintf("%v", os.Getpid())
+}
+
+// PID return the current process id
+func PID() string {
+ return _pid
}
diff --git a/eventmesh-server-go/pkg/util/until.go b/eventmesh-server-go/pkg/util/pid_test.go
similarity index 52%
rename from eventmesh-server-go/pkg/util/until.go
rename to eventmesh-server-go/pkg/util/pid_test.go
index 7669be21..42483d96 100644
--- a/eventmesh-server-go/pkg/util/until.go
+++ b/eventmesh-server-go/pkg/util/pid_test.go
@@ -16,44 +16,22 @@
package util
import (
- "context"
- "time"
+ "github.com/stretchr/testify/assert"
+ "testing"
)
-// Condition the condition to exit the loop
-type Condition func() bool
-
-// defaultInternal internal to check the condition
-var defaultInternal = time.Second
-
-// Until wait for the condition is true
-type Until struct {
- Cond Condition
- internal time.Duration
-}
-
-// NewUntil create a new until instance
-func NewUntil(cond Condition, internal time.Duration) *Until {
- if internal == time.Duration(0) {
- internal = defaultInternal
+func TestPID(t *testing.T) {
+ tests := []struct {
+ name string
+ }{
+ {
+ name: "pid",
+ },
}
- return &Until{
- Cond: cond,
- internal: internal,
- }
-}
-
-// Wait loop until the condition is true
-func (u *Until) Wait(ctx context.Context) {
- tick := time.NewTicker(u.internal)
- for {
- select {
- case <-ctx.Done():
- return
- case <-tick.C:
- if u.Cond() {
- return
- }
- }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got := PID()
+ assert.NotEmpty(t, got)
+ })
}
}
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/pkg/version/version.go
similarity index 67%
copy from eventmesh-server-go/config/grpc.go
copy to eventmesh-server-go/pkg/version/version.go
index 39abcef8..2a7828fd 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/pkg/version/version.go
@@ -13,17 +13,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package config
+package version
-// GRPCOption configuratin for grpc server
-type GRPCOption struct {
- Port string `yaml:"port" toml:"port"`
-
- // TLSOption process with the tls configuration
- *TLSOption `yaml:"tls" toml:"tls"`
-
- // PProfOption if pprof is enabled, server
- // will start on given port, and you can check
- // on http://ip:port/pprof/debug
- *PProfOption `yaml:"pprof" toml:"pprof"`
-}
+var (
+ Current = "v0.0.1"
+)
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/config/consumer_group.go b/eventmesh-server-go/runtime/core/protocol/grpc/config/consumer_group.go
new file mode 100644
index 00000000..10218232
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/config/consumer_group.go
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package config
+
+import (
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+ "github.com/liyue201/gostl/ds/set"
+ "sync"
+)
+
+type ConsumerGroupConfig struct {
+ ConsumerGroup string
+
+ // key is topic, value is ConsumerGroupTopicConfig
+ ConsumerGroupTopicConfigs sync.Map[ConsumerGroupTopicConfig]
+}
+
+type GRPCType string
+
+const (
+ WEBHOOK GRPCType = "WEBHOOK"
+ STREAM GRPCType = "STREAM"
+)
+
+type StateAction string
+
+const (
+ NEW StateAction = "NEW"
+ CHANGE StateAction = "CHANGE"
+ DELETE StateAction = "DELETE"
+)
+
+type ConsumerGroupTopicConfig struct {
+ ConsumerGroup string
+ Topic string
+ SubscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode
+ GRPCType GRPCType
+ // IDCWebhookURLs webhook urls seperated by IDC
+ // key is IDC, value is vector.Vector
+ IDCWebhookURLs *sync.Map
+
+ // AllURLs all webhook urls, ignore idc
+ AllURLs *set.Set
+}
+
+type ConsumerGroupMetadata struct {
+ ConsumerGroup string
+ ConsumerGroupTopicMetadata *sync.Map
+}
+
+type ConsumerGroupTopicMetadata struct {
+ ConsumerGroup string
+ Topic string
+ AllURLs *set.Set
+}
+
+type ConsumerGroupStateEvent struct {
+ ConsumerGroup string
+ ConsumerGroupConfig *ConsumerGroupConfig
+ ConsumerGroupStateAction StateAction
+}
+
+type ConsumerGroupTopicConfChangeEvent struct {
+ Action StateAction
+ ConsumerGroup string
+ ConsumerGroupTopicConfig *ConsumerGroupTopicConfig
+}
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/runtime/core/protocol/grpc/config/producer_group.go
similarity index 69%
copy from eventmesh-server-go/config/grpc.go
copy to eventmesh-server-go/runtime/core/protocol/grpc/config/producer_group.go
index 39abcef8..2b72c99e 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/config/producer_group.go
@@ -15,15 +15,6 @@
package config
-// GRPCOption configuratin for grpc server
-type GRPCOption struct {
- Port string `yaml:"port" toml:"port"`
-
- // TLSOption process with the tls configuration
- *TLSOption `yaml:"tls" toml:"tls"`
-
- // PProfOption if pprof is enabled, server
- // will start on given port, and you can check
- // on http://ip:port/pprof/debug
- *PProfOption `yaml:"pprof" toml:"pprof"`
+type ProducerGroupConfig struct {
+ GroupName string `json:"groupName"`
}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer.go
new file mode 100644
index 00000000..67e263c5
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer.go
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumer
+
+import (
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/util"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/consts"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/push"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/wrapper"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+ cloudv2 "github.com/cloudevents/sdk-go/v2"
+ "github.com/pkg/errors"
+ "sync"
+ "time"
+)
+
+var (
+ ErrNoConnectorPlugin = errors.New("no connector plugin found")
+ ErrNewConsumerConnector = errors.New("create consumer connector err")
+ ErrNewProducerConnector = errors.New("create producer connector err")
+)
+
+type EventMeshConsumer struct {
+ ConsumerGroup string
+ persistentConsumer *wrapper.Consumer
+ broadcastConsumer *wrapper.Consumer
+ messageHandler *push.MessageHandler
+ ServiceState consts.ServiceState
+ // consumerGroupTopicConfig key is topic
+ // value is ConsumerGroupTopicOption
+ consumerGroupTopicConfig *sync.Map
+}
+
+func NewEventMeshConsumer(consumerGroup string) (*EventMeshConsumer, error) {
+ pushHandler, err := push.NewMessageHandler(consumerGroup)
+ if err != nil {
+ return nil, err
+ }
+ cons, err := wrapper.NewConsumer()
+ if err != nil {
+ return nil, ErrNewConsumerConnector
+ }
+ bcros, err := wrapper.NewConsumer()
+ if err != nil {
+ return nil, ErrNewConsumerConnector
+ }
+ return &EventMeshConsumer{
+ ConsumerGroup: consumerGroup,
+ messageHandler: pushHandler,
+ persistentConsumer: cons,
+ broadcastConsumer: bcros,
+ consumerGroupTopicConfig: new(sync.Map),
+ }, nil
+}
+
+func (e *EventMeshConsumer) Init() error {
+ // no topics, don't init the consumer
+ if e.ConsumerGroupSize() == 0 {
+ return nil
+ }
+
+ persistProps := make(map[string]string)
+ persistProps["isBroadcast"] = "false"
+ persistProps["consumerGroup"] = e.ConsumerGroup
+ persistProps["eventMeshIDC"] = config.GlobalConfig().Server.GRPCOption.IDC
+ persistProps["instanceName"] = util.BuildMeshClientID(e.ConsumerGroup,
+ config.GlobalConfig().Server.GRPCOption.Cluster)
+ if err := e.persistentConsumer.Init(persistProps); err != nil {
+ return err
+ }
+ clusterEventListener := e.createEventListener(pb.Subscription_SubscriptionItem_CLUSTERING)
+ e.persistentConsumer.RegisterListener(clusterEventListener)
+
+ broadcastProps := make(map[string]string)
+ broadcastProps["isBroadcast"] = "false"
+ broadcastProps["consumerGroup"] = e.ConsumerGroup
+ broadcastProps["eventMeshIDC"] = config.GlobalConfig().Server.GRPCOption.IDC
+ broadcastProps["instanceName"] = util.BuildMeshClientID(e.ConsumerGroup,
+ config.GlobalConfig().Server.GRPCOption.Cluster)
+ if err := e.broadcastConsumer.Init(broadcastProps); err != nil {
+ return err
+ }
+ broadcastEventListener := e.createEventListener(pb.Subscription_SubscriptionItem_BROADCASTING)
+ e.broadcastConsumer.RegisterListener(broadcastEventListener)
+ e.ServiceState = consts.INITED
+
+ log.Infof("init the eventmesh consumer success, group:%v", e.ConsumerGroup)
+ return nil
+}
+
+func (e *EventMeshConsumer) Start() error {
+ // no topics, don't start the consumer
+ if e.ConsumerGroupSize() == 0 {
+ return nil
+ }
+
+ e.consumerGroupTopicConfig.Range(func(key, value any) bool {
+ topic := key.(string)
+ opt := value.(*ConsumerGroupTopicOption).SubscriptionMode
+ switch opt {
+ case pb.Subscription_SubscriptionItem_CLUSTERING:
+ e.persistentConsumer.Subscribe(topic)
+ case pb.Subscription_SubscriptionItem_BROADCASTING:
+ e.broadcastConsumer.Subscribe(topic)
+ default:
+ log.Warnf("un support sub mode:%v", opt)
+ }
+ return true
+ })
+
+ if err := e.broadcastConsumer.Start(); err != nil {
+ return err
+ }
+ if err := e.persistentConsumer.Start(); err != nil {
+ return err
+ }
+ return nil
+}
+
+// RegisterClient Register client's topic information
+// return true if this EventMeshConsumer required restart because of the topic changes
+func (e *EventMeshConsumer) RegisterClient(cli *GroupClient) bool {
+ var (
+ consumerTopicOption *ConsumerGroupTopicOption
+ restart = false
+ )
+ val, ok := e.consumerGroupTopicConfig.Load(cli.Topic)
+ if !ok {
+ consumerTopicOption = NewConsumerGroupTopicOption(cli.ConsumerGroup, cli.Topic, cli.SubscriptionMode, cli.GRPCType)
+ e.consumerGroupTopicConfig.Store(cli.Topic, consumerTopicOption)
+ restart = true
+ } else {
+ consumerTopicOption = val.(*ConsumerGroupTopicOption)
+ }
+ consumerTopicOption.RegisterClient(cli)
+ return restart
+}
+
+// DeRegisterClient deregister client's topic information and return true if this EventMeshConsumer
+// required restart because of the topic changes
+// return true if the underlining EventMeshConsumer needs to restart later; false otherwise
+func (e *EventMeshConsumer) DeRegisterClient(cli *GroupClient) bool {
+ var (
+ consumerTopicOption *ConsumerGroupTopicOption
+ )
+ val, ok := e.consumerGroupTopicConfig.Load(cli.Topic)
+ if !ok {
+ return false
+ }
+ consumerTopicOption = val.(*ConsumerGroupTopicOption)
+ consumerTopicOption.DeregisterClient(cli)
+ e.consumerGroupTopicConfig.Delete(cli.Topic)
+ return true
+}
+
+func (e *EventMeshConsumer) Shutdown() error {
+ if err := e.persistentConsumer.Shutdown(); err != nil {
+ return err
+ }
+ if err := e.broadcastConsumer.Shutdown(); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (e *EventMeshConsumer) ConsumerGroupSize() int {
+ count := 0
+ e.consumerGroupTopicConfig.Range(func(key, value any) bool {
+ count++
+ return true
+ })
+ return count
+}
+
+func (e *EventMeshConsumer) createEventListener(mode pb.Subscription_SubscriptionItem_SubscriptionMode) *connector.EventListener {
+ return &connector.EventListener{
+ Consume: func(event *cloudv2.Event, commitFunc connector.CommitFunc) error {
+ var commitAction connector.EventMeshAction
+ defer commitFunc(commitAction)
+
+ eventclone := event.Clone()
+ eventclone.SetExtension(consts.REQ_MQ2EVENTMESH_TIMESTAMP, time.Now().UnixMilli())
+ topic := event.Subject()
+ bizSeqNo := eventclone.Extensions()[consts.PROPERTY_MESSAGE_SEARCH_KEYS]
+ uniqueID := eventclone.Extensions()[consts.RMB_UNIQ_ID]
+ log.Infof("mq to eventmesh, topic:%v, bizSeqNo:%v, uniqueID:%v", topic, bizSeqNo, uniqueID)
+
+ val, ok := e.consumerGroupTopicConfig.Load(topic)
+ if !ok {
+ log.Debugf("no active consumer for topic:%v", topic)
+ commitAction = connector.CommitMessage
+ return nil
+ }
+
+ topicConfig := val.(*ConsumerGroupTopicOption)
+ tpy := topicConfig.GRPCType
+ mctx := &push.MessageContext{
+ GrpcType: tpy,
+ ConsumerGroup: e.ConsumerGroup,
+ SubscriptionMode: mode,
+ Event: &eventclone,
+ }
+ if err := e.messageHandler.Handler(mctx); err != nil {
+ log.Warnf("handle msg err:%v, topic:%v, group:%v", err, topic, topicConfig.ConsumerGroup)
+ // can not handle the message due to the capacity limit is reached
+ // wait for 5 seconds and send this message back to mq and consume again
+ time.Sleep(time.Second * 5)
+ //e.sendMessageBack()
+ commitAction = connector.CommitMessage
+ return nil
+ }
+
+ commitAction = connector.ManualAck
+ return nil
+ },
+ }
+}
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/group_client.go
similarity index 54%
copy from eventmesh-server-go/config/grpc.go
copy to eventmesh-server-go/runtime/core/protocol/grpc/consumer/group_client.go
index 39abcef8..3ed84c18 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/group_client.go
@@ -13,17 +13,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package config
+package consumer
-// GRPCOption configuratin for grpc server
-type GRPCOption struct {
- Port string `yaml:"port" toml:"port"`
+import (
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/config"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+ "time"
+)
- // TLSOption process with the tls configuration
- *TLSOption `yaml:"tls" toml:"tls"`
-
- // PProfOption if pprof is enabled, server
- // will start on given port, and you can check
- // on http://ip:port/pprof/debug
- *PProfOption `yaml:"pprof" toml:"pprof"`
+// GroupClient consumer group client details
+type GroupClient struct {
+ ENV string
+ IDC string
+ ConsumerGroup string
+ Topic string
+ GRPCType config.GRPCType
+ URL string
+ SubscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode
+ SYS string
+ IP string
+ PID string
+ Hostname string
+ APIVersion string
+ LastUPtime time.Time
}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/group_option.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/group_option.go
new file mode 100644
index 00000000..0cea6116
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/group_option.go
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumer
+
+import (
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/config"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+ "github.com/liyue201/gostl/ds/set"
+ "sync"
+)
+
+type RegisterClient func(*GroupClient)
+type DeregisterClient func(*GroupClient)
+
+// ConsumerGroupTopicOption refers to ConsumerGroupTopicConfig
+type ConsumerGroupTopicOption struct {
+ ConsumerGroup string
+ Topic string
+ SubscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode
+ GRPCType config.GRPCType
+ RegisterClient RegisterClient
+ DeregisterClient DeregisterClient
+}
+
+func NewConsumerGroupTopicOption(cg string,
+ topic string,
+ mode pb.Subscription_SubscriptionItem_SubscriptionMode,
+ gtype config.GRPCType) *ConsumerGroupTopicOption {
+ return &ConsumerGroupTopicOption{
+ ConsumerGroup: cg,
+ Topic: topic,
+ SubscriptionMode: mode,
+ GRPCType: gtype,
+ }
+}
+
+// WebhookGroupTopicOption topic option for subscribe with webhook
+type WebhookGroupTopicOption struct {
+ *ConsumerGroupTopicOption
+
+ // IDCWebhookURLs webhook urls seperated by IDC
+ // key is IDC, value is vector.Vector
+ IDCWebhookURLs *sync.Map
+
+ // AllURLs all webhook urls, ignore idc
+ AllURLs *set.Set
+}
+
+func NewWebhookGroupTopicOption(cg string,
+ topic string,
+ mode pb.Subscription_SubscriptionItem_SubscriptionMode,
+ gtype config.GRPCType) *WebhookGroupTopicOption {
+ opt := &WebhookGroupTopicOption{
+ ConsumerGroupTopicOption: NewConsumerGroupTopicOption(cg, topic, mode, config.WEBHOOK),
+ IDCWebhookURLs: new(sync.Map),
+ AllURLs: set.New(set.WithGoroutineSafe()),
+ }
+ opt.ConsumerGroupTopicOption.RegisterClient = func(cli *GroupClient) {
+ if cli.GRPCType != config.WEBHOOK {
+ log.Warnf("invalid grpc type:%v, with provide WEBHOOK", cli.GRPCType)
+ return
+ }
+ iwu, ok := opt.IDCWebhookURLs.Load(cli.IDC)
+ if !ok {
+ newIDCURLs := set.New(set.WithGoroutineSafe())
+ newIDCURLs.Insert(cli.URL)
+ opt.IDCWebhookURLs.Store(cli.IDC, newIDCURLs)
+ } else {
+ val := iwu.(*set.Set)
+ val.Insert(cli.URL)
+ opt.IDCWebhookURLs.Store(cli.IDC, val)
+ }
+ opt.AllURLs.Insert(cli.URL)
+ }
+
+ opt.ConsumerGroupTopicOption.DeregisterClient = func(cli *GroupClient) {
+ val, ok := opt.IDCWebhookURLs.Load(cli.IDC)
+ if !ok {
+ return
+ }
+ idcURLs := val.(*set.Set)
+ idcURLs.Erase(cli.URL)
+ opt.AllURLs.Erase(cli.URL)
+ }
+ return opt
+}
+
+// StreamGroupTopicOption topic option for subscribe with stream
+type StreamGroupTopicOption struct {
+ *ConsumerGroupTopicOption
+}
+
+func NewWStreamGroupTopicOption(cg string,
+ topic string,
+ mode pb.Subscription_SubscriptionItem_SubscriptionMode,
+ gtype config.GRPCType) *StreamGroupTopicOption {
+ opt := &StreamGroupTopicOption{
+ ConsumerGroupTopicOption: NewConsumerGroupTopicOption(cg, topic, mode, config.STREAM),
+ }
+ opt.RegisterClient = func(cli *GroupClient) {
+ if cli.GRPCType != config.STREAM {
+ log.Warnf("invalid grpc type:%v, with provide STREAM", cli.GRPCType)
+ return
+ }
+
+ }
+ opt.DeregisterClient = func(cli *GroupClient) {
+
+ }
+ return opt
+}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/manager.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/manager.go
new file mode 100644
index 00000000..133a7629
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/manager.go
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumer
+
+import (
+ config2 "github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/consts"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/config"
+ "github.com/liyue201/gostl/ds/set"
+ "github.com/pkg/errors"
+ "sync"
+ "time"
+)
+
+var (
+ ErrNoConsumerClient = errors.New("no consumer group client")
+)
+
+type Manager struct {
+ // consumerClients store all consumer clients
+ // key is consumer group, value is set *GroupClient
+ consumerGroupClients *sync.Map
+
+ // consumers eventmesh consumer instances
+ // key is consumer group, value is EventMeshConsumer
+ consumers *sync.Map
+}
+
+// NewManager create new consumer manager
+func NewManager() (*Manager, error) {
+ return &Manager{
+ consumers: new(sync.Map),
+ consumerGroupClients: new(sync.Map),
+ }, nil
+}
+
+func (c *Manager) GetConsumer(consumerGroup string) (*EventMeshConsumer, error) {
+ val, ok := c.consumers.Load(consumerGroup)
+ if ok {
+ return val.(*EventMeshConsumer), nil
+ }
+ cu, err := NewEventMeshConsumer(consumerGroup)
+ if err != nil {
+ return nil, err
+ }
+ c.consumers.Store(consumerGroup, cu)
+ return cu, nil
+}
+
+func (c *Manager) RegisterClient(cli *GroupClient) error {
+ val, ok := c.consumerGroupClients.Load(cli.ConsumerGroup)
+ if !ok {
+ cliset := set.New(set.WithGoroutineSafe())
+ cliset.Insert(cli)
+ c.consumerGroupClients.Store(cli.ConsumerGroup, cliset)
+ return nil
+ }
+ localClients := val.(*set.Set)
+ found := false
+ for iter := localClients.Begin(); iter.IsValid(); iter.Next() {
+ lc := iter.Value().(*GroupClient)
+ if lc.GRPCType == config.WEBHOOK {
+ lc.URL = cli.URL
+ lc.LastUPtime = cli.LastUPtime
+ found = true
+ break
+ }
+ if lc.GRPCType == config.STREAM {
+ // TODO add event emitter
+ lc.LastUPtime = cli.LastUPtime
+ found = true
+ break
+ }
+ }
+ if !found {
+ localClients.Insert(cli)
+ }
+ return nil
+}
+
+func (c *Manager) DeRegisterClient(cli *GroupClient) error {
+ val, ok := c.consumerGroupClients.Load(cli.ConsumerGroup)
+ if !ok {
+ log.Debugf("no consumer group client found, name:%v", cli.ConsumerGroup)
+ return nil
+ }
+ localClients := val.(*set.Set)
+ for iter := localClients.Begin(); iter.IsValid(); iter.Next() {
+ lc := iter.Value().(*GroupClient)
+ if lc.Topic == cli.Topic {
+ if lc.GRPCType == config.STREAM {
+ // TODO
+ // close the GRPC client stream before removing it
+ }
+ localClients.Erase(lc)
+ }
+ }
+ if localClients.Size() == 0 {
+ c.consumerGroupClients.Delete(cli.ConsumerGroup)
+ }
+ return nil
+}
+
+func (c *Manager) restartConsumer(consumerGroup string) error {
+ val, ok := c.consumers.Load(consumerGroup)
+ if !ok {
+ return nil
+ }
+ emconsumer := val.(*EventMeshConsumer)
+ if emconsumer.ServiceState == consts.RUNNING {
+ if err := emconsumer.Shutdown(); err != nil {
+ return err
+ }
+ }
+ if err := emconsumer.Init(); err != nil {
+ return err
+ }
+ if err := emconsumer.Start(); err != nil {
+ return err
+ }
+ if emconsumer.ServiceState != consts.RUNNING {
+ log.Warnf("restart eventmesh consumer failed, status:%v", emconsumer.ServiceState)
+ c.consumers.Delete(consumerGroup)
+ }
+ return nil
+}
+
+func (c *Manager) UpdateClientTime(cli *GroupClient) {
+ val, ok := c.consumerGroupClients.Load(cli.ConsumerGroup)
+ if !ok {
+ log.Debugf("no consumer group client found, name:%v", cli.ConsumerGroup)
+ return
+ }
+ localClients := val.(*set.Set)
+ for iter := localClients.Begin(); iter.IsValid(); iter.Next() {
+ iter.Value().(*GroupClient).LastUPtime = time.Now()
+ }
+}
+
+func (c *Manager) clientCheck() {
+ sessionExpiredInMills := config2.GlobalConfig().Server.GRPCOption.SessionExpiredInMills
+ tk := time.NewTicker(sessionExpiredInMills)
+ go func() {
+ for range tk.C {
+ var consumerGroupRestart []string
+ c.consumerGroupClients.Range(func(key, value any) bool {
+ localClients := value.(*set.Set)
+ for iter := localClients.Begin(); iter.IsValid(); iter.Next() {
+ lc := iter.Value().(*GroupClient)
+ if time.Now().Sub(lc.LastUPtime) > sessionExpiredInMills {
+ log.Warnf("client:%v lastUpdate time:%v over three heartbeat cycles. Removing it",
+ lc.ConsumerGroup, lc.LastUPtime)
+ emconsumer, err := c.GetConsumer(lc.ConsumerGroup)
+ if err != nil {
+ log.Warnf("get eventmesh consumer:%v failed, err:%v", lc.ConsumerGroup, err)
+ return true
+ }
+ if err := c.DeRegisterClient(lc); err != nil {
+ log.Warnf("deregistry client:%v err:%v", lc.ConsumerGroup, err)
+ return true
+ }
+ if ok := emconsumer.DeRegisterClient(lc); !ok {
+ log.Warnf("failed deregistry client:%v in eventmesh consumer", lc.ConsumerGroup)
+ return true
+ }
+ consumerGroupRestart = append(consumerGroupRestart, lc.ConsumerGroup)
+ }
+ }
+ for _, rs := range consumerGroupRestart {
+ if err := c.restartConsumer(rs); err != nil {
+ log.Warnf("deregistry consumer:%v err:%v", rs, err)
+ return true
+ }
+ }
+ return true
+ })
+ }
+ }()
+}
+
+func (c *Manager) Start() error {
+ log.Infof("start consumer manager")
+ return nil
+}
+
+func (c *Manager) Stop() error {
+ return nil
+}
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/runtime/core/protocol/grpc/push/context.go
similarity index 54%
copy from eventmesh-server-go/config/grpc.go
copy to eventmesh-server-go/runtime/core/protocol/grpc/push/context.go
index 39abcef8..49ee5654 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/push/context.go
@@ -13,17 +13,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package config
+package push
-// GRPCOption configuratin for grpc server
-type GRPCOption struct {
- Port string `yaml:"port" toml:"port"`
+import (
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/config"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/consumer"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+ cloudv2 "github.com/cloudevents/sdk-go/v2"
+)
- // TLSOption process with the tls configuration
- *TLSOption `yaml:"tls" toml:"tls"`
-
- // PProfOption if pprof is enabled, server
- // will start on given port, and you can check
- // on http://ip:port/pprof/debug
- *PProfOption `yaml:"pprof" toml:"pprof"`
+type MessageContext struct {
+ MsgRandomNo string
+ SubscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode
+ GrpcType config.GRPCType
+ ConsumerGroup string
+ Event *cloudv2.Event
+ TopicConfig *config.ConsumerGroupTopicConfig
+ // channel for server
+ Consumer *consumer.EventMeshConsumer
}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/push/handler.go b/eventmesh-server-go/runtime/core/protocol/grpc/push/handler.go
new file mode 100644
index 00000000..e149a29e
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/push/handler.go
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package push
+
+import (
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+ "github.com/pkg/errors"
+ "sync"
+ "time"
+)
+
+var (
+ ConsumerGroupWaitingRequestThreshold = 1000
+
+ ErrRequestReachMaxThreshold = errors.New("request reach the max threshold")
+)
+
+type MessageHandler struct {
+ //pool *ants.Pool
+ // waitingRequests waiting to request
+ // key to consumerGroup value to []*PushRequest
+ waitingRequests *sync.Map
+}
+
+func NewMessageHandler(consumerGroup string) (*MessageHandler, error) {
+ wr := new(sync.Map)
+ // TODO need goroutine safe in []*Request{}
+ wr.Store(consumerGroup, []*Request{})
+ hdl := &MessageHandler{
+ //pool: p,
+ waitingRequests: wr,
+ }
+ go hdl.checkTimeout()
+ return hdl, nil
+}
+
+func (m *MessageHandler) checkTimeout() {
+ tk := time.NewTicker(time.Second)
+ for range tk.C {
+ m.waitingRequests.Range(func(key, value interface{}) bool {
+ reqs := value.([]*Request)
+ for _, req := range reqs {
+ if req.timeout() {
+
+ }
+ }
+ return true
+ })
+ }
+}
+
+func (m *MessageHandler) Handler(mctx *MessageContext) error {
+ if m.Size() > ConsumerGroupWaitingRequestThreshold {
+ log.Warnf("too many request, reject and send back to MQ, group:%v, threshold:%v",
+ mctx.ConsumerGroup, ConsumerGroupWaitingRequestThreshold)
+ return ErrRequestReachMaxThreshold
+ }
+
+ go func() {
+ pushRequest := &Request{}
+ if err := pushRequest.Try(); err != nil {
+
+ }
+ }()
+ return nil
+}
+
+func (m *MessageHandler) Size() int {
+ count := 0
+ m.waitingRequests.Range(func(key, value any) bool {
+ count++
+ return true
+ })
+ return count
+}
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/runtime/core/protocol/grpc/retry/context.go
similarity index 68%
copy from eventmesh-server-go/config/grpc.go
copy to eventmesh-server-go/runtime/core/protocol/grpc/retry/context.go
index 39abcef8..2b90746b 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/retry/context.go
@@ -13,17 +13,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package config
+package retry
-// GRPCOption configuratin for grpc server
-type GRPCOption struct {
- Port string `yaml:"port" toml:"port"`
+import "time"
- // TLSOption process with the tls configuration
- *TLSOption `yaml:"tls" toml:"tls"`
+type Context struct {
+ RetryTimes int
+ ExecuteTime time.Time
+ Do func() error
+}
+
+func (c *Context) SetDelay(delay time.Duration) *Context {
+ c.ExecuteTime = time.Now().Add(delay)
+ return c
+}
- // PProfOption if pprof is enabled, server
- // will start on given port, and you can check
- // on http://ip:port/pprof/debug
- *PProfOption `yaml:"pprof" toml:"pprof"`
+func (c *Context) GetDelay() time.Duration {
+ return c.ExecuteTime.Sub(time.Now())
}
diff --git a/eventmesh-server-go/runtime/core/wrapper/producer.go b/eventmesh-server-go/runtime/core/wrapper/producer.go
new file mode 100644
index 00000000..1dfbe51d
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/wrapper/producer.go
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wrapper
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
+ eventv2 "github.com/cloudevents/sdk-go/v2"
+ "time"
+)
+
+type Producer struct {
+ *Base
+ ProducerConnector connector.Producer
+}
+
+var (
+ ErrNoConnectorPlugin = fmt.Errorf("no connector plugin provided")
+ ErrNoConnectorName = fmt.Errorf("no connector plugin name provided")
+)
+
+// NewProducer create new producer to handle the grpc request
+func NewProducer() (*Producer, error) {
+ connectorPlugin, ok := config.GlobalConfig().Plugins[config.ConnectorPluginType]
+ if !ok {
+ return nil, ErrNoConnectorPlugin
+ }
+ connectorPluginName, ok := connectorPlugin["name"]
+ if !ok {
+ return nil, ErrNoConnectorName
+ }
+ log.Infof("init producer with connector name:%s", connectorPluginName)
+ factory := plugin.Get(connector.ProducerPluginType, connectorPluginName.Value).(connector.ProducerFactory)
+ consu, err := factory.Get()
+ if err != nil {
+ return nil, err
+ }
+ return &Producer{
+ Base: DefaultBaseWrapper(),
+ ProducerConnector: consu,
+ }, nil
+}
+
+func (c *Producer) Send(ctx context.Context, event *eventv2.Event, callback *connector.SendCallback) error {
+ return c.ProducerConnector.Publish(ctx, event, callback)
+}
+
+func (c *Producer) Request(ctx context.Context, event *eventv2.Event, callback *connector.RequestReplyCallback, timeout time.Duration) error {
+ return c.ProducerConnector.Request(ctx, event, callback, timeout)
+}
+
+func (c *Producer) Reply(ctx context.Context, event *eventv2.Event, callback *connector.SendCallback) error {
+ return c.ProducerConnector.Reply(ctx, event, callback)
+}
+
+func (c *Producer) Start() error {
+ if err := c.ProducerConnector.Start(); err != nil {
+ return err
+ }
+
+ c.Base.Started.CAS(false, true)
+ return nil
+}
+
+func (c *Producer) Shutdown() error {
+ if err := c.ProducerConnector.Shutdown(); err != nil {
+ return err
+ }
+
+ c.Base.Started.CAS(false, true)
+ c.Base.Inited.CAS(false, true)
+ return nil
+}
diff --git a/eventmesh-server-go/runtime/core/wrapper/producer_test.go b/eventmesh-server-go/runtime/core/wrapper/producer_test.go
new file mode 100644
index 00000000..75146b57
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/wrapper/producer_test.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wrapper
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
+ eventv2 "github.com/cloudevents/sdk-go/v2"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestProducer_Send(t *testing.T) {
+ type fields struct {
+ Base *Base
+ ProducerConnector connector.Producer
+ }
+ type args struct {
+ ctx context.Context
+ event *eventv2.Event
+ callback *connector.SendCallback
+ }
+ factory := plugin.Get(connector.ProducerPluginType, "standalone").(connector.ProducerFactory)
+ produ, err := factory.Get()
+ assert.NoError(t, err)
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {
+ name: "test send",
+ fields: fields{
+ Base: DefaultBaseWrapper(),
+ ProducerConnector: produ,
+ },
+ args: args{
+ ctx: context.TODO(),
+ event: &eventv2.Event{},
+ callback: &connector.SendCallback{
+ OnSuccess: func(result *connector.SendResult) {
+ t.Logf("success")
+ },
+ OnError: func(result *connector.ErrorResult) {
+ t.Logf("error")
+ },
+ },
+ },
+ wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+ return err != nil
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := &Producer{
+ Base: tt.fields.Base,
+ ProducerConnector: tt.fields.ProducerConnector,
+ }
+ tt.wantErr(t, c.Send(tt.args.ctx, tt.args.event, tt.args.callback), fmt.Sprintf("Send(%v, %v, %v)", tt.args.ctx, tt.args.event, tt.args.callback))
+ })
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org