You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/29 13:27:19 UTC

[plc4x] 01/02: refactor(spi): optimize RequestTransactionManager

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 49809c54def1c0542fe643c48b4f3c7b2af38249
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Aug 29 15:26:11 2022 +0200

    refactor(spi): optimize RequestTransactionManager
    
    + fixed shared instance being set by every NewFixedSizeExecutor call
    + AwaitCompletion returns err
    + fixed typos
    + added AwaitCompletion to the RequestTransaction to wait for a transaction to finish
---
 plc4go/internal/bacnetip/Driver.go      |  2 +-
 plc4go/internal/cbus/Driver.go          |  2 +-
 plc4go/internal/eip/Driver.go           |  2 +-
 plc4go/internal/s7/Driver.go            |  2 +-
 plc4go/spi/RequestTransactionManager.go | 94 +++++++++++++++++++++------------
 5 files changed, 63 insertions(+), 39 deletions(-)

diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index 33c9d5e24..72176c15b 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -47,7 +47,7 @@ type Driver struct {
 func NewDriver() plc4go.PlcDriver {
 	return &Driver{
 		DefaultDriver:           _default.NewDefaultDriver("bacnet-ip", "BACnet/IP", "udp", NewFieldHandler()),
-		tm:                      spi.NewRequestTransactionManager(math.MaxInt),
+		tm:                      *spi.NewRequestTransactionManager(math.MaxInt),
 		awaitSetupComplete:      true,
 		awaitDisconnectComplete: true,
 	}
diff --git a/plc4go/internal/cbus/Driver.go b/plc4go/internal/cbus/Driver.go
index 97cb28f77..3b6ff120d 100644
--- a/plc4go/internal/cbus/Driver.go
+++ b/plc4go/internal/cbus/Driver.go
@@ -44,7 +44,7 @@ type Driver struct {
 func NewDriver() plc4go.PlcDriver {
 	return &Driver{
 		DefaultDriver:           _default.NewDefaultDriver("c-bus", "Clipsal Bus", "tcp", NewFieldHandler()),
-		tm:                      spi.NewRequestTransactionManager(1),
+		tm:                      *spi.NewRequestTransactionManager(1),
 		awaitSetupComplete:      true,
 		awaitDisconnectComplete: true,
 	}
diff --git a/plc4go/internal/eip/Driver.go b/plc4go/internal/eip/Driver.go
index 232e8c5f1..81f6a7444 100644
--- a/plc4go/internal/eip/Driver.go
+++ b/plc4go/internal/eip/Driver.go
@@ -39,7 +39,7 @@ type Driver struct {
 func NewDriver() plc4go.PlcDriver {
 	return &Driver{
 		DefaultDriver:           _default.NewDefaultDriver("eip", "EthernetIP", "tcp", NewFieldHandler()),
-		tm:                      spi.NewRequestTransactionManager(1),
+		tm:                      *spi.NewRequestTransactionManager(1),
 		awaitSetupComplete:      true,
 		awaitDisconnectComplete: true,
 	}
diff --git a/plc4go/internal/s7/Driver.go b/plc4go/internal/s7/Driver.go
index 9deb705ba..fbde786ec 100644
--- a/plc4go/internal/s7/Driver.go
+++ b/plc4go/internal/s7/Driver.go
@@ -39,7 +39,7 @@ type Driver struct {
 func NewDriver() plc4go.PlcDriver {
 	return &Driver{
 		DefaultDriver:           _default.NewDefaultDriver("s7", "Siemens S7 (Basic)", "tcp", NewFieldHandler()),
-		tm:                      spi.NewRequestTransactionManager(1),
+		tm:                      *spi.NewRequestTransactionManager(1),
 		awaitSetupComplete:      true,
 		awaitDisconnectComplete: true,
 	}
diff --git a/plc4go/spi/RequestTransactionManager.go b/plc4go/spi/RequestTransactionManager.go
index 2c3ceca6e..adf912803 100644
--- a/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/spi/RequestTransactionManager.go
@@ -26,16 +26,16 @@ import (
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
 	"github.com/rs/zerolog/log"
+	"runtime"
 	"sync"
 	"time"
 )
 
-/** Executor that performs all operations */
-var executor Executor // shared instance
+var sharedExecutorInstance Executor // shared instance
 
 func init() {
-	executor = *NewFixedSizeExecutor(4)
-	executor.start()
+	sharedExecutorInstance = *NewFixedSizeExecutor(runtime.NumCPU())
+	sharedExecutorInstance.start()
 }
 
 type Runnable func()
@@ -113,7 +113,7 @@ func NewFixedSizeExecutor(numberOfWorkers int) *Executor {
 			executor:    nil,
 		}
 	}
-	executor = Executor{
+	executor := Executor{
 		queue:  make(chan WorkItem, 100),
 		worker: workers,
 	}
@@ -186,10 +186,11 @@ func (f CompletionFuture) complete() {
 	f.completed = true
 }
 
-func (f CompletionFuture) AwaitCompletion() {
+func (f CompletionFuture) AwaitCompletion() error {
 	for !f.completed || !f.errored {
 		time.Sleep(time.Millisecond * 10)
 	}
+	return f.err
 }
 
 type RequestTransaction struct {
@@ -207,30 +208,44 @@ func (t *RequestTransaction) String() string {
 	return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
 }
 
+// RequestTransactionManager handles transactions
 type RequestTransactionManager struct {
 	runningRequests []*RequestTransaction
 	// How many Transactions are allowed to run at the same time?
 	numberOfConcurrentRequests int
 	// Assigns each request a Unique Transaction Id, especially important for failure handling
-	transactionId   int32
-	transationMutex sync.RWMutex
+	transactionId    int32
+	transactionMutex sync.RWMutex
 	// Important, this is a FIFO Queue for Fairness!
-	worklog      list.List
-	worklogMutex sync.RWMutex
+	workLog      list.List
+	workLogMutex sync.RWMutex
+	executor     *Executor
 }
 
-func NewRequestTransactionManager(numberOfConcurrentRequests int) RequestTransactionManager {
-	return RequestTransactionManager{
+// NewRequestTransactionManager creates a new RequestTransactionManager
+func NewRequestTransactionManager(numberOfConcurrentRequests int, requestTransactionManagerOptions ...RequestTransactionManagerOption) *RequestTransactionManager {
+	requestTransactionManager := &RequestTransactionManager{
 		numberOfConcurrentRequests: numberOfConcurrentRequests,
 		transactionId:              0,
-		worklog:                    *list.New(),
+		workLog:                    *list.New(),
+		executor:                   &sharedExecutorInstance,
+	}
+	for _, requestTransactionManagerOption := range requestTransactionManagerOptions {
+		requestTransactionManagerOption(requestTransactionManager)
 	}
+	return requestTransactionManager
 }
 
-func (r *RequestTransactionManager) getNumberOfConcurrentRequests() int {
-	return r.numberOfConcurrentRequests
+type RequestTransactionManagerOption func(requestTransactionManager *RequestTransactionManager)
+
+// WithCustomExecutor sets a custom Executor for the RequestTransactionManager
+func WithCustomExecutor(executor *Executor) RequestTransactionManagerOption {
+	return func(requestTransactionManager *RequestTransactionManager) {
+		requestTransactionManager.executor = executor
+	}
 }
 
+// SetNumberOfConcurrentRequests sets the number of concurrent requests that will be sent out to a device
 func (r *RequestTransactionManager) SetNumberOfConcurrentRequests(numberOfConcurrentRequests int) {
 	log.Info().Msgf("Setting new number of concurrent requests %d", numberOfConcurrentRequests)
 	// If we reduced the number of concurrent requests and more requests are in-flight
@@ -251,34 +266,36 @@ func (r *RequestTransactionManager) submitHandle(handle *RequestTransaction) {
 	}
 	// Add this Request with this handle i the Worklog
 	// Put Transaction into Worklog
-	r.worklogMutex.Lock()
-	r.worklog.PushFront(handle)
-	r.worklogMutex.Unlock()
+	r.workLogMutex.Lock()
+	r.workLog.PushFront(handle)
+	r.workLogMutex.Unlock()
 	// Try to Process the Worklog
 	r.processWorklog()
 }
 
 func (r *RequestTransactionManager) processWorklog() {
-	r.worklogMutex.RLock()
-	defer r.worklogMutex.RUnlock()
-	log.Debug().Msgf("Processing work log with size of %d (%d concurrent requests allowed)", r.worklog.Len(), r.numberOfConcurrentRequests)
-	for len(r.runningRequests) < r.numberOfConcurrentRequests && r.worklog.Len() > 0 {
-		front := r.worklog.Front()
+	r.workLogMutex.RLock()
+	defer r.workLogMutex.RUnlock()
+	log.Debug().Msgf("Processing work log with size of %d (%d concurrent requests allowed)", r.workLog.Len(), r.numberOfConcurrentRequests)
+	for len(r.runningRequests) < r.numberOfConcurrentRequests && r.workLog.Len() > 0 {
+		front := r.workLog.Front()
 		if front == nil {
 			return
 		}
 		next := front.Value.(*RequestTransaction)
 		log.Debug().Msgf("Handling next %v. (Adding to running requests (length: %d))", next, len(r.runningRequests))
 		r.runningRequests = append(r.runningRequests, next)
-		completionFuture := executor.submit(next.transactionId, next.operation)
+		// TODO: use sharedInstance if none is present
+		completionFuture := sharedExecutorInstance.submit(next.transactionId, next.operation)
 		next.completionFuture = completionFuture
-		r.worklog.Remove(front)
+		r.workLog.Remove(front)
 	}
 }
 
+// StartTransaction starts a RequestTransaction
 func (r *RequestTransactionManager) StartTransaction() *RequestTransaction {
-	r.transationMutex.Lock()
-	defer r.transationMutex.Unlock()
+	r.transactionMutex.Lock()
+	defer r.transactionMutex.Unlock()
 	currentTransactionId := r.transactionId
 	r.transactionId += 1
 	transactionLogger := log.With().Int32("transactionId", currentTransactionId).Logger()
@@ -322,36 +339,43 @@ func (r *RequestTransactionManager) endRequest(transaction *RequestTransaction)
 	}
 	transaction.transactionLog.Debug().Msg("Removing the existing transaction transaction")
 	r.runningRequests = append(r.runningRequests[:index], r.runningRequests[index+1:]...)
-	// Process the worklog, a slot should be free now
-	transaction.transactionLog.Debug().Msg("Processing the worklog")
+	// Process the workLog, a slot should be free now
+	transaction.transactionLog.Debug().Msg("Processing the workLog")
 	r.processWorklog()
 	return nil
 }
 
+// FailRequest signals that this transaction has failed
 func (t *RequestTransaction) FailRequest(err error) error {
 	t.transactionLog.Trace().Msg("Fail the request")
 	return t.parent.failRequest(t, err)
 }
 
+// EndRequest signals that this transaction is done
 func (t *RequestTransaction) EndRequest() error {
 	t.transactionLog.Trace().Msg("Ending the request")
 	// Remove it from Running Requests
 	return t.parent.endRequest(t)
 }
 
+// Submit submits a Runnable to the RequestTransactionManager
 func (t *RequestTransaction) Submit(operation Runnable) {
 	if t.operation != nil {
 		panic("Operation already set")
 	}
 	t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
-	t.operation = t.NewTransactionOperation(operation)
+	t.operation = func() {
+		t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
+		operation()
+		t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
+	}
 	t.parent.submitHandle(t)
 }
 
-func (t *RequestTransaction) NewTransactionOperation(delegate Runnable) Runnable {
-	return func() {
-		t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
-		delegate()
-		t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
+// AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
+func (t *RequestTransaction) AwaitCompletion() error {
+	for t.completionFuture == nil {
+		time.Sleep(time.Millisecond * 10)
 	}
+	return t.completionFuture.AwaitCompletion()
 }