You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/29 13:27:19 UTC
[plc4x] 01/02: refactor(spi): optimize RequestTransactionManager
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 49809c54def1c0542fe643c48b4f3c7b2af38249
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Aug 29 15:26:11 2022 +0200
refactor(spi): optimize RequestTransactionManager
+ fixed shared instance being set by every NewFixedSizeExecutor call
+ AwaitCompletion returns err
+ fixed typos
+ added AwaitCompletion to the RequestTransaction to wait for a transaction to finish
---
plc4go/internal/bacnetip/Driver.go | 2 +-
plc4go/internal/cbus/Driver.go | 2 +-
plc4go/internal/eip/Driver.go | 2 +-
plc4go/internal/s7/Driver.go | 2 +-
plc4go/spi/RequestTransactionManager.go | 94 +++++++++++++++++++++------------
5 files changed, 63 insertions(+), 39 deletions(-)
diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index 33c9d5e24..72176c15b 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -47,7 +47,7 @@ type Driver struct {
func NewDriver() plc4go.PlcDriver {
return &Driver{
DefaultDriver: _default.NewDefaultDriver("bacnet-ip", "BACnet/IP", "udp", NewFieldHandler()),
- tm: spi.NewRequestTransactionManager(math.MaxInt),
+ tm: *spi.NewRequestTransactionManager(math.MaxInt),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
diff --git a/plc4go/internal/cbus/Driver.go b/plc4go/internal/cbus/Driver.go
index 97cb28f77..3b6ff120d 100644
--- a/plc4go/internal/cbus/Driver.go
+++ b/plc4go/internal/cbus/Driver.go
@@ -44,7 +44,7 @@ type Driver struct {
func NewDriver() plc4go.PlcDriver {
return &Driver{
DefaultDriver: _default.NewDefaultDriver("c-bus", "Clipsal Bus", "tcp", NewFieldHandler()),
- tm: spi.NewRequestTransactionManager(1),
+ tm: *spi.NewRequestTransactionManager(1),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
diff --git a/plc4go/internal/eip/Driver.go b/plc4go/internal/eip/Driver.go
index 232e8c5f1..81f6a7444 100644
--- a/plc4go/internal/eip/Driver.go
+++ b/plc4go/internal/eip/Driver.go
@@ -39,7 +39,7 @@ type Driver struct {
func NewDriver() plc4go.PlcDriver {
return &Driver{
DefaultDriver: _default.NewDefaultDriver("eip", "EthernetIP", "tcp", NewFieldHandler()),
- tm: spi.NewRequestTransactionManager(1),
+ tm: *spi.NewRequestTransactionManager(1),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
diff --git a/plc4go/internal/s7/Driver.go b/plc4go/internal/s7/Driver.go
index 9deb705ba..fbde786ec 100644
--- a/plc4go/internal/s7/Driver.go
+++ b/plc4go/internal/s7/Driver.go
@@ -39,7 +39,7 @@ type Driver struct {
func NewDriver() plc4go.PlcDriver {
return &Driver{
DefaultDriver: _default.NewDefaultDriver("s7", "Siemens S7 (Basic)", "tcp", NewFieldHandler()),
- tm: spi.NewRequestTransactionManager(1),
+ tm: *spi.NewRequestTransactionManager(1),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
diff --git a/plc4go/spi/RequestTransactionManager.go b/plc4go/spi/RequestTransactionManager.go
index 2c3ceca6e..adf912803 100644
--- a/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/spi/RequestTransactionManager.go
@@ -26,16 +26,16 @@ import (
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
+ "runtime"
"sync"
"time"
)
-/** Executor that performs all operations */
-var executor Executor // shared instance
+var sharedExecutorInstance Executor // shared instance
func init() {
- executor = *NewFixedSizeExecutor(4)
- executor.start()
+ sharedExecutorInstance = *NewFixedSizeExecutor(runtime.NumCPU())
+ sharedExecutorInstance.start()
}
type Runnable func()
@@ -113,7 +113,7 @@ func NewFixedSizeExecutor(numberOfWorkers int) *Executor {
executor: nil,
}
}
- executor = Executor{
+ executor := Executor{
queue: make(chan WorkItem, 100),
worker: workers,
}
@@ -186,10 +186,11 @@ func (f CompletionFuture) complete() {
f.completed = true
}
-func (f CompletionFuture) AwaitCompletion() {
+func (f CompletionFuture) AwaitCompletion() error {
for !f.completed || !f.errored {
time.Sleep(time.Millisecond * 10)
}
+ return f.err
}
type RequestTransaction struct {
@@ -207,30 +208,44 @@ func (t *RequestTransaction) String() string {
return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
}
+// RequestTransactionManager handles transactions
type RequestTransactionManager struct {
runningRequests []*RequestTransaction
// How many Transactions are allowed to run at the same time?
numberOfConcurrentRequests int
// Assigns each request a Unique Transaction Id, especially important for failure handling
- transactionId int32
- transationMutex sync.RWMutex
+ transactionId int32
+ transactionMutex sync.RWMutex
// Important, this is a FIFO Queue for Fairness!
- worklog list.List
- worklogMutex sync.RWMutex
+ workLog list.List
+ workLogMutex sync.RWMutex
+ executor *Executor
}
-func NewRequestTransactionManager(numberOfConcurrentRequests int) RequestTransactionManager {
- return RequestTransactionManager{
+// NewRequestTransactionManager creates a new RequestTransactionManager
+func NewRequestTransactionManager(numberOfConcurrentRequests int, requestTransactionManagerOptions ...RequestTransactionManagerOption) *RequestTransactionManager {
+ requestTransactionManager := &RequestTransactionManager{
numberOfConcurrentRequests: numberOfConcurrentRequests,
transactionId: 0,
- worklog: *list.New(),
+ workLog: *list.New(),
+ executor: &sharedExecutorInstance,
+ }
+ for _, requestTransactionManagerOption := range requestTransactionManagerOptions {
+ requestTransactionManagerOption(requestTransactionManager)
}
+ return requestTransactionManager
}
-func (r *RequestTransactionManager) getNumberOfConcurrentRequests() int {
- return r.numberOfConcurrentRequests
+type RequestTransactionManagerOption func(requestTransactionManager *RequestTransactionManager)
+
+// WithCustomExecutor sets a custom Executor for the RequestTransactionManager
+func WithCustomExecutor(executor *Executor) RequestTransactionManagerOption {
+ return func(requestTransactionManager *RequestTransactionManager) {
+ requestTransactionManager.executor = executor
+ }
}
+// SetNumberOfConcurrentRequests sets the number of concurrent requests that will be sent out to a device
func (r *RequestTransactionManager) SetNumberOfConcurrentRequests(numberOfConcurrentRequests int) {
log.Info().Msgf("Setting new number of concurrent requests %d", numberOfConcurrentRequests)
// If we reduced the number of concurrent requests and more requests are in-flight
@@ -251,34 +266,36 @@ func (r *RequestTransactionManager) submitHandle(handle *RequestTransaction) {
}
// Add this Request with this handle i the Worklog
// Put Transaction into Worklog
- r.worklogMutex.Lock()
- r.worklog.PushFront(handle)
- r.worklogMutex.Unlock()
+ r.workLogMutex.Lock()
+ r.workLog.PushFront(handle)
+ r.workLogMutex.Unlock()
// Try to Process the Worklog
r.processWorklog()
}
func (r *RequestTransactionManager) processWorklog() {
- r.worklogMutex.RLock()
- defer r.worklogMutex.RUnlock()
- log.Debug().Msgf("Processing work log with size of %d (%d concurrent requests allowed)", r.worklog.Len(), r.numberOfConcurrentRequests)
- for len(r.runningRequests) < r.numberOfConcurrentRequests && r.worklog.Len() > 0 {
- front := r.worklog.Front()
+ r.workLogMutex.RLock()
+ defer r.workLogMutex.RUnlock()
+ log.Debug().Msgf("Processing work log with size of %d (%d concurrent requests allowed)", r.workLog.Len(), r.numberOfConcurrentRequests)
+ for len(r.runningRequests) < r.numberOfConcurrentRequests && r.workLog.Len() > 0 {
+ front := r.workLog.Front()
if front == nil {
return
}
next := front.Value.(*RequestTransaction)
log.Debug().Msgf("Handling next %v. (Adding to running requests (length: %d))", next, len(r.runningRequests))
r.runningRequests = append(r.runningRequests, next)
- completionFuture := executor.submit(next.transactionId, next.operation)
+ // TODO: use sharedInstance if none is present
+ completionFuture := sharedExecutorInstance.submit(next.transactionId, next.operation)
next.completionFuture = completionFuture
- r.worklog.Remove(front)
+ r.workLog.Remove(front)
}
}
+// StartTransaction starts a RequestTransaction
func (r *RequestTransactionManager) StartTransaction() *RequestTransaction {
- r.transationMutex.Lock()
- defer r.transationMutex.Unlock()
+ r.transactionMutex.Lock()
+ defer r.transactionMutex.Unlock()
currentTransactionId := r.transactionId
r.transactionId += 1
transactionLogger := log.With().Int32("transactionId", currentTransactionId).Logger()
@@ -322,36 +339,43 @@ func (r *RequestTransactionManager) endRequest(transaction *RequestTransaction)
}
transaction.transactionLog.Debug().Msg("Removing the existing transaction transaction")
r.runningRequests = append(r.runningRequests[:index], r.runningRequests[index+1:]...)
- // Process the worklog, a slot should be free now
- transaction.transactionLog.Debug().Msg("Processing the worklog")
+ // Process the workLog, a slot should be free now
+ transaction.transactionLog.Debug().Msg("Processing the workLog")
r.processWorklog()
return nil
}
+// FailRequest signals that this transaction has failed
func (t *RequestTransaction) FailRequest(err error) error {
t.transactionLog.Trace().Msg("Fail the request")
return t.parent.failRequest(t, err)
}
+// EndRequest signals that this transaction is done
func (t *RequestTransaction) EndRequest() error {
t.transactionLog.Trace().Msg("Ending the request")
// Remove it from Running Requests
return t.parent.endRequest(t)
}
+// Submit submits a Runnable to the RequestTransactionManager
func (t *RequestTransaction) Submit(operation Runnable) {
if t.operation != nil {
panic("Operation already set")
}
t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
- t.operation = t.NewTransactionOperation(operation)
+ t.operation = func() {
+ t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
+ operation()
+ t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
+ }
t.parent.submitHandle(t)
}
-func (t *RequestTransaction) NewTransactionOperation(delegate Runnable) Runnable {
- return func() {
- t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
- delegate()
- t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
+// AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
+func (t *RequestTransaction) AwaitCompletion() error {
+ for t.completionFuture == nil {
+ time.Sleep(time.Millisecond * 10)
}
+ return t.completionFuture.AwaitCompletion()
}