You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/08 08:00:12 UTC

[incubator-eventmesh] branch master updated: add in-line scheduler

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e0c2cec add in-line scheduler
     new 6323f18e Merge pull request #1484 from walterlife/modify-eventmesh-workflow-schedule
0e0c2cec is described below

commit 0e0c2cec6bf7d6fbe9ce57bcb7a6efe8ee1db9bc
Author: walterlife <wa...@gmail.com>
AuthorDate: Sat Oct 1 23:53:05 2022 +0800

    add in-line scheduler
---
 .../schedule/{scheduler.go => inline_scheduler.go} | 89 +++++++++++---------
 .../internal/schedule/scheduler.go                 | 94 ++--------------------
 eventmesh-workflow-go/server.go                    |  9 ++-
 3 files changed, 62 insertions(+), 130 deletions(-)

diff --git a/eventmesh-workflow-go/internal/schedule/scheduler.go b/eventmesh-workflow-go/internal/schedule/inline_scheduler.go
similarity index 58%
copy from eventmesh-workflow-go/internal/schedule/scheduler.go
copy to eventmesh-workflow-go/internal/schedule/inline_scheduler.go
index df489abe..627ab578 100644
--- a/eventmesh-workflow-go/internal/schedule/scheduler.go
+++ b/eventmesh-workflow-go/internal/schedule/inline_scheduler.go
@@ -23,25 +23,31 @@ import (
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/task"
+	"github.com/avast/retry-go/v4"
+	"gorm.io/gorm"
 	"time"
 )
 
+func init() {
+	schedulerBuilder["in-line"] = NewInlineScheduler
+}
+
 const (
 	schedulerLockKey     = "scheduler_lock"
 	schedulerLockTimeout = 2
 )
 
