You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/01/16 16:38:24 UTC

[plc4x] branch develop updated: feat(plc4go/bacnet): fix some open issues regarding task processing

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new eb59f7c111 feat(plc4go/bacnet): fix some open issues regarding task processing
eb59f7c111 is described below

commit eb59f7c1113c2e068a547910b6cc93d604bac2fc
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jan 16 17:36:43 2023 +0100

    feat(plc4go/bacnet): fix some open issues regarding task processing
---
 plc4go/internal/bacnetip/Core.go | 63 ++++++++++++++++++++++++++++++++++++++--
 plc4go/internal/bacnetip/Task.go | 14 ---------
 2 files changed, 61 insertions(+), 16 deletions(-)

diff --git a/plc4go/internal/bacnetip/Core.go b/plc4go/internal/bacnetip/Core.go
index 145617be80..9692828a10 100644
--- a/plc4go/internal/bacnetip/Core.go
+++ b/plc4go/internal/bacnetip/Core.go
@@ -19,9 +19,68 @@
 
 package bacnetip
 
-import "github.com/rs/zerolog/log"
+import (
+	"github.com/rs/zerolog/log"
+	"math"
+	"time"
+)
+
+var running bool
+var spin = 10 * time.Millisecond
+var sleepTime = 0 * time.Nanosecond
+var deferredFunctions []func() error
+
+func init() {
+	running = true
+	go func() {
+		for running {
+			// get the next task
+			task, delta := _taskManager.getNextTask()
+			if task != nil {
+				_taskManager.processTask(task)
+			}
+
+			// if delta is None, there are no tasks, default to spinning
+			if delta == 0 {
+				delta = spin
+			}
+
+			// there may be threads around, sleep for a bit
+			if sleepTime > 0 && delta > sleepTime {
+				time.Sleep(sleepTime)
+				delta -= sleepTime
+			}
+
+			// delta should be no more than the spin value
+			delta = time.Duration(math.Min(float64(delta), float64(spin)))
+
+			// if there are deferred functions, use a small delta
+			if len(deferredFunctions) > 0 {
+				delta = time.Duration(math.Min(float64(delta), float64(1*time.Millisecond)))
+			}
+
+			// wait for socket
+			time.Sleep(delta)
+
+			// check for deferred functions
+			fnlist := deferredFunctions
+			// empty list
+			deferredFunctions = nil
+			for _, fn := range fnlist {
+				if err := fn(); err != nil {
+					log.Debug().Err(err).Msg("error executing deferred function")
+				}
+			}
+		}
+	}()
+}
 
 func Deferred(fn func() error) {
 	log.Debug().Msg("Deferred")
-	// TODO: implement me
+
+	// append it to the list
+	deferredFunctions = append(deferredFunctions, fn)
+
+	// trigger the task manager event
+	// TODO: there is no trigger
 }
diff --git a/plc4go/internal/bacnetip/Task.go b/plc4go/internal/bacnetip/Task.go
index 5e0a84ec11..e69e779d23 100644
--- a/plc4go/internal/bacnetip/Task.go
+++ b/plc4go/internal/bacnetip/Task.go
@@ -223,20 +223,6 @@ func RecurringFunctionTask(interval *time.Duration, fn func() error) *RecurringT
 
 var _taskManager = TaskManager{}
 
-func init() {
-	go func() {
-		for {
-			task, delta := _taskManager.getNextTask()
-			if task == nil {
-				time.Sleep(10 * time.Millisecond)
-				continue
-			}
-			_taskManager.processTask(task)
-			time.Sleep(delta)
-		}
-	}()
-}
-
 type TaskManager struct {
 	sync.Mutex
 	tasks []_TaskRequirements