You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2022/10/24 10:19:38 UTC

[incubator-eventmesh] branch eventmesh-server-go updated: add hearbeat service

This is an automated email from the ASF dual-hosted git repository.

walleliu pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/eventmesh-server-go by this push:
     new 9c49dc73 add hearbeat service
     new 9f7b7315 Merge pull request #1815 from walleliu1016/eventmesh-server-go
9c49dc73 is described below

commit 9c49dc73d5ebe5040f82342bbc7818e142f7997b
Author: walleliu <li...@163.com>
AuthorDate: Mon Oct 24 18:15:30 2022 +0800

    add hearbeat service
---
 .../core/protocol/grpc/heartbeat_service.go        | 67 ++++++++++++++++++++++
 1 file changed, 67 insertions(+)

diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/heartbeat_service.go b/eventmesh-server-go/runtime/core/protocol/grpc/heartbeat_service.go
new file mode 100644
index 00000000..64d540b3
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/heartbeat_service.go
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpc
+
+import (
+	"context"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+	"github.com/panjf2000/ants/v2"
+	"time"
+)
+
+type HeartbeatService struct {
+	pb.UnimplementedHeartbeatServiceServer
+	gctx *GRPCContext
+	pool *ants.Pool
+}
+
+func NewHeartbeatServiceServer(gctx *GRPCContext) (*HeartbeatService, error) {
+	sp := config.GlobalConfig().Server.GRPCOption.SubscribePoolSize
+	pl, err := ants.NewPool(sp)
+	if err != nil {
+		return nil, err
+	}
+	return &HeartbeatService{
+		gctx: gctx,
+		pool: pl,
+	}, nil
+}
+
+func (h *HeartbeatService) Heartbeat(ctx context.Context, hb *pb.Heartbeat) (*pb.Response, error) {
+	tmCtx, cancel := context.WithTimeout(ctx, time.Second*5)
+	defer cancel()
+	var (
+		resp    *pb.Response
+		errChan = make(chan error)
+		err     error
+	)
+	h.pool.Submit(func() {
+		resp, err = ProcessHeartbeat(h.gctx, hb)
+		errChan <- err
+	})
+	select {
+	case <-tmCtx.Done():
+		log.Warnf("timeout in subscribe")
+	case <-errChan:
+		break
+	}
+	if err != nil {
+		log.Warnf("failed to handle hearbeat, err:%v", err)
+	}
+	return resp, err
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org