You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/08 08:00:12 UTC
[incubator-eventmesh] branch master updated: add in-line scheduler
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 0e0c2cec add in-line scheduler
new 6323f18e Merge pull request #1484 from walterlife/modify-eventmesh-workflow-schedule
0e0c2cec is described below
commit 0e0c2cec6bf7d6fbe9ce57bcb7a6efe8ee1db9bc
Author: walterlife <wa...@gmail.com>
AuthorDate: Sat Oct 1 23:53:05 2022 +0800
add in-line scheduler
---
.../schedule/{scheduler.go => inline_scheduler.go} | 89 +++++++++++---------
.../internal/schedule/scheduler.go | 94 ++--------------------
eventmesh-workflow-go/server.go | 9 ++-
3 files changed, 62 insertions(+), 130 deletions(-)
diff --git a/eventmesh-workflow-go/internal/schedule/scheduler.go b/eventmesh-workflow-go/internal/schedule/inline_scheduler.go
similarity index 58%
copy from eventmesh-workflow-go/internal/schedule/scheduler.go
copy to eventmesh-workflow-go/internal/schedule/inline_scheduler.go
index df489abe..627ab578 100644
--- a/eventmesh-workflow-go/internal/schedule/scheduler.go
+++ b/eventmesh-workflow-go/internal/schedule/inline_scheduler.go
@@ -23,25 +23,31 @@ import (
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/task"
+ "github.com/avast/retry-go/v4"
+ "gorm.io/gorm"
"time"
)
+func init() {
+ schedulerBuilder["in-line"] = NewInlineScheduler
+}
+
const (
schedulerLockKey = "scheduler_lock"
schedulerLockTimeout = 2
)
-type Scheduler struct {
+type inlineScheduler struct {
workflowDAL dal.WorkflowDAL
}
-func NewScheduler() (*Scheduler, error) {
- var s Scheduler
+func NewInlineScheduler() (Scheduler, error) {
+ var s inlineScheduler
s.workflowDAL = dal.NewWorkflowDAL()
return &s, nil
}
-func (s *Scheduler) Run() {
+func (s *inlineScheduler) Run() {
go func() {
ticker := time.NewTicker(time.Millisecond * time.Duration(config.GlobalConfig().Flow.Schedule.Interval))
defer func() {
@@ -57,52 +63,40 @@ func (s *Scheduler) Run() {
}()
}
-func (s *Scheduler) handle() {
- var res *model.WorkflowTaskInstance
- if err := lock(func() error {
- var err error
- res, err = s.workflowDAL.SelectTaskInstance(context.Background(),
- model.WorkflowTaskInstance{Status: constants.TaskInstanceWaitStatus})
- if err != nil {
+func (s *inlineScheduler) handle() {
+ if err := dal.GetDalClient().Transaction(func(tx *gorm.DB) error {
+ var res *model.WorkflowTaskInstance
+ if err := s.lock(func() error {
+ var err error
+ res, err = s.fetchTask(tx)
+ if err != nil {
+ return err
+ }
+ return nil
+ }); err != nil {
return err
}
if res == nil {
return nil
}
- if err = s.workflowDAL.UpdateTaskInstance(context.Background(), &model.WorkflowTaskInstance{ID: res.ID,
- Status: constants.TaskInstanceProcessStatus}); err != nil {
- return err
+ t := task.New(res)
+ if t == nil {
+ return nil
}
- return nil
- }); err != nil {
- log.Get(constants.LogSchedule).Errorf("handle UpdateTaskInstance error=%v", err)
- return
- }
- if res == nil {
- return
- }
- t := task.Build(res)
- if t == nil {
- return
- }
- // TODO fail retry
- if err := t.Run(); err != nil {
- if err := s.workflowDAL.UpdateTaskInstance(context.Background(), &model.WorkflowTaskInstance{ID: res.ID,
- Status: constants.TaskInstanceFailStatus}); err != nil {
- log.Get(constants.LogSchedule).Errorf("handle UpdateTaskInstance error=%v", err)
- return
+ if err := retry.Do(func() error {
+ return t.Run()
+ }, retry.Attempts(constants.RetryAttempts)); err != nil {
+ return s.workflowDAL.UpdateTaskInstance(tx, &model.WorkflowTaskInstance{ID: res.ID,
+ Status: constants.TaskInstanceFailStatus})
}
- }
- // success
- if err := s.workflowDAL.UpdateTaskInstance(context.Background(), &model.WorkflowTaskInstance{ID: res.ID,
- Status: constants.TaskInstanceSuccessStatus}); err != nil {
- // TODO fail retry
+ return s.workflowDAL.UpdateTaskInstance(tx, &model.WorkflowTaskInstance{ID: res.ID,
+ Status: constants.TaskInstanceSuccessStatus})
+ }); err != nil {
log.Get(constants.LogSchedule).Errorf("handle UpdateTaskInstance error=%v", err)
- return
}
}
-func lock(h func() error) error {
+func (s *inlineScheduler) lock(h func() error) error {
l, err := dal.GetLockClient().ObtainTimeout(schedulerLockKey, schedulerLockTimeout)
if err != nil {
return err
@@ -110,3 +104,20 @@ func lock(h func() error) error {
defer l.Release()
return h()
}
+
+func (s *inlineScheduler) fetchTask(tx *gorm.DB) (*model.WorkflowTaskInstance, error) {
+ var err error
+ res, err := s.workflowDAL.SelectTaskInstance(context.Background(),
+ model.WorkflowTaskInstance{Status: constants.TaskInstanceWaitStatus})
+ if err != nil {
+ return nil, err
+ }
+ if res == nil {
+ return nil, nil
+ }
+ if err = s.workflowDAL.UpdateTaskInstance(tx, &model.WorkflowTaskInstance{ID: res.ID,
+ Status: constants.TaskInstanceProcessStatus}); err != nil {
+ return nil, err
+ }
+ return res, nil
+}
diff --git a/eventmesh-workflow-go/internal/schedule/scheduler.go b/eventmesh-workflow-go/internal/schedule/scheduler.go
index df489abe..3f24dfa3 100644
--- a/eventmesh-workflow-go/internal/schedule/scheduler.go
+++ b/eventmesh-workflow-go/internal/schedule/scheduler.go
@@ -16,97 +16,15 @@
package schedule
import (
- "context"
- "github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
- "github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
- "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
- "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
- "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
- "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/task"
- "time"
+ "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config"
)
-const (
- schedulerLockKey = "scheduler_lock"
- schedulerLockTimeout = 2
-)
-
-type Scheduler struct {
- workflowDAL dal.WorkflowDAL
-}
-
-func NewScheduler() (*Scheduler, error) {
- var s Scheduler
- s.workflowDAL = dal.NewWorkflowDAL()
- return &s, nil
-}
-
-func (s *Scheduler) Run() {
- go func() {
- ticker := time.NewTicker(time.Millisecond * time.Duration(config.GlobalConfig().Flow.Schedule.Interval))
- defer func() {
- if err := recover(); err != nil {
- log.Get(constants.LogSchedule).Errorf("schedule run error=%+v", err)
- }
- ticker.Stop()
- }()
-
- for range ticker.C {
- s.handle()
- }
- }()
-}
+var schedulerBuilder map[string]func() (Scheduler, error)
-func (s *Scheduler) handle() {
- var res *model.WorkflowTaskInstance
- if err := lock(func() error {
- var err error
- res, err = s.workflowDAL.SelectTaskInstance(context.Background(),
- model.WorkflowTaskInstance{Status: constants.TaskInstanceWaitStatus})
- if err != nil {
- return err
- }
- if res == nil {
- return nil
- }
- if err = s.workflowDAL.UpdateTaskInstance(context.Background(), &model.WorkflowTaskInstance{ID: res.ID,
- Status: constants.TaskInstanceProcessStatus}); err != nil {
- return err
- }
- return nil
- }); err != nil {
- log.Get(constants.LogSchedule).Errorf("handle UpdateTaskInstance error=%v", err)
- return
- }
- if res == nil {
- return
- }
- t := task.Build(res)
- if t == nil {
- return
- }
- // TODO fail retry
- if err := t.Run(); err != nil {
- if err := s.workflowDAL.UpdateTaskInstance(context.Background(), &model.WorkflowTaskInstance{ID: res.ID,
- Status: constants.TaskInstanceFailStatus}); err != nil {
- log.Get(constants.LogSchedule).Errorf("handle UpdateTaskInstance error=%v", err)
- return
- }
- }
- // success
- if err := s.workflowDAL.UpdateTaskInstance(context.Background(), &model.WorkflowTaskInstance{ID: res.ID,
- Status: constants.TaskInstanceSuccessStatus}); err != nil {
- // TODO fail retry
- log.Get(constants.LogSchedule).Errorf("handle UpdateTaskInstance error=%v", err)
- return
- }
+type Scheduler interface {
+ Run()
}
-func lock(h func() error) error {
- l, err := dal.GetLockClient().ObtainTimeout(schedulerLockKey, schedulerLockTimeout)
- if err != nil {
- return err
- }
- defer l.Release()
- return h()
+func NewScheduler() (Scheduler, error) {
+ return schedulerBuilder[config.Get().Flow.Scheduler]()
}
diff --git a/eventmesh-workflow-go/server.go b/eventmesh-workflow-go/server.go
index 8bedaf9c..fcb7806b 100644
--- a/eventmesh-workflow-go/server.go
+++ b/eventmesh-workflow-go/server.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/api"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/api/proto"
+ pconfig "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
@@ -38,7 +39,7 @@ import (
type Server struct {
Server *grpc.Server
- schedule *schedule.Scheduler
+ schedule schedule.Scheduler
queue queue.ObserveQueue
}
@@ -91,8 +92,10 @@ func (s *Server) SetupConfig() error {
return err
}
config.SetGlobalConfig(cfg)
-
- return config.Setup(cfg)
+ if err := config.Setup(cfg); err != nil {
+ return err
+ }
+ return pconfig.Setup(config.ServerConfigPath)
}
func (s *Server) listen() (net.Listener, error) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org