You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2022/09/19 04:06:20 UTC

[incubator-devlake] branch main updated: fix: fix the running task lost lock and visit map (#3105)

This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a2838d8 fix: fix the running task lost lock and visit map (#3105)
8a2838d8 is described below

commit 8a2838d84376ff709ae4a06e63cc4c5678ebec8b
Author: mappjzc <zh...@merico.dev>
AuthorDate: Mon Sep 19 12:06:15 2022 +0800

    fix: fix the running task lost lock and visit map (#3105)
    
    Add mu.Lock() and defer mu.Unlock() before visit.
    
    Nddtfjiang <zh...@merico.dev>
---
 services/pipeline.go | 11 +++++++----
 services/task.go     |  6 +++++-
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/services/pipeline.go b/services/pipeline.go
index 42516888..a4bced5b 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -20,6 +20,11 @@ package services
 import (
 	"context"
 	"fmt"
+	"os"
+	"path/filepath"
+	"strings"
+	"time"
+
 	"github.com/apache/incubator-devlake/errors"
 	"github.com/apache/incubator-devlake/logger"
 	"github.com/apache/incubator-devlake/models"
@@ -29,10 +34,6 @@ import (
 	"go.temporal.io/sdk/client"
 	"go.temporal.io/sdk/converter"
 	"golang.org/x/sync/semaphore"
-	"os"
-	"path/filepath"
-	"strings"
-	"time"
 )
 
 var notificationService *NotificationService
@@ -198,6 +199,8 @@ func watchTemporalPipelines() {
 			if err != nil {
 				panic(err)
 			}
+			// progressDetails will be only used in this goroutine now
+			// So it needn't lock and unlock now
 			progressDetails := make(map[uint64]*models.TaskProgressDetail)
 			// check their status against temporal
 			for _, rp := range runningDbPipelines {
diff --git a/services/task.go b/services/task.go
index cec147a3..232db101 100644
--- a/services/task.go
+++ b/services/task.go
@@ -22,11 +22,12 @@ import (
 	"encoding/json"
 	goerror "errors"
 	"fmt"
-	"github.com/apache/incubator-devlake/errors"
 	"regexp"
 	"strconv"
 	"sync"
 
+	"github.com/apache/incubator-devlake/errors"
+
 	"github.com/apache/incubator-devlake/logger"
 	"github.com/apache/incubator-devlake/models"
 	"github.com/apache/incubator-devlake/plugins/core"
@@ -279,6 +280,9 @@ func runTaskStandalone(parentLog core.Logger, taskId uint64) errors.Error {
 }
 
 func updateTaskProgress(taskId uint64, progress chan core.RunningProgress) {
+	runningTasks.mu.Lock()
+	defer runningTasks.mu.Unlock()
+
 	data := runningTasks.tasks[taskId]
 	if data == nil {
 		return