You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2022/10/24 10:19:38 UTC
[incubator-eventmesh] branch eventmesh-server-go updated: add hearbeat service
This is an automated email from the ASF dual-hosted git repository.
walleliu pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/eventmesh-server-go by this push:
new 9c49dc73 add hearbeat service
new 9f7b7315 Merge pull request #1815 from walleliu1016/eventmesh-server-go
9c49dc73 is described below
commit 9c49dc73d5ebe5040f82342bbc7818e142f7997b
Author: walleliu <li...@163.com>
AuthorDate: Mon Oct 24 18:15:30 2022 +0800
add hearbeat service
---
.../core/protocol/grpc/heartbeat_service.go | 67 ++++++++++++++++++++++
1 file changed, 67 insertions(+)
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/heartbeat_service.go b/eventmesh-server-go/runtime/core/protocol/grpc/heartbeat_service.go
new file mode 100644
index 00000000..64d540b3
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/heartbeat_service.go
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpc
+
+import (
+ "context"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+ "github.com/panjf2000/ants/v2"
+ "time"
+)
+
+type HeartbeatService struct {
+ pb.UnimplementedHeartbeatServiceServer
+ gctx *GRPCContext
+ pool *ants.Pool
+}
+
+func NewHeartbeatServiceServer(gctx *GRPCContext) (*HeartbeatService, error) {
+ sp := config.GlobalConfig().Server.GRPCOption.SubscribePoolSize
+ pl, err := ants.NewPool(sp)
+ if err != nil {
+ return nil, err
+ }
+ return &HeartbeatService{
+ gctx: gctx,
+ pool: pl,
+ }, nil
+}
+
+func (h *HeartbeatService) Heartbeat(ctx context.Context, hb *pb.Heartbeat) (*pb.Response, error) {
+ tmCtx, cancel := context.WithTimeout(ctx, time.Second*5)
+ defer cancel()
+ var (
+ resp *pb.Response
+ errChan = make(chan error)
+ err error
+ )
+ h.pool.Submit(func() {
+ resp, err = ProcessHeartbeat(h.gctx, hb)
+ errChan <- err
+ })
+ select {
+ case <-tmCtx.Done():
+ log.Warnf("timeout in subscribe")
+ case <-errChan:
+ break
+ }
+ if err != nil {
+ log.Warnf("failed to handle hearbeat, err:%v", err)
+ }
+ return resp, err
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org