You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2022/09/19 15:04:40 UTC
[incubator-eventmesh] branch eventmesh-server-go updated: add runtime producer
This is an automated email from the ASF dual-hosted git repository.
walleliu pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/eventmesh-server-go by this push:
new 0392be85 add runtime producer
new 41f9139f Merge pull request #1339 from walleliu1016/eventmesh-server-go-runtime
0392be85 is described below
commit 0392be85d188c515bf2a359cbbc331476812968b
Author: walleliu1016 <li...@163.com>
AuthorDate: Mon Sep 19 23:03:00 2022 +0800
add runtime producer
---
eventmesh-server-go/plugin/types.go | 37 +++++
.../runtime/core/protocol/grpc/producer/context.go | 31 ++++
.../runtime/core/protocol/grpc/producer/manager.go | 78 ++++++++++
.../core/protocol/grpc/producer/producer.go | 101 ++++++++++++
.../runtime/core/protocol/grpc/push/request.go | 171 +++++++++++++++++++++
5 files changed, 418 insertions(+)
diff --git a/eventmesh-server-go/plugin/types.go b/eventmesh-server-go/plugin/types.go
new file mode 100644
index 00000000..14982ed8
--- /dev/null
+++ b/eventmesh-server-go/plugin/types.go
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package plugin
+
+// name for every plugin
+var Name = "name"
+
+// indicates all plugin type
+var (
+ Connector = "connector"
+ Registry = "registry"
+ Protocol = "protocol"
+)
+
+// indicates registry name list
+var (
+ NacosRegistry = "nacos"
+ ETCDRegistry = "etcd"
+)
+
+// indicates connector name list
+var (
+ Standalone = "standalone"
+)
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/producer/context.go b/eventmesh-server-go/runtime/core/protocol/grpc/producer/context.go
new file mode 100644
index 00000000..74d1d386
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/producer/context.go
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producer
+
+import (
+ "context"
+ cloudv2 "github.com/cloudevents/sdk-go/v2"
+ "time"
+)
+
+// SendMessageContext context in produce message
+type SendMessageContext struct {
+ Ctx context.Context
+ Event *cloudv2.Event
+ BizSeqNO string
+ producerAPI *EventMeshProducer
+ CreateTime time.Time
+}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/producer/manager.go b/eventmesh-server-go/runtime/core/protocol/grpc/producer/manager.go
new file mode 100644
index 00000000..c4e28fa8
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/producer/manager.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producer
+
+import (
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/config"
+ "sync"
+)
+
+// Manager manger for all producer
+type Manager struct {
+ // EventMeshProducers {groupName, *EventMeshProducer}
+ EventMeshProducers *sync.Map
+}
+
+func NewManager() (*Manager, error) {
+ return &Manager{
+ EventMeshProducers: new(sync.Map),
+ }, nil
+}
+
+func (m *Manager) GetProducer(groupName string) (*EventMeshProducer, error) {
+ p, ok := m.EventMeshProducers.Load(groupName)
+ if ok {
+ return p.(*EventMeshProducer), nil
+ }
+ pgc := &config.ProducerGroupConfig{GroupName: groupName}
+ pg, err := m.CreateProducer(pgc)
+ if err != nil {
+ return nil, err
+ }
+ return pg, nil
+}
+
+func (m *Manager) CreateProducer(producerGroupConfig *config.ProducerGroupConfig) (*EventMeshProducer, error) {
+ val, ok := m.EventMeshProducers.Load(producerGroupConfig.GroupName)
+ if ok {
+ return val.(*EventMeshProducer), nil
+ }
+ pg, err := NewEventMeshProducer(producerGroupConfig)
+ if err != nil {
+ return nil, err
+ }
+ m.EventMeshProducers.Store(producerGroupConfig.GroupName, pg)
+ return pg, nil
+}
+
+func (m *Manager) Start() error {
+ log.Infof("start producer manager")
+ return nil
+}
+
+func (m *Manager) Shutdown() error {
+ log.Infof("shutdown producer manager")
+
+ m.EventMeshProducers.Range(func(key, value any) bool {
+ pg := value.(*EventMeshProducer)
+ if err := pg.Shutdown(); err != nil {
+ log.Infof("shutdown eventmesh producer:%v, err:%v", key, err)
+ }
+ return true
+ })
+ return nil
+}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/producer/producer.go b/eventmesh-server-go/runtime/core/protocol/grpc/producer/producer.go
new file mode 100644
index 00000000..3b54bd40
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/producer/producer.go
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producer
+
+import (
+ "fmt"
+ config2 "github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/util"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/consts"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/config"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/wrapper"
+ "time"
+)
+
+type EventMeshProducer struct {
+ cfg *config.ProducerGroupConfig
+ producer *wrapper.Producer
+ ServiceState consts.ServiceState
+}
+
+func NewEventMeshProducer(cfg *config.ProducerGroupConfig) (*EventMeshProducer, error) {
+ pm, err := wrapper.NewProducer()
+ if err != nil {
+ return nil, err
+ }
+
+ cluster := config2.GlobalConfig().Server.GRPCOption.Cluster
+ idc := config2.GlobalConfig().Server.GRPCOption.IDC
+ mm := make(map[string]string)
+ mm["producerGroup"] = cfg.GroupName
+ mm["instanceName"] = util.BuildMeshClientID(cfg.GroupName, cluster)
+ mm["eventMeshIDC"] = idc
+ if err = pm.ProducerConnector.InitProducer(mm); err != nil {
+ return nil, err
+ }
+
+ p := &EventMeshProducer{
+ cfg: cfg,
+ producer: pm,
+ ServiceState: consts.INITED,
+ }
+ return p, nil
+}
+
+func (e *EventMeshProducer) Send(sctx SendMessageContext, callback *connector.SendCallback) error {
+ return e.producer.Send(sctx.Ctx, sctx.Event, callback)
+}
+
+func (e *EventMeshProducer) Request(sctx SendMessageContext, callback *connector.RequestReplyCallback, timeout time.Duration) error {
+ return e.producer.Request(sctx.Ctx, sctx.Event, callback, timeout)
+}
+
+func (e *EventMeshProducer) Reply(sctx SendMessageContext, callback *connector.SendCallback) error {
+ return e.producer.Reply(sctx.Ctx, sctx.Event, callback)
+}
+
+func (e *EventMeshProducer) Start() error {
+ if e.ServiceState == "" || e.ServiceState == consts.RUNNING {
+ return nil
+ }
+ if err := e.producer.Start(); err != nil {
+ return err
+ }
+ e.ServiceState = consts.RUNNING
+ log.Info("start eventmesh producer for groupName:%s", e.cfg.GroupName)
+ return nil
+}
+
+func (e *EventMeshProducer) Shutdown() error {
+ if e.ServiceState == "" || e.ServiceState == consts.INITED {
+ return nil
+ }
+ if err := e.producer.Shutdown(); err != nil {
+ return err
+ }
+ e.ServiceState = consts.STOPED
+ return nil
+}
+
+func (e *EventMeshProducer) Status() consts.ServiceState {
+ return e.ServiceState
+}
+
+func (e *EventMeshProducer) String() string {
+ return fmt.Sprintf("eventMeshProducer, status:%s, groupName:%s", e.ServiceState, e.cfg.GroupName)
+}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/push/request.go b/eventmesh-server-go/runtime/core/protocol/grpc/push/request.go
new file mode 100644
index 00000000..24da6128
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/push/request.go
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package push
+
+import (
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/protocol"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/consts"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/retry"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+ cloudv2 "github.com/cloudevents/sdk-go/v2"
+ "github.com/liyue201/gostl/ds/set"
+ "github.com/liyue201/gostl/ds/vector"
+ "github.com/pkg/errors"
+ "go.uber.org/atomic"
+ "math/rand"
+ "sync"
+ "time"
+)
+
+var (
+ ErrNoProtocolFound = errors.New("no protocol type found in event message")
+)
+
+type Request struct {
+ *retry.Context
+
+ MessageContext *MessageContext
+ CreateTime time.Time
+ LastPushTime time.Time
+ Complete *atomic.Bool
+ SimpleMessage *pb.SimpleMessage
+ Try func() error
+}
+
+func NewRequest(mctx *MessageContext) (*Request, error) {
+ sm, err := eventToSimpleMessage(mctx.Event)
+ if err != nil {
+ return nil, err
+ }
+ return &Request{
+ MessageContext: mctx,
+ SimpleMessage: sm,
+ }, nil
+}
+
+func eventToSimpleMessage(ev *cloudv2.Event) (*pb.SimpleMessage, error) {
+ val, ok := ev.Extensions()[consts.PROTOCOL_TYPE]
+ if !ok {
+ return nil, ErrNoProtocolFound
+ }
+ ptype := val.(string)
+ pplugin := plugin.Get(plugin.Protocol, ptype)
+ adapter := pplugin.(protocol.Adapter)
+ msg, err := adapter.FromCloudEvent(ev)
+ if err != nil {
+ return nil, err
+ }
+ return msg.(*pb.SimpleMessage), nil
+}
+
+func (r *Request) timeout() bool {
+ return true
+}
+
+type StreamRequest struct {
+ *Request
+ mode pb.Subscription_SubscriptionItem_SubscriptionMode
+ startIdx int
+}
+
+func NewStreamRequest(mctx *MessageContext) (*StreamRequest, error) {
+ r, err := NewRequest(mctx)
+ if err != nil {
+ return nil, err
+ }
+ sr := &StreamRequest{
+ Request: r,
+ }
+ sr.Try = func() error {
+ return nil
+ }
+
+ return sr, nil
+}
+
+type WebhookRequest struct {
+ *Request
+ // IDCWebhookURLs webhook urls seperated by IDC
+ // key is IDC, value is vector.Vector
+ IDCWebhookURLs *sync.Map
+
+ // AllURLs all webhook urls, ignore idc
+ AllURLs *set.Set
+
+ startIdx int
+
+ subscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode
+}
+
+func NewWebhookRequest(mctx *MessageContext) (*WebhookRequest, error) {
+ r, err := NewRequest(mctx)
+ if err != nil {
+ return nil, err
+ }
+ rand.Seed(time.Now().UnixMilli())
+ hr := &WebhookRequest{
+ IDCWebhookURLs: mctx.TopicConfig.IDCWebhookURLs,
+ AllURLs: mctx.TopicConfig.AllURLs,
+ Request: r,
+ startIdx: rand.Intn(mctx.TopicConfig.AllURLs.Size()),
+ subscriptionMode: mctx.SubscriptionMode,
+ }
+ hr.Try = func() error {
+ return nil
+ }
+ return hr, nil
+}
+
+func (w *WebhookRequest) getURLs() []string {
+ var (
+ urls []string
+ currentIDC = config.GlobalConfig().Server.GRPCOption.IDC
+ )
+
+ w.IDCWebhookURLs.Range(func(key, value any) bool {
+ idc := key.(string)
+ vc := value.(*vector.Vector)
+ if idc == currentIDC {
+ for iter := vc.Begin(); iter.IsValid(); iter.Next() {
+ urls = append(urls, iter.Value().(string))
+ }
+ }
+ return true
+ })
+
+ if len(urls) == 0 {
+ for iter := w.AllURLs.Begin(); iter.IsValid(); iter.Next() {
+ urls = append(urls, iter.Value().(string))
+ }
+ }
+ if len(urls) == 0 {
+ log.Warnf("no handler for submitter")
+ return []string{}
+ }
+
+ switch w.subscriptionMode {
+ case pb.Subscription_SubscriptionItem_CLUSTERING:
+ return []string{urls[(w.RetryTimes+w.startIdx)%len(urls)]}
+ case pb.Subscription_SubscriptionItem_BROADCASTING:
+ return urls
+ default:
+ log.Warnf("invalid Subscription Mode, no message returning back to subscriber")
+ return []string{}
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org