You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/29 13:27:18 UTC

[plc4x] branch develop updated (1216bbcd6 -> c38f164ac)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from 1216bbcd6 fix(plc4j/utils): Added a check for libpcap and the version to the ArpUtils giving error messages if anything is missing.
     new 49809c54d refactor(spi): optimize RequestTransactionManager
     new c38f164ac fix(plc4go/cbus): fixed empty responses on read

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/bacnetip/Driver.go      |  2 +-
 plc4go/internal/cbus/Driver.go          |  2 +-
 plc4go/internal/cbus/Reader.go          |  3 ++
 plc4go/internal/eip/Driver.go           |  2 +-
 plc4go/internal/s7/Driver.go            |  2 +-
 plc4go/spi/RequestTransactionManager.go | 94 +++++++++++++++++++++------------
 6 files changed, 66 insertions(+), 39 deletions(-)


[plc4x] 01/02: refactor(spi): optimize RequestTransactionManager

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 49809c54def1c0542fe643c48b4f3c7b2af38249
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Aug 29 15:26:11 2022 +0200

    refactor(spi): optimize RequestTransactionManager
    
    + fixed shared instance being set by every NewFixedSizeExecutor call
    + AwaitCompletion returns err
    + fixed typos
    + added AwaitCompletion to the RequestTransaction to wait for a transaction to finish
---
 plc4go/internal/bacnetip/Driver.go      |  2 +-
 plc4go/internal/cbus/Driver.go          |  2 +-
 plc4go/internal/eip/Driver.go           |  2 +-
 plc4go/internal/s7/Driver.go            |  2 +-
 plc4go/spi/RequestTransactionManager.go | 94 +++++++++++++++++++++------------
 5 files changed, 63 insertions(+), 39 deletions(-)

diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index 33c9d5e24..72176c15b 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -47,7 +47,7 @@ type Driver struct {
 func NewDriver() plc4go.PlcDriver {
 	return &Driver{
 		DefaultDriver:           _default.NewDefaultDriver("bacnet-ip", "BACnet/IP", "udp", NewFieldHandler()),
-		tm:                      spi.NewRequestTransactionManager(math.MaxInt),
+		tm:                      *spi.NewRequestTransactionManager(math.MaxInt),
 		awaitSetupComplete:      true,
 		awaitDisconnectComplete: true,
 	}
diff --git a/plc4go/internal/cbus/Driver.go b/plc4go/internal/cbus/Driver.go
index 97cb28f77..3b6ff120d 100644
--- a/plc4go/internal/cbus/Driver.go
+++ b/plc4go/internal/cbus/Driver.go
@@ -44,7 +44,7 @@ type Driver struct {
 func NewDriver() plc4go.PlcDriver {
 	return &Driver{
 		DefaultDriver:           _default.NewDefaultDriver("c-bus", "Clipsal Bus", "tcp", NewFieldHandler()),
-		tm:                      spi.NewRequestTransactionManager(1),
+		tm:                      *spi.NewRequestTransactionManager(1),
 		awaitSetupComplete:      true,
 		awaitDisconnectComplete: true,
 	}
diff --git a/plc4go/internal/eip/Driver.go b/plc4go/internal/eip/Driver.go
index 232e8c5f1..81f6a7444 100644
--- a/plc4go/internal/eip/Driver.go
+++ b/plc4go/internal/eip/Driver.go
@@ -39,7 +39,7 @@ type Driver struct {
 func NewDriver() plc4go.PlcDriver {
 	return &Driver{
 		DefaultDriver:           _default.NewDefaultDriver("eip", "EthernetIP", "tcp", NewFieldHandler()),
-		tm:                      spi.NewRequestTransactionManager(1),
+		tm:                      *spi.NewRequestTransactionManager(1),
 		awaitSetupComplete:      true,
 		awaitDisconnectComplete: true,
 	}
diff --git a/plc4go/internal/s7/Driver.go b/plc4go/internal/s7/Driver.go
index 9deb705ba..fbde786ec 100644
--- a/plc4go/internal/s7/Driver.go
+++ b/plc4go/internal/s7/Driver.go
@@ -39,7 +39,7 @@ type Driver struct {
 func NewDriver() plc4go.PlcDriver {
 	return &Driver{
 		DefaultDriver:           _default.NewDefaultDriver("s7", "Siemens S7 (Basic)", "tcp", NewFieldHandler()),
-		tm:                      spi.NewRequestTransactionManager(1),
+		tm:                      *spi.NewRequestTransactionManager(1),
 		awaitSetupComplete:      true,
 		awaitDisconnectComplete: true,
 	}
diff --git a/plc4go/spi/RequestTransactionManager.go b/plc4go/spi/RequestTransactionManager.go
index 2c3ceca6e..adf912803 100644
--- a/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/spi/RequestTransactionManager.go
@@ -26,16 +26,16 @@ import (
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
 	"github.com/rs/zerolog/log"
+	"runtime"
 	"sync"
 	"time"
 )
 
-/** Executor that performs all operations */
-var executor Executor // shared instance
+var sharedExecutorInstance Executor // shared instance
 
 func init() {
-	executor = *NewFixedSizeExecutor(4)
-	executor.start()
+	sharedExecutorInstance = *NewFixedSizeExecutor(runtime.NumCPU())
+	sharedExecutorInstance.start()
 }
 
 type Runnable func()
@@ -113,7 +113,7 @@ func NewFixedSizeExecutor(numberOfWorkers int) *Executor {
 			executor:    nil,
 		}
 	}
-	executor = Executor{
+	executor := Executor{
 		queue:  make(chan WorkItem, 100),
 		worker: workers,
 	}
@@ -186,10 +186,11 @@ func (f CompletionFuture) complete() {
 	f.completed = true
 }
 
-func (f CompletionFuture) AwaitCompletion() {
+func (f CompletionFuture) AwaitCompletion() error {
 	for !f.completed || !f.errored {
 		time.Sleep(time.Millisecond * 10)
 	}
+	return f.err
 }
 
 type RequestTransaction struct {
@@ -207,30 +208,44 @@ func (t *RequestTransaction) String() string {
 	return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
 }
 
+// RequestTransactionManager handles transactions
 type RequestTransactionManager struct {
 	runningRequests []*RequestTransaction
 	// How many Transactions are allowed to run at the same time?
 	numberOfConcurrentRequests int
 	// Assigns each request a Unique Transaction Id, especially important for failure handling
-	transactionId   int32
-	transationMutex sync.RWMutex
+	transactionId    int32
+	transactionMutex sync.RWMutex
 	// Important, this is a FIFO Queue for Fairness!
-	worklog      list.List
-	worklogMutex sync.RWMutex
+	workLog      list.List
+	workLogMutex sync.RWMutex
+	executor     *Executor
 }
 
-func NewRequestTransactionManager(numberOfConcurrentRequests int) RequestTransactionManager {
-	return RequestTransactionManager{
+// NewRequestTransactionManager creates a new RequestTransactionManager
+func NewRequestTransactionManager(numberOfConcurrentRequests int, requestTransactionManagerOptions ...RequestTransactionManagerOption) *RequestTransactionManager {
+	requestTransactionManager := &RequestTransactionManager{
 		numberOfConcurrentRequests: numberOfConcurrentRequests,
 		transactionId:              0,
-		worklog:                    *list.New(),
+		workLog:                    *list.New(),
+		executor:                   &sharedExecutorInstance,
+	}
+	for _, requestTransactionManagerOption := range requestTransactionManagerOptions {
+		requestTransactionManagerOption(requestTransactionManager)
 	}
+	return requestTransactionManager
 }
 
-func (r *RequestTransactionManager) getNumberOfConcurrentRequests() int {
-	return r.numberOfConcurrentRequests
+type RequestTransactionManagerOption func(requestTransactionManager *RequestTransactionManager)
+
+// WithCustomExecutor sets a custom Executor for the RequestTransactionManager
+func WithCustomExecutor(executor *Executor) RequestTransactionManagerOption {
+	return func(requestTransactionManager *RequestTransactionManager) {
+		requestTransactionManager.executor = executor
+	}
 }
 
+// SetNumberOfConcurrentRequests sets the number of concurrent requests that will be sent out to a device
 func (r *RequestTransactionManager) SetNumberOfConcurrentRequests(numberOfConcurrentRequests int) {
 	log.Info().Msgf("Setting new number of concurrent requests %d", numberOfConcurrentRequests)
 	// If we reduced the number of concurrent requests and more requests are in-flight
@@ -251,34 +266,36 @@ func (r *RequestTransactionManager) submitHandle(handle *RequestTransaction) {
 	}
 	// Add this Request with this handle i the Worklog
 	// Put Transaction into Worklog
-	r.worklogMutex.Lock()
-	r.worklog.PushFront(handle)
-	r.worklogMutex.Unlock()
+	r.workLogMutex.Lock()
+	r.workLog.PushFront(handle)
+	r.workLogMutex.Unlock()
 	// Try to Process the Worklog
 	r.processWorklog()
 }
 
 func (r *RequestTransactionManager) processWorklog() {
-	r.worklogMutex.RLock()
-	defer r.worklogMutex.RUnlock()
-	log.Debug().Msgf("Processing work log with size of %d (%d concurrent requests allowed)", r.worklog.Len(), r.numberOfConcurrentRequests)
-	for len(r.runningRequests) < r.numberOfConcurrentRequests && r.worklog.Len() > 0 {
-		front := r.worklog.Front()
+	r.workLogMutex.RLock()
+	defer r.workLogMutex.RUnlock()
+	log.Debug().Msgf("Processing work log with size of %d (%d concurrent requests allowed)", r.workLog.Len(), r.numberOfConcurrentRequests)
+	for len(r.runningRequests) < r.numberOfConcurrentRequests && r.workLog.Len() > 0 {
+		front := r.workLog.Front()
 		if front == nil {
 			return
 		}
 		next := front.Value.(*RequestTransaction)
 		log.Debug().Msgf("Handling next %v. (Adding to running requests (length: %d))", next, len(r.runningRequests))
 		r.runningRequests = append(r.runningRequests, next)
-		completionFuture := executor.submit(next.transactionId, next.operation)
+		// TODO: use sharedInstance if none is present
+		completionFuture := sharedExecutorInstance.submit(next.transactionId, next.operation)
 		next.completionFuture = completionFuture
-		r.worklog.Remove(front)
+		r.workLog.Remove(front)
 	}
 }
 
+// StartTransaction starts a RequestTransaction
 func (r *RequestTransactionManager) StartTransaction() *RequestTransaction {
-	r.transationMutex.Lock()
-	defer r.transationMutex.Unlock()
+	r.transactionMutex.Lock()
+	defer r.transactionMutex.Unlock()
 	currentTransactionId := r.transactionId
 	r.transactionId += 1
 	transactionLogger := log.With().Int32("transactionId", currentTransactionId).Logger()
@@ -322,36 +339,43 @@ func (r *RequestTransactionManager) endRequest(transaction *RequestTransaction)
 	}
 	transaction.transactionLog.Debug().Msg("Removing the existing transaction transaction")
 	r.runningRequests = append(r.runningRequests[:index], r.runningRequests[index+1:]...)
-	// Process the worklog, a slot should be free now
-	transaction.transactionLog.Debug().Msg("Processing the worklog")
+	// Process the workLog, a slot should be free now
+	transaction.transactionLog.Debug().Msg("Processing the workLog")
 	r.processWorklog()
 	return nil
 }
 
+// FailRequest signals that this transaction has failed
 func (t *RequestTransaction) FailRequest(err error) error {
 	t.transactionLog.Trace().Msg("Fail the request")
 	return t.parent.failRequest(t, err)
 }
 
+// EndRequest signals that this transaction is done
 func (t *RequestTransaction) EndRequest() error {
 	t.transactionLog.Trace().Msg("Ending the request")
 	// Remove it from Running Requests
 	return t.parent.endRequest(t)
 }
 
+// Submit submits a Runnable to the RequestTransactionManager
 func (t *RequestTransaction) Submit(operation Runnable) {
 	if t.operation != nil {
 		panic("Operation already set")
 	}
 	t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
-	t.operation = t.NewTransactionOperation(operation)
+	t.operation = func() {
+		t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
+		operation()
+		t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
+	}
 	t.parent.submitHandle(t)
 }
 
-func (t *RequestTransaction) NewTransactionOperation(delegate Runnable) Runnable {
-	return func() {
-		t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
-		delegate()
-		t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
+// AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
+func (t *RequestTransaction) AwaitCompletion() error {
+	for t.completionFuture == nil {
+		time.Sleep(time.Millisecond * 10)
 	}
+	return t.completionFuture.AwaitCompletion()
 }


[plc4x] 02/02: fix(plc4go/cbus): fixed empty responses on read

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit c38f164aca14e0bb96647b55a83d8d857156dcad
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Aug 29 15:27:07 2022 +0200

    fix(plc4go/cbus): fixed empty responses on read
---
 plc4go/internal/cbus/Reader.go | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index b2949dab2..3eaf2f9b8 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -388,6 +388,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
 					_ = transaction.EndRequest()
 				}
 			})
+			if err := transaction.AwaitCompletion(); err != nil {
+				addResponseCode(fieldName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+			}
 		}
 		readResponse := spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues)
 		result <- &spiModel.DefaultPlcReadRequestResult{