-type Scheduler struct {
+type inlineScheduler struct {
 	workflowDAL dal.WorkflowDAL
 }
 
-func NewScheduler() (*Scheduler, error) {
-	var s Scheduler
+func NewInlineScheduler() (Scheduler, error) {
+	var s inlineScheduler
 	s.workflowDAL = dal.NewWorkflowDAL()
 	return &s, nil
 }
 
-func (s *Scheduler) Run() {
+func (s *inlineScheduler) Run() {
 	go func() {
 		ticker := time.NewTicker(time.Millisecond * time.Duration(config.GlobalConfig().Flow.Schedule.Interval))
 		defer func() {
@@ -57,52 +63,40 @@ func (s *Scheduler) Run() {
 	}()
 }
 
-func (s *Scheduler) handle() {
-	var res *model.WorkflowTaskInstance
-	if err := lock(func() error {
-		var err error
-		res, err = s.workflowDAL.SelectTaskInstance(context.Background(),
-			model.WorkflowTaskInstance{Status: constants.TaskInstanceWaitStatus})
-		if err != nil {
+func (s *inlineScheduler) handle() {
+	if err := dal.GetDalClient().Transaction(func(tx *gorm.DB) error {
+		var res *model.WorkflowTaskInstance
+		if err := s.lock(func() error {
+			var err error
+			res, err = s.fetchTask(tx)
+			if err != nil {
+				return err
+			}
+			return nil
+		}); err != nil {
 			return err
 		}
 		if res == nil {
 			return nil
 		}
-		if err = s.workflowDAL.UpdateTaskInstance(context.Background(), &model.WorkflowTaskInstance{ID: res.ID,
-			Status: constants.TaskInstanceProcessStatus}); err != nil {
-			return err
+		t := task.New(res)
+		if t == nil {
+			return nil
 		}
-		return nil
-	}); err != nil {
-		log.Get(constants.LogSchedule).Errorf("handle UpdateTaskInstance error=%v", err)
-		return
-	}
-	if res == nil {
-		return
-	}
-	t := task.Build(res)
-	if t == nil {
-		return
-	}
-	// TODO fail retry
-	if err := t.Run(); err != nil {
-		if err := s.workflowDAL.UpdateTaskInstance(context.Background(), &model.WorkflowTaskInstance{ID: res.ID,
-			Status: constants.TaskInstanceFailStatus}); err != nil {
-			log.Get(constants.LogSchedule).Errorf("handle UpdateTaskInstance error=%v", err)
-			return
+		if err := retry.Do(func() error {
+			return t.Run()
+		}, retry.Attempts(constants.RetryAttempts)); err != nil {
+			return s.workflowDAL.UpdateTaskInstance(tx, &model.WorkflowTaskInstance{ID: res.ID,
+				Status: constants.TaskInstanceFailStatus})
 		}
-	}
-	// success
-	if err := s.workflowDAL.UpdateTaskInstance(context.Background(), &model.WorkflowTaskInstance{ID: res.ID,
-		Status: constants.TaskInstanceSuccessStatus}); err != nil {
-		// TODO fail retry
+		return s.workflowDAL.UpdateTaskInstance(tx, &model.WorkflowTaskInstance{ID: res.ID,
+			Status: constants.TaskInstanceSuccessStatus})
+	}); err != nil {
 		log.Get(constants.LogSchedule).Errorf("handle UpdateTaskInstance error=%v", err)
-		return
 	}
 }
 
-func lock(h func() error) error {
+func (s *inlineScheduler) lock(h func() error) error {
 	l, err := dal.GetLockClient().ObtainTimeout(schedulerLockKey, schedulerLockTimeout)
 	if err != nil {
 		return err
@@ -110,3 +104,20 @@ func lock(h func() error) error {
 	defer l.Release()
 	return h()
 }
+
+func (s *inlineScheduler) fetchTask(tx *gorm.DB) (*model.WorkflowTaskInstance, error) {
+	var err error
+	res, err := s.workflowDAL.SelectTaskInstance(context.Background(),
+		model.WorkflowTaskInstance{Status: constants.TaskInstanceWaitStatus})
+	if err != nil {
+		return nil, err
+	}
+	if res == nil {
+		return nil, nil
+	}
+	if err = s.workflowDAL.UpdateTaskInstance(tx, &model.WorkflowTaskInstance{ID: res.ID,
+		Status: constants.TaskInstanceProcessStatus}); err != nil {
+		return nil, err
+	}
+	return res, nil
+}
diff --git a/eventmesh-workflow-go/internal/schedule/scheduler.go b/eventmesh-workflow-go/internal/schedule/scheduler.go
index df489abe..3f24dfa3 100644
--- a/eventmesh-workflow-go/internal/schedule/scheduler.go
+++ b/eventmesh-workflow-go/internal/schedule/scheduler.go
@@ -16,97 +16,15 @@
 package schedule
 
 import (
-	"context"
-	"github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
-	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
-	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
-	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
-	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
-	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/task"
-	"time"
+	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config"
 )
 
-const (
-	schedulerLockKey     = "scheduler_lock"
-	schedulerLockTimeout = 2
-)
-
-type Scheduler struct {
-	workflowDAL dal.WorkflowDAL
-}
-
-func NewScheduler() (*Scheduler, error) {
-	var s Scheduler
-	s.workflowDAL = dal.NewWorkflowDAL()
-	return &s, nil
-}
-
-func (s *Scheduler) Run() {
-	go func() {
-		ticker := time.NewTicker(time.Millisecond * time.Duration(config.GlobalConfig().Flow.Schedule.Interval))
-		defer func() {
-			if err := recover(); err != nil {
-				log.Get(constants.LogSchedule).Errorf("schedule run error=%+v", err)
-			}
-			ticker.Stop()
-		}()
-
-		for range ticker.C {
-			s.handle()
-		}
-	}()
-}
+var schedulerBuilder map[string]func() (Scheduler, error)
 
-func (s *Scheduler) handle() {
-	var res *model.WorkflowTaskInstance
-	if err := lock(func() error {
-		var err error
-		res, err = s.workflowDAL.SelectTaskInstance(context.Background(),
-			model.WorkflowTaskInstance{Status: constants.TaskInstanceWaitStatus})
-		if err != nil {
-			return err
-		}
-		if res == nil {
-			return nil
-		}
-		if err = s.workflowDAL.UpdateTaskInstance(context.Background(), &model.WorkflowTaskInstance{ID: res.ID,
-			Status: constants.TaskInstanceProcessStatus}); err != nil {
-			return err
-		}
-		return nil
-	}); err != nil {
-		log.Get(constants.LogSchedule).Errorf("handle UpdateTaskInstance error=%v", err)
-		return
-	}
-	if res == nil {
-		return
-	}
-	t := task.Build(res)
-	if t == nil {
-		return
-	}
-	// TODO fail retry
-	if err := t.Run(); err != nil {
-		if err := s.workflowDAL.UpdateTaskInstance(context.Background(), &model.WorkflowTaskInstance{ID: res.ID,
-			Status: constants.TaskInstanceFailStatus}); err != nil {
-			log.Get(constants.LogSchedule).Errorf("handle UpdateTaskInstance error=%v", err)
-			return
-		}
-	}
-	// success
-	if err := s.workflowDAL.UpdateTaskInstance(context.Background(), &model.WorkflowTaskInstance{ID: res.ID,
-		Status: constants.TaskInstanceSuccessStatus}); err != nil {
-		// TODO fail retry
-		log.Get(constants.LogSchedule).Errorf("handle UpdateTaskInstance error=%v", err)
-		return
-	}
+type Scheduler interface {
+	Run()
 }
 
-func lock(h func() error) error {
-	l, err := dal.GetLockClient().ObtainTimeout(schedulerLockKey, schedulerLockTimeout)
-	if err != nil {
-		return err
-	}
-	defer l.Release()
-	return h()
+func NewScheduler() (Scheduler, error) {
+	return schedulerBuilder[config.Get().Flow.Scheduler]()
 }
diff --git a/eventmesh-workflow-go/server.go b/eventmesh-workflow-go/server.go
index 8bedaf9c..fcb7806b 100644
--- a/eventmesh-workflow-go/server.go
+++ b/eventmesh-workflow-go/server.go
@@ -25,6 +25,7 @@ import (
 	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/api"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/api/proto"
+	pconfig "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
@@ -38,7 +39,7 @@ import (
 
 type Server struct {
 	Server   *grpc.Server
-	schedule *schedule.Scheduler
+	schedule schedule.Scheduler
 	queue    queue.ObserveQueue
 }
 
@@ -91,8 +92,10 @@ func (s *Server) SetupConfig() error {
 		return err
 	}
 	config.SetGlobalConfig(cfg)
-
-	return config.Setup(cfg)
+	if err := config.Setup(cfg); err != nil {
+		return err
+	}
+	return pconfig.Setup(config.ServerConfigPath)
 }
 
 func (s *Server) listen() (net.Listener, error) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org