You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2022/09/19 15:04:40 UTC

[incubator-eventmesh] branch eventmesh-server-go updated: add runtime producer

This is an automated email from the ASF dual-hosted git repository.

walleliu pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/eventmesh-server-go by this push:
     new 0392be85 add runtime producer
     new 41f9139f Merge pull request #1339 from walleliu1016/eventmesh-server-go-runtime
0392be85 is described below

commit 0392be85d188c515bf2a359cbbc331476812968b
Author: walleliu1016 <li...@163.com>
AuthorDate: Mon Sep 19 23:03:00 2022 +0800

    add runtime producer
---
 eventmesh-server-go/plugin/types.go                |  37 +++++
 .../runtime/core/protocol/grpc/producer/context.go |  31 ++++
 .../runtime/core/protocol/grpc/producer/manager.go |  78 ++++++++++
 .../core/protocol/grpc/producer/producer.go        | 101 ++++++++++++
 .../runtime/core/protocol/grpc/push/request.go     | 171 +++++++++++++++++++++
 5 files changed, 418 insertions(+)

diff --git a/eventmesh-server-go/plugin/types.go b/eventmesh-server-go/plugin/types.go
new file mode 100644
index 00000000..14982ed8
--- /dev/null
+++ b/eventmesh-server-go/plugin/types.go
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package plugin
+
+// name for every plugin
+var Name = "name"
+
+// indicates all plugin type
+var (
+	Connector = "connector"
+	Registry  = "registry"
+	Protocol  = "protocol"
+)
+
+// indicates registry name list
+var (
+	NacosRegistry = "nacos"
+	ETCDRegistry  = "etcd"
+)
+
+// indicates connector name list
+var (
+	Standalone = "standalone"
+)
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/producer/context.go b/eventmesh-server-go/runtime/core/protocol/grpc/producer/context.go
new file mode 100644
index 00000000..74d1d386
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/producer/context.go
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producer
+
+import (
+	"context"
+	cloudv2 "github.com/cloudevents/sdk-go/v2"
+	"time"
+)
+
+// SendMessageContext context in produce message
+type SendMessageContext struct {
+	Ctx         context.Context
+	Event       *cloudv2.Event
+	BizSeqNO    string
+	producerAPI *EventMeshProducer
+	CreateTime  time.Time
+}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/producer/manager.go b/eventmesh-server-go/runtime/core/protocol/grpc/producer/manager.go
new file mode 100644
index 00000000..c4e28fa8
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/producer/manager.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producer
+
+import (
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/config"
+	"sync"
+)
+
+// Manager manger for all producer
+type Manager struct {
+	// EventMeshProducers {groupName, *EventMeshProducer}
+	EventMeshProducers *sync.Map
+}
+
+func NewManager() (*Manager, error) {
+	return &Manager{
+		EventMeshProducers: new(sync.Map),
+	}, nil
+}
+
+func (m *Manager) GetProducer(groupName string) (*EventMeshProducer, error) {
+	p, ok := m.EventMeshProducers.Load(groupName)
+	if ok {
+		return p.(*EventMeshProducer), nil
+	}
+	pgc := &config.ProducerGroupConfig{GroupName: groupName}
+	pg, err := m.CreateProducer(pgc)
+	if err != nil {
+		return nil, err
+	}
+	return pg, nil
+}
+
+func (m *Manager) CreateProducer(producerGroupConfig *config.ProducerGroupConfig) (*EventMeshProducer, error) {
+	val, ok := m.EventMeshProducers.Load(producerGroupConfig.GroupName)
+	if ok {
+		return val.(*EventMeshProducer), nil
+	}
+	pg, err := NewEventMeshProducer(producerGroupConfig)
+	if err != nil {
+		return nil, err
+	}
+	m.EventMeshProducers.Store(producerGroupConfig.GroupName, pg)
+	return pg, nil
+}
+
+func (m *Manager) Start() error {
+	log.Infof("start producer manager")
+	return nil
+}
+
+func (m *Manager) Shutdown() error {
+	log.Infof("shutdown producer manager")
+
+	m.EventMeshProducers.Range(func(key, value any) bool {
+		pg := value.(*EventMeshProducer)
+		if err := pg.Shutdown(); err != nil {
+			log.Infof("shutdown eventmesh producer:%v, err:%v", key, err)
+		}
+		return true
+	})
+	return nil
+}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/producer/producer.go b/eventmesh-server-go/runtime/core/protocol/grpc/producer/producer.go
new file mode 100644
index 00000000..3b54bd40
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/producer/producer.go
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producer
+
+import (
+	"fmt"
+	config2 "github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/util"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/consts"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/config"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/wrapper"
+	"time"
+)
+
+type EventMeshProducer struct {
+	cfg          *config.ProducerGroupConfig
+	producer     *wrapper.Producer
+	ServiceState consts.ServiceState
+}
+
+func NewEventMeshProducer(cfg *config.ProducerGroupConfig) (*EventMeshProducer, error) {
+	pm, err := wrapper.NewProducer()
+	if err != nil {
+		return nil, err
+	}
+
+	cluster := config2.GlobalConfig().Server.GRPCOption.Cluster
+	idc := config2.GlobalConfig().Server.GRPCOption.IDC
+	mm := make(map[string]string)
+	mm["producerGroup"] = cfg.GroupName
+	mm["instanceName"] = util.BuildMeshClientID(cfg.GroupName, cluster)
+	mm["eventMeshIDC"] = idc
+	if err = pm.ProducerConnector.InitProducer(mm); err != nil {
+		return nil, err
+	}
+
+	p := &EventMeshProducer{
+		cfg:          cfg,
+		producer:     pm,
+		ServiceState: consts.INITED,
+	}
+	return p, nil
+}
+
+func (e *EventMeshProducer) Send(sctx SendMessageContext, callback *connector.SendCallback) error {
+	return e.producer.Send(sctx.Ctx, sctx.Event, callback)
+}
+
+func (e *EventMeshProducer) Request(sctx SendMessageContext, callback *connector.RequestReplyCallback, timeout time.Duration) error {
+	return e.producer.Request(sctx.Ctx, sctx.Event, callback, timeout)
+}
+
+func (e *EventMeshProducer) Reply(sctx SendMessageContext, callback *connector.SendCallback) error {
+	return e.producer.Reply(sctx.Ctx, sctx.Event, callback)
+}
+
+func (e *EventMeshProducer) Start() error {
+	if e.ServiceState == "" || e.ServiceState == consts.RUNNING {
+		return nil
+	}
+	if err := e.producer.Start(); err != nil {
+		return err
+	}
+	e.ServiceState = consts.RUNNING
+	log.Info("start eventmesh producer for groupName:%s", e.cfg.GroupName)
+	return nil
+}
+
+func (e *EventMeshProducer) Shutdown() error {
+	if e.ServiceState == "" || e.ServiceState == consts.INITED {
+		return nil
+	}
+	if err := e.producer.Shutdown(); err != nil {
+		return err
+	}
+	e.ServiceState = consts.STOPED
+	return nil
+}
+
+func (e *EventMeshProducer) Status() consts.ServiceState {
+	return e.ServiceState
+}
+
+func (e *EventMeshProducer) String() string {
+	return fmt.Sprintf("eventMeshProducer, status:%s,  groupName:%s", e.ServiceState, e.cfg.GroupName)
+}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/push/request.go b/eventmesh-server-go/runtime/core/protocol/grpc/push/request.go
new file mode 100644
index 00000000..24da6128
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/push/request.go
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package push
+
+import (
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/protocol"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/consts"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/retry"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+	cloudv2 "github.com/cloudevents/sdk-go/v2"
+	"github.com/liyue201/gostl/ds/set"
+	"github.com/liyue201/gostl/ds/vector"
+	"github.com/pkg/errors"
+	"go.uber.org/atomic"
+	"math/rand"
+	"sync"
+	"time"
+)
+
+var (
+	ErrNoProtocolFound = errors.New("no protocol type found in event message")
+)
+
+type Request struct {
+	*retry.Context
+
+	MessageContext *MessageContext
+	CreateTime     time.Time
+	LastPushTime   time.Time
+	Complete       *atomic.Bool
+	SimpleMessage  *pb.SimpleMessage
+	Try            func() error
+}
+
+func NewRequest(mctx *MessageContext) (*Request, error) {
+	sm, err := eventToSimpleMessage(mctx.Event)
+	if err != nil {
+		return nil, err
+	}
+	return &Request{
+		MessageContext: mctx,
+		SimpleMessage:  sm,
+	}, nil
+}
+
+func eventToSimpleMessage(ev *cloudv2.Event) (*pb.SimpleMessage, error) {
+	val, ok := ev.Extensions()[consts.PROTOCOL_TYPE]
+	if !ok {
+		return nil, ErrNoProtocolFound
+	}
+	ptype := val.(string)
+	pplugin := plugin.Get(plugin.Protocol, ptype)
+	adapter := pplugin.(protocol.Adapter)
+	msg, err := adapter.FromCloudEvent(ev)
+	if err != nil {
+		return nil, err
+	}
+	return msg.(*pb.SimpleMessage), nil
+}
+
+func (r *Request) timeout() bool {
+	return true
+}
+
+type StreamRequest struct {
+	*Request
+	mode     pb.Subscription_SubscriptionItem_SubscriptionMode
+	startIdx int
+}
+
+func NewStreamRequest(mctx *MessageContext) (*StreamRequest, error) {
+	r, err := NewRequest(mctx)
+	if err != nil {
+		return nil, err
+	}
+	sr := &StreamRequest{
+		Request: r,
+	}
+	sr.Try = func() error {
+		return nil
+	}
+
+	return sr, nil
+}
+
+type WebhookRequest struct {
+	*Request
+	// IDCWebhookURLs webhook urls seperated by IDC
+	// key is IDC, value is vector.Vector
+	IDCWebhookURLs *sync.Map
+
+	// AllURLs all webhook urls, ignore idc
+	AllURLs *set.Set
+
+	startIdx int
+
+	subscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode
+}
+
+func NewWebhookRequest(mctx *MessageContext) (*WebhookRequest, error) {
+	r, err := NewRequest(mctx)
+	if err != nil {
+		return nil, err
+	}
+	rand.Seed(time.Now().UnixMilli())
+	hr := &WebhookRequest{
+		IDCWebhookURLs:   mctx.TopicConfig.IDCWebhookURLs,
+		AllURLs:          mctx.TopicConfig.AllURLs,
+		Request:          r,
+		startIdx:         rand.Intn(mctx.TopicConfig.AllURLs.Size()),
+		subscriptionMode: mctx.SubscriptionMode,
+	}
+	hr.Try = func() error {
+		return nil
+	}
+	return hr, nil
+}
+
+func (w *WebhookRequest) getURLs() []string {
+	var (
+		urls       []string
+		currentIDC = config.GlobalConfig().Server.GRPCOption.IDC
+	)
+
+	w.IDCWebhookURLs.Range(func(key, value any) bool {
+		idc := key.(string)
+		vc := value.(*vector.Vector)
+		if idc == currentIDC {
+			for iter := vc.Begin(); iter.IsValid(); iter.Next() {
+				urls = append(urls, iter.Value().(string))
+			}
+		}
+		return true
+	})
+
+	if len(urls) == 0 {
+		for iter := w.AllURLs.Begin(); iter.IsValid(); iter.Next() {
+			urls = append(urls, iter.Value().(string))
+		}
+	}
+	if len(urls) == 0 {
+		log.Warnf("no handler for submitter")
+		return []string{}
+	}
+
+	switch w.subscriptionMode {
+	case pb.Subscription_SubscriptionItem_CLUSTERING:
+		return []string{urls[(w.RetryTimes+w.startIdx)%len(urls)]}
+	case pb.Subscription_SubscriptionItem_BROADCASTING:
+		return urls
+	default:
+		log.Warnf("invalid Subscription Mode, no message returning back to subscriber")
+		return []string{}
+	}
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org