You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/31 12:54:57 UTC
[plc4x] 03/03: feat(plc4go/spi): Introduce new WithCustomLogger option
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit b330c7ce33647b379089d23a91d5788bc7b98a27
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed May 31 14:54:46 2023 +0200
feat(plc4go/spi): Introduce new WithCustomLogger option
With this option it is possible to supply a custom logger which will get used downstream.
Benefit is that in production and test you can better assign logs and later downstream enrich them properly
---
plc4go/internal/bacnetip/Connection.go | 5 +-
plc4go/internal/bacnetip/Driver.go | 6 +-
plc4go/internal/bacnetip/Reader.go | 7 +-
plc4go/internal/cbus/Browser.go | 73 +-
plc4go/internal/cbus/Browser_test.go | 351 +++++-----
plc4go/internal/cbus/CBusMessageMapper.go | 8 +-
plc4go/internal/cbus/CBusMessageMapper_test.go | 589 +++++++++++-----
plc4go/internal/cbus/Configuration.go | 10 +-
plc4go/internal/cbus/Configuration_test.go | 5 +-
plc4go/internal/cbus/Connection.go | 101 +--
plc4go/internal/cbus/Connection_test.go | 742 +++++++++++++--------
plc4go/internal/cbus/Discoverer.go | 43 +-
plc4go/internal/cbus/Discoverer_test.go | 53 +-
plc4go/internal/cbus/Driver.go | 39 +-
plc4go/internal/cbus/Driver_test.go | 8 +-
plc4go/internal/cbus/MessageCodec.go | 66 +-
plc4go/internal/cbus/Reader.go | 40 +-
plc4go/internal/cbus/Reader_test.go | 35 +-
plc4go/internal/cbus/Subscriber.go | 35 +-
plc4go/internal/cbus/Writer.go | 22 +-
plc4go/internal/cbus/Writer_test.go | 6 +-
.../internal/cbus/mock_RequestTransaction_test.go | 12 +-
plc4go/internal/cbus/mock_requirements.go | 6 +-
.../{mock_requirements.go => noGlobalLog_test.go} | 10 +-
plc4go/internal/eip/Connection.go | 5 +-
plc4go/internal/eip/EipDriver.go | 6 +-
plc4go/internal/eip/Reader.go | 7 +-
plc4go/internal/eip/Writer.go | 9 +-
plc4go/internal/knxnetip/Discoverer.go | 14 +-
plc4go/internal/s7/Connection.go | 5 +-
plc4go/internal/s7/Driver.go | 6 +-
plc4go/internal/s7/Reader.go | 7 +-
plc4go/internal/s7/Writer.go | 7 +-
plc4go/pkg/api/driver.go | 2 +-
plc4go/pkg/api/mock_PlcDriver_test.go | 16 +-
plc4go/pkg/api/mock_Transport_test.go | 51 +-
.../protocols/cbus/readwrite/model/StaticHelper.go | 7 +-
plc4go/spi/default/DefaultCodec.go | 55 +-
plc4go/spi/default/DefaultConnection.go | 31 +-
plc4go/spi/default/DefaultDriver.go | 2 +-
.../default/mock_DefaultDriverRequirements_test.go | 16 +-
plc4go/spi/default/mock_DefaultDriver_test.go | 16 +-
plc4go/spi/mock_requirements.go | 4 +-
plc4go/spi/options/Option.go | 35 +
plc4go/spi/pool/mock_CompletionFuture_test.go | 132 ++++
plc4go/spi/pool/mock_Executor_test.go | 248 +++++++
plc4go/spi/pool/mock_Runnable_test.go | 84 +++
plc4go/spi/testutils/TestUtils.go | 34 +
.../spi/transactions/RequestTransactionManager.go | 85 ++-
.../transactions/RequestTransactionManager_test.go | 45 +-
.../mock_RequestTransactionManager_test.go | 89 ++-
plc4go/spi/transactions/mock_requirements.go | 4 +-
plc4go/spi/transports/Transport.go | 7 +-
plc4go/spi/transports/mock_Transport_test.go | 50 +-
plc4go/spi/transports/pcap/Transport.go | 36 +-
plc4go/spi/transports/serial/Transport.go | 35 +-
plc4go/spi/transports/tcp/Transport.go | 14 +-
plc4go/spi/transports/test/Transport.go | 48 +-
plc4go/spi/transports/udp/Transport.go | 23 +-
59 files changed, 2395 insertions(+), 1112 deletions(-)
diff --git a/plc4go/internal/bacnetip/Connection.go b/plc4go/internal/bacnetip/Connection.go
index 1bc799b950..3c1bff9124 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -22,6 +22,7 @@ package bacnetip
import (
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
"sync"
@@ -40,13 +41,13 @@ type Connection struct {
invokeIdGenerator InvokeIdGenerator
messageCodec spi.MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
connectionId string
tracer *spi.Tracer
}
-func NewConnection(messageCodec spi.MessageCodec, tagHandler spi.PlcTagHandler, tm spi.RequestTransactionManager, options map[string][]string) *Connection {
+func NewConnection(messageCodec spi.MessageCodec, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, options map[string][]string) *Connection {
connection := &Connection{
invokeIdGenerator: InvokeIdGenerator{currentInvokeId: 0},
messageCodec: messageCodec,
diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index 1315e400c1..6b72a0ccba 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -22,6 +22,7 @@ package bacnetip
import (
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"math"
"net"
"net/url"
@@ -31,7 +32,6 @@ import (
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
- "github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
@@ -43,7 +43,7 @@ import (
type Driver struct {
_default.DefaultDriver
applicationManager ApplicationManager
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
awaitSetupComplete bool
awaitDisconnectComplete bool
}
@@ -53,7 +53,7 @@ func NewDriver() plc4go.PlcDriver {
applicationManager: ApplicationManager{
applications: map[string]*ApplicationLayerMessageCodec{},
},
- tm: spi.NewRequestTransactionManager(math.MaxInt),
+ tm: transactions.NewRequestTransactionManager(math.MaxInt),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
diff --git a/plc4go/internal/bacnetip/Reader.go b/plc4go/internal/bacnetip/Reader.go
index 3fdc4961ed..e8e1f5b1cc 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -22,6 +22,7 @@ package bacnetip
import (
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"time"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -37,13 +38,13 @@ import (
type Reader struct {
invokeIdGenerator *InvokeIdGenerator
messageCodec spi.MessageCodec
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
maxSegmentsAccepted readWriteModel.MaxSegmentsAccepted
maxApduLengthAccepted readWriteModel.MaxApduLengthAccepted
}
-func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCodec, tm spi.RequestTransactionManager) *Reader {
+func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager) *Reader {
return &Reader{
invokeIdGenerator: invokeIdGenerator,
messageCodec: messageCodec,
@@ -126,7 +127,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
// Send the over the wire
log.Trace().Msg("Send ")
if err := m.messageCodec.SendRequest(ctx, apdu, func(message spi.Message) bool {
diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index 56c4ed7af8..e0a83358c2 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -22,29 +22,34 @@ package cbus
import (
"context"
"fmt"
- plc4go "github.com/apache/plc4x/plc4go/pkg/api"
"time"
+ plc4go "github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
_default "github.com/apache/plc4x/plc4go/spi/default"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
+ "github.com/rs/zerolog"
)
type Browser struct {
_default.DefaultBrowser
connection plc4go.PlcConnection
sequenceCounter uint8
+
+ log zerolog.Logger
}
-func NewBrowser(connection plc4go.PlcConnection) *Browser {
+func NewBrowser(connection plc4go.PlcConnection, _options ...options.WithOption) *Browser {
browser := Browser{
connection: connection,
sequenceCounter: 0,
+
+ log: options.ExtractCustomLogger(_options...),
}
browser.DefaultBrowser = _default.NewDefaultBrowser(browser)
return &browser
@@ -56,36 +61,36 @@ func (m Browser) BrowseQuery(ctx context.Context, interceptor func(result apiMod
case *unitInfoQuery:
units, allUnits, err := m.extractUnits(ctx, query, m.getInstalledUnitAddressBytes)
if err != nil {
- log.Error().Err(err).Msg("Error extracting units")
+ m.log.Error().Err(err).Msg("Error extracting units")
return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
}
attributes, allAttributes := m.extractAttributes(query)
if allUnits {
- log.Info().Msg("Querying all (available) units")
+ m.log.Info().Msg("Querying all (available) units")
}
unitLoop:
for _, unit := range units {
if err := ctx.Err(); err != nil {
- log.Info().Err(err).Msgf("Aborting scan at unit %s", unit)
+ m.log.Info().Err(err).Msgf("Aborting scan at unit %s", unit)
return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
}
unitAddress := unit.GetAddress()
if !allUnits && allAttributes {
- log.Info().Msgf("Querying all attributes of unit %d", unitAddress)
+ m.log.Info().Msgf("Querying all attributes of unit %d", unitAddress)
}
- event := log.Info()
+ event := m.log.Info()
if allUnits {
- event = log.Debug()
+ event = m.log.Debug()
}
event.Msgf("Query unit %d", unitAddress)
for _, attribute := range attributes {
if err := ctx.Err(); err != nil {
- log.Info().Err(err).Msgf("Aborting scan at unit %s", unit)
+ m.log.Info().Err(err).Msgf("Aborting scan at unit %s", unit)
return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
}
if !allUnits && !allAttributes {
- log.Info().Msgf("Querying attribute %s of unit %d", attribute, unitAddress)
+ m.log.Info().Msgf("Querying attribute %s of unit %d", attribute, unitAddress)
} else {
event.Msgf("unit %d: Query %s", unitAddress, attribute)
}
@@ -195,45 +200,45 @@ func (m Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]any
result := make(map[byte]any)
plcConsumerRegistration := subscriptionHandle.Register(func(event apiModel.PlcSubscriptionEvent) {
if responseCode := event.GetResponseCode("installationMMIMonitor"); responseCode != apiModel.PlcResponseCode_OK {
- log.Warn().Msgf("Ignoring %v", event)
+ m.log.Warn().Msgf("Ignoring %v", event)
return
}
rootValue := event.GetValue("installationMMIMonitor")
if !rootValue.IsStruct() {
- log.Warn().Msgf("Ignoring %v should be a struct", rootValue)
+ m.log.Warn().Msgf("Ignoring %v should be a struct", rootValue)
return
}
rootStruct := rootValue.GetStruct()
if applicationValue := rootStruct["application"]; applicationValue == nil || !applicationValue.IsString() || applicationValue.GetString() != "NETWORK_CONTROL" {
- log.Warn().Msgf("Ignoring %v should contain a application tag of type string with value NETWORK_CONTROL", rootStruct)
+ m.log.Warn().Msgf("Ignoring %v should contain a application tag of type string with value NETWORK_CONTROL", rootStruct)
return
}
var blockStart int
if blockStartValue := rootStruct["blockStart"]; blockStartValue == nil || !blockStartValue.IsByte() {
- log.Warn().Msgf("Ignoring %v should contain a blockStart tag of type byte", rootStruct)
+ m.log.Warn().Msgf("Ignoring %v should contain a blockStart tag of type byte", rootStruct)
return
} else {
blockStart = int(blockStartValue.GetByte())
}
if plcListValue := rootStruct["values"]; plcListValue == nil || !plcListValue.IsList() {
- log.Warn().Msgf("Ignoring %v should contain a values tag of type list", rootStruct)
+ m.log.Warn().Msgf("Ignoring %v should contain a values tag of type list", rootStruct)
return
} else {
for unitByteAddress, plcValue := range plcListValue.GetList() {
unitByteAddress = blockStart + unitByteAddress
if !plcValue.IsString() {
- log.Warn().Msgf("Ignoring %v at %d should be a string", plcValue, unitByteAddress)
+ m.log.Warn().Msgf("Ignoring %v at %d should be a string", plcValue, unitByteAddress)
return
}
switch plcValue.GetString() {
case readWriteModel.GAVState_ON.PLC4XEnumName(), readWriteModel.GAVState_OFF.PLC4XEnumName():
- log.Debug().Msgf("unit %d does exists", unitByteAddress)
+ m.log.Debug().Msgf("unit %d does exists", unitByteAddress)
result[byte(unitByteAddress)] = true
case readWriteModel.GAVState_DOES_NOT_EXIST.PLC4XEnumName():
- log.Debug().Msgf("unit %d does not exists", unitByteAddress)
+ m.log.Debug().Msgf("unit %d does not exists", unitByteAddress)
case readWriteModel.GAVState_ERROR.PLC4XEnumName():
- log.Warn().Msgf("unit %d is in error state", unitByteAddress)
+ m.log.Warn().Msgf("unit %d is in error state", unitByteAddress)
}
}
}
@@ -268,52 +273,52 @@ func (m Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]any
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
defer readCtxCancel()
readRequestResult := <-readRequest.ExecuteWithContext(readCtx)
if err := readRequestResult.GetErr(); err != nil {
- log.Warn().Err(err).Msg("Error reading the mmi")
+ m.log.Warn().Err(err).Msg("Error reading the mmi")
return
}
if responseCode := readRequestResult.GetResponse().GetResponseCode("installationMMI"); responseCode == apiModel.PlcResponseCode_OK {
rootValue := readRequestResult.GetResponse().GetValue("installationMMI")
if !rootValue.IsStruct() {
- log.Warn().Err(err).Msgf("%v should be a struct", rootValue)
+ m.log.Warn().Err(err).Msgf("%v should be a struct", rootValue)
return
}
rootStruct := rootValue.GetStruct()
if applicationValue := rootStruct["application"]; applicationValue == nil || !applicationValue.IsString() || applicationValue.GetString() != "NETWORK_CONTROL" {
- log.Warn().Err(err).Msgf("%v should contain a application tag of type string with value NETWORK_CONTROL", rootStruct)
+ m.log.Warn().Err(err).Msgf("%v should contain a application tag of type string with value NETWORK_CONTROL", rootStruct)
return
}
var blockStart int
if blockStartValue := rootStruct["blockStart"]; blockStartValue == nil || !blockStartValue.IsByte() || blockStartValue.GetByte() != 0 {
- log.Warn().Err(err).Msgf("%v should contain a blockStart tag of type byte with value 0", rootStruct)
+ m.log.Warn().Err(err).Msgf("%v should contain a blockStart tag of type byte with value 0", rootStruct)
return
} else {
blockStart = int(blockStartValue.GetByte())
}
if plcListValue := rootStruct["values"]; plcListValue == nil || !plcListValue.IsList() {
- log.Warn().Err(err).Msgf("%v should contain a values tag of type list", rootStruct)
+ m.log.Warn().Err(err).Msgf("%v should contain a values tag of type list", rootStruct)
return
} else {
for unitByteAddress, plcValue := range plcListValue.GetList() {
unitByteAddress = blockStart + unitByteAddress
if !plcValue.IsString() {
- log.Warn().Err(err).Msgf("%v at %d should be a string", plcValue, unitByteAddress)
+ m.log.Warn().Err(err).Msgf("%v at %d should be a string", plcValue, unitByteAddress)
return
}
switch plcValue.GetString() {
case readWriteModel.GAVState_ON.PLC4XEnumName(), readWriteModel.GAVState_OFF.PLC4XEnumName():
- log.Debug().Msgf("unit %d does exists", unitByteAddress)
+ m.log.Debug().Msgf("unit %d does exists", unitByteAddress)
result[byte(unitByteAddress)] = true
case readWriteModel.GAVState_DOES_NOT_EXIST.PLC4XEnumName():
- log.Debug().Msgf("unit %d does not exists", unitByteAddress)
+ m.log.Debug().Msgf("unit %d does not exists", unitByteAddress)
case readWriteModel.GAVState_ERROR.PLC4XEnumName():
- log.Warn().Msgf("unit %d is in error state", unitByteAddress)
+ m.log.Warn().Msgf("unit %d is in error state", unitByteAddress)
}
}
}
@@ -327,7 +332,7 @@ func (m Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]any
}
} else {
- log.Warn().Msgf("We got %s as response code for installation mmi so we rely on getting it via subscription", responseCode)
+ m.log.Warn().Msgf("We got %s as response code for installation mmi so we rely on getting it via subscription", responseCode)
}
}()
@@ -336,13 +341,13 @@ func (m Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]any
for !blockOffset0Received || !blockOffset88Received || !blockOffset176Received {
select {
case <-blockOffset0ReceivedChan:
- log.Trace().Msg("Offset 0 received")
+ m.log.Trace().Msg("Offset 0 received")
blockOffset0Received = true
case <-blockOffset88ReceivedChan:
- log.Trace().Msg("Offset 88 received")
+ m.log.Trace().Msg("Offset 88 received")
blockOffset88Received = true
case <-blockOffset176ReceivedChan:
- log.Trace().Msg("Offset 176 received")
+ m.log.Trace().Msg("Offset 176 received")
blockOffset176Received = true
case <-syncCtx.Done():
return nil, errors.Wrap(err, "error waiting for other offsets")
diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index f6cc485c38..f14cba80ac 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -22,20 +22,25 @@ package cbus
import (
"context"
"fmt"
+ "net/url"
+ "sync/atomic"
+ "testing"
+
plc4go "github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
_default "github.com/apache/plc4x/plc4go/spi/default"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/test"
spiValues "github.com/apache/plc4x/plc4go/spi/values"
+
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
- "net/url"
- "sync/atomic"
- "testing"
)
func TestNewBrowser(t *testing.T) {
@@ -47,6 +52,7 @@ func TestBrowser_BrowseQuery(t *testing.T) {
DefaultBrowser _default.DefaultBrowser
connection plc4go.PlcConnection
sequenceCounter uint8
+ log zerolog.Logger
}
type args struct {
ctx context.Context
@@ -58,6 +64,7 @@ func TestBrowser_BrowseQuery(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields)
want apiModel.PlcResponseCode
want1 []apiModel.PlcBrowseItem
}{
@@ -67,88 +74,6 @@ func TestBrowser_BrowseQuery(t *testing.T) {
},
{
name: "non responding browse",
- fields: fields{
- DefaultBrowser: nil,
- connection: func() plc4go.PlcConnection {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- type MockState uint8
- const (
- RESET MockState = iota
- APPLICATION_FILTER_1
- APPLICATION_FILTER_2
- INTERFACE_OPTIONS_3
- INTERFACE_OPTIONS_1_PUN
- INTERFACE_OPTIONS_1
- MANUFACTURER
- DONE
- )
- currentState := atomic.Value{}
- currentState.Store(RESET)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
- switch currentState.Load().(MockState) {
- case RESET:
- t.Log("Dispatching reset echo")
- transportInstance.FillReadBuffer([]byte("~~~\r"))
- currentState.Store(APPLICATION_FILTER_1)
- case APPLICATION_FILTER_1:
- t.Log("Dispatching app1 echo and confirm")
- transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
- transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
- currentState.Store(APPLICATION_FILTER_2)
- case APPLICATION_FILTER_2:
- t.Log("Dispatching app2 echo and confirm")
- transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
- transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
- currentState.Store(INTERFACE_OPTIONS_3)
- case INTERFACE_OPTIONS_3:
- t.Log("Dispatching interface 3 echo and confirm")
- transportInstance.FillReadBuffer([]byte("@A342000A\r"))
- transportInstance.FillReadBuffer([]byte("3242008C\r\n"))
- currentState.Store(INTERFACE_OPTIONS_1_PUN)
- case INTERFACE_OPTIONS_1_PUN:
- t.Log("Dispatching interface 1 PUN echo and confirm???")
- transportInstance.FillReadBuffer([]byte("@A3410079\r"))
- transportInstance.FillReadBuffer([]byte("3241008D\r\n"))
- currentState.Store(INTERFACE_OPTIONS_1)
- case INTERFACE_OPTIONS_1:
- t.Log("Dispatching interface 1 echo and confirm???")
- transportInstance.FillReadBuffer([]byte("@A3300079\r"))
- transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
- currentState.Store(MANUFACTURER)
- case MANUFACTURER:
- t.Log("Dispatching manufacturer")
- transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
- currentState.Store(DONE)
- case DONE:
- t.Log("Dispatching 3 MMI segments")
- transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
- }
- })
- err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- connectionConnectResult := <-NewDriver().GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
- if err := connectionConnectResult.GetErr(); err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return connectionConnectResult.GetConnection()
- }(),
- sequenceCounter: 0,
- },
args: args{
ctx: context.Background(),
interceptor: func(result apiModel.PlcBrowseItem) bool {
@@ -158,6 +83,91 @@ func TestBrowser_BrowseQuery(t *testing.T) {
queryName: "testQuery",
query: NewUnitInfoQuery(readWriteModel.NewUnitAddress(2), nil, 1),
},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ transport := test.NewTransport(loggerOption)
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ type MockState uint8
+ const (
+ RESET MockState = iota
+ APPLICATION_FILTER_1
+ APPLICATION_FILTER_2
+ INTERFACE_OPTIONS_3
+ INTERFACE_OPTIONS_1_PUN
+ INTERFACE_OPTIONS_1
+ MANUFACTURER
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(RESET)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case RESET:
+ t.Log("Dispatching reset echo")
+ transportInstance.FillReadBuffer([]byte("~~~\r"))
+ currentState.Store(APPLICATION_FILTER_1)
+ case APPLICATION_FILTER_1:
+ t.Log("Dispatching app1 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
+ transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
+ currentState.Store(APPLICATION_FILTER_2)
+ case APPLICATION_FILTER_2:
+ t.Log("Dispatching app2 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
+ transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
+ currentState.Store(INTERFACE_OPTIONS_3)
+ case INTERFACE_OPTIONS_3:
+ t.Log("Dispatching interface 3 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A342000A\r"))
+ transportInstance.FillReadBuffer([]byte("3242008C\r\n"))
+ currentState.Store(INTERFACE_OPTIONS_1_PUN)
+ case INTERFACE_OPTIONS_1_PUN:
+ t.Log("Dispatching interface 1 PUN echo and confirm???")
+ transportInstance.FillReadBuffer([]byte("@A3410079\r"))
+ transportInstance.FillReadBuffer([]byte("3241008D\r\n"))
+ currentState.Store(INTERFACE_OPTIONS_1)
+ case INTERFACE_OPTIONS_1:
+ t.Log("Dispatching interface 1 echo and confirm???")
+ transportInstance.FillReadBuffer([]byte("@A3300079\r"))
+ transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
+ currentState.Store(MANUFACTURER)
+ case MANUFACTURER:
+ t.Log("Dispatching manufacturer")
+ transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Dispatching 3 MMI segments")
+ transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
+ }
+ })
+ err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ connectionConnectResult := <-NewDriver(loggerOption).GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
+ if err := connectionConnectResult.GetErr(); err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ fields.connection = connectionConnectResult.GetConnection()
+ },
want: apiModel.PlcResponseCode_OK,
want1: []apiModel.PlcBrowseItem{
&spiModel.DefaultPlcBrowseItem{
@@ -173,10 +183,14 @@ func TestBrowser_BrowseQuery(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
m := Browser{
DefaultBrowser: tt.fields.DefaultBrowser,
connection: tt.fields.connection,
sequenceCounter: tt.fields.sequenceCounter,
+ log: tt.fields.log,
}
got, got1 := m.BrowseQuery(tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
assert.Equalf(t, tt.want, got, "BrowseQuery(%v, func(), %v,\n%v\n)", tt.args.ctx, tt.args.queryName, tt.args.query)
@@ -190,6 +204,7 @@ func TestBrowser_extractUnits(t *testing.T) {
DefaultBrowser _default.DefaultBrowser
connection plc4go.PlcConnection
sequenceCounter uint8
+ log zerolog.Logger
}
type args struct {
ctx context.Context
@@ -247,6 +262,7 @@ func TestBrowser_extractUnits(t *testing.T) {
DefaultBrowser: tt.fields.DefaultBrowser,
connection: tt.fields.connection,
sequenceCounter: tt.fields.sequenceCounter,
+ log: tt.fields.log,
}
got, got1, err := m.extractUnits(tt.args.ctx, tt.args.query, tt.args.getInstalledUnitAddressBytes)
if !tt.wantErr(t, err, fmt.Sprintf("extractUnits(%v, \n%v, func())", tt.args.ctx, tt.args.query)) {
@@ -263,6 +279,7 @@ func TestBrowser_extractAttributes(t *testing.T) {
DefaultBrowser _default.DefaultBrowser
connection plc4go.PlcConnection
sequenceCounter uint8
+ log zerolog.Logger
}
type args struct {
query *unitInfoQuery
@@ -300,6 +317,7 @@ func TestBrowser_extractAttributes(t *testing.T) {
DefaultBrowser: tt.fields.DefaultBrowser,
connection: tt.fields.connection,
sequenceCounter: tt.fields.sequenceCounter,
+ log: tt.fields.log,
}
got, got1 := m.extractAttributes(tt.args.query)
assert.Equalf(t, tt.want, got, "extractAttributes(\n%v)", tt.args.query)
@@ -313,6 +331,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
DefaultBrowser _default.DefaultBrowser
connection plc4go.PlcConnection
sequenceCounter uint8
+ log zerolog.Logger
}
type args struct {
ctx context.Context
@@ -321,91 +340,95 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields)
want map[byte]any
wantErr assert.ErrorAssertionFunc
}{
{
name: "get units",
- fields: fields{
- DefaultBrowser: nil,
- connection: func() plc4go.PlcConnection {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- type MockState uint8
- const (
- RESET MockState = iota
- APPLICATION_FILTER_1
- APPLICATION_FILTER_2
- INTERFACE_OPTIONS_3
- INTERFACE_OPTIONS_1_PUN
- INTERFACE_OPTIONS_1
- DONE
- )
- currentState := atomic.Value{}
- currentState.Store(RESET)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
- switch currentState.Load().(MockState) {
- case RESET:
- t.Log("Dispatching reset echo")
- transportInstance.FillReadBuffer([]byte("~~~\r"))
- currentState.Store(APPLICATION_FILTER_1)
- case APPLICATION_FILTER_1:
- t.Log("Dispatching app1 echo and confirm")
- transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
- transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
- currentState.Store(APPLICATION_FILTER_2)
- case APPLICATION_FILTER_2:
- t.Log("Dispatching app2 echo and confirm")
- transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
- transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
- currentState.Store(INTERFACE_OPTIONS_3)
- case INTERFACE_OPTIONS_3:
- t.Log("Dispatching interface 3 echo and confirm")
- transportInstance.FillReadBuffer([]byte("@A342000A\r"))
- transportInstance.FillReadBuffer([]byte("3242008C\r\n"))
- currentState.Store(INTERFACE_OPTIONS_1_PUN)
- case INTERFACE_OPTIONS_1_PUN:
- t.Log("Dispatching interface 1 PUN echo and confirm???")
- transportInstance.FillReadBuffer([]byte("@A3410079\r"))
- transportInstance.FillReadBuffer([]byte("3241008D\r\n"))
- currentState.Store(INTERFACE_OPTIONS_1)
- case INTERFACE_OPTIONS_1:
- t.Log("Dispatching interface 1 echo and confirm???")
- transportInstance.FillReadBuffer([]byte("@A3300079\r"))
- transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
- currentState.Store(DONE)
- case DONE:
- t.Log("Dispatching 3 MMI segments")
- transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
- transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
- }
- })
- err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- connectionConnectResult := <-NewDriver().GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
- if err := connectionConnectResult.GetErr(); err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return connectionConnectResult.GetConnection()
- }(),
- sequenceCounter: 0,
- },
args: args{
ctx: context.Background(),
},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+
+ // Set the model logger to the logger above
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ transport := test.NewTransport(loggerOption)
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ type MockState uint8
+ const (
+ RESET MockState = iota
+ APPLICATION_FILTER_1
+ APPLICATION_FILTER_2
+ INTERFACE_OPTIONS_3
+ INTERFACE_OPTIONS_1_PUN
+ INTERFACE_OPTIONS_1
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(RESET)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case RESET:
+ t.Log("Dispatching reset echo")
+ transportInstance.FillReadBuffer([]byte("~~~\r"))
+ currentState.Store(APPLICATION_FILTER_1)
+ case APPLICATION_FILTER_1:
+ t.Log("Dispatching app1 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
+ transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
+ currentState.Store(APPLICATION_FILTER_2)
+ case APPLICATION_FILTER_2:
+ t.Log("Dispatching app2 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
+ transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
+ currentState.Store(INTERFACE_OPTIONS_3)
+ case INTERFACE_OPTIONS_3:
+ t.Log("Dispatching interface 3 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A342000A\r"))
+ transportInstance.FillReadBuffer([]byte("3242008C\r\n"))
+ currentState.Store(INTERFACE_OPTIONS_1_PUN)
+ case INTERFACE_OPTIONS_1_PUN:
+ t.Log("Dispatching interface 1 PUN echo and confirm???")
+ transportInstance.FillReadBuffer([]byte("@A3410079\r"))
+ transportInstance.FillReadBuffer([]byte("3241008D\r\n"))
+ currentState.Store(INTERFACE_OPTIONS_1)
+ case INTERFACE_OPTIONS_1:
+ t.Log("Dispatching interface 1 echo and confirm???")
+ transportInstance.FillReadBuffer([]byte("@A3300079\r"))
+ transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Dispatching 3 MMI segments")
+ transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
+ }
+ })
+ err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ connectionConnectResult := <-NewDriver(loggerOption).GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
+ if err := connectionConnectResult.GetErr(); err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ fields.connection = connectionConnectResult.GetConnection()
+ },
want: map[byte]any{
1: true,
2: true,
@@ -421,10 +444,14 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
m := Browser{
DefaultBrowser: tt.fields.DefaultBrowser,
connection: tt.fields.connection,
sequenceCounter: tt.fields.sequenceCounter,
+ log: tt.fields.log,
}
got, err := m.getInstalledUnitAddressBytes(tt.args.ctx)
if !tt.wantErr(t, err, fmt.Sprintf("getInstalledUnitAddressBytes(%v)", tt.args.ctx)) {
diff --git a/plc4go/internal/cbus/CBusMessageMapper.go b/plc4go/internal/cbus/CBusMessageMapper.go
index a51e604dbf..8e6e0fe4db 100644
--- a/plc4go/internal/cbus/CBusMessageMapper.go
+++ b/plc4go/internal/cbus/CBusMessageMapper.go
@@ -22,9 +22,9 @@ package cbus
import (
"context"
"fmt"
- "github.com/apache/plc4x/plc4go/spi"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
spiValues "github.com/apache/plc4x/plc4go/spi/values"
- "github.com/rs/zerolog/log"
+ "github.com/rs/zerolog"
"strconv"
"strings"
@@ -277,7 +277,7 @@ func producePointToMultiPointCommandNormal(bridgeAddresses []readWriteModel.Brid
return readWriteModel.NewCBusCommandPointToMultiPoint(command, header, cbusOptions), nil
}
-func MapEncodedReply(transaction spi.RequestTransaction, encodedReply readWriteModel.EncodedReply, tagName string, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), addPlcValue func(name string, plcValue apiValues.PlcValue)) error {
+func MapEncodedReply(localLog zerolog.Logger, transaction transactions.RequestTransaction, encodedReply readWriteModel.EncodedReply, tagName string, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), addPlcValue func(name string, plcValue apiValues.PlcValue)) error {
switch reply := encodedReply.(type) {
case readWriteModel.EncodedReplyCALReplyExactly:
calData := reply.GetCalReply().GetCalData()
@@ -479,7 +479,7 @@ func MapEncodedReply(transaction spi.RequestTransaction, encodedReply readWriteM
default:
wbpcb := spiValues.NewWriteBufferPlcValueBased()
if err := calData.SerializeWithWriteBuffer(context.Background(), wbpcb); err != nil {
- log.Warn().Err(err).Msgf("Unmapped cal data type %T. Returning raw to string", calData)
+ localLog.Warn().Err(err).Msgf("Unmapped cal data type %T. Returning raw to string", calData)
addPlcValue(tagName, spiValues.NewPlcSTRING(fmt.Sprintf("%s", calData)))
} else {
addPlcValue(tagName, wbpcb.GetPlcValue())
diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go b/plc4go/internal/cbus/CBusMessageMapper_test.go
index 4da9984e32..1067150d48 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -24,7 +24,10 @@ import (
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
- "github.com/apache/plc4x/plc4go/spi"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/pool"
+ "github.com/apache/plc4x/plc4go/spi/testutils"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/apache/plc4x/plc4go/spi/utils"
spiValues "github.com/apache/plc4x/plc4go/spi/values"
"github.com/stretchr/testify/assert"
@@ -1436,7 +1439,7 @@ func Test_producePointToMultiPointCommandNormal(t *testing.T) {
func TestMapEncodedReply(t *testing.T) {
type args struct {
- transaction spi.RequestTransaction
+ transaction transactions.RequestTransaction
encodedReply readWriteModel.EncodedReply
tagName string
addResponseCode func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode)
@@ -1445,19 +1448,12 @@ func TestMapEncodedReply(t *testing.T) {
tests := []struct {
name string
args args
+ setup func(t *testing.T, args *args)
wantErr bool
}{
{
name: "empty input",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: nil,
tagName: "",
addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
@@ -1471,18 +1467,27 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ t.Logf("Submitting No-Op to transaction %v", transaction)
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataStatus",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
statusBytes := []readWriteModel.StatusByte{
readWriteModel.NewStatusByte(readWriteModel.GAVState_ON, readWriteModel.GAVState_ERROR, readWriteModel.GAVState_DOES_NOT_EXIST, readWriteModel.GAVState_OFF),
@@ -1533,18 +1538,31 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ t.Logf("Submitting No-Op to transaction %v", transaction)
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ t.Log("No op-ing")
+ })
+ t.Logf("Submitted to transaction %v", transaction)
+ args.transaction = transaction
+ },
},
{
name: "CALDataStatusExtendedExactly (binary)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
statusBytes := []readWriteModel.StatusByte{
readWriteModel.NewStatusByte(readWriteModel.GAVState_ON, readWriteModel.GAVState_ERROR, readWriteModel.GAVState_DOES_NOT_EXIST, readWriteModel.GAVState_OFF),
@@ -1595,18 +1613,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataStatusExtendedExactly (level)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
levelInformation := []readWriteModel.LevelInformation{
readWriteModel.NewLevelInformationNormal(readWriteModel.LevelInformationNibblePair_Value_A, readWriteModel.LevelInformationNibblePair_Value_F, 13),
@@ -1649,18 +1677,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (sense levels)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandCurrentSenseLevels([]byte{1, 2, 3, 4}, 4)
calDataIdentify := readWriteModel.NewCALDataIdentifyReply(readWriteModel.Attribute_CurrentSenseLevels, command, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, nil)
@@ -1693,18 +1731,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (delays)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandDelays([]byte{1, 2, 3, 4}, 5, 5)
calDataIdentify := readWriteModel.NewCALDataIdentifyReply(readWriteModel.Attribute_Delays, command, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, nil)
@@ -1740,18 +1788,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (dsi status)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandDSIStatus(
readWriteModel.ChannelStatus_OK,
@@ -1807,18 +1865,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (extended diagnostic summary)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandExtendedDiagnosticSummary(
readWriteModel.ApplicationIdContainer_LIGHTING_3B,
@@ -1892,18 +1960,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (summary)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandSummary("pineapple", 1, "13", 3)
calDataIdentify := readWriteModel.NewCALDataIdentifyReply(readWriteModel.Attribute_DSIStatus, command, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, nil)
@@ -1940,18 +2018,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (firmware version)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandFirmwareVersion("13", 1)
calDataIdentify := readWriteModel.NewCALDataIdentifyReply(readWriteModel.Attribute_DSIStatus, command, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, nil)
@@ -1984,18 +2072,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (GAV physical addresses)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandGAVPhysicalAddresses([]byte{1, 2, 3, 4}, 4)
calDataIdentify := readWriteModel.NewCALDataIdentifyReply(readWriteModel.Attribute_DSIStatus, command, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, nil)
@@ -2028,18 +2126,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (GAV values current)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandGAVValuesCurrent([]byte{1, 2, 3, 4}, 4)
calDataIdentify := readWriteModel.NewCALDataIdentifyReply(readWriteModel.Attribute_DSIStatus, command, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, nil)
@@ -2072,18 +2180,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (GAV values stored)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandGAVValuesStored([]byte{1, 2, 3, 4}, 4)
calDataIdentify := readWriteModel.NewCALDataIdentifyReply(readWriteModel.Attribute_DSIStatus, command, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, nil)
@@ -2116,18 +2234,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (logical assignment)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandLogicalAssignment([]readWriteModel.LogicAssignment{
readWriteModel.NewLogicAssignment(true, true, true, true, true, true),
@@ -2180,18 +2308,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (manufacturer)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandManufacturer("Apache", 13)
calDataIdentify := readWriteModel.NewCALDataIdentifyReply(readWriteModel.Attribute_DSIStatus, command, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, nil)
@@ -2224,18 +2362,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (maximum levels)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandMaximumLevels([]byte{1, 2, 3, 4}, 1)
calDataIdentify := readWriteModel.NewCALDataIdentifyReply(readWriteModel.Attribute_DSIStatus, command, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, nil)
@@ -2268,18 +2416,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (minimum levels)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandMinimumLevels([]byte{1, 2, 3, 4}, 1)
calDataIdentify := readWriteModel.NewCALDataIdentifyReply(readWriteModel.Attribute_DSIStatus, command, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, nil)
@@ -2312,18 +2470,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (network terminal levels)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandNetworkTerminalLevels([]byte{1, 2, 3, 4}, 1)
calDataIdentify := readWriteModel.NewCALDataIdentifyReply(readWriteModel.Attribute_DSIStatus, command, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, nil)
@@ -2356,18 +2524,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (network voltage)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandNetworkVoltage("13.3", "3", 3)
calDataIdentify := readWriteModel.NewCALDataIdentifyReply(readWriteModel.Attribute_DSIStatus, command, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, nil)
@@ -2400,18 +2578,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (output unit summary)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
gavStoreEnabledByte1 := byte(2)
gavStoreEnabledByte2 := byte(3)
@@ -2460,18 +2648,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (terminal levels)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandTerminalLevels([]byte{1, 2, 3, 4}, 4)
calDataIdentify := readWriteModel.NewCALDataIdentifyReply(readWriteModel.Attribute_DSIStatus, command, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, nil)
@@ -2504,18 +2702,28 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
{
name: "CALDataIdentifyReplyExactly (type)",
args: args{
- transaction: func() spi.RequestTransaction {
- transactionManager := spi.NewRequestTransactionManager(1)
- transaction := transactionManager.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
- // NO-OP
- })
- return transaction
- }(),
encodedReply: func() readWriteModel.EncodedReplyCALReply {
command := readWriteModel.NewIdentifyReplyCommandType("chonkers", 4)
calDataIdentify := readWriteModel.NewCALDataIdentifyReply(readWriteModel.Attribute_DSIStatus, command, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, nil)
@@ -2548,11 +2756,34 @@ func TestMapEncodedReply(t *testing.T) {
}
},
},
+ setup: func(t *testing.T, args *args) {
+ transactionManager := transactions.NewRequestTransactionManager(
+ 1,
+ options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
+ transactions.WithCustomExecutor(
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ ),
+ )
+ t.Cleanup(func() {
+ t.Log("Closing transaction manager now")
+ assert.NoError(t, transactionManager.Close())
+ })
+ transaction := transactionManager.StartTransaction()
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
+ // NO-OP
+ })
+ args.transaction = transaction
+ },
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if err := MapEncodedReply(tt.args.transaction, tt.args.encodedReply, tt.args.tagName, tt.args.addResponseCode(t), tt.args.addPlcValue(t)); (err != nil) != tt.wantErr {
+ testingLogger := testutils.ProduceTestingLogger(t)
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+ if tt.setup != nil {
+ tt.setup(t, &tt.args)
+ }
+ if err := MapEncodedReply(testingLogger, tt.args.transaction, tt.args.encodedReply, tt.args.tagName, tt.args.addResponseCode(t), tt.args.addPlcValue(t)); (err != nil) != tt.wantErr {
t.Errorf("MapEncodedReply() error = %v, wantErr %v", err, tt.wantErr)
}
})
diff --git a/plc4go/internal/cbus/Configuration.go b/plc4go/internal/cbus/Configuration.go
index 4ec41eaa78..5655c87473 100644
--- a/plc4go/internal/cbus/Configuration.go
+++ b/plc4go/internal/cbus/Configuration.go
@@ -20,11 +20,11 @@
package cbus
import (
+ "github.com/rs/zerolog"
"reflect"
"strconv"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Configuration struct {
@@ -43,13 +43,13 @@ type Configuration struct {
MonitoredApplication2 byte
}
-func ParseFromOptions(options map[string][]string) (Configuration, error) {
+func ParseFromOptions(log zerolog.Logger, options map[string][]string) (Configuration, error) {
configuration := createDefaultConfiguration()
reflectConfiguration := reflect.ValueOf(&configuration).Elem()
for i := 0; i < reflectConfiguration.NumField(); i++ {
field := reflectConfiguration.Type().Field(i)
key := field.Name
- if optionValue := getFromOptions(options, key); optionValue != "" {
+ if optionValue := getFromOptions(log, options, key); optionValue != "" {
switch field.Type.Kind() {
case reflect.Uint8:
parseUint, err := strconv.ParseUint(optionValue, 0, 8)
@@ -86,13 +86,13 @@ func createDefaultConfiguration() Configuration {
}
}
-func getFromOptions(options map[string][]string, key string) string {
+func getFromOptions(localLog zerolog.Logger, options map[string][]string, key string) string {
if optionValues, ok := options[key]; ok {
if len(optionValues) <= 0 {
return ""
}
if len(optionValues) > 1 {
- log.Warn().Msgf("Options %s must be unique", key)
+ localLog.Warn().Msgf("Options %s must be unique", key)
}
return optionValues[0]
}
diff --git a/plc4go/internal/cbus/Configuration_test.go b/plc4go/internal/cbus/Configuration_test.go
index b31b0fc176..733d56ce27 100644
--- a/plc4go/internal/cbus/Configuration_test.go
+++ b/plc4go/internal/cbus/Configuration_test.go
@@ -21,6 +21,7 @@ package cbus
import (
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/stretchr/testify/assert"
"testing"
)
@@ -131,7 +132,7 @@ func TestParseFromOptions(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- got, err := ParseFromOptions(tt.args.options)
+ got, err := ParseFromOptions(testutils.ProduceTestingLogger(t), tt.args.options)
if !tt.wantErr(t, err, fmt.Sprintf("ParseFromOptions(%v)", tt.args.options)) {
return
}
@@ -196,7 +197,7 @@ func Test_getFromOptions(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- assert.Equalf(t, tt.want, getFromOptions(tt.args.options, tt.args.key), "getFromOptions(%v, %v)", tt.args.options, tt.args.key)
+ assert.Equalf(t, tt.want, getFromOptions(testutils.ProduceTestingLogger(t), tt.args.options, tt.args.key), "getFromOptions(%v, %v)", tt.args.options, tt.args.key)
})
}
}
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index a4935e6c88..858f32ec79 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -22,6 +22,9 @@ package cbus
import (
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
+ "github.com/rs/zerolog"
"sync"
"time"
@@ -33,7 +36,6 @@ import (
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type AlphaGenerator struct {
@@ -58,16 +60,18 @@ type Connection struct {
alphaGenerator AlphaGenerator
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+
+ log zerolog.Logger
}
-func NewConnection(messageCodec *MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, tm spi.RequestTransactionManager, options map[string][]string) *Connection {
+func NewConnection(messageCodec *MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, connectionOptions map[string][]string, _options ...options.WithOption) *Connection {
connection := &Connection{
alphaGenerator: AlphaGenerator{currentAlpha: 'g'},
messageCodec: messageCodec,
@@ -75,14 +79,17 @@ func NewConnection(messageCodec *MessageCodec, configuration Configuration, driv
driverContext: driverContext,
tm: tm,
}
- if traceEnabledOption, ok := options["traceEnabled"]; ok {
+ if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
connection.tracer = spi.NewTracer(connection.connectionId)
}
}
- connection.DefaultConnection = _default.NewDefaultConnection(connection,
- _default.WithPlcTagHandler(tagHandler),
- _default.WithPlcValueHandler(NewValueHandler()),
+ connection.DefaultConnection = _default.NewDefaultConnection(
+ connection,
+ append(_options,
+ _default.WithPlcTagHandler(tagHandler),
+ _default.WithPlcValueHandler(NewValueHandler()),
+ )...,
)
return connection
}
@@ -108,7 +115,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
}
func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
- log.Trace().Msg("Connecting")
+ c.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
defer func() {
@@ -124,7 +131,7 @@ func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
// For testing purposes we can skip the waiting for a complete connection
if !c.driverContext.awaitSetupComplete {
go c.setupConnection(ctx, ch)
- log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
+ c.log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
// Here we write directly and don't wait till the connection is "really" connected
// Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete
ch <- _default.NewDefaultPlcConnectionConnectResult(c, nil)
@@ -147,7 +154,7 @@ func (c *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
}
func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
- return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.alphaGenerator, c.messageCodec, c.tm))
+ return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.alphaGenerator, c.messageCodec, c.tm, options.WithCustomLogger(c.log)))
}
func (c *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
@@ -155,7 +162,7 @@ func (c *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
}
func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
- return spiModel.NewDefaultPlcSubscriptionRequestBuilder(c.GetPlcTagHandler(), c.GetPlcValueHandler(), NewSubscriber(c))
+ return spiModel.NewDefaultPlcSubscriptionRequestBuilder(c.GetPlcTagHandler(), c.GetPlcValueHandler(), NewSubscriber(c, options.WithCustomLogger(c.log)))
}
func (c *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder {
@@ -164,13 +171,13 @@ func (c *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRe
}
func (c *Connection) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder {
- return spiModel.NewDefaultPlcBrowseRequestBuilder(c.GetPlcTagHandler(), NewBrowser(c))
+ return spiModel.NewDefaultPlcBrowseRequestBuilder(c.GetPlcTagHandler(), NewBrowser(c, options.WithCustomLogger(c.log)))
}
func (c *Connection) addSubscriber(subscriber *Subscriber) {
for _, sub := range c.subscribers {
if sub == subscriber {
- log.Debug().Msgf("Subscriber %v already added", subscriber)
+ c.log.Debug().Msgf("Subscriber %v already added", subscriber)
return
}
}
@@ -186,27 +193,27 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
requestContext := &c.messageCodec.requestContext
if !c.sendReset(ctx, ch, cbusOptions, requestContext, false) {
- log.Warn().Msg("First reset failed")
+ c.log.Warn().Msg("First reset failed")
// We try a second reset in case we get a power up
if !c.sendReset(ctx, ch, cbusOptions, requestContext, true) {
- log.Trace().Msg("Reset failed")
+ c.log.Trace().Msg("Reset failed")
return
}
}
if !c.setApplicationFilter(ctx, ch, requestContext, cbusOptions) {
- log.Trace().Msg("Set application filter failed")
+ c.log.Trace().Msg("Set application filter failed")
return
}
if !c.setInterfaceOptions3(ctx, ch, requestContext, cbusOptions) {
- log.Trace().Msg("Set interface options 3 failed")
+ c.log.Trace().Msg("Set interface options 3 failed")
return
}
if !c.setInterface1PowerUpSettings(ctx, ch, requestContext, cbusOptions) {
- log.Trace().Msg("Set interface options 1 power up settings failed")
+ c.log.Trace().Msg("Set interface options 1 power up settings failed")
return
}
if !c.setInterfaceOptions1(ctx, ch, requestContext, cbusOptions) {
- log.Trace().Msg("Set interface options 1 failed")
+ c.log.Trace().Msg("Set interface options 1 failed")
return
}
c.fireConnected(ch)
@@ -214,50 +221,50 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}
func (c *Connection) startSubscriptionHandler() {
- log.Debug().Msg("Starting SAL handler")
+ c.log.Debug().Msg("Starting SAL handler")
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ c.log.Error().Msgf("panic-ed %v", err)
}
}()
- log.Debug().Msg("SAL handler stated")
+ c.log.Debug().Msg("SAL handler stated")
for c.IsConnected() {
for monitoredSal := range c.messageCodec.monitoredSALs {
for _, subscriber := range c.subscribers {
if ok := subscriber.handleMonitoredSAL(monitoredSal); ok {
- log.Debug().Msgf("%v handled\n%s", subscriber, monitoredSal)
+ c.log.Debug().Msgf("%v handled\n%s", subscriber, monitoredSal)
continue
}
}
}
}
- log.Info().Msg("Ending SAL handler")
+ c.log.Info().Msg("Ending SAL handler")
}()
- log.Debug().Msg("Starting MMI handler")
+ c.log.Debug().Msg("Starting MMI handler")
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ c.log.Error().Msgf("panic-ed %v", err)
}
}()
- log.Debug().Msg("default MMI started")
+ c.log.Debug().Msg("default MMI started")
for c.IsConnected() {
for calReply := range c.messageCodec.monitoredMMIs {
for _, subscriber := range c.subscribers {
if ok := subscriber.handleMonitoredMMI(calReply); ok {
- log.Debug().Msgf("%v handled\n%s", subscriber, calReply)
+ c.log.Debug().Msgf("%v handled\n%s", subscriber, calReply)
continue
}
}
}
}
- log.Info().Msg("Ending MMI handler")
+ c.log.Info().Msg("Ending MMI handler")
}()
}
func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, cbusOptions *readWriteModel.CBusOptions, requestContext *readWriteModel.RequestContext, sendOutErrorNotification bool) (ok bool) {
- log.Debug().Msgf("Send a reset (sendOutErrorNotification: %t)", sendOutErrorNotification)
+ c.log.Debug().Msgf("Send a reset (sendOutErrorNotification: %t)", sendOutErrorNotification)
requestTypeReset := readWriteModel.RequestType_RESET
requestReset := readWriteModel.NewRequestReset(requestTypeReset, &requestTypeReset, requestTypeReset, &requestTypeReset, requestTypeReset, nil, &requestTypeReset, requestTypeReset, readWriteModel.NewRequestTermination(), *cbusOptions)
cBusMessage := readWriteModel.NewCBusMessageToServer(requestReset, *requestContext, *cbusOptions)
@@ -304,7 +311,7 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
if sendOutErrorNotification {
c.fireConnectionError(errors.Wrap(err, "Error during sending of Reset Request"), ch)
} else {
- log.Warn().Err(err).Msg("connect failed")
+ c.log.Warn().Err(err).Msg("connect failed")
}
return false
}
@@ -314,28 +321,28 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
defer utils.CleanupTimer(timeout)
select {
case <-receivedResetEchoChan:
- log.Debug().Msgf("We received the echo")
+ c.log.Debug().Msgf("We received the echo")
case err := <-receivedResetEchoErrorChan:
if sendOutErrorNotification {
c.fireConnectionError(errors.Wrap(err, "Error receiving of Reset"), ch)
} else {
- log.Trace().Err(err).Msg("connect failed")
+ c.log.Trace().Err(err).Msg("connect failed")
}
return false
case timeout := <-timeout.C:
if sendOutErrorNotification {
c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
} else {
- log.Trace().Msgf("Timeout after %v", timeout.Sub(startTime))
+ c.log.Trace().Msgf("Timeout after %v", timeout.Sub(startTime))
}
return false
}
- log.Debug().Msg("Reset done")
+ c.log.Debug().Msg("Reset done")
return true
}
func (c *Connection) setApplicationFilter(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) (ok bool) {
- log.Debug().Msg("Set application filter to all")
+ c.log.Debug().Msg("Set application filter to all")
applicationAddress1 := readWriteModel.NewParameterValueApplicationAddress1(readWriteModel.NewApplicationAddress1(c.configuration.MonitoredApplication1), nil, 1)
if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_APPLICATION_ADDRESS_1, applicationAddress1, requestContext, cbusOptions) {
return false
@@ -344,43 +351,43 @@ func (c *Connection) setApplicationFilter(ctx context.Context, ch chan plc4go.Pl
if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_APPLICATION_ADDRESS_2, applicationAddress2, requestContext, cbusOptions) {
return false
}
- log.Debug().Msg("Application filter set")
+ c.log.Debug().Msg("Application filter set")
return true
}
func (c *Connection) setInterfaceOptions3(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) (ok bool) {
- log.Debug().Msg("Set interface options 3")
+ c.log.Debug().Msg("Set interface options 3")
interfaceOptions3 := readWriteModel.NewParameterValueInterfaceOptions3(readWriteModel.NewInterfaceOptions3(c.configuration.Exstat, c.configuration.Pun, c.configuration.LocalSal, c.configuration.Pcn), nil, 1)
if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_INTERFACE_OPTIONS_3, interfaceOptions3, requestContext, cbusOptions) {
return false
}
// TODO: add localsal to the options
*cbusOptions = readWriteModel.NewCBusOptions(false, false, false, c.configuration.Exstat, false, false, c.configuration.Pun, c.configuration.Pcn, false)
- log.Debug().Msg("Interface options 3 set")
+ c.log.Debug().Msg("Interface options 3 set")
return true
}
func (c *Connection) setInterface1PowerUpSettings(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) (ok bool) {
- log.Debug().Msg("Set interface options 1 power up settings")
+ c.log.Debug().Msg("Set interface options 1 power up settings")
interfaceOptions1PowerUpSettings := readWriteModel.NewParameterValueInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1(c.configuration.Idmon, c.configuration.Monitor, c.configuration.Smart, c.configuration.Srchk, c.configuration.XonXoff, c.configuration.Connect)), 1)
if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1_POWER_UP_SETTINGS, interfaceOptions1PowerUpSettings, requestContext, cbusOptions) {
return false
}
// TODO: what is with monall
*cbusOptions = readWriteModel.NewCBusOptions(c.configuration.Connect, c.configuration.Smart, c.configuration.Idmon, c.configuration.Exstat, c.configuration.Monitor, false, c.configuration.Pun, c.configuration.Pcn, c.configuration.Srchk)
- log.Debug().Msg("Interface options 1 power up settings set")
+ c.log.Debug().Msg("Interface options 1 power up settings set")
return true
}
func (c *Connection) setInterfaceOptions1(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) bool {
- log.Debug().Msg("Set interface options 1")
+ c.log.Debug().Msg("Set interface options 1")
interfaceOptions1 := readWriteModel.NewParameterValueInterfaceOptions1(readWriteModel.NewInterfaceOptions1(c.configuration.Idmon, c.configuration.Monitor, c.configuration.Smart, c.configuration.Srchk, c.configuration.XonXoff, c.configuration.Connect), nil, 1)
if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1, interfaceOptions1, requestContext, cbusOptions) {
return false
}
// TODO: what is with monall
*cbusOptions = readWriteModel.NewCBusOptions(c.configuration.Connect, c.configuration.Smart, c.configuration.Idmon, c.configuration.Exstat, c.configuration.Monitor, false, c.configuration.Pun, c.configuration.Pcn, c.configuration.Srchk)
- log.Debug().Msg("Interface options 1 set")
+ c.log.Debug().Msg("Interface options 1 set")
return true
}
@@ -452,7 +459,7 @@ func (c *Connection) sendCalDataWrite(ctx context.Context, ch chan plc4go.PlcCon
defer utils.CleanupTimer(timeout)
select {
case <-directCommandAckChan:
- log.Debug().Msgf("We received the ack")
+ c.log.Debug().Msgf("We received the ack")
case err := <-directCommandAckErrorChan:
c.fireConnectionError(errors.Wrap(err, "Error receiving of ack"), ch)
return false
@@ -467,10 +474,10 @@ func (c *Connection) fireConnectionError(err error, ch chan<- plc4go.PlcConnecti
if c.driverContext.awaitSetupComplete {
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Error during connection"))
} else {
- log.Error().Err(err).Msg("awaitSetupComplete set to false and we got a error during connect")
+ c.log.Error().Err(err).Msg("awaitSetupComplete set to false and we got a error during connect")
}
if err := c.messageCodec.Disconnect(); err != nil {
- log.Debug().Err(err).Msg("Error disconnecting message codec on connection error")
+ c.log.Debug().Err(err).Msg("Error disconnecting message codec on connection error")
}
}
@@ -478,7 +485,7 @@ func (c *Connection) fireConnected(ch chan<- plc4go.PlcConnectionConnectResult)
if c.driverContext.awaitSetupComplete {
ch <- _default.NewDefaultPlcConnectionConnectResult(c, nil)
} else {
- log.Info().Msg("Successfully connected")
+ c.log.Info().Msg("Successfully connected")
}
c.SetConnected(true)
}
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index daff836d99..341a2a275c 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -26,8 +26,12 @@ import (
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/testutils"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/test"
+ "github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"net/url"
"sync/atomic"
@@ -84,11 +88,12 @@ func TestConnection_BrowseRequestBuilder(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
tests := []struct {
name string
@@ -116,6 +121,7 @@ func TestConnection_BrowseRequestBuilder(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.True(t, tt.wantAssert(t, c.BrowseRequestBuilder()), "BrowseRequestBuilder()")
})
@@ -127,11 +133,12 @@ func TestConnection_ConnectWithContext(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
type args struct {
ctx context.Context
@@ -140,23 +147,12 @@ func TestConnection_ConnectWithContext(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields)
wantAsserter func(*testing.T, <-chan plc4go.PlcConnectionConnectResult) bool
}{
{
name: "just connect and fail",
fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
- messageCodec: NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }()),
- subscribers: nil,
- tm: nil,
configuration: Configuration{
Srchk: false,
Exstat: false,
@@ -179,6 +175,26 @@ func TestConnection_ConnectWithContext(t *testing.T) {
tracer: nil,
},
args: args{ctx: context.Background()},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Build the default connection
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+ fields.messageCodec = NewMessageCodec(func() transports.TransportInstance {
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ return ti
+ }(), loggerOption)
+ },
wantAsserter: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool {
assert.NotNil(t, results)
result := <-results
@@ -191,6 +207,9 @@ func TestConnection_ConnectWithContext(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
c := &Connection{
DefaultConnection: tt.fields.DefaultConnection,
messageCodec: tt.fields.messageCodec,
@@ -200,6 +219,7 @@ func TestConnection_ConnectWithContext(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.True(t, tt.wantAsserter(t, c.ConnectWithContext(tt.args.ctx)), "ConnectWithContext(%v)", tt.args.ctx)
})
@@ -211,11 +231,12 @@ func TestConnection_GetConnection(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
tests := []struct {
name string
@@ -240,6 +261,7 @@ func TestConnection_GetConnection(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Truef(t, tt.wantAsserter(t, c.GetConnection()), "GetConnection()")
})
@@ -251,11 +273,12 @@ func TestConnection_GetConnectionId(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
tests := []struct {
name string
@@ -277,6 +300,7 @@ func TestConnection_GetConnectionId(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Equalf(t, tt.want, c.GetConnectionId(), "GetConnectionId()")
})
@@ -288,11 +312,12 @@ func TestConnection_GetMessageCodec(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
tests := []struct {
name string
@@ -318,6 +343,7 @@ func TestConnection_GetMessageCodec(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Equalf(t, tt.want, c.GetMessageCodec(), "GetMessageCodec()")
})
@@ -329,11 +355,12 @@ func TestConnection_GetMetadata(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
tests := []struct {
name string
@@ -362,6 +389,7 @@ func TestConnection_GetMetadata(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Equalf(t, tt.want, c.GetMetadata(), "GetMetadata()")
})
@@ -373,11 +401,12 @@ func TestConnection_GetTracer(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
tests := []struct {
name string
@@ -399,6 +428,7 @@ func TestConnection_GetTracer(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Equalf(t, tt.want, c.GetTracer(), "GetTracer()")
})
@@ -410,11 +440,12 @@ func TestConnection_IsTraceEnabled(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
tests := []struct {
name string
@@ -436,6 +467,7 @@ func TestConnection_IsTraceEnabled(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Equalf(t, tt.want, c.IsTraceEnabled(), "IsTraceEnabled()")
})
@@ -447,11 +479,12 @@ func TestConnection_ReadRequestBuilder(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
tests := []struct {
name string
@@ -479,6 +512,7 @@ func TestConnection_ReadRequestBuilder(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Truef(t, tt.wantAssert(t, c.ReadRequestBuilder()), "ReadRequestBuilder()")
})
@@ -490,11 +524,12 @@ func TestConnection_String(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
tests := []struct {
name string
@@ -517,6 +552,7 @@ func TestConnection_String(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Equalf(t, tt.want, c.String(), "String()")
})
@@ -528,11 +564,12 @@ func TestConnection_SubscriptionRequestBuilder(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
tests := []struct {
name string
@@ -560,6 +597,7 @@ func TestConnection_SubscriptionRequestBuilder(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Truef(t, tt.wantAssert(t, c.SubscriptionRequestBuilder()), "SubscriptionRequestBuilder()")
})
@@ -571,11 +609,12 @@ func TestConnection_UnsubscriptionRequestBuilder(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
tests := []struct {
name string
@@ -597,6 +636,7 @@ func TestConnection_UnsubscriptionRequestBuilder(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Equalf(t, tt.want, c.UnsubscriptionRequestBuilder(), "UnsubscriptionRequestBuilder()")
})
@@ -608,11 +648,12 @@ func TestConnection_WriteRequestBuilder(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
tests := []struct {
name string
@@ -640,6 +681,7 @@ func TestConnection_WriteRequestBuilder(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Truef(t, tt.wantAssert(t, c.WriteRequestBuilder()), "WriteRequestBuilder()")
})
@@ -651,11 +693,12 @@ func TestConnection_addSubscriber(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
type args struct {
subscriber *Subscriber
@@ -694,6 +737,7 @@ func TestConnection_addSubscriber(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
c.addSubscriber(tt.args.subscriber)
assert.Truef(t, tt.subElevator(t, c.subscribers), "addSubscriber(%v)", tt.args.subscriber)
@@ -706,11 +750,12 @@ func TestConnection_fireConnected(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
type args struct {
ch chan<- plc4go.PlcConnectionConnectResult
@@ -756,6 +801,7 @@ func TestConnection_fireConnected(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
c.fireConnected(tt.args.ch)
assert.True(t, tt.chanValidator(t, tt.args.ch))
@@ -768,11 +814,12 @@ func TestConnection_fireConnectionError(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
type args struct {
err error
@@ -837,6 +884,7 @@ func TestConnection_fireConnectionError(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
c.fireConnectionError(tt.args.err, tt.args.ch)
assert.True(t, tt.chanValidator(t, tt.args.ch))
@@ -849,11 +897,12 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
type args struct {
ctx context.Context
@@ -867,22 +916,11 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields)
want bool
}{
{
name: "send something",
- fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
- messageCodec: NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }()),
- },
args: args{
ctx: context.Background(),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -897,11 +935,35 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
return &cBusOptions
}(),
},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+ fields.messageCodec = NewMessageCodec(func() transports.TransportInstance {
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ return ti
+ }())
+ },
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
c := &Connection{
DefaultConnection: tt.fields.DefaultConnection,
messageCodec: tt.fields.messageCodec,
@@ -911,6 +973,7 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Equalf(t, tt.want, c.sendCalDataWrite(tt.args.ctx, tt.args.ch, tt.args.paramNo, tt.args.parameterValue, tt.args.requestContext, tt.args.cbusOptions), "sendCalDataWrite(%v, %v, %v, %v, %v, %v)", tt.args.ctx, tt.args.ch, tt.args.paramNo, tt.args.parameterValue, tt.args.requestContext, tt.args.cbusOptions)
})
@@ -922,11 +985,12 @@ func TestConnection_sendReset(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
type args struct {
ctx context.Context
@@ -982,6 +1046,7 @@ func TestConnection_sendReset(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Equalf(t, tt.wantOk, c.sendReset(tt.args.ctx, tt.args.ch, tt.args.cbusOptions, tt.args.requestContext, tt.args.sendOutErrorNotification), "sendReset(%v, %v, %v, %v, %v)", tt.args.ctx, tt.args.ch, tt.args.cbusOptions, tt.args.requestContext, tt.args.sendOutErrorNotification)
})
@@ -993,11 +1058,12 @@ func TestConnection_setApplicationFilter(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
type args struct {
ctx context.Context
@@ -1009,22 +1075,11 @@ func TestConnection_setApplicationFilter(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields)
wantOk bool
}{
{
name: "set application filter (failing)",
- fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
- messageCodec: NewMessageCodec(func() transports.TransportInstance {
- transport := test.NewTransport()
- ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
- return ti
- }()),
- },
args: args{
ctx: context.Background(),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1037,11 +1092,36 @@ func TestConnection_setApplicationFilter(t *testing.T) {
return &requestContext
}(),
},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Setup connection
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+ fields.messageCodec = NewMessageCodec(func() transports.TransportInstance {
+ transport := test.NewTransport(loggerOption)
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ return ti
+ }(), loggerOption)
+ },
wantOk: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
c := &Connection{
DefaultConnection: tt.fields.DefaultConnection,
messageCodec: tt.fields.messageCodec,
@@ -1051,6 +1131,7 @@ func TestConnection_setApplicationFilter(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Equalf(t, tt.wantOk, c.setApplicationFilter(tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions), "setApplicationFilter(%v, %v, %v, %v)", tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions)
})
@@ -1062,11 +1143,12 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
type args struct {
ctx context.Context
@@ -1078,6 +1160,7 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields, args *args)
wantOk bool
}{
{
@@ -1106,11 +1189,17 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) {
return &requestContext
}(),
},
+ setup: func(t *testing.T, fields *fields, args *args) {
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+ },
wantOk: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields, &tt.args)
+ }
c := &Connection{
DefaultConnection: tt.fields.DefaultConnection,
messageCodec: tt.fields.messageCodec,
@@ -1120,6 +1209,7 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Equalf(t, tt.wantOk, c.setInterface1PowerUpSettings(tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions), "setInterface1PowerUpSettings(%v, %v, %v, %v)", tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions)
})
@@ -1131,11 +1221,12 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
type args struct {
ctx context.Context
@@ -1147,6 +1238,7 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields)
want bool
}{
{
@@ -1175,11 +1267,17 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
return &requestContext
}(),
},
+ setup: func(t *testing.T, fields *fields) {
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+ },
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
c := &Connection{
DefaultConnection: tt.fields.DefaultConnection,
messageCodec: tt.fields.messageCodec,
@@ -1189,6 +1287,7 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Equalf(t, tt.want, c.setInterfaceOptions1(tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions), "setInterfaceOptions1(%v, %v, %v, %v)", tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions)
})
@@ -1200,11 +1299,12 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
type args struct {
ctx context.Context
@@ -1216,6 +1316,7 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields)
wantOk bool
}{
{
@@ -1244,11 +1345,17 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
return &requestContext
}(),
},
+ setup: func(t *testing.T, fields *fields) {
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+ },
wantOk: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
c := &Connection{
DefaultConnection: tt.fields.DefaultConnection,
messageCodec: tt.fields.messageCodec,
@@ -1258,6 +1365,7 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
assert.Equalf(t, tt.wantOk, c.setInterfaceOptions3(tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions), "setInterfaceOptions3(%v, %v, %v, %v)", tt.args.ctx, tt.args.ch, tt.args.requestContext, tt.args.cbusOptions)
})
@@ -1269,11 +1377,12 @@ func TestConnection_setupConnection(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
type args struct {
ctx context.Context
@@ -1283,12 +1392,29 @@ func TestConnection_setupConnection(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields)
}{
{
name: "setup connection (failing)",
- fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
- messageCodec: NewMessageCodec(func() transports.TransportInstance {
+ args: args{
+ ctx: context.Background(),
+ ch: make(chan plc4go.PlcConnectionConnectResult, 1),
+ },
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Build the default connection
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+
+ // Build the message codec
+ fields.messageCodec = NewMessageCodec(func() transports.TransportInstance {
transport := test.NewTransport()
ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
if err != nil {
@@ -1296,246 +1422,277 @@ func TestConnection_setupConnection(t *testing.T) {
t.FailNow()
}
return ti
- }()),
- },
- args: args{
- ctx: context.Background(),
- ch: make(chan plc4go.PlcConnectionConnectResult, 1),
+ }(), loggerOption)
},
},
{
name: "setup connection (failing after reset)",
- fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
- messageCodec: func() *MessageCodec {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- type MockState uint8
- const (
- RESET MockState = iota
- DONE
- )
- currentState := atomic.Value{}
- currentState.Store(RESET)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
- switch currentState.Load().(MockState) {
- case RESET:
- t.Log("Dispatching reset echo")
- transportInstance.FillReadBuffer([]byte("~~~\r"))
- currentState.Store(DONE)
- case DONE:
- t.Log("Done")
- }
- })
- codec := NewMessageCodec(transportInstance)
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return codec
- }(),
- },
args: args{
ctx: context.Background(),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Build the default connection
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+
+ // Build the message codec
+ transport := test.NewTransport(loggerOption)
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ type MockState uint8
+ const (
+ RESET MockState = iota
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(RESET)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case RESET:
+ t.Log("Dispatching reset echo")
+ transportInstance.FillReadBuffer([]byte("~~~\r"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+
+ fields.messageCodec = codec
+ },
},
{
name: "setup connection (failing after app filters)",
- fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
- messageCodec: func() *MessageCodec {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- type MockState uint8
- const (
- RESET MockState = iota
- APPLICATION_FILTER_1
- APPLICATION_FILTER_2
- DONE
- )
- currentState := atomic.Value{}
- currentState.Store(RESET)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
- switch currentState.Load().(MockState) {
- case RESET:
- t.Log("Dispatching reset echo")
- transportInstance.FillReadBuffer([]byte("~~~\r"))
- currentState.Store(APPLICATION_FILTER_1)
- case APPLICATION_FILTER_1:
- t.Log("Dispatching app1 echo and confirm")
- transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
- transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
- currentState.Store(APPLICATION_FILTER_2)
- case APPLICATION_FILTER_2:
- t.Log("Dispatching app2 echo and confirm")
- transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
- transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
- currentState.Store(DONE)
- case DONE:
- t.Log("Done")
- }
- })
- codec := NewMessageCodec(transportInstance)
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return codec
- }(),
- },
args: args{
ctx: context.Background(),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Build the default connection
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+
+ // Build the message codec
+ transport := test.NewTransport(loggerOption)
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ type MockState uint8
+ const (
+ RESET MockState = iota
+ APPLICATION_FILTER_1
+ APPLICATION_FILTER_2
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(RESET)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case RESET:
+ t.Log("Dispatching reset echo")
+ transportInstance.FillReadBuffer([]byte("~~~\r"))
+ currentState.Store(APPLICATION_FILTER_1)
+ case APPLICATION_FILTER_1:
+ t.Log("Dispatching app1 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
+ transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
+ currentState.Store(APPLICATION_FILTER_2)
+ case APPLICATION_FILTER_2:
+ t.Log("Dispatching app2 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
+ transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+
+ fields.messageCodec = codec
+ },
},
{
name: "setup connection (failing after interface options 3",
- fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
- messageCodec: func() *MessageCodec {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- type MockState uint8
- const (
- RESET MockState = iota
- APPLICATION_FILTER_1
- APPLICATION_FILTER_2
- INTERFACE_OPTIONS_3
- DONE
- )
- currentState := atomic.Value{}
- currentState.Store(RESET)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
- switch currentState.Load().(MockState) {
- case RESET:
- t.Log("Dispatching reset echo")
- transportInstance.FillReadBuffer([]byte("~~~\r"))
- currentState.Store(APPLICATION_FILTER_1)
- case APPLICATION_FILTER_1:
- t.Log("Dispatching app1 echo and confirm")
- transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
- transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
- currentState.Store(APPLICATION_FILTER_2)
- case APPLICATION_FILTER_2:
- t.Log("Dispatching app2 echo and confirm")
- transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
- transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
- currentState.Store(INTERFACE_OPTIONS_3)
- case INTERFACE_OPTIONS_3:
- t.Log("Dispatching interface 3 echo and confirm")
- transportInstance.FillReadBuffer([]byte("@A342000A\r"))
- transportInstance.FillReadBuffer([]byte("3242008C\r\n"))
- currentState.Store(DONE)
- case DONE:
- t.Log("Done")
- }
- })
- codec := NewMessageCodec(transportInstance)
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return codec
- }(),
- },
args: args{
ctx: context.Background(),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Build the default connection
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+
+ // Build the message codec
+ transport := test.NewTransport(loggerOption)
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ type MockState uint8
+ const (
+ RESET MockState = iota
+ APPLICATION_FILTER_1
+ APPLICATION_FILTER_2
+ INTERFACE_OPTIONS_3
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(RESET)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case RESET:
+ t.Log("Dispatching reset echo")
+ transportInstance.FillReadBuffer([]byte("~~~\r"))
+ currentState.Store(APPLICATION_FILTER_1)
+ case APPLICATION_FILTER_1:
+ t.Log("Dispatching app1 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
+ transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
+ currentState.Store(APPLICATION_FILTER_2)
+ case APPLICATION_FILTER_2:
+ t.Log("Dispatching app2 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
+ transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
+ currentState.Store(INTERFACE_OPTIONS_3)
+ case INTERFACE_OPTIONS_3:
+ t.Log("Dispatching interface 3 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A342000A\r"))
+ transportInstance.FillReadBuffer([]byte("3242008C\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+
+ fields.messageCodec = codec
+ },
},
{
name: "setup connection (failing after interface options 1 pun)",
- fields: fields{
- DefaultConnection: _default.NewDefaultConnection(nil),
- messageCodec: func() *MessageCodec {
- transport := test.NewTransport()
- transportUrl := url.URL{Scheme: "test"}
- transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- type MockState uint8
- const (
- RESET MockState = iota
- APPLICATION_FILTER_1
- APPLICATION_FILTER_2
- INTERFACE_OPTIONS_3
- INTERFACE_OPTIONS_1_PUN
- DONE
- )
- currentState := atomic.Value{}
- currentState.Store(RESET)
- transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
- switch currentState.Load().(MockState) {
- case RESET:
- t.Log("Dispatching reset echo")
- transportInstance.FillReadBuffer([]byte("~~~\r"))
- currentState.Store(APPLICATION_FILTER_1)
- case APPLICATION_FILTER_1:
- t.Log("Dispatching app1 echo and confirm")
- transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
- transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
- currentState.Store(APPLICATION_FILTER_2)
- case APPLICATION_FILTER_2:
- t.Log("Dispatching app2 echo and confirm")
- transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
- transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
- currentState.Store(INTERFACE_OPTIONS_3)
- case INTERFACE_OPTIONS_3:
- t.Log("Dispatching interface 3 echo and confirm")
- transportInstance.FillReadBuffer([]byte("@A342000A\r"))
- transportInstance.FillReadBuffer([]byte("3242008C\r\n"))
- currentState.Store(INTERFACE_OPTIONS_1_PUN)
- case INTERFACE_OPTIONS_1_PUN:
- t.Log("Dispatching interface 1 PUN echo and confirm???")
- transportInstance.FillReadBuffer([]byte("@A3410079\r"))
- transportInstance.FillReadBuffer([]byte("3241008D\r\n"))
- currentState.Store(DONE)
- case DONE:
- t.Log("Done")
- }
- })
- codec := NewMessageCodec(transportInstance)
- err = codec.Connect()
- if err != nil {
- t.Error(err)
- t.FailNow()
- return nil
- }
- return codec
- }(),
- },
args: args{
ctx: context.Background(),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Build the default connection
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+
+ // Build the message codec
+ transport := test.NewTransport(loggerOption)
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ type MockState uint8
+ const (
+ RESET MockState = iota
+ APPLICATION_FILTER_1
+ APPLICATION_FILTER_2
+ INTERFACE_OPTIONS_3
+ INTERFACE_OPTIONS_1_PUN
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(RESET)
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ switch currentState.Load().(MockState) {
+ case RESET:
+ t.Log("Dispatching reset echo")
+ transportInstance.FillReadBuffer([]byte("~~~\r"))
+ currentState.Store(APPLICATION_FILTER_1)
+ case APPLICATION_FILTER_1:
+ t.Log("Dispatching app1 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
+ transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
+ currentState.Store(APPLICATION_FILTER_2)
+ case APPLICATION_FILTER_2:
+ t.Log("Dispatching app2 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
+ transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
+ currentState.Store(INTERFACE_OPTIONS_3)
+ case INTERFACE_OPTIONS_3:
+ t.Log("Dispatching interface 3 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A342000A\r"))
+ transportInstance.FillReadBuffer([]byte("3242008C\r\n"))
+ currentState.Store(INTERFACE_OPTIONS_1_PUN)
+ case INTERFACE_OPTIONS_1_PUN:
+ t.Log("Dispatching interface 1 PUN echo and confirm???")
+ transportInstance.FillReadBuffer([]byte("@A3410079\r"))
+ transportInstance.FillReadBuffer([]byte("3241008D\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Done")
+ }
+ })
+ codec := NewMessageCodec(transportInstance, loggerOption)
+ err = codec.Connect()
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ fields.messageCodec = codec
+ },
},
{
name: "setup connection",
@@ -1611,10 +1768,37 @@ func TestConnection_setupConnection(t *testing.T) {
ctx: context.Background(),
ch: make(chan plc4go.PlcConnectionConnectResult, 1),
},
+ setup: func(t *testing.T, fields *fields) {
+ // Setup logger
+ logger := testutils.ProduceTestingLogger(t)
+ fields.log = logger
+
+ testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
+
+ // Custom option for that
+ loggerOption := options.WithCustomLogger(logger)
+
+ // Build the default connection
+ fields.DefaultConnection = _default.NewDefaultConnection(nil, loggerOption)
+
+ // Build the message codec
+ fields.messageCodec = NewMessageCodec(func() transports.TransportInstance {
+ transport := test.NewTransport()
+ ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil)
+ if err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ return ti
+ }(), loggerOption)
+ },
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
c := &Connection{
DefaultConnection: tt.fields.DefaultConnection,
messageCodec: tt.fields.messageCodec,
@@ -1624,6 +1808,7 @@ func TestConnection_setupConnection(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
c.setupConnection(tt.args.ctx, tt.args.ch)
})
@@ -1635,15 +1820,17 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
DefaultConnection _default.DefaultConnection
messageCodec *MessageCodec
subscribers []*Subscriber
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
driverContext DriverContext
connectionId string
tracer *spi.Tracer
+ log zerolog.Logger
}
tests := []struct {
name string
fields fields
+ setup func(t *testing.T, fields *fields)
}{
{
name: "just start",
@@ -1685,7 +1872,9 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
}()
return messageCodec
}(),
- subscribers: []*Subscriber{NewSubscriber(nil)},
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.subscribers = []*Subscriber{NewSubscriber(nil, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))}
},
},
}
@@ -1700,6 +1889,7 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
driverContext: tt.fields.driverContext,
connectionId: tt.fields.connectionId,
tracer: tt.fields.tracer,
+ log: tt.fields.log,
}
c.startSubscriptionHandler()
time.Sleep(50 * time.Millisecond)
@@ -1713,7 +1903,7 @@ func TestNewConnection(t *testing.T) {
configuration Configuration
driverContext DriverContext
tagHandler spi.PlcTagHandler
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
options map[string][]string
}
tests := []struct {
diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
index 1cd681b71e..139ffda597 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -22,6 +22,7 @@ package cbus
import (
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/pool"
"github.com/apache/plc4x/plc4go/spi/transports/tcp"
"github.com/pkg/errors"
"github.com/rs/zerolog"
@@ -43,16 +44,20 @@ import (
type Discoverer struct {
transportInstanceCreationWorkItemId atomic.Int32
- transportInstanceCreationQueue utils.Executor
+ transportInstanceCreationQueue pool.Executor
deviceScanningWorkItemId atomic.Int32
- deviceScanningQueue utils.Executor
+ deviceScanningQueue pool.Executor
+
+ log zerolog.Logger
}
-func NewDiscoverer() *Discoverer {
+func NewDiscoverer(_options ...options.WithOption) *Discoverer {
return &Discoverer{
// TODO: maybe a dynamic executor would be better to not waste cycles when not in use
- transportInstanceCreationQueue: utils.NewFixedSizeExecutor(50, 100),
- deviceScanningQueue: utils.NewFixedSizeExecutor(50, 100),
+ transportInstanceCreationQueue: pool.NewFixedSizeExecutor(50, 100, _options...),
+ deviceScanningQueue: pool.NewFixedSizeExecutor(50, 100, _options...),
+
+ log: options.ExtractCustomLogger(_options...),
}
}
@@ -61,14 +66,14 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
d.deviceScanningQueue.Start()
deviceNames := d.extractDeviceNames(discoveryOptions...)
- interfaces, err := addressProviderRetriever(deviceNames)
+ interfaces, err := addressProviderRetriever(d.log, deviceNames)
if err != nil {
return errors.Wrap(err, "error getting addresses")
}
if log.Debug().Enabled() {
for _, provider := range interfaces {
- log.Debug().Msgf("Discover on %s", provider)
- log.Trace().Msgf("Discover on %#v", provider.containedInterface())
+ d.log.Debug().Msgf("Discover on %s", provider)
+ d.log.Trace().Msgf("Discover on %#v", provider.containedInterface())
}
}
@@ -147,25 +152,25 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
}
go func() {
wg.Wait()
- log.Trace().Msg("Closing transport instance channel")
+ d.log.Trace().Msg("Closing transport instance channel")
close(transportInstances)
}()
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ d.log.Error().Msgf("panic-ed %v", err)
}
}()
for transportInstance := range transportInstances {
- log.Debug().Stringer("transportInstance", transportInstance).Msg("submitting device scan")
+ d.log.Debug().Stringer("transportInstance", transportInstance).Msg("submitting device scan")
d.deviceScanningQueue.Submit(ctx, d.deviceScanningWorkItemId.Add(1), d.createDeviceScanDispatcher(transportInstance.(*tcp.TransportInstance), callback))
}
}()
return nil
}
-func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *sync.WaitGroup, ip net.IP, tcpTransport *tcp.Transport, transportInstances chan transports.TransportInstance, cBusPort uint16, addressLogger zerolog.Logger) utils.Runnable {
+func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *sync.WaitGroup, ip net.IP, tcpTransport *tcp.Transport, transportInstances chan transports.TransportInstance, cBusPort uint16, addressLogger zerolog.Logger) pool.Runnable {
wg.Add(1)
return func() {
defer wg.Done()
@@ -199,12 +204,12 @@ func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *
}
}
-func (d *Discoverer) createDeviceScanDispatcher(tcpTransportInstance *tcp.TransportInstance, callback func(event apiModel.PlcDiscoveryItem)) utils.Runnable {
+func (d *Discoverer) createDeviceScanDispatcher(tcpTransportInstance *tcp.TransportInstance, callback func(event apiModel.PlcDiscoveryItem)) pool.Runnable {
return func() {
transportInstanceLogger := log.With().Stringer("transportInstance", tcpTransportInstance).Logger()
transportInstanceLogger.Debug().Msgf("Scanning %v", tcpTransportInstance)
// Create a codec for sending and receiving messages.
- codec := NewMessageCodec(tcpTransportInstance)
+ codec := NewMessageCodec(tcpTransportInstance, options.WithCustomLogger(d.log))
// Explicitly start the worker
if err := codec.Connect(); err != nil {
transportInstanceLogger.Debug().Err(err).Msg("Error connecting")
@@ -337,12 +342,12 @@ func (w *wrappedInterface) String() string {
}
// allInterfaceRetriever can be exchanged in tests
-var allInterfaceRetriever = func() ([]addressProvider, error) {
+var allInterfaceRetriever = func(localLog zerolog.Logger) ([]addressProvider, error) {
interfaces, err := net.Interfaces()
if err != nil {
return nil, errors.Wrap(err, "could not retrieve all interfaces")
}
- log.Debug().Msgf("Mapping %d interfaces", len(interfaces))
+ localLog.Debug().Msgf("Mapping %d interfaces", len(interfaces))
addressProviders := make([]addressProvider, len(interfaces))
for i, networkInterface := range interfaces {
var copyInterface = networkInterface
@@ -352,8 +357,8 @@ var allInterfaceRetriever = func() ([]addressProvider, error) {
}
// addressProviderRetriever can be exchanged in tests
-var addressProviderRetriever = func(deviceNames []string) ([]addressProvider, error) {
- allInterfaces, err := allInterfaceRetriever()
+var addressProviderRetriever = func(localLog zerolog.Logger, deviceNames []string) ([]addressProvider, error) {
+ allInterfaces, err := allInterfaceRetriever(localLog)
if err != nil {
return nil, errors.Wrap(err, "error getting all interfaces")
}
@@ -362,7 +367,7 @@ var addressProviderRetriever = func(deviceNames []string) ([]addressProvider, er
// However if a discovery option is present to select a device by name, only
// add those devices matching any of the given names.
if len(deviceNames) <= 0 {
- log.Info().Msgf("no devices selected, use all devices (%d)", len(allInterfaces))
+ localLog.Info().Msgf("no devices selected, use all devices (%d)", len(allInterfaces))
return allInterfaces, nil
}
diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index d121dbb8fd..2e11686608 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -24,6 +24,8 @@ import (
"fmt"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/pool"
+ "github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/tcp"
"github.com/apache/plc4x/plc4go/spi/utils"
@@ -55,8 +57,8 @@ func TestNewDiscoverer(t *testing.T) {
func TestDiscoverer_Discover(t *testing.T) {
type fields struct {
- transportInstanceCreationQueue utils.Executor
- deviceScanningQueue utils.Executor
+ transportInstanceCreationQueue pool.Executor
+ deviceScanningQueue pool.Executor
}
type args struct {
ctx context.Context
@@ -68,15 +70,11 @@ func TestDiscoverer_Discover(t *testing.T) {
fields fields
args args
wantErr assert.ErrorAssertionFunc
- setup func() (params []any)
+ setup func(t *testing.T, fields *fields) (params []any)
teardown func(params []any)
}{
{
name: "discover unknown device",
- fields: fields{
- transportInstanceCreationQueue: utils.NewFixedSizeExecutor(50, 100),
- deviceScanningQueue: utils.NewFixedSizeExecutor(50, 100),
- },
args: args{
ctx: context.Background(),
callback: func(_ apiModel.PlcDiscoveryItem) {
@@ -85,14 +83,15 @@ func TestDiscoverer_Discover(t *testing.T) {
options.WithDiscoveryOptionDeviceName("blub"),
},
},
+ setup: func(t *testing.T, fields *fields) (params []any) {
+ fields.transportInstanceCreationQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ fields.deviceScanningQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ return nil
+ },
wantErr: assert.NoError,
},
{
name: "test with loopback",
- fields: fields{
- transportInstanceCreationQueue: utils.NewFixedSizeExecutor(50, 100),
- deviceScanningQueue: utils.NewFixedSizeExecutor(50, 100),
- },
args: args{
ctx: context.Background(),
callback: func(_ apiModel.PlcDiscoveryItem) {
@@ -102,9 +101,11 @@ func TestDiscoverer_Discover(t *testing.T) {
},
},
wantErr: assert.NoError,
- setup: func() (params []any) {
+ setup: func(t *testing.T, fields *fields) (params []any) {
+ fields.transportInstanceCreationQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ fields.deviceScanningQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
oldaddressProviderRetriever := addressProviderRetriever
- addressProviderRetriever = func(_ []string) ([]addressProvider, error) {
+ addressProviderRetriever = func(log zerolog.Logger, _ []string) ([]addressProvider, error) {
loopbackInterface, err := nettest.LoopbackInterface()
if err != nil {
return nil, err
@@ -114,19 +115,20 @@ func TestDiscoverer_Discover(t *testing.T) {
return []any{oldaddressProviderRetriever}
},
teardown: func(params []any) {
- addressProviderRetriever = params[0].(func(deviceNames []string) ([]addressProvider, error))
+ addressProviderRetriever = params[0].(func(log zerolog.Logger, deviceNames []string) ([]addressProvider, error))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ var params []any
+ if tt.setup != nil {
+ params = tt.setup(t, &tt.fields)
+ }
d := &Discoverer{
transportInstanceCreationQueue: tt.fields.transportInstanceCreationQueue,
deviceScanningQueue: tt.fields.deviceScanningQueue,
- }
- var params []any
- if tt.setup != nil {
- params = tt.setup()
+ log: testutils.ProduceTestingLogger(t),
}
tt.wantErr(t, d.Discover(tt.args.ctx, tt.args.callback, tt.args.discoveryOptions...), fmt.Sprintf("Discover(%v, func(), %v)", tt.args.ctx, tt.args.discoveryOptions))
if tt.teardown != nil {
@@ -138,8 +140,8 @@ func TestDiscoverer_Discover(t *testing.T) {
func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
type fields struct {
- transportInstanceCreationQueue utils.Executor
- deviceScanningQueue utils.Executor
+ transportInstanceCreationQueue pool.Executor
+ deviceScanningQueue pool.Executor
}
type args struct {
tcpTransportInstance *tcp.TransportInstance
@@ -201,6 +203,7 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
d := &Discoverer{
transportInstanceCreationQueue: tt.fields.transportInstanceCreationQueue,
deviceScanningQueue: tt.fields.deviceScanningQueue,
+ log: testutils.ProduceTestingLogger(t),
}
dispatcher := d.createDeviceScanDispatcher(tt.args.tcpTransportInstance, func(event apiModel.PlcDiscoveryItem) {
tt.args.callback(t, event)
@@ -213,8 +216,8 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
func TestDiscoverer_createTransportInstanceDispatcher(t *testing.T) {
type fields struct {
- transportInstanceCreationQueue utils.Executor
- deviceScanningQueue utils.Executor
+ transportInstanceCreationQueue pool.Executor
+ deviceScanningQueue pool.Executor
}
type args struct {
ctx context.Context
@@ -282,6 +285,7 @@ func TestDiscoverer_createTransportInstanceDispatcher(t *testing.T) {
d := &Discoverer{
transportInstanceCreationQueue: tt.fields.transportInstanceCreationQueue,
deviceScanningQueue: tt.fields.deviceScanningQueue,
+ log: testutils.ProduceTestingLogger(t),
}
dispatcher := d.createTransportInstanceDispatcher(tt.args.ctx, tt.args.wg, tt.args.ip, tt.args.tcpTransport, tt.args.transportInstances, tt.args.cBusPort, tt.args.addressLogger)
assert.NotNilf(t, dispatcher, "createTransportInstanceDispatcher(%v, %v, %v, %v, %v)", tt.args.ctx, tt.args.wg, tt.args.ip, tt.args.tcpTransport, tt.args.transportInstances)
@@ -301,8 +305,8 @@ func TestDiscoverer_createTransportInstanceDispatcher(t *testing.T) {
func TestDiscoverer_extractDeviceNames(t *testing.T) {
type fields struct {
- transportInstanceCreationQueue utils.Executor
- deviceScanningQueue utils.Executor
+ transportInstanceCreationQueue pool.Executor
+ deviceScanningQueue pool.Executor
}
type args struct {
discoveryOptions []options.WithDiscoveryOption
@@ -342,6 +346,7 @@ func TestDiscoverer_extractDeviceNames(t *testing.T) {
d := &Discoverer{
transportInstanceCreationQueue: tt.fields.transportInstanceCreationQueue,
deviceScanningQueue: tt.fields.deviceScanningQueue,
+ log: testutils.ProduceTestingLogger(t),
}
assert.Equalf(t, tt.want, d.extractDeviceNames(tt.args.discoveryOptions...), "extractDeviceNames(%v)", tt.args.discoveryOptions)
})
diff --git a/plc4go/internal/cbus/Driver.go b/plc4go/internal/cbus/Driver.go
index f8043723c7..27e38a0739 100644
--- a/plc4go/internal/cbus/Driver.go
+++ b/plc4go/internal/cbus/Driver.go
@@ -21,70 +21,73 @@ package cbus
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
+ "github.com/rs/zerolog"
"net/url"
"strconv"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
- "github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Driver struct {
_default.DefaultDriver
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
awaitSetupComplete bool
awaitDisconnectComplete bool
+
+ log zerolog.Logger
}
-func NewDriver() plc4go.PlcDriver {
+func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
driver := &Driver{
- tm: spi.NewRequestTransactionManager(1),
+ tm: transactions.NewRequestTransactionManager(1, _options...),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
+ log: options.ExtractCustomLogger(_options...),
}
driver.DefaultDriver = _default.NewDefaultDriver(driver, "c-bus", "Clipsal Bus", "tcp", NewTagHandler())
return driver
}
-func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(options))
+func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+ m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
// Get the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
- log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
+ m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
return m.reportError(errors.Errorf("couldn't find transport for given transport url %v", transportUrl))
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
- options["defaultTcpPort"] = []string{strconv.FormatUint(uint64(readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT), 10)}
+ driverOptions["defaultTcpPort"] = []string{strconv.FormatUint(uint64(readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT), 10)}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions)
if err != nil {
- log.Error().Err(err).Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
+ m.log.Error().Err(err).Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
return m.reportError(errors.Wrapf(err, "couldn't initialize transport configuration for given transport url %s", transportUrl.String()))
}
- configuration, err := ParseFromOptions(options)
+ configuration, err := ParseFromOptions(m.log, driverOptions)
if err != nil {
- log.Error().Err(err).Msgf("Invalid options")
+ m.log.Error().Err(err).Msgf("Invalid options")
return m.reportError(errors.Wrap(err, "Invalid options"))
}
- codec := NewMessageCodec(transportInstance)
- log.Debug().Msgf("working with codec %#v", codec)
+ codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
+ m.log.Debug().Msgf("working with codec %#v", codec)
driverContext := NewDriverContext(configuration)
driverContext.awaitSetupComplete = m.awaitSetupComplete
driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete
// Create the new connection
- connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, options)
- log.Debug().Msg("created connection, connecting now")
+ connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, driverOptions, options.WithCustomLogger(m.log))
+ m.log.Debug().Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
@@ -107,5 +110,5 @@ func (m *Driver) SupportsDiscovery() bool {
}
func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
- return NewDiscoverer().Discover(ctx, callback, discoveryOptions...)
+ return NewDiscoverer(options.WithCustomLogger(m.log)).Discover(ctx, callback, discoveryOptions...)
}
diff --git a/plc4go/internal/cbus/Driver_test.go b/plc4go/internal/cbus/Driver_test.go
index 0c2ad3b676..7b75e79cc2 100644
--- a/plc4go/internal/cbus/Driver_test.go
+++ b/plc4go/internal/cbus/Driver_test.go
@@ -24,9 +24,9 @@ import (
"fmt"
plc4go "github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
- "github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/test"
"github.com/apache/plc4x/plc4go/spi/utils"
@@ -40,7 +40,7 @@ import (
func TestDriver_DiscoverWithContext(t *testing.T) {
type fields struct {
DefaultDriver _default.DefaultDriver
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
awaitSetupComplete bool
awaitDisconnectComplete bool
}
@@ -83,7 +83,7 @@ func TestDriver_DiscoverWithContext(t *testing.T) {
func TestDriver_GetConnectionWithContext(t *testing.T) {
type fields struct {
DefaultDriver _default.DefaultDriver
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
awaitSetupComplete bool
awaitDisconnectComplete bool
}
@@ -273,7 +273,7 @@ func TestNewDriver(t *testing.T) {
func TestDriver_reportError(t *testing.T) {
type fields struct {
DefaultDriver _default.DefaultDriver
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
awaitSetupComplete bool
awaitDisconnectComplete bool
}
diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index 3f10f43a0b..2160eee221 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -21,6 +21,8 @@ package cbus
import (
"bufio"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/rs/zerolog"
"hash/crc32"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
@@ -28,7 +30,6 @@ import (
"github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type MessageCodec struct {
@@ -43,16 +44,19 @@ type MessageCodec struct {
hashEncountered uint
currentlyReportedServerErrors uint
+
+ log zerolog.Logger
}
-func NewMessageCodec(transportInstance transports.TransportInstance) *MessageCodec {
+func NewMessageCodec(transportInstance transports.TransportInstance, _options ...options.WithOption) *MessageCodec {
codec := &MessageCodec{
requestContext: readWriteModel.NewRequestContext(false),
cbusOptions: readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false),
monitoredMMIs: make(chan readWriteModel.CALReply, 100),
monitoredSALs: make(chan readWriteModel.MonitoredSAL, 100),
+ log: options.ExtractCustomLogger(_options...),
}
- codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance, _default.WithCustomMessageHandler(extractMMIAndSAL))
+ codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance, append(_options, _default.WithCustomMessageHandler(extractMMIAndSAL))...)
return codec
}
@@ -61,7 +65,7 @@ func (m *MessageCodec) GetCodec() spi.MessageCodec {
}
func (m *MessageCodec) Send(message spi.Message) error {
- log.Trace().Msg("Sending message")
+ m.log.Trace().Msg("Sending message")
// Cast the message to the correct type of struct
cbusMessage, ok := message.(readWriteModel.CBusMessage)
if !ok {
@@ -70,7 +74,7 @@ func (m *MessageCodec) Send(message spi.Message) error {
// Set the right request context
m.requestContext = CreateRequestContext(cbusMessage)
- log.Debug().Msgf("Created request context\n%s", m.requestContext)
+ m.log.Debug().Msgf("Created request context\n%s", m.requestContext)
// Serialize the request
theBytes, err := cbusMessage.Serialize()
@@ -87,13 +91,13 @@ func (m *MessageCodec) Send(message spi.Message) error {
}
func (m *MessageCodec) Receive() (spi.Message, error) {
- log.Trace().Msg("Receive")
+ m.log.Trace().Msg("Receive")
ti := m.GetTransportInstance()
confirmation := false
// Fill the buffer
{
if err := ti.FillBuffer(func(pos uint, currentByte byte, reader *bufio.Reader) bool {
- log.Trace().Uint8("byte", currentByte).Msg("current byte")
+ m.log.Trace().Uint8("byte", currentByte).Msg("current byte")
switch currentByte {
case
readWriteModel.ResponseTermination_CR,
@@ -116,26 +120,26 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
return true
}
}); err != nil {
- log.Debug().Err(err).Msg("Error filling buffer")
+ m.log.Debug().Err(err).Msg("Error filling buffer")
}
}
- log.Trace().Msg("Buffer filled")
+ m.log.Trace().Msg("Buffer filled")
// Check how many readable bytes we have
var readableBytes uint32
{
numBytesAvailableInBuffer, err := ti.GetNumBytesAvailableInBuffer()
if err != nil {
- log.Warn().Err(err).Msg("Got error reading")
+ m.log.Warn().Err(err).Msg("Got error reading")
return nil, nil
}
if numBytesAvailableInBuffer == 0 {
- log.Trace().Msg("Nothing to read")
+ m.log.Trace().Msg("Nothing to read")
return nil, nil
}
readableBytes = numBytesAvailableInBuffer
}
- log.Trace().Msgf("%d bytes available in buffer", readableBytes)
+ m.log.Trace().Msgf("%d bytes available in buffer", readableBytes)
// Check for an isolated error
if bytes, err := ti.PeekReadableBytes(1); err == nil && (bytes[0] == byte(readWriteModel.ConfirmationType_CHECKSUM_FAILURE)) {
@@ -175,20 +179,20 @@ lookingForTheEnd:
break lookingForTheEnd
}
}
- log.Trace().Msgf("indexOfCR %d,indexOfLF %d,indexOfConfirmation %d", indexOfCR, indexOfLF, indexOfConfirmation)
+ m.log.Trace().Msgf("indexOfCR %d,indexOfLF %d,indexOfConfirmation %d", indexOfCR, indexOfLF, indexOfConfirmation)
if indexOfCR < 0 && indexOfLF >= 0 {
// This means that the package is garbage as a lf is always prefixed with a cr
- log.Debug().Err(err).Msg("Error reading")
+ m.log.Debug().Err(err).Msg("Error reading")
garbage, err := ti.Read(readableBytes)
- log.Warn().Bytes("garbage", garbage).Msg("Garbage bytes")
+ m.log.Warn().Bytes("garbage", garbage).Msg("Garbage bytes")
return nil, err
}
if indexOfCR+1 == indexOfLF {
- log.Trace().Msg("pci response for sure")
+ m.log.Trace().Msg("pci response for sure")
// This means a <cr> is directly followed by a <lf> which means that we know for sure this is a response
pciResponse = true
} else if indexOfCR >= 0 && int(readableBytes) >= indexOfCR+2 && peekedBytes[+indexOfCR+1] != '\n' {
- log.Trace().Msg("pci request for sure")
+ m.log.Trace().Msg("pci request for sure")
// We got a request to pci for sure because the cr is followed by something else than \n
requestToPci = true
}
@@ -202,24 +206,24 @@ lookingForTheEnd:
if newPackageHash == m.lastPackageHash {
m.hashEncountered++
}
- log.Trace().Msgf("new hash %x, last hash %x, seen %d times", newPackageHash, m.lastPackageHash, m.hashEncountered)
+ m.log.Trace().Msgf("new hash %x, last hash %x, seen %d times", newPackageHash, m.lastPackageHash, m.hashEncountered)
m.lastPackageHash = newPackageHash
if m.hashEncountered < numberOfCyclesToWait {
- log.Trace().Msg("Waiting for more data")
+ m.log.Trace().Msg("Waiting for more data")
return nil, nil
} else {
- log.Trace().Msgf("stopping after ~%dms", estimatedElapsedTime)
+ m.log.Trace().Msgf("stopping after ~%dms", estimatedElapsedTime)
// after numberOfCyclesToWait*10 ms we give up finding a lf
m.lastPackageHash, m.hashEncountered = 0, 0
if indexOfCR >= 0 {
- log.Trace().Msg("setting requestToPci")
+ m.log.Trace().Msg("setting requestToPci")
requestToPci = true
}
}
}
if !pciResponse && !requestToPci && !confirmation {
// Apparently we have not found any message yet
- log.Trace().Msg("no message found yet")
+ m.log.Trace().Msg("no message found yet")
return nil, nil
}
@@ -255,20 +259,20 @@ lookingForTheEnd:
}
// Now we report the errors one by one so for every request we get a proper rejection
if foundErrors > m.currentlyReportedServerErrors {
- log.Debug().Msgf("We found %d errors in the current message. We have %d reported already", foundErrors, m.currentlyReportedServerErrors)
+ m.log.Debug().Msgf("We found %d errors in the current message. We have %d reported already", foundErrors, m.currentlyReportedServerErrors)
m.currentlyReportedServerErrors++
return readWriteModel.CBusMessageParse([]byte{'!'}, true, m.requestContext, m.cbusOptions)
}
if foundErrors > 0 {
- log.Debug().Msgf("We should have reported all errors by now (%d in total which we reported %d), so we resetting the count", foundErrors, m.currentlyReportedServerErrors)
+ m.log.Debug().Msgf("We should have reported all errors by now (%d in total which we reported %d), so we resetting the count", foundErrors, m.currentlyReportedServerErrors)
m.currentlyReportedServerErrors = 0
}
- log.Trace().Msgf("currentlyReportedServerErrors %d should be 0", m.currentlyReportedServerErrors)
+ m.log.Trace().Msgf("currentlyReportedServerErrors %d should be 0", m.currentlyReportedServerErrors)
}
var rawInput []byte
{
- log.Trace().Msgf("Read packet length %d", packetLength)
+ m.log.Trace().Msgf("Read packet length %d", packetLength)
read, err := ti.Read(uint32(packetLength))
if err != nil {
return nil, errors.Wrap(err, "Invalid state... If we have peeked that before we should be able to read that now")
@@ -284,17 +288,17 @@ lookingForTheEnd:
}
}
}
- log.Debug().Msgf("Parsing %q", sanitizedInput)
+ m.log.Debug().Msgf("Parsing %q", sanitizedInput)
cBusMessage, err := readWriteModel.CBusMessageParse(sanitizedInput, pciResponse, m.requestContext, m.cbusOptions)
if err != nil {
- log.Debug().Err(err).Msg("First Parse Failed")
+ m.log.Debug().Err(err).Msg("First Parse Failed")
{ // Try SAL
requestContext := readWriteModel.NewRequestContext(false)
cBusMessage, secondErr := readWriteModel.CBusMessageParse(sanitizedInput, pciResponse, requestContext, m.cbusOptions)
if secondErr == nil {
return cBusMessage, nil
} else {
- log.Debug().Err(secondErr).Msg("SAL parse failed too")
+ m.log.Debug().Err(secondErr).Msg("SAL parse failed too")
}
}
{ // Try MMI
@@ -304,11 +308,11 @@ lookingForTheEnd:
if secondErr == nil {
return cBusMessage, nil
} else {
- log.Debug().Err(secondErr).Msg("CAL parse failed too")
+ m.log.Debug().Err(secondErr).Msg("CAL parse failed too")
}
}
- log.Warn().Err(err).Msg("error parsing")
+ m.log.Warn().Err(err).Msg("error parsing")
return nil, nil
}
return cBusMessage, nil
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 645595bbdf..0b3cedc4ff 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -21,6 +21,9 @@ package cbus
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
+ "github.com/rs/zerolog"
"sync"
"time"
@@ -30,25 +33,28 @@ import (
"github.com/apache/plc4x/plc4go/spi"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Reader struct {
alphaGenerator *AlphaGenerator
messageCodec *MessageCodec
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
+
+ log zerolog.Logger
}
-func NewReader(tpduGenerator *AlphaGenerator, messageCodec *MessageCodec, tm spi.RequestTransactionManager) *Reader {
+func NewReader(tpduGenerator *AlphaGenerator, messageCodec *MessageCodec, tm transactions.RequestTransactionManager, _options ...options.WithOption) *Reader {
return &Reader{
alphaGenerator: tpduGenerator,
messageCodec: messageCodec,
tm: tm,
+
+ log: options.ExtractCustomLogger(_options...),
}
}
func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
- log.Trace().Msg("Reading")
+ m.log.Trace().Msg("Reading")
result := make(chan apiModel.PlcReadRequestResult, 1)
go m.readSync(ctx, readRequest, result)
return result
@@ -118,17 +124,17 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
m.sendMessageOverTheWire(ctx, transaction, messageToSend, addResponseCode, tagName, addPlcValue)
})
if err := transaction.AwaitCompletion(ctx); err != nil {
- log.Warn().Err(err).Msg("Error while awaiting completion")
+ m.log.Warn().Err(err).Msg("Error while awaiting completion")
}
}
-func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
+func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transactions.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
// Send the over the wire
- log.Trace().Msg("Send ")
+ m.log.Trace().Msg("Send ")
if err := m.messageCodec.SendRequest(ctx, messageToSend, func(cbusMessage spi.Message) bool {
messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
if !ok {
@@ -149,15 +155,15 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.Req
expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
return actualAlpha == expectedAlpha
}, func(receivedMessage spi.Message) error {
- defer func(transaction spi.RequestTransaction) {
+ defer func(transaction transactions.RequestTransaction) {
// This is just to make sure we don't forget to close the transaction here
_ = transaction.EndRequest()
}(transaction)
// Convert the response into an
- log.Trace().Msg("convert response to ")
+ m.log.Trace().Msg("convert response to ")
messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
- log.Trace().Msg("We got a server failure")
+ m.log.Trace().Msg("We got a server failure")
addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
return transaction.EndRequest()
}
@@ -176,7 +182,7 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.Req
default:
return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
}
- log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
+ m.log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
addResponseCode(tagName, responseCode)
return transaction.EndRequest()
}
@@ -185,15 +191,15 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.Req
// TODO: it could be double confirmed but this is not implemented yet
embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
if !ok {
- log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
+ m.log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
return transaction.EndRequest()
}
- log.Trace().Msg("Handling confirmed data")
+ m.log.Trace().Msg("Handling confirmed data")
// TODO: check if we can use a plcValueSerializer
encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
- if err := MapEncodedReply(transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
+ if err := MapEncodedReply(m.log, transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
return errors.Wrap(err, "error encoding reply")
}
return transaction.EndRequest()
@@ -201,10 +207,10 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.Req
addResponseCode(tagName, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
return transaction.FailRequest(err)
}, time.Second*1); err != nil {
- log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
+ m.log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
- log.Debug().Err(err).Msg("Error failing request")
+ m.log.Debug().Err(err).Msg("Error failing request")
}
}
}
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index 54050ce9f3..1ead699a20 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -24,8 +24,10 @@ import (
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
- "github.com/apache/plc4x/plc4go/spi"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/testutils"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/apache/plc4x/plc4go/spi/transports/test"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
@@ -42,7 +44,7 @@ func TestNewReader(t *testing.T) {
type args struct {
tpduGenerator *AlphaGenerator
messageCodec *MessageCodec
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
}
tests := []struct {
name string
@@ -60,7 +62,10 @@ func TestNewReader(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- assert.Equalf(t, tt.want, NewReader(tt.args.tpduGenerator, tt.args.messageCodec, tt.args.tm), "NewReader(%v, %v, %v)", tt.args.tpduGenerator, tt.args.messageCodec, tt.args.tm)
+ logger := testutils.ProduceTestingLogger(t)
+ reader := NewReader(tt.args.tpduGenerator, tt.args.messageCodec, tt.args.tm, options.WithCustomLogger(logger))
+ tt.want.log = logger
+ assert.Equalf(t, tt.want, reader, "NewReader(%v, %v, %v)", tt.args.tpduGenerator, tt.args.messageCodec, tt.args.tm)
})
}
}
@@ -69,7 +74,7 @@ func TestReader_Read(t *testing.T) {
type fields struct {
alphaGenerator *AlphaGenerator
messageCodec *MessageCodec
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
}
type args struct {
ctx context.Context
@@ -118,7 +123,7 @@ func TestReader_readSync(t *testing.T) {
type fields struct {
alphaGenerator *AlphaGenerator
messageCodec *MessageCodec
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
}
type args struct {
ctx context.Context
@@ -129,6 +134,7 @@ func TestReader_readSync(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields)
resultEvaluator func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool
}{
{
@@ -173,7 +179,6 @@ func TestReader_readSync(t *testing.T) {
}
return codec
}(),
- tm: spi.NewRequestTransactionManager(10),
},
args: args{
ctx: context.Background(),
@@ -189,6 +194,9 @@ func TestReader_readSync(t *testing.T) {
),
result: make(chan apiModel.PlcReadRequestResult, 1),
},
+ setup: func(t *testing.T, fields *fields) {
+ fields.tm = transactions.NewRequestTransactionManager(10, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ },
resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
timer := time.NewTimer(2 * time.Second)
defer timer.Stop()
@@ -265,7 +273,6 @@ func TestReader_readSync(t *testing.T) {
}
return codec
}(),
- tm: spi.NewRequestTransactionManager(10),
},
args: args{
ctx: func() context.Context {
@@ -285,6 +292,9 @@ func TestReader_readSync(t *testing.T) {
),
result: make(chan apiModel.PlcReadRequestResult, 1),
},
+ setup: func(t *testing.T, fields *fields) {
+ fields.tm = transactions.NewRequestTransactionManager(10, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ },
resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
timer := time.NewTimer(2 * time.Second)
defer timer.Stop()
@@ -324,7 +334,6 @@ func TestReader_readSync(t *testing.T) {
}
return codec
}(),
- tm: spi.NewRequestTransactionManager(10),
},
args: args{
ctx: func() context.Context {
@@ -344,6 +353,9 @@ func TestReader_readSync(t *testing.T) {
),
result: make(chan apiModel.PlcReadRequestResult, 1),
},
+ setup: func(t *testing.T, fields *fields) {
+ fields.tm = transactions.NewRequestTransactionManager(10, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ },
resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
timer := time.NewTimer(2 * time.Second)
defer timer.Stop()
@@ -359,6 +371,9 @@ func TestReader_readSync(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
m := &Reader{
alphaGenerator: tt.fields.alphaGenerator,
messageCodec: tt.fields.messageCodec,
@@ -374,11 +389,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
type fields struct {
alphaGenerator *AlphaGenerator
messageCodec *MessageCodec
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
}
type args struct {
ctx context.Context
- transaction spi.RequestTransaction
+ transaction transactions.RequestTransaction
messageToSend readWriteModel.CBusMessage
addResponseCode func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode)
tagName string
diff --git a/plc4go/internal/cbus/Subscriber.go b/plc4go/internal/cbus/Subscriber.go
index c9857a20d1..bc13fcd3bc 100644
--- a/plc4go/internal/cbus/Subscriber.go
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -22,7 +22,9 @@ package cbus
import (
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
"strings"
"time"
@@ -31,18 +33,21 @@ import (
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
spiValues "github.com/apache/plc4x/plc4go/spi/values"
- "github.com/rs/zerolog/log"
)
type Subscriber struct {
connection *Connection
consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+
+ log zerolog.Logger
}
-func NewSubscriber(connection *Connection) *Subscriber {
+func NewSubscriber(connection *Connection, _options ...options.WithOption) *Subscriber {
return &Subscriber{
connection: connection,
consumers: make(map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer),
+
+ log: options.ExtractCustomLogger(_options...),
}
}
@@ -119,7 +124,7 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
func (m *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.CALData, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
tag, ok := subscriptionHandle.tag.(*mmiMonitorTag)
if !ok {
- log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
+ m.log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
return false
}
@@ -135,7 +140,7 @@ func (m *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.C
if unitAddress := tag.GetUnitAddress(); unitAddress != nil {
unitSuffix := fmt.Sprintf("u%d", unitAddress.GetAddress())
if !strings.HasSuffix(unitAddressString, unitSuffix) {
- log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
+ m.log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
return false
}
}
@@ -217,12 +222,12 @@ func (m *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.C
plcValues[tagName] = spiValues.NewPlcList(plcListValues)
}
default:
- log.Error().Msgf("Unmapped type %T", calData)
+ m.log.Error().Msgf("Unmapped type %T", calData)
return false
}
if application := tag.GetApplication(); application != nil {
if actualApplicationIdString := application.ApplicationId().String(); applicationString != actualApplicationIdString {
- log.Debug().Msgf("Current application id %s doesn't match actual id %s", unitAddressString, actualApplicationIdString)
+ m.log.Debug().Msgf("Current application id %s doesn't match actual id %s", unitAddressString, actualApplicationIdString)
return false
}
}
@@ -251,7 +256,7 @@ func (m *Subscriber) handleMonitoredSAL(sal readWriteModel.MonitoredSAL) bool {
func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
tag, ok := subscriptionHandle.tag.(*salMonitorTag)
if !ok {
- log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
+ m.log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
return false
}
tags := map[string]apiModel.PlcTag{}
@@ -295,7 +300,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
if unitAddress := tag.GetUnitAddress(); unitAddress != nil {
unitSuffix := fmt.Sprintf("u%d", unitAddress.GetAddress())
if !strings.HasSuffix(unitAddressString, unitSuffix) {
- log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
+ m.log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
return false
}
}
@@ -303,7 +308,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
if application := tag.GetApplication(); application != nil {
if actualApplicationIdString := application.ApplicationId().String(); applicationString != actualApplicationIdString {
- log.Debug().Msgf("Current application id %s doesn't match actual id %s", unitAddressString, actualApplicationIdString)
+ m.log.Debug().Msgf("Current application id %s doesn't match actual id %s", unitAddressString, actualApplicationIdString)
return false
}
}
@@ -325,7 +330,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
case readWriteModel.SALDataErrorReportingExactly:
commandTypeGetter = salData.GetErrorReportingData().GetCommandType()
case readWriteModel.SALDataFreeUsageExactly:
- log.Info().Msg("Unknown command type")
+ m.log.Info().Msg("Unknown command type")
case readWriteModel.SALDataHeatingExactly:
commandTypeGetter = salData.GetHeatingData().GetCommandType()
case readWriteModel.SALDataHvacActuatorExactly:
@@ -343,9 +348,9 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
case readWriteModel.SALDataPoolsSpasPondsFountainsControlExactly:
commandTypeGetter = salData.GetPoolsSpaPondsFountainsData().GetCommandType()
case readWriteModel.SALDataReservedExactly:
- log.Info().Msg("Unknown command type")
+ m.log.Info().Msg("Unknown command type")
case readWriteModel.SALDataRoomControlSystemExactly:
- log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
+ m.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
case readWriteModel.SALDataSecurityExactly:
commandTypeGetter = salData.GetSecurityData().GetCommandType()
case readWriteModel.SALDataTelephonyStatusAndControlExactly:
@@ -353,13 +358,13 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
case readWriteModel.SALDataTemperatureBroadcastExactly:
commandTypeGetter = salData.GetTemperatureBroadcastData().GetCommandType()
case readWriteModel.SALDataTestingExactly:
- log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
+ m.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
case readWriteModel.SALDataTriggerControlExactly:
commandTypeGetter = salData.GetTriggerControlData().GetCommandType()
case readWriteModel.SALDataVentilationExactly:
commandTypeGetter = salData.GetVentilationData().GetCommandType()
default:
- log.Error().Msgf("Unmapped type %T", salData)
+ m.log.Error().Msgf("Unmapped type %T", salData)
}
commandType := "Unknown"
if commandTypeGetter != nil {
@@ -372,7 +377,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
rbvb := spiValues.NewWriteBufferPlcValueBased()
err := salData.SerializeWithWriteBuffer(context.Background(), rbvb)
if err != nil {
- log.Error().Err(err).Msg("Error serializing to plc value... just returning it as string")
+ m.log.Error().Err(err).Msg("Error serializing to plc value... just returning it as string")
plcValues[tagName] = spiValues.NewPlcSTRING(fmt.Sprintf("%s", salData))
} else {
plcValues[tagName] = rbvb.GetPlcValue()
diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go
index 2883c0787b..35b13f71dc 100644
--- a/plc4go/internal/cbus/Writer.go
+++ b/plc4go/internal/cbus/Writer.go
@@ -21,6 +21,9 @@ package cbus
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
+ "github.com/rs/zerolog"
"sync"
"time"
@@ -29,25 +32,28 @@ import (
"github.com/apache/plc4x/plc4go/spi"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Writer struct {
alphaGenerator *AlphaGenerator
messageCodec *MessageCodec
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
+
+ log zerolog.Logger
}
-func NewWriter(tpduGenerator *AlphaGenerator, messageCodec *MessageCodec, tm spi.RequestTransactionManager) *Writer {
+func NewWriter(tpduGenerator *AlphaGenerator, messageCodec *MessageCodec, tm transactions.RequestTransactionManager, _options ...options.WithOption) *Writer {
return &Writer{
alphaGenerator: tpduGenerator,
messageCodec: messageCodec,
tm: tm,
+
+ log: options.ExtractCustomLogger(_options...),
}
}
func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
- log.Trace().Msg("Writing")
+ m.log.Trace().Msg("Writing")
result := make(chan apiModel.PlcWriteRequestResult, 1)
go func() {
defer func() {
@@ -103,9 +109,9 @@ func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteReques
tagNameCopy := tagName
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
// Send the over the wire
- log.Trace().Msg("Send ")
+ m.log.Trace().Msg("Send ")
if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
if !ok {
@@ -131,12 +137,12 @@ func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteReques
addResponseCode(tagName, apiModel.PlcResponseCode_OK)
return transaction.EndRequest()
}, func(err error) error {
- log.Debug().Msgf("Error waiting for tag %s", tagNameCopy)
+ m.log.Debug().Msgf("Error waiting for tag %s", tagNameCopy)
addResponseCode(tagNameCopy, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
// TODO: ok or not ok?
return transaction.EndRequest()
}, time.Second*1); err != nil {
- log.Debug().Err(err).Msgf("Error sending message for tag %s", tagNameCopy)
+ m.log.Debug().Err(err).Msgf("Error sending message for tag %s", tagNameCopy)
addResponseCode(tagNameCopy, apiModel.PlcResponseCode_INTERNAL_ERROR)
_ = transaction.EndRequest()
}
diff --git a/plc4go/internal/cbus/Writer_test.go b/plc4go/internal/cbus/Writer_test.go
index b1ba9db6cf..3af6f00013 100644
--- a/plc4go/internal/cbus/Writer_test.go
+++ b/plc4go/internal/cbus/Writer_test.go
@@ -22,6 +22,7 @@ package cbus
import (
"context"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/stretchr/testify/assert"
"strings"
@@ -29,14 +30,13 @@ import (
"time"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
- "github.com/apache/plc4x/plc4go/spi"
)
func TestNewWriter(t *testing.T) {
type args struct {
tpduGenerator *AlphaGenerator
messageCodec *MessageCodec
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
}
tests := []struct {
name string
@@ -62,7 +62,7 @@ func TestWriter_Write(t *testing.T) {
type fields struct {
alphaGenerator *AlphaGenerator
messageCodec *MessageCodec
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
}
type args struct {
ctx context.Context
diff --git a/plc4go/internal/cbus/mock_RequestTransaction_test.go b/plc4go/internal/cbus/mock_RequestTransaction_test.go
index 119b237012..ae6861665b 100644
--- a/plc4go/internal/cbus/mock_RequestTransaction_test.go
+++ b/plc4go/internal/cbus/mock_RequestTransaction_test.go
@@ -24,7 +24,7 @@ package cbus
import (
context "context"
- spi "github.com/apache/plc4x/plc4go/spi"
+ transactions "github.com/apache/plc4x/plc4go/spi/transactions"
mock "github.com/stretchr/testify/mock"
)
@@ -208,7 +208,7 @@ func (_c *MockRequestTransaction_String_Call) RunAndReturn(run func() string) *M
}
// Submit provides a mock function with given fields: operation
-func (_m *MockRequestTransaction) Submit(operation spi.RequestTransactionRunnable) {
+func (_m *MockRequestTransaction) Submit(operation transactions.RequestTransactionRunnable) {
_m.Called(operation)
}
@@ -218,14 +218,14 @@ type MockRequestTransaction_Submit_Call struct {
}
// Submit is a helper method to define mock.On call
-// - operation spi.RequestTransactionRunnable
+// - operation transactions.RequestTransactionRunnable
func (_e *MockRequestTransaction_Expecter) Submit(operation interface{}) *MockRequestTransaction_Submit_Call {
return &MockRequestTransaction_Submit_Call{Call: _e.mock.On("Submit", operation)}
}
-func (_c *MockRequestTransaction_Submit_Call) Run(run func(operation spi.RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call {
+func (_c *MockRequestTransaction_Submit_Call) Run(run func(operation transactions.RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call {
_c.Call.Run(func(args mock.Arguments) {
- run(args[0].(spi.RequestTransactionRunnable))
+ run(args[0].(transactions.RequestTransactionRunnable))
})
return _c
}
@@ -235,7 +235,7 @@ func (_c *MockRequestTransaction_Submit_Call) Return() *MockRequestTransaction_S
return _c
}
-func (_c *MockRequestTransaction_Submit_Call) RunAndReturn(run func(spi.RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call {
+func (_c *MockRequestTransaction_Submit_Call) RunAndReturn(run func(transactions.RequestTransactionRunnable)) *MockRequestTransaction_Submit_Call {
_c.Call.Return(run)
return _c
}
diff --git a/plc4go/internal/cbus/mock_requirements.go b/plc4go/internal/cbus/mock_requirements.go
index 69bb4c4e7c..a8d06b79af 100644
--- a/plc4go/internal/cbus/mock_requirements.go
+++ b/plc4go/internal/cbus/mock_requirements.go
@@ -19,11 +19,13 @@
package cbus
-import "github.com/apache/plc4x/plc4go/spi"
+import (
+ "github.com/apache/plc4x/plc4go/spi/transactions"
+)
// Note this file is a Helper for mockery to generate use mocks from other package
// Deprecated: don't use it in productive code
type RequestTransaction interface {
- spi.RequestTransaction
+ transactions.RequestTransaction
}
diff --git a/plc4go/internal/cbus/mock_requirements.go b/plc4go/internal/cbus/noGlobalLog_test.go
similarity index 77%
copy from plc4go/internal/cbus/mock_requirements.go
copy to plc4go/internal/cbus/noGlobalLog_test.go
index 69bb4c4e7c..7a3475947d 100644
--- a/plc4go/internal/cbus/mock_requirements.go
+++ b/plc4go/internal/cbus/noGlobalLog_test.go
@@ -19,11 +19,7 @@
package cbus
-import "github.com/apache/plc4x/plc4go/spi"
-
-// Note this file is a Helper for mockery to generate use mocks from other package
-
-// Deprecated: don't use it in productive code
-type RequestTransaction interface {
- spi.RequestTransaction
+// This ensures that we don't global log
+func init() {
+ //testutils.ExplodingGlobalLogger(true)
}
diff --git a/plc4go/internal/eip/Connection.go b/plc4go/internal/eip/Connection.go
index 54fbb595b6..2524e333db 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -22,6 +22,7 @@ package eip
import (
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -45,7 +46,7 @@ type Connection struct {
messageCodec spi.MessageCodec
configuration Configuration
driverContext DriverContext
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
sessionHandle uint32
senderContext []uint8
connectionId uint32
@@ -58,7 +59,7 @@ type Connection struct {
tracer *spi.Tracer
}
-func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, tm spi.RequestTransactionManager, options map[string][]string) *Connection {
+func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, options map[string][]string) *Connection {
connection := &Connection{
messageCodec: messageCodec,
configuration: configuration,
diff --git a/plc4go/internal/eip/EipDriver.go b/plc4go/internal/eip/EipDriver.go
index 9920bdd10c..e36e27904c 100644
--- a/plc4go/internal/eip/EipDriver.go
+++ b/plc4go/internal/eip/EipDriver.go
@@ -21,10 +21,10 @@ package eip
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"net/url"
"github.com/apache/plc4x/plc4go/pkg/api"
- "github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
@@ -33,14 +33,14 @@ import (
type Driver struct {
_default.DefaultDriver
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
awaitSetupComplete bool
awaitDisconnectComplete bool
}
func NewDriver() plc4go.PlcDriver {
driver := &Driver{
- tm: spi.NewRequestTransactionManager(1),
+ tm: transactions.NewRequestTransactionManager(1),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 67dee5b772..5105835298 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -23,6 +23,7 @@ import (
"context"
"encoding/binary"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"regexp"
"strconv"
"time"
@@ -41,12 +42,12 @@ import (
type Reader struct {
messageCodec spi.MessageCodec
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
sessionHandle *uint32
}
-func NewReader(messageCodec spi.MessageCodec, tm spi.RequestTransactionManager, configuration Configuration, sessionHandle *uint32) *Reader {
+func NewReader(messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager, configuration Configuration, sessionHandle *uint32) *Reader {
return &Reader{
messageCodec: messageCodec,
tm: tm,
@@ -88,7 +89,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
}
request := readWriteModel.NewCipRRData(0, 0, typeIds, *m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), 0)
transaction := m.tm.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
if err := m.messageCodec.SendRequest(ctx, request,
func(message spi.Message) bool {
eipPacket := message.(readWriteModel.EipPacket)
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index 81f05206c5..195f753e2a 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -22,6 +22,7 @@ package eip
import (
"context"
"encoding/binary"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"strings"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -37,13 +38,13 @@ import (
type Writer struct {
messageCodec spi.MessageCodec
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
configuration Configuration
sessionHandle *uint32
senderContext *[]uint8
}
-func NewWriter(messageCodec spi.MessageCodec, tm spi.RequestTransactionManager, configuration Configuration, sessionHandle *uint32, senderContext *[]uint8) Writer {
+func NewWriter(messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager, configuration Configuration, sessionHandle *uint32, senderContext *[]uint8) Writer {
return Writer{
messageCodec: messageCodec,
tm: tm,
@@ -118,7 +119,7 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
)
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
// Send the over the wire
if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
eipPacket := message.(readWriteModel.EipPacket)
@@ -206,7 +207,7 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
)
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
// Send the over the wire
if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
eipPacket := message.(readWriteModel.EipPacket)
diff --git a/plc4go/internal/knxnetip/Discoverer.go b/plc4go/internal/knxnetip/Discoverer.go
index dd1b8ad024..45857edbe0 100644
--- a/plc4go/internal/knxnetip/Discoverer.go
+++ b/plc4go/internal/knxnetip/Discoverer.go
@@ -23,7 +23,7 @@ import (
"bytes"
"context"
"fmt"
- "github.com/apache/plc4x/plc4go/spi/utils"
+ "github.com/apache/plc4x/plc4go/spi/pool"
"github.com/rs/zerolog/log"
"net"
"net/url"
@@ -41,16 +41,16 @@ import (
type Discoverer struct {
transportInstanceCreationWorkItemId atomic.Int32
- transportInstanceCreationQueue utils.Executor
+ transportInstanceCreationQueue pool.Executor
deviceScanningWorkItemId atomic.Int32
- deviceScanningQueue utils.Executor
+ deviceScanningQueue pool.Executor
}
func NewDiscoverer() *Discoverer {
return &Discoverer{
// TODO: maybe a dynamic executor would be better to not waste cycles when not in use
- transportInstanceCreationQueue: utils.NewFixedSizeExecutor(50, 100),
- deviceScanningQueue: utils.NewFixedSizeExecutor(50, 100),
+ transportInstanceCreationQueue: pool.NewFixedSizeExecutor(50, 100),
+ deviceScanningQueue: pool.NewFixedSizeExecutor(50, 100),
}
}
@@ -149,7 +149,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
return nil
}
-func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *sync.WaitGroup, connectionUrl *url.URL, ipv4Addr net.IP, udpTransport *udp.Transport, transportInstances chan transports.TransportInstance) utils.Runnable {
+func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *sync.WaitGroup, connectionUrl *url.URL, ipv4Addr net.IP, udpTransport *udp.Transport, transportInstances chan transports.TransportInstance) pool.Runnable {
wg.Add(1)
return func() {
defer wg.Done()
@@ -171,7 +171,7 @@ func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *
}
}
-func (d *Discoverer) createDeviceScanDispatcher(udpTransportInstance *udp.TransportInstance, callback func(event apiModel.PlcDiscoveryItem)) utils.Runnable {
+func (d *Discoverer) createDeviceScanDispatcher(udpTransportInstance *udp.TransportInstance, callback func(event apiModel.PlcDiscoveryItem)) pool.Runnable {
return func() {
log.Debug().Msgf("Scanning %v", udpTransportInstance)
// Create a codec for sending and receiving messages.
diff --git a/plc4go/internal/s7/Connection.go b/plc4go/internal/s7/Connection.go
index cc45c9111e..131bc31db2 100644
--- a/plc4go/internal/s7/Connection.go
+++ b/plc4go/internal/s7/Connection.go
@@ -22,6 +22,7 @@ package s7
import (
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"reflect"
"strings"
"sync"
@@ -60,13 +61,13 @@ type Connection struct {
messageCodec spi.MessageCodec
configuration Configuration
driverContext DriverContext
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
connectionId string
tracer *spi.Tracer
}
-func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, tm spi.RequestTransactionManager, options map[string][]string) *Connection {
+func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, options map[string][]string) *Connection {
connection := &Connection{
tpduGenerator: TpduGenerator{currentTpduId: 10},
messageCodec: messageCodec,
diff --git a/plc4go/internal/s7/Driver.go b/plc4go/internal/s7/Driver.go
index 37d4e080a1..738e6c1f1f 100644
--- a/plc4go/internal/s7/Driver.go
+++ b/plc4go/internal/s7/Driver.go
@@ -21,10 +21,10 @@ package s7
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"net/url"
"github.com/apache/plc4x/plc4go/pkg/api"
- "github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
@@ -33,14 +33,14 @@ import (
type Driver struct {
_default.DefaultDriver
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
awaitSetupComplete bool
awaitDisconnectComplete bool
}
func NewDriver() plc4go.PlcDriver {
driver := &Driver{
- tm: spi.NewRequestTransactionManager(1),
+ tm: transactions.NewRequestTransactionManager(1),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
}
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index 45b8ccfa19..4a99674f62 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -21,6 +21,7 @@ package s7
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"time"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -37,10 +38,10 @@ import (
type Reader struct {
tpduGenerator *TpduGenerator
messageCodec spi.MessageCodec
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
}
-func NewReader(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm spi.RequestTransactionManager) *Reader {
+func NewReader(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager) *Reader {
return &Reader{
tpduGenerator: tpduGenerator,
messageCodec: messageCodec,
@@ -101,7 +102,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
)
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
// Send the over the wire
log.Trace().Msg("Send ")
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index 171fe0660e..2bf9a8f3b5 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -21,6 +21,7 @@ package s7
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/transactions"
"time"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -36,10 +37,10 @@ import (
type Writer struct {
tpduGenerator *TpduGenerator
messageCodec spi.MessageCodec
- tm spi.RequestTransactionManager
+ tm transactions.RequestTransactionManager
}
-func NewWriter(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm spi.RequestTransactionManager) Writer {
+func NewWriter(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager) Writer {
return Writer{
tpduGenerator: tpduGenerator,
messageCodec: messageCodec,
@@ -98,7 +99,7 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
- transaction.Submit(func(transaction spi.RequestTransaction) {
+ transaction.Submit(func(transaction transactions.RequestTransaction) {
// Send the over the wire
if err := m.messageCodec.SendRequest(ctx, tpktPacket, func(message spi.Message) bool {
tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
diff --git a/plc4go/pkg/api/driver.go b/plc4go/pkg/api/driver.go
index 293ddad294..d7a405b601 100644
--- a/plc4go/pkg/api/driver.go
+++ b/plc4go/pkg/api/driver.go
@@ -48,7 +48,7 @@ type PlcDriver interface {
GetConnection(transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan PlcConnectionConnectResult
// GetConnectionWithContext Establishes a connection to a given PLC using the information in the connectionString
// FIXME: this leaks spi in the signature move to spi driver or create interfaces. Can also be done by moving spi in a proper module
- GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan PlcConnectionConnectResult
+ GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan PlcConnectionConnectResult
// SupportsDiscovery returns true if this driver supports discovery
SupportsDiscovery() bool
diff --git a/plc4go/pkg/api/mock_PlcDriver_test.go b/plc4go/pkg/api/mock_PlcDriver_test.go
index c1a1c71636..fb2f548262 100644
--- a/plc4go/pkg/api/mock_PlcDriver_test.go
+++ b/plc4go/pkg/api/mock_PlcDriver_test.go
@@ -292,13 +292,13 @@ func (_c *MockPlcDriver_GetConnection_Call) RunAndReturn(run func(url.URL, map[s
return _c
}
-// GetConnectionWithContext provides a mock function with given fields: ctx, transportUrl, _a2, _a3
-func (_m *MockPlcDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, _a2 map[string]transports.Transport, _a3 map[string][]string) <-chan PlcConnectionConnectResult {
- ret := _m.Called(ctx, transportUrl, _a2, _a3)
+// GetConnectionWithContext provides a mock function with given fields: ctx, transportUrl, _a2, driverOptions
+func (_m *MockPlcDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, _a2 map[string]transports.Transport, driverOptions map[string][]string) <-chan PlcConnectionConnectResult {
+ ret := _m.Called(ctx, transportUrl, _a2, driverOptions)
var r0 <-chan PlcConnectionConnectResult
if rf, ok := ret.Get(0).(func(context.Context, url.URL, map[string]transports.Transport, map[string][]string) <-chan PlcConnectionConnectResult); ok {
- r0 = rf(ctx, transportUrl, _a2, _a3)
+ r0 = rf(ctx, transportUrl, _a2, driverOptions)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan PlcConnectionConnectResult)
@@ -317,12 +317,12 @@ type MockPlcDriver_GetConnectionWithContext_Call struct {
// - ctx context.Context
// - transportUrl url.URL
// - _a2 map[string]transports.Transport
-// - _a3 map[string][]string
-func (_e *MockPlcDriver_Expecter) GetConnectionWithContext(ctx interface{}, transportUrl interface{}, _a2 interface{}, _a3 interface{}) *MockPlcDriver_GetConnectionWithContext_Call {
- return &MockPlcDriver_GetConnectionWithContext_Call{Call: _e.mock.On("GetConnectionWithContext", ctx, transportUrl, _a2, _a3)}
+// - driverOptions map[string][]string
+func (_e *MockPlcDriver_Expecter) GetConnectionWithContext(ctx interface{}, transportUrl interface{}, _a2 interface{}, driverOptions interface{}) *MockPlcDriver_GetConnectionWithContext_Call {
+ return &MockPlcDriver_GetConnectionWithContext_Call{Call: _e.mock.On("GetConnectionWithContext", ctx, transportUrl, _a2, driverOptions)}
}
-func (_c *MockPlcDriver_GetConnectionWithContext_Call) Run(run func(ctx context.Context, transportUrl url.URL, _a2 map[string]transports.Transport, _a3 map[string][]string)) *MockPlcDriver_GetConnectionWithContext_Call {
+func (_c *MockPlcDriver_GetConnectionWithContext_Call) Run(run func(ctx context.Context, transportUrl url.URL, _a2 map[string]transports.Transport, driverOptions map[string][]string)) *MockPlcDriver_GetConnectionWithContext_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(url.URL), args[2].(map[string]transports.Transport), args[3].(map[string][]string))
})
diff --git a/plc4go/pkg/api/mock_Transport_test.go b/plc4go/pkg/api/mock_Transport_test.go
index a3ac4c196b..c42f59e91f 100644
--- a/plc4go/pkg/api/mock_Transport_test.go
+++ b/plc4go/pkg/api/mock_Transport_test.go
@@ -22,10 +22,12 @@
package plc4go
import (
- url "net/url"
+ options "github.com/apache/plc4x/plc4go/spi/options"
+ mock "github.com/stretchr/testify/mock"
transports "github.com/apache/plc4x/plc4go/spi/transports"
- mock "github.com/stretchr/testify/mock"
+
+ url "net/url"
)
// MockTransport is an autogenerated mock type for the Transport type
@@ -41,25 +43,32 @@ func (_m *MockTransport) EXPECT() *MockTransport_Expecter {
return &MockTransport_Expecter{mock: &_m.Mock}
}
-// CreateTransportInstance provides a mock function with given fields: transportUrl, options
-func (_m *MockTransport) CreateTransportInstance(transportUrl url.URL, options map[string][]string) (transports.TransportInstance, error) {
- ret := _m.Called(transportUrl, options)
+// CreateTransportInstance provides a mock function with given fields: transportUrl, _a1, _options
+func (_m *MockTransport) CreateTransportInstance(transportUrl url.URL, _a1 map[string][]string, _options ...options.WithOption) (transports.TransportInstance, error) {
+ _va := make([]interface{}, len(_options))
+ for _i := range _options {
+ _va[_i] = _options[_i]
+ }
+ var _ca []interface{}
+ _ca = append(_ca, transportUrl, _a1)
+ _ca = append(_ca, _va...)
+ ret := _m.Called(_ca...)
var r0 transports.TransportInstance
var r1 error
- if rf, ok := ret.Get(0).(func(url.URL, map[string][]string) (transports.TransportInstance, error)); ok {
- return rf(transportUrl, options)
+ if rf, ok := ret.Get(0).(func(url.URL, map[string][]string, ...options.WithOption) (transports.TransportInstance, error)); ok {
+ return rf(transportUrl, _a1, _options...)
}
- if rf, ok := ret.Get(0).(func(url.URL, map[string][]string) transports.TransportInstance); ok {
- r0 = rf(transportUrl, options)
+ if rf, ok := ret.Get(0).(func(url.URL, map[string][]string, ...options.WithOption) transports.TransportInstance); ok {
+ r0 = rf(transportUrl, _a1, _options...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(transports.TransportInstance)
}
}
- if rf, ok := ret.Get(1).(func(url.URL, map[string][]string) error); ok {
- r1 = rf(transportUrl, options)
+ if rf, ok := ret.Get(1).(func(url.URL, map[string][]string, ...options.WithOption) error); ok {
+ r1 = rf(transportUrl, _a1, _options...)
} else {
r1 = ret.Error(1)
}
@@ -74,14 +83,22 @@ type MockTransport_CreateTransportInstance_Call struct {
// CreateTransportInstance is a helper method to define mock.On call
// - transportUrl url.URL
-// - options map[string][]string
-func (_e *MockTransport_Expecter) CreateTransportInstance(transportUrl interface{}, options interface{}) *MockTransport_CreateTransportInstance_Call {
- return &MockTransport_CreateTransportInstance_Call{Call: _e.mock.On("CreateTransportInstance", transportUrl, options)}
+// - _a1 map[string][]string
+// - _options ...options.WithOption
+func (_e *MockTransport_Expecter) CreateTransportInstance(transportUrl interface{}, _a1 interface{}, _options ...interface{}) *MockTransport_CreateTransportInstance_Call {
+ return &MockTransport_CreateTransportInstance_Call{Call: _e.mock.On("CreateTransportInstance",
+ append([]interface{}{transportUrl, _a1}, _options...)...)}
}
-func (_c *MockTransport_CreateTransportInstance_Call) Run(run func(transportUrl url.URL, options map[string][]string)) *MockTransport_CreateTransportInstance_Call {
+func (_c *MockTransport_CreateTransportInstance_Call) Run(run func(transportUrl url.URL, _a1 map[string][]string, _options ...options.WithOption)) *MockTransport_CreateTransportInstance_Call {
_c.Call.Run(func(args mock.Arguments) {
- run(args[0].(url.URL), args[1].(map[string][]string))
+ variadicArgs := make([]options.WithOption, len(args)-2)
+ for i, a := range args[2:] {
+ if a != nil {
+ variadicArgs[i] = a.(options.WithOption)
+ }
+ }
+ run(args[0].(url.URL), args[1].(map[string][]string), variadicArgs...)
})
return _c
}
@@ -91,7 +108,7 @@ func (_c *MockTransport_CreateTransportInstance_Call) Return(_a0 transports.Tran
return _c
}
-func (_c *MockTransport_CreateTransportInstance_Call) RunAndReturn(run func(url.URL, map[string][]string) (transports.TransportInstance, error)) *MockTransport_CreateTransportInstance_Call {
+func (_c *MockTransport_CreateTransportInstance_Call) RunAndReturn(run func(url.URL, map[string][]string, ...options.WithOption) (transports.TransportInstance, error)) *MockTransport_CreateTransportInstance_Call {
_c.Call.Return(run)
return _c
}
diff --git a/plc4go/protocols/cbus/readwrite/model/StaticHelper.go b/plc4go/protocols/cbus/readwrite/model/StaticHelper.go
index 100b1f4e41..e219dddbff 100644
--- a/plc4go/protocols/cbus/readwrite/model/StaticHelper.go
+++ b/plc4go/protocols/cbus/readwrite/model/StaticHelper.go
@@ -21,10 +21,11 @@ package model
import (
"encoding/hex"
+
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/utils"
+
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
func ReadAndValidateChecksum(readBuffer utils.ReadBuffer, message spi.Message, srchk bool) (Checksum, error) {
@@ -140,7 +141,7 @@ func readBytesFromHex(logicalName string, readBuffer utils.ReadBuffer, srchk boo
readBuffer.Reset(readBuffer.GetPos() - 2)
rawBytes = rawBytes[:len(rawBytes)-1]
}
- log.Trace().Msgf("%d bytes decoded", n)
+ Plc4xModelLog.Trace().Msgf("%d bytes decoded", n)
return rawBytes, nil
}
@@ -174,7 +175,7 @@ func writeToHex(logicalName string, writeBuffer utils.WriteBuffer, bytesToWrite
// usually you use hex.Encode but we want the encoding in uppercase
//n := hex.Encode(hexBytes, wbbb.GetBytes())
n := encodeHexUpperCase(hexBytes, bytesToWrite)
- log.Trace().Msgf("%d bytes encoded", n)
+ Plc4xModelLog.Trace().Msgf("%d bytes encoded", n)
return writeBuffer.WriteByteArray(logicalName, hexBytes)
}
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 5cf97068e6..f5be14b852 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -22,18 +22,17 @@ package _default
import (
"context"
"fmt"
- "github.com/apache/plc4x/plc4go/spi/utils"
"runtime/debug"
"time"
- "github.com/apache/plc4x/plc4go/spi/options"
-
"github.com/apache/plc4x/plc4go/pkg/api/config"
"github.com/apache/plc4x/plc4go/spi"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
+ "github.com/apache/plc4x/plc4go/spi/utils"
+
"github.com/pkg/errors"
"github.com/rs/zerolog"
- "github.com/rs/zerolog/log"
)
// DefaultCodecRequirements adds required methods to MessageCodec that are needed when using DefaultCodec
@@ -84,25 +83,29 @@ type defaultCodec struct {
expectations []spi.Expectation
running bool
customMessageHandling func(codec DefaultCodecRequirements, message spi.Message) bool
+
+ log zerolog.Logger
}
-func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transportInstance transports.TransportInstance, options ...options.WithOption) DefaultCodec {
+func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transportInstance transports.TransportInstance, _options ...options.WithOption) DefaultCodec {
var customMessageHandler func(codec DefaultCodecRequirements, message spi.Message) bool
+ var logger = options.ExtractCustomLogger(_options...)
- for _, option := range options {
- switch option.(type) {
+ for _, option := range _options {
+ switch option := option.(type) {
case withCustomMessageHandler:
- customMessageHandler = option.(withCustomMessageHandler).customMessageHandler
+ customMessageHandler = option.customMessageHandler
}
}
return &defaultCodec{
- defaultCodecRequirements,
- transportInstance,
- make(chan spi.Message),
- []spi.Expectation{},
- false,
- customMessageHandler,
+ DefaultCodecRequirements: defaultCodecRequirements,
+ transportInstance: transportInstance,
+ defaultIncomingMessageChannel: make(chan spi.Message),
+ expectations: []spi.Expectation{},
+ running: false,
+ customMessageHandling: customMessageHandler,
+ log: logger,
}
}
@@ -149,17 +152,17 @@ func (m *defaultCodec) Connect() error {
}
func (m *defaultCodec) ConnectWithContext(ctx context.Context) error {
- log.Trace().Msg("Connecting")
+ m.log.Trace().Msg("Connecting")
if !m.transportInstance.IsConnected() {
if err := m.transportInstance.ConnectWithContext(ctx); err != nil {
return err
}
} else {
- log.Info().Msg("Transport instance already connected")
+ m.log.Info().Msg("Transport instance already connected")
}
if !m.running {
- log.Debug().Msg("Message codec currently not running, starting worker now")
+ m.log.Debug().Msg("Message codec currently not running, starting worker now")
go m.Work(m.DefaultCodecRequirements)
}
m.running = true
@@ -167,7 +170,7 @@ func (m *defaultCodec) ConnectWithContext(ctx context.Context) error {
}
func (m *defaultCodec) Disconnect() error {
- log.Trace().Msg("Disconnecting")
+ m.log.Trace().Msg("Disconnecting")
m.running = false
return m.transportInstance.Close()
}
@@ -192,7 +195,7 @@ func (m *defaultCodec) SendRequest(ctx context.Context, message spi.Message, acc
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "Not sending message as context is aborted")
}
- log.Trace().Msg("Sending request")
+ m.log.Trace().Msg("Sending request")
// Send the actual message
err := m.Send(message)
if err != nil {
@@ -212,7 +215,7 @@ func (m *defaultCodec) TimeoutExpectations(now time.Time) {
// Call the error handler.
go func(expectation spi.Expectation) {
if err := expectation.GetHandleError()(utils.NewTimeoutError(now.Sub(expectation.GetExpiration()))); err != nil {
- log.Error().Err(err).Msg("Got an error handling error on expectation")
+ m.log.Error().Err(err).Msg("Got an error handling error on expectation")
}
}(expectation)
continue
@@ -223,7 +226,7 @@ func (m *defaultCodec) TimeoutExpectations(now time.Time) {
i--
go func(expectation spi.Expectation) {
if err := expectation.GetHandleError()(err); err != nil {
- log.Error().Err(err).Msg("Got an error handling error on expectation")
+ m.log.Error().Err(err).Msg("Got an error handling error on expectation")
}
}(expectation)
continue
@@ -233,12 +236,12 @@ func (m *defaultCodec) TimeoutExpectations(now time.Time) {
func (m *defaultCodec) HandleMessages(message spi.Message) bool {
messageHandled := false
- log.Trace().Msgf("Current number of expectations: %d", len(m.expectations))
+ m.log.Trace().Msgf("Current number of expectations: %d", len(m.expectations))
for index, expectation := range m.expectations {
// Check if the current message matches the expectations
// If it does, let it handle the message.
if accepts := expectation.GetAcceptsMessage()(message); accepts {
- log.Debug().Stringer("expectation", expectation).Msg("accepts message")
+ m.log.Debug().Stringer("expectation", expectation).Msg("accepts message")
// TODO: decouple from worker thread
err := expectation.GetHandleMessage()(message)
if err != nil {
@@ -246,7 +249,7 @@ func (m *defaultCodec) HandleMessages(message spi.Message) bool {
// TODO: decouple from worker thread
err := expectation.GetHandleError()(err)
if err != nil {
- log.Error().Err(err).Msg("Got an error handling error on expectation")
+ m.log.Error().Err(err).Msg("Got an error handling error on expectation")
}
continue
}
@@ -263,7 +266,7 @@ func (m *defaultCodec) HandleMessages(message spi.Message) bool {
}
func (m *defaultCodec) Work(codec DefaultCodecRequirements) {
- workerLog := log.With().Logger()
+ workerLog := m.log.With().Logger()
if !config.TraceDefaultMessageCodecWorker {
workerLog = zerolog.Nop()
}
@@ -271,7 +274,7 @@ func (m *defaultCodec) Work(codec DefaultCodecRequirements) {
defer func(workerLog zerolog.Logger) {
if err := recover(); err != nil {
// TODO: If this is an error, cast it to an error and log it with "Err(err)"
- log.Error().Msgf("recovered from: %#v at %s", err, string(debug.Stack()))
+ m.log.Error().Msgf("recovered from: %#v at %s", err, string(debug.Stack()))
}
if m.running {
workerLog.Warn().Msg("Keep running")
diff --git a/plc4go/spi/default/DefaultConnection.go b/plc4go/spi/default/DefaultConnection.go
index c703f035ca..492ae8be56 100644
--- a/plc4go/spi/default/DefaultConnection.go
+++ b/plc4go/spi/default/DefaultConnection.go
@@ -21,16 +21,15 @@ package _default
import (
"context"
+ "github.com/rs/zerolog"
"time"
- "github.com/pkg/errors"
- "github.com/rs/zerolog/log"
-
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
+ "github.com/pkg/errors"
)
// DefaultConnectionRequirements defines the required at a implementing connection when using DefaultConnection
@@ -152,14 +151,16 @@ type defaultConnection struct {
connected bool
tagHandler spi.PlcTagHandler
valueHandler spi.PlcValueHandler
+
+ log zerolog.Logger
}
-func buildDefaultConnection(requirements DefaultConnectionRequirements, options ...options.WithOption) DefaultConnection {
+func buildDefaultConnection(requirements DefaultConnectionRequirements, _options ...options.WithOption) DefaultConnection {
defaultTtl := time.Second * 10
var tagHandler spi.PlcTagHandler
var valueHandler spi.PlcValueHandler
- for _, option := range options {
+ for _, option := range _options {
switch option.(type) {
case withDefaultTtl:
defaultTtl = option.(withDefaultTtl).defaultTtl
@@ -171,11 +172,13 @@ func buildDefaultConnection(requirements DefaultConnectionRequirements, options
}
return &defaultConnection{
- requirements,
- defaultTtl,
- false,
- tagHandler,
- valueHandler,
+ DefaultConnectionRequirements: requirements,
+ defaultTtl: defaultTtl,
+ connected: false,
+ tagHandler: tagHandler,
+ valueHandler: valueHandler,
+
+ log: options.ExtractCustomLogger(_options...),
}
}
@@ -233,7 +236,7 @@ func (d *defaultConnection) Connect() <-chan plc4go.PlcConnectionConnectResult {
}
func (d *defaultConnection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
- log.Trace().Msg("Connecting")
+ d.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
go func() {
defer func() {
@@ -250,7 +253,7 @@ func (d *defaultConnection) ConnectWithContext(ctx context.Context) <-chan plc4g
}
func (d *defaultConnection) BlockingClose() {
- log.Trace().Msg("blocking close connection")
+ d.log.Trace().Msg("blocking close connection")
closeResults := d.GetConnection().Close()
timeout := time.NewTimer(d.GetTtl())
d.SetConnected(false)
@@ -267,9 +270,9 @@ func (d *defaultConnection) BlockingClose() {
}
func (d *defaultConnection) Close() <-chan plc4go.PlcConnectionCloseResult {
- log.Trace().Msg("close connection")
+ d.log.Trace().Msg("close connection")
if err := d.GetMessageCodec().Disconnect(); err != nil {
- log.Warn().Err(err).Msg("Error disconnecting message code")
+ d.log.Warn().Err(err).Msg("Error disconnecting message code")
}
err := d.GetTransportInstance().Close()
d.SetConnected(false)
diff --git a/plc4go/spi/default/DefaultDriver.go b/plc4go/spi/default/DefaultDriver.go
index f654f65a58..42f4d18d77 100644
--- a/plc4go/spi/default/DefaultDriver.go
+++ b/plc4go/spi/default/DefaultDriver.go
@@ -33,7 +33,7 @@ import (
)
type DefaultDriverRequirements interface {
- GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult
+ GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult
DiscoverWithContext(callback context.Context, event func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error
}
diff --git a/plc4go/spi/default/mock_DefaultDriverRequirements_test.go b/plc4go/spi/default/mock_DefaultDriverRequirements_test.go
index 021d730aec..9c12bf0aee 100644
--- a/plc4go/spi/default/mock_DefaultDriverRequirements_test.go
+++ b/plc4go/spi/default/mock_DefaultDriverRequirements_test.go
@@ -107,13 +107,13 @@ func (_c *MockDefaultDriverRequirements_DiscoverWithContext_Call) RunAndReturn(r
return _c
}
-// GetConnectionWithContext provides a mock function with given fields: ctx, transportUrl, _a2, _a3
-func (_m *MockDefaultDriverRequirements) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, _a2 map[string]transports.Transport, _a3 map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- ret := _m.Called(ctx, transportUrl, _a2, _a3)
+// GetConnectionWithContext provides a mock function with given fields: ctx, transportUrl, _a2, driverOptions
+func (_m *MockDefaultDriverRequirements) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, _a2 map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+ ret := _m.Called(ctx, transportUrl, _a2, driverOptions)
var r0 <-chan plc4go.PlcConnectionConnectResult
if rf, ok := ret.Get(0).(func(context.Context, url.URL, map[string]transports.Transport, map[string][]string) <-chan plc4go.PlcConnectionConnectResult); ok {
- r0 = rf(ctx, transportUrl, _a2, _a3)
+ r0 = rf(ctx, transportUrl, _a2, driverOptions)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan plc4go.PlcConnectionConnectResult)
@@ -132,12 +132,12 @@ type MockDefaultDriverRequirements_GetConnectionWithContext_Call struct {
// - ctx context.Context
// - transportUrl url.URL
// - _a2 map[string]transports.Transport
-// - _a3 map[string][]string
-func (_e *MockDefaultDriverRequirements_Expecter) GetConnectionWithContext(ctx interface{}, transportUrl interface{}, _a2 interface{}, _a3 interface{}) *MockDefaultDriverRequirements_GetConnectionWithContext_Call {
- return &MockDefaultDriverRequirements_GetConnectionWithContext_Call{Call: _e.mock.On("GetConnectionWithContext", ctx, transportUrl, _a2, _a3)}
+// - driverOptions map[string][]string
+func (_e *MockDefaultDriverRequirements_Expecter) GetConnectionWithContext(ctx interface{}, transportUrl interface{}, _a2 interface{}, driverOptions interface{}) *MockDefaultDriverRequirements_GetConnectionWithContext_Call {
+ return &MockDefaultDriverRequirements_GetConnectionWithContext_Call{Call: _e.mock.On("GetConnectionWithContext", ctx, transportUrl, _a2, driverOptions)}
}
-func (_c *MockDefaultDriverRequirements_GetConnectionWithContext_Call) Run(run func(ctx context.Context, transportUrl url.URL, _a2 map[string]transports.Transport, _a3 map[string][]string)) *MockDefaultDriverRequirements_GetConnectionWithContext_Call {
+func (_c *MockDefaultDriverRequirements_GetConnectionWithContext_Call) Run(run func(ctx context.Context, transportUrl url.URL, _a2 map[string]transports.Transport, driverOptions map[string][]string)) *MockDefaultDriverRequirements_GetConnectionWithContext_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(url.URL), args[2].(map[string]transports.Transport), args[3].(map[string][]string))
})
diff --git a/plc4go/spi/default/mock_DefaultDriver_test.go b/plc4go/spi/default/mock_DefaultDriver_test.go
index 45d7a52da9..f3fcadec19 100644
--- a/plc4go/spi/default/mock_DefaultDriver_test.go
+++ b/plc4go/spi/default/mock_DefaultDriver_test.go
@@ -296,13 +296,13 @@ func (_c *MockDefaultDriver_GetConnection_Call) RunAndReturn(run func(url.URL, m
return _c
}
-// GetConnectionWithContext provides a mock function with given fields: ctx, transportUrl, _a2, _a3
-func (_m *MockDefaultDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, _a2 map[string]transports.Transport, _a3 map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- ret := _m.Called(ctx, transportUrl, _a2, _a3)
+// GetConnectionWithContext provides a mock function with given fields: ctx, transportUrl, _a2, driverOptions
+func (_m *MockDefaultDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, _a2 map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+ ret := _m.Called(ctx, transportUrl, _a2, driverOptions)
var r0 <-chan plc4go.PlcConnectionConnectResult
if rf, ok := ret.Get(0).(func(context.Context, url.URL, map[string]transports.Transport, map[string][]string) <-chan plc4go.PlcConnectionConnectResult); ok {
- r0 = rf(ctx, transportUrl, _a2, _a3)
+ r0 = rf(ctx, transportUrl, _a2, driverOptions)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan plc4go.PlcConnectionConnectResult)
@@ -321,12 +321,12 @@ type MockDefaultDriver_GetConnectionWithContext_Call struct {
// - ctx context.Context
// - transportUrl url.URL
// - _a2 map[string]transports.Transport
-// - _a3 map[string][]string
-func (_e *MockDefaultDriver_Expecter) GetConnectionWithContext(ctx interface{}, transportUrl interface{}, _a2 interface{}, _a3 interface{}) *MockDefaultDriver_GetConnectionWithContext_Call {
- return &MockDefaultDriver_GetConnectionWithContext_Call{Call: _e.mock.On("GetConnectionWithContext", ctx, transportUrl, _a2, _a3)}
+// - driverOptions map[string][]string
+func (_e *MockDefaultDriver_Expecter) GetConnectionWithContext(ctx interface{}, transportUrl interface{}, _a2 interface{}, driverOptions interface{}) *MockDefaultDriver_GetConnectionWithContext_Call {
+ return &MockDefaultDriver_GetConnectionWithContext_Call{Call: _e.mock.On("GetConnectionWithContext", ctx, transportUrl, _a2, driverOptions)}
}
-func (_c *MockDefaultDriver_GetConnectionWithContext_Call) Run(run func(ctx context.Context, transportUrl url.URL, _a2 map[string]transports.Transport, _a3 map[string][]string)) *MockDefaultDriver_GetConnectionWithContext_Call {
+func (_c *MockDefaultDriver_GetConnectionWithContext_Call) Run(run func(ctx context.Context, transportUrl url.URL, _a2 map[string]transports.Transport, driverOptions map[string][]string)) *MockDefaultDriver_GetConnectionWithContext_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(url.URL), args[2].(map[string]transports.Transport), args[3].(map[string][]string))
})
diff --git a/plc4go/spi/mock_requirements.go b/plc4go/spi/mock_requirements.go
index c19da79323..d5d13452a4 100644
--- a/plc4go/spi/mock_requirements.go
+++ b/plc4go/spi/mock_requirements.go
@@ -20,12 +20,12 @@
package spi
import (
- "github.com/apache/plc4x/plc4go/spi/utils"
+ "github.com/apache/plc4x/plc4go/spi/pool"
)
// Note this file is a Helper for mockery to generate use mocks from other package
// Deprecated: don't use it in productive code
type CompletionFuture interface {
- utils.CompletionFuture
+ pool.CompletionFuture
}
diff --git a/plc4go/spi/options/Option.go b/plc4go/spi/options/Option.go
index 1a242d5593..772d7bd8d9 100644
--- a/plc4go/spi/options/Option.go
+++ b/plc4go/spi/options/Option.go
@@ -19,6 +19,8 @@
package options
+import "github.com/rs/zerolog"
+
// WithOption is a marker interface for options supplied by the builders like WithDefaultTtl
type WithOption interface {
isOption() bool
@@ -31,3 +33,36 @@ type Option struct {
func (_ Option) isOption() bool {
return true
}
+
+// WithCustomLogger is a global option to supply a custom logger
+func WithCustomLogger(logger zerolog.Logger) WithOption {
+ return withCustomLogger{logger: logger}
+}
+
+// ExtractCustomLogger can be used to extract the custom logger
+func ExtractCustomLogger(options ...WithOption) (customLogger zerolog.Logger) {
+ for _, option := range options {
+ switch option := option.(type) {
+ case withCustomLogger:
+ customLogger = option.logger
+ return
+ }
+ }
+ return
+}
+
+///////////////////////////////////////
+///////////////////////////////////////
+//
+// Internal section
+//
+
+type withCustomLogger struct {
+ Option
+ logger zerolog.Logger
+}
+
+//
+//
+///////////////////////////////////////
+///////////////////////////////////////
diff --git a/plc4go/spi/pool/mock_CompletionFuture_test.go b/plc4go/spi/pool/mock_CompletionFuture_test.go
new file mode 100644
index 0000000000..f395023e6f
--- /dev/null
+++ b/plc4go/spi/pool/mock_CompletionFuture_test.go
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// Code generated by mockery v2.28.1. DO NOT EDIT.
+
+package pool
+
+import (
+ context "context"
+
+ mock "github.com/stretchr/testify/mock"
+)
+
+// MockCompletionFuture is an autogenerated mock type for the CompletionFuture type
+type MockCompletionFuture struct {
+ mock.Mock
+}
+
+type MockCompletionFuture_Expecter struct {
+ mock *mock.Mock
+}
+
+func (_m *MockCompletionFuture) EXPECT() *MockCompletionFuture_Expecter {
+ return &MockCompletionFuture_Expecter{mock: &_m.Mock}
+}
+
+// AwaitCompletion provides a mock function with given fields: ctx
+func (_m *MockCompletionFuture) AwaitCompletion(ctx context.Context) error {
+ ret := _m.Called(ctx)
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(context.Context) error); ok {
+ r0 = rf(ctx)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// MockCompletionFuture_AwaitCompletion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AwaitCompletion'
+type MockCompletionFuture_AwaitCompletion_Call struct {
+ *mock.Call
+}
+
+// AwaitCompletion is a helper method to define mock.On call
+// - ctx context.Context
+func (_e *MockCompletionFuture_Expecter) AwaitCompletion(ctx interface{}) *MockCompletionFuture_AwaitCompletion_Call {
+ return &MockCompletionFuture_AwaitCompletion_Call{Call: _e.mock.On("AwaitCompletion", ctx)}
+}
+
+func (_c *MockCompletionFuture_AwaitCompletion_Call) Run(run func(ctx context.Context)) *MockCompletionFuture_AwaitCompletion_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(context.Context))
+ })
+ return _c
+}
+
+func (_c *MockCompletionFuture_AwaitCompletion_Call) Return(_a0 error) *MockCompletionFuture_AwaitCompletion_Call {
+ _c.Call.Return(_a0)
+ return _c
+}
+
+func (_c *MockCompletionFuture_AwaitCompletion_Call) RunAndReturn(run func(context.Context) error) *MockCompletionFuture_AwaitCompletion_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// Cancel provides a mock function with given fields: interrupt, err
+func (_m *MockCompletionFuture) Cancel(interrupt bool, err error) {
+ _m.Called(interrupt, err)
+}
+
+// MockCompletionFuture_Cancel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Cancel'
+type MockCompletionFuture_Cancel_Call struct {
+ *mock.Call
+}
+
+// Cancel is a helper method to define mock.On call
+// - interrupt bool
+// - err error
+func (_e *MockCompletionFuture_Expecter) Cancel(interrupt interface{}, err interface{}) *MockCompletionFuture_Cancel_Call {
+ return &MockCompletionFuture_Cancel_Call{Call: _e.mock.On("Cancel", interrupt, err)}
+}
+
+func (_c *MockCompletionFuture_Cancel_Call) Run(run func(interrupt bool, err error)) *MockCompletionFuture_Cancel_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(bool), args[1].(error))
+ })
+ return _c
+}
+
+func (_c *MockCompletionFuture_Cancel_Call) Return() *MockCompletionFuture_Cancel_Call {
+ _c.Call.Return()
+ return _c
+}
+
+func (_c *MockCompletionFuture_Cancel_Call) RunAndReturn(run func(bool, error)) *MockCompletionFuture_Cancel_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+type mockConstructorTestingTNewMockCompletionFuture interface {
+ mock.TestingT
+ Cleanup(func())
+}
+
+// NewMockCompletionFuture creates a new instance of MockCompletionFuture. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+func NewMockCompletionFuture(t mockConstructorTestingTNewMockCompletionFuture) *MockCompletionFuture {
+ mock := &MockCompletionFuture{}
+ mock.Mock.Test(t)
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/plc4go/spi/pool/mock_Executor_test.go b/plc4go/spi/pool/mock_Executor_test.go
new file mode 100644
index 0000000000..f7a92e3e80
--- /dev/null
+++ b/plc4go/spi/pool/mock_Executor_test.go
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// Code generated by mockery v2.28.1. DO NOT EDIT.
+
+package pool
+
+import (
+ context "context"
+
+ mock "github.com/stretchr/testify/mock"
+)
+
+// MockExecutor is an autogenerated mock type for the Executor type
+type MockExecutor struct {
+ mock.Mock
+}
+
+type MockExecutor_Expecter struct {
+ mock *mock.Mock
+}
+
+func (_m *MockExecutor) EXPECT() *MockExecutor_Expecter {
+ return &MockExecutor_Expecter{mock: &_m.Mock}
+}
+
+// Close provides a mock function with given fields:
+func (_m *MockExecutor) Close() error {
+ ret := _m.Called()
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func() error); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// MockExecutor_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
+type MockExecutor_Close_Call struct {
+ *mock.Call
+}
+
+// Close is a helper method to define mock.On call
+func (_e *MockExecutor_Expecter) Close() *MockExecutor_Close_Call {
+ return &MockExecutor_Close_Call{Call: _e.mock.On("Close")}
+}
+
+func (_c *MockExecutor_Close_Call) Run(run func()) *MockExecutor_Close_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run()
+ })
+ return _c
+}
+
+func (_c *MockExecutor_Close_Call) Return(_a0 error) *MockExecutor_Close_Call {
+ _c.Call.Return(_a0)
+ return _c
+}
+
+func (_c *MockExecutor_Close_Call) RunAndReturn(run func() error) *MockExecutor_Close_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// IsRunning provides a mock function with given fields:
+func (_m *MockExecutor) IsRunning() bool {
+ ret := _m.Called()
+
+ var r0 bool
+ if rf, ok := ret.Get(0).(func() bool); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Get(0).(bool)
+ }
+
+ return r0
+}
+
+// MockExecutor_IsRunning_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsRunning'
+type MockExecutor_IsRunning_Call struct {
+ *mock.Call
+}
+
+// IsRunning is a helper method to define mock.On call
+func (_e *MockExecutor_Expecter) IsRunning() *MockExecutor_IsRunning_Call {
+ return &MockExecutor_IsRunning_Call{Call: _e.mock.On("IsRunning")}
+}
+
+func (_c *MockExecutor_IsRunning_Call) Run(run func()) *MockExecutor_IsRunning_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run()
+ })
+ return _c
+}
+
+func (_c *MockExecutor_IsRunning_Call) Return(_a0 bool) *MockExecutor_IsRunning_Call {
+ _c.Call.Return(_a0)
+ return _c
+}
+
+func (_c *MockExecutor_IsRunning_Call) RunAndReturn(run func() bool) *MockExecutor_IsRunning_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// Start provides a mock function with given fields:
+func (_m *MockExecutor) Start() {
+ _m.Called()
+}
+
+// MockExecutor_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start'
+type MockExecutor_Start_Call struct {
+ *mock.Call
+}
+
+// Start is a helper method to define mock.On call
+func (_e *MockExecutor_Expecter) Start() *MockExecutor_Start_Call {
+ return &MockExecutor_Start_Call{Call: _e.mock.On("Start")}
+}
+
+func (_c *MockExecutor_Start_Call) Run(run func()) *MockExecutor_Start_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run()
+ })
+ return _c
+}
+
+func (_c *MockExecutor_Start_Call) Return() *MockExecutor_Start_Call {
+ _c.Call.Return()
+ return _c
+}
+
+func (_c *MockExecutor_Start_Call) RunAndReturn(run func()) *MockExecutor_Start_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// Stop provides a mock function with given fields:
+func (_m *MockExecutor) Stop() {
+ _m.Called()
+}
+
+// MockExecutor_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'
+type MockExecutor_Stop_Call struct {
+ *mock.Call
+}
+
+// Stop is a helper method to define mock.On call
+func (_e *MockExecutor_Expecter) Stop() *MockExecutor_Stop_Call {
+ return &MockExecutor_Stop_Call{Call: _e.mock.On("Stop")}
+}
+
+func (_c *MockExecutor_Stop_Call) Run(run func()) *MockExecutor_Stop_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run()
+ })
+ return _c
+}
+
+func (_c *MockExecutor_Stop_Call) Return() *MockExecutor_Stop_Call {
+ _c.Call.Return()
+ return _c
+}
+
+func (_c *MockExecutor_Stop_Call) RunAndReturn(run func()) *MockExecutor_Stop_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// Submit provides a mock function with given fields: ctx, workItemId, runnable
+func (_m *MockExecutor) Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture {
+ ret := _m.Called(ctx, workItemId, runnable)
+
+ var r0 CompletionFuture
+ if rf, ok := ret.Get(0).(func(context.Context, int32, Runnable) CompletionFuture); ok {
+ r0 = rf(ctx, workItemId, runnable)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(CompletionFuture)
+ }
+ }
+
+ return r0
+}
+
+// MockExecutor_Submit_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Submit'
+type MockExecutor_Submit_Call struct {
+ *mock.Call
+}
+
+// Submit is a helper method to define mock.On call
+// - ctx context.Context
+// - workItemId int32
+// - runnable Runnable
+func (_e *MockExecutor_Expecter) Submit(ctx interface{}, workItemId interface{}, runnable interface{}) *MockExecutor_Submit_Call {
+ return &MockExecutor_Submit_Call{Call: _e.mock.On("Submit", ctx, workItemId, runnable)}
+}
+
+func (_c *MockExecutor_Submit_Call) Run(run func(ctx context.Context, workItemId int32, runnable Runnable)) *MockExecutor_Submit_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(context.Context), args[1].(int32), args[2].(Runnable))
+ })
+ return _c
+}
+
+func (_c *MockExecutor_Submit_Call) Return(_a0 CompletionFuture) *MockExecutor_Submit_Call {
+ _c.Call.Return(_a0)
+ return _c
+}
+
+func (_c *MockExecutor_Submit_Call) RunAndReturn(run func(context.Context, int32, Runnable) CompletionFuture) *MockExecutor_Submit_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+type mockConstructorTestingTNewMockExecutor interface {
+ mock.TestingT
+ Cleanup(func())
+}
+
+// NewMockExecutor creates a new instance of MockExecutor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+func NewMockExecutor(t mockConstructorTestingTNewMockExecutor) *MockExecutor {
+ mock := &MockExecutor{}
+ mock.Mock.Test(t)
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/plc4go/spi/pool/mock_Runnable_test.go b/plc4go/spi/pool/mock_Runnable_test.go
new file mode 100644
index 0000000000..a55e5e962a
--- /dev/null
+++ b/plc4go/spi/pool/mock_Runnable_test.go
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// Code generated by mockery v2.28.1. DO NOT EDIT.
+
+package pool
+
+import mock "github.com/stretchr/testify/mock"
+
+// MockRunnable is an autogenerated mock type for the Runnable type
+type MockRunnable struct {
+ mock.Mock
+}
+
+type MockRunnable_Expecter struct {
+ mock *mock.Mock
+}
+
+func (_m *MockRunnable) EXPECT() *MockRunnable_Expecter {
+ return &MockRunnable_Expecter{mock: &_m.Mock}
+}
+
+// Execute provides a mock function with given fields:
+func (_m *MockRunnable) Execute() {
+ _m.Called()
+}
+
+// MockRunnable_Execute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Execute'
+type MockRunnable_Execute_Call struct {
+ *mock.Call
+}
+
+// Execute is a helper method to define mock.On call
+func (_e *MockRunnable_Expecter) Execute() *MockRunnable_Execute_Call {
+ return &MockRunnable_Execute_Call{Call: _e.mock.On("Execute")}
+}
+
+func (_c *MockRunnable_Execute_Call) Run(run func()) *MockRunnable_Execute_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run()
+ })
+ return _c
+}
+
+func (_c *MockRunnable_Execute_Call) Return() *MockRunnable_Execute_Call {
+ _c.Call.Return()
+ return _c
+}
+
+func (_c *MockRunnable_Execute_Call) RunAndReturn(run func()) *MockRunnable_Execute_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+type mockConstructorTestingTNewMockRunnable interface {
+ mock.TestingT
+ Cleanup(func())
+}
+
+// NewMockRunnable creates a new instance of MockRunnable. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+func NewMockRunnable(t mockConstructorTestingTNewMockRunnable) *MockRunnable {
+ mock := &MockRunnable{}
+ mock.Mock.Test(t)
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/plc4go/spi/testutils/TestUtils.go b/plc4go/spi/testutils/TestUtils.go
index 4dbae796f8..35773412ea 100644
--- a/plc4go/spi/testutils/TestUtils.go
+++ b/plc4go/spi/testutils/TestUtils.go
@@ -24,9 +24,11 @@ import (
"github.com/ajankovic/xdiff/parser"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
"os"
+ "runtime/debug"
"strings"
"testing"
)
@@ -98,3 +100,35 @@ func CompareResults(t *testing.T, actualString []byte, referenceString []byte) e
_ = boxSideBySide // TODO: xml too distorted, we need a don't center option
return errors.New("there were differences: Expected: \n" + string(referenceString) + "\nBut Got: \n" + string(actualString))
}
+
+// ProduceTestingLogger produces a logger which redirects to testing.T
+func ProduceTestingLogger(t *testing.T) zerolog.Logger {
+ return zerolog.New(zerolog.NewConsoleWriter(zerolog.ConsoleTestWriter(t)))
+}
+
+// SetToTestingLogger sets logger to ProduceTestingLogger and resets it on cleanup
+func SetToTestingLogger(t *testing.T, logger *zerolog.Logger) {
+ oldLogger := *logger
+ t.Cleanup(func() {
+ *logger = oldLogger
+ })
+ newLogger := ProduceTestingLogger(t)
+ *logger = newLogger
+}
+
+type _explodingGlobalLogger struct {
+ hardExplode bool
+}
+
+func (e _explodingGlobalLogger) Write(_ []byte) (_ int, err error) {
+ if e.hardExplode {
+ debug.PrintStack()
+ panic("found a global log usage")
+ }
+ return 0, errors.New("found a global log usage")
+}
+
+// ExplodingGlobalLogger Useful to find unredirected logs
+func ExplodingGlobalLogger(hardExplode bool) {
+ log.Logger = zerolog.New(_explodingGlobalLogger{hardExplode: hardExplode})
+}
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index 08865904b1..ff41af79c2 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -23,6 +23,9 @@ import (
"container/list"
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/pool"
+ "io"
"runtime"
"sync"
"time"
@@ -35,10 +38,10 @@ import (
"github.com/rs/zerolog/log"
)
-var sharedExecutorInstance utils.Executor // shared instance
+var sharedExecutorInstance pool.Executor // shared instance
func init() {
- sharedExecutorInstance = utils.NewFixedSizeExecutor(runtime.NumCPU(), 100, utils.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers))
+ sharedExecutorInstance = pool.NewFixedSizeExecutor(runtime.NumCPU(), 100, pool.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers))
sharedExecutorInstance.Start()
}
@@ -59,6 +62,9 @@ type RequestTransaction interface {
// RequestTransactionManager handles transactions
type RequestTransactionManager interface {
+ io.Closer
+ // CloseGraceful gives some time opposed to io.Closer
+ CloseGraceful(timeout time.Duration) error
// SetNumberOfConcurrentRequests sets the number of concurrent requests that will be sent out to a device
SetNumberOfConcurrentRequests(numberOfConcurrentRequests int)
// StartTransaction starts a RequestTransaction
@@ -66,26 +72,27 @@ type RequestTransactionManager interface {
}
// NewRequestTransactionManager creates a new RequestTransactionManager
-func NewRequestTransactionManager(numberOfConcurrentRequests int, requestTransactionManagerOptions ...RequestTransactionManagerOption) RequestTransactionManager {
+func NewRequestTransactionManager(numberOfConcurrentRequests int, _options ...options.WithOption) RequestTransactionManager {
_requestTransactionManager := &requestTransactionManager{
numberOfConcurrentRequests: numberOfConcurrentRequests,
transactionId: 0,
workLog: *list.New(),
executor: sharedExecutorInstance,
+
+ log: options.ExtractCustomLogger(_options...),
}
- for _, requestTransactionManagerOption := range requestTransactionManagerOptions {
- requestTransactionManagerOption(_requestTransactionManager)
+ for _, option := range _options {
+ switch option := option.(type) {
+ case *withCustomExecutor:
+ _requestTransactionManager.executor = option.executor
+ }
}
return _requestTransactionManager
}
-type RequestTransactionManagerOption func(requestTransactionManager *requestTransactionManager)
-
// WithCustomExecutor sets a custom Executor for the RequestTransactionManager
-func WithCustomExecutor(executor utils.Executor) RequestTransactionManagerOption {
- return func(requestTransactionManager *requestTransactionManager) {
- requestTransactionManager.executor = executor
- }
+func WithCustomExecutor(executor pool.Executor) options.WithOption {
+ return &withCustomExecutor{executor: executor}
}
///////////////////////////////////////
@@ -94,13 +101,18 @@ func WithCustomExecutor(executor utils.Executor) RequestTransactionManagerOption
// Internal section
//
+type withCustomExecutor struct {
+ options.Option
+ executor pool.Executor
+}
+
type requestTransaction struct {
parent *requestTransactionManager
transactionId int32
/** The initial operation to perform to kick off the request */
- operation utils.Runnable
- completionFuture utils.CompletionFuture
+ operation pool.Runnable
+ completionFuture pool.CompletionFuture
transactionLog zerolog.Logger
}
@@ -115,7 +127,11 @@ type requestTransactionManager struct {
// Important, this is a FIFO Queue for Fairness!
workLog list.List
workLogMutex sync.RWMutex
- executor utils.Executor
+ executor pool.Executor
+
+ shutdown bool
+
+ log zerolog.Logger
}
//
@@ -172,13 +188,19 @@ func (r *requestTransactionManager) StartTransaction() RequestTransaction {
if !config.TraceTransactionManagerTransactions {
transactionLogger = zerolog.Nop()
}
- return &requestTransaction{
+ transaction := &requestTransaction{
r,
currentTransactionId,
nil,
nil,
transactionLogger,
}
+ if r.shutdown {
+ if err := r.failRequest(transaction, errors.New("request transaction manager in shutdown")); err != nil {
+ r.log.Error().Err(err).Msg("error shutting down transaction")
+ }
+ }
+ return transaction
}
func (r *requestTransactionManager) getNumberOfActiveRequests() int {
@@ -215,6 +237,39 @@ func (r *requestTransactionManager) endRequest(transaction *requestTransaction)
return nil
}
+func (r *requestTransactionManager) Close() error {
+ return r.CloseGraceful(0)
+}
+
+func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
+ r.shutdown = true
+ if timeout > 0 {
+ timer := time.NewTimer(timeout)
+ defer utils.CleanupTimer(timer)
+ signal := make(chan struct{})
+ go func() {
+ for {
+ if len(r.runningRequests) == 0 {
+ break
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+ close(signal)
+ }()
+ select {
+ case <-timer.C:
+ log.Warn().Msgf("timout after %d", timeout)
+ case <-signal:
+ }
+ }
+ r.transactionMutex.Lock()
+ defer r.transactionMutex.Unlock()
+ r.workLogMutex.RLock()
+ defer r.workLogMutex.RUnlock()
+ r.runningRequests = nil
+ return r.executor.Close()
+}
+
func (t *requestTransaction) FailRequest(err error) error {
t.transactionLog.Trace().Msg("Fail the request")
return t.parent.failRequest(t, err)
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index c46767ece0..f3d355789b 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -22,7 +22,8 @@ package transactions
import (
"container/list"
"context"
- "github.com/apache/plc4x/plc4go/spi/utils"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/pool"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@@ -33,7 +34,7 @@ import (
func TestNewRequestTransactionManager(t *testing.T) {
type args struct {
numberOfConcurrentRequests int
- requestTransactionManagerOptions []RequestTransactionManagerOption
+ requestTransactionManagerOptions []options.WithOption
}
tests := []struct {
name string
@@ -51,7 +52,7 @@ func TestNewRequestTransactionManager(t *testing.T) {
name: "just create one with option",
args: args{
numberOfConcurrentRequests: 2,
- requestTransactionManagerOptions: []RequestTransactionManagerOption{
+ requestTransactionManagerOptions: []options.WithOption{
WithCustomExecutor(sharedExecutorInstance),
},
},
@@ -73,12 +74,12 @@ func TestNewRequestTransactionManager(t *testing.T) {
func TestWithCustomExecutor(t *testing.T) {
type args struct {
- executor utils.Executor
+ executor pool.Executor
}
tests := []struct {
name string
args args
- want RequestTransactionManagerOption
+ want options.WithOption
}{
{
name: "with a option",
@@ -102,7 +103,7 @@ func Test_requestTransactionManager_SetNumberOfConcurrentRequests(t *testing.T)
numberOfConcurrentRequests int
transactionId int32
workLog list.List
- executor utils.Executor
+ executor pool.Executor
}
type args struct {
numberOfConcurrentRequests int
@@ -144,7 +145,7 @@ func Test_requestTransactionManager_StartTransaction(t *testing.T) {
numberOfConcurrentRequests int
transactionId int32
workLog list.List
- executor utils.Executor
+ executor pool.Executor
}
tests := []struct {
name string
@@ -183,7 +184,7 @@ func Test_requestTransactionManager_endRequest(t *testing.T) {
numberOfConcurrentRequests int
transactionId int32
workLog list.List
- executor utils.Executor
+ executor pool.Executor
}
type args struct {
transaction *requestTransaction
@@ -235,7 +236,7 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
numberOfConcurrentRequests int
transactionId int32
workLog list.List
- executor utils.Executor
+ executor pool.Executor
}
type args struct {
transaction *requestTransaction
@@ -287,7 +288,7 @@ func Test_requestTransactionManager_getNumberOfActiveRequests(t *testing.T) {
numberOfConcurrentRequests int
transactionId int32
workLog list.List
- executor utils.Executor
+ executor pool.Executor
}
tests := []struct {
name string
@@ -320,7 +321,7 @@ func Test_requestTransactionManager_processWorklog(t *testing.T) {
numberOfConcurrentRequests int
transactionId int32
workLog list.List
- executor utils.Executor
+ executor pool.Executor
}
tests := []struct {
name string
@@ -381,7 +382,7 @@ func Test_requestTransactionManager_submitTransaction(t *testing.T) {
numberOfConcurrentRequests int
transactionId int32
workLog list.List
- executor utils.Executor
+ executor pool.Executor
}
type args struct {
handle *requestTransaction
@@ -420,8 +421,8 @@ func Test_requestTransaction_AwaitCompletion(t1 *testing.T) {
type fields struct {
parent *requestTransactionManager
transactionId int32
- operation utils.Runnable
- completionFuture utils.CompletionFuture
+ operation pool.Runnable
+ completionFuture pool.CompletionFuture
transactionLog zerolog.Logger
}
type args struct {
@@ -489,8 +490,8 @@ func Test_requestTransaction_EndRequest(t1 *testing.T) {
type fields struct {
parent *requestTransactionManager
transactionId int32
- operation utils.Runnable
- completionFuture utils.CompletionFuture
+ operation pool.Runnable
+ completionFuture pool.CompletionFuture
transactionLog zerolog.Logger
}
tests := []struct {
@@ -526,8 +527,8 @@ func Test_requestTransaction_FailRequest(t1 *testing.T) {
type fields struct {
parent *requestTransactionManager
transactionId int32
- operation utils.Runnable
- completionFuture utils.CompletionFuture
+ operation pool.Runnable
+ completionFuture pool.CompletionFuture
transactionLog zerolog.Logger
}
type args struct {
@@ -577,8 +578,8 @@ func Test_requestTransaction_String(t1 *testing.T) {
type fields struct {
parent *requestTransactionManager
transactionId int32
- operation utils.Runnable
- completionFuture utils.CompletionFuture
+ operation pool.Runnable
+ completionFuture pool.CompletionFuture
transactionLog zerolog.Logger
}
tests := []struct {
@@ -611,8 +612,8 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
type fields struct {
parent *requestTransactionManager
transactionId int32
- operation utils.Runnable
- completionFuture utils.CompletionFuture
+ operation pool.Runnable
+ completionFuture pool.CompletionFuture
transactionLog zerolog.Logger
}
type args struct {
diff --git a/plc4go/spi/transactions/mock_RequestTransactionManager_test.go b/plc4go/spi/transactions/mock_RequestTransactionManager_test.go
index f15b138ff3..2b030692bc 100644
--- a/plc4go/spi/transactions/mock_RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/mock_RequestTransactionManager_test.go
@@ -21,7 +21,11 @@
package transactions
-import mock "github.com/stretchr/testify/mock"
+import (
+ time "time"
+
+ mock "github.com/stretchr/testify/mock"
+)
// MockRequestTransactionManager is an autogenerated mock type for the RequestTransactionManager type
type MockRequestTransactionManager struct {
@@ -36,6 +40,89 @@ func (_m *MockRequestTransactionManager) EXPECT() *MockRequestTransactionManager
return &MockRequestTransactionManager_Expecter{mock: &_m.Mock}
}
+// Close provides a mock function with given fields:
+func (_m *MockRequestTransactionManager) Close() error {
+ ret := _m.Called()
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func() error); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// MockRequestTransactionManager_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
+type MockRequestTransactionManager_Close_Call struct {
+ *mock.Call
+}
+
+// Close is a helper method to define mock.On call
+func (_e *MockRequestTransactionManager_Expecter) Close() *MockRequestTransactionManager_Close_Call {
+ return &MockRequestTransactionManager_Close_Call{Call: _e.mock.On("Close")}
+}
+
+func (_c *MockRequestTransactionManager_Close_Call) Run(run func()) *MockRequestTransactionManager_Close_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run()
+ })
+ return _c
+}
+
+func (_c *MockRequestTransactionManager_Close_Call) Return(_a0 error) *MockRequestTransactionManager_Close_Call {
+ _c.Call.Return(_a0)
+ return _c
+}
+
+func (_c *MockRequestTransactionManager_Close_Call) RunAndReturn(run func() error) *MockRequestTransactionManager_Close_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
+// CloseGraceful provides a mock function with given fields: timeout
+func (_m *MockRequestTransactionManager) CloseGraceful(timeout time.Duration) error {
+ ret := _m.Called(timeout)
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(time.Duration) error); ok {
+ r0 = rf(timeout)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// MockRequestTransactionManager_CloseGraceful_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CloseGraceful'
+type MockRequestTransactionManager_CloseGraceful_Call struct {
+ *mock.Call
+}
+
+// CloseGraceful is a helper method to define mock.On call
+// - timeout time.Duration
+func (_e *MockRequestTransactionManager_Expecter) CloseGraceful(timeout interface{}) *MockRequestTransactionManager_CloseGraceful_Call {
+ return &MockRequestTransactionManager_CloseGraceful_Call{Call: _e.mock.On("CloseGraceful", timeout)}
+}
+
+func (_c *MockRequestTransactionManager_CloseGraceful_Call) Run(run func(timeout time.Duration)) *MockRequestTransactionManager_CloseGraceful_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run(args[0].(time.Duration))
+ })
+ return _c
+}
+
+func (_c *MockRequestTransactionManager_CloseGraceful_Call) Return(_a0 error) *MockRequestTransactionManager_CloseGraceful_Call {
+ _c.Call.Return(_a0)
+ return _c
+}
+
+func (_c *MockRequestTransactionManager_CloseGraceful_Call) RunAndReturn(run func(time.Duration) error) *MockRequestTransactionManager_CloseGraceful_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
// SetNumberOfConcurrentRequests provides a mock function with given fields: numberOfConcurrentRequests
func (_m *MockRequestTransactionManager) SetNumberOfConcurrentRequests(numberOfConcurrentRequests int) {
_m.Called(numberOfConcurrentRequests)
diff --git a/plc4go/spi/transactions/mock_requirements.go b/plc4go/spi/transactions/mock_requirements.go
index 4feb95b8de..64f8171934 100644
--- a/plc4go/spi/transactions/mock_requirements.go
+++ b/plc4go/spi/transactions/mock_requirements.go
@@ -20,12 +20,12 @@
package transactions
import (
- "github.com/apache/plc4x/plc4go/spi/utils"
+ "github.com/apache/plc4x/plc4go/spi/pool"
)
// Note this file is a Helper for mockery to generate use mocks from other package
// Deprecated: don't use it in productive code
type CompletionFuture interface {
- utils.CompletionFuture
+ pool.CompletionFuture
}
diff --git a/plc4go/spi/transports/Transport.go b/plc4go/spi/transports/Transport.go
index ea0ad98d74..128c0f1ea2 100644
--- a/plc4go/spi/transports/Transport.go
+++ b/plc4go/spi/transports/Transport.go
@@ -19,7 +19,10 @@
package transports
-import "net/url"
+import (
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "net/url"
+)
type Transport interface {
// GetTransportCode Get the short code used to identify this transport (As used in the connection string)
@@ -27,5 +30,5 @@ type Transport interface {
// GetTransportName Get a human-readable name for this transport
GetTransportName() string
// CreateTransportInstance creates transport instance
- CreateTransportInstance(transportUrl url.URL, options map[string][]string) (TransportInstance, error)
+ CreateTransportInstance(transportUrl url.URL, options map[string][]string, _options ...options.WithOption) (TransportInstance, error)
}
diff --git a/plc4go/spi/transports/mock_Transport_test.go b/plc4go/spi/transports/mock_Transport_test.go
index 1637099390..0fe7e0a258 100644
--- a/plc4go/spi/transports/mock_Transport_test.go
+++ b/plc4go/spi/transports/mock_Transport_test.go
@@ -22,9 +22,10 @@
package transports
import (
- url "net/url"
-
+ options "github.com/apache/plc4x/plc4go/spi/options"
mock "github.com/stretchr/testify/mock"
+
+ url "net/url"
)
// MockTransport is an autogenerated mock type for the Transport type
@@ -40,25 +41,32 @@ func (_m *MockTransport) EXPECT() *MockTransport_Expecter {
return &MockTransport_Expecter{mock: &_m.Mock}
}
-// CreateTransportInstance provides a mock function with given fields: transportUrl, options
-func (_m *MockTransport) CreateTransportInstance(transportUrl url.URL, options map[string][]string) (TransportInstance, error) {
- ret := _m.Called(transportUrl, options)
+// CreateTransportInstance provides a mock function with given fields: transportUrl, _a1, _options
+func (_m *MockTransport) CreateTransportInstance(transportUrl url.URL, _a1 map[string][]string, _options ...options.WithOption) (TransportInstance, error) {
+ _va := make([]interface{}, len(_options))
+ for _i := range _options {
+ _va[_i] = _options[_i]
+ }
+ var _ca []interface{}
+ _ca = append(_ca, transportUrl, _a1)
+ _ca = append(_ca, _va...)
+ ret := _m.Called(_ca...)
var r0 TransportInstance
var r1 error
- if rf, ok := ret.Get(0).(func(url.URL, map[string][]string) (TransportInstance, error)); ok {
- return rf(transportUrl, options)
+ if rf, ok := ret.Get(0).(func(url.URL, map[string][]string, ...options.WithOption) (TransportInstance, error)); ok {
+ return rf(transportUrl, _a1, _options...)
}
- if rf, ok := ret.Get(0).(func(url.URL, map[string][]string) TransportInstance); ok {
- r0 = rf(transportUrl, options)
+ if rf, ok := ret.Get(0).(func(url.URL, map[string][]string, ...options.WithOption) TransportInstance); ok {
+ r0 = rf(transportUrl, _a1, _options...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(TransportInstance)
}
}
- if rf, ok := ret.Get(1).(func(url.URL, map[string][]string) error); ok {
- r1 = rf(transportUrl, options)
+ if rf, ok := ret.Get(1).(func(url.URL, map[string][]string, ...options.WithOption) error); ok {
+ r1 = rf(transportUrl, _a1, _options...)
} else {
r1 = ret.Error(1)
}
@@ -73,14 +81,22 @@ type MockTransport_CreateTransportInstance_Call struct {
// CreateTransportInstance is a helper method to define mock.On call
// - transportUrl url.URL
-// - options map[string][]string
-func (_e *MockTransport_Expecter) CreateTransportInstance(transportUrl interface{}, options interface{}) *MockTransport_CreateTransportInstance_Call {
- return &MockTransport_CreateTransportInstance_Call{Call: _e.mock.On("CreateTransportInstance", transportUrl, options)}
+// - _a1 map[string][]string
+// - _options ...options.WithOption
+func (_e *MockTransport_Expecter) CreateTransportInstance(transportUrl interface{}, _a1 interface{}, _options ...interface{}) *MockTransport_CreateTransportInstance_Call {
+ return &MockTransport_CreateTransportInstance_Call{Call: _e.mock.On("CreateTransportInstance",
+ append([]interface{}{transportUrl, _a1}, _options...)...)}
}
-func (_c *MockTransport_CreateTransportInstance_Call) Run(run func(transportUrl url.URL, options map[string][]string)) *MockTransport_CreateTransportInstance_Call {
+func (_c *MockTransport_CreateTransportInstance_Call) Run(run func(transportUrl url.URL, _a1 map[string][]string, _options ...options.WithOption)) *MockTransport_CreateTransportInstance_Call {
_c.Call.Run(func(args mock.Arguments) {
- run(args[0].(url.URL), args[1].(map[string][]string))
+ variadicArgs := make([]options.WithOption, len(args)-2)
+ for i, a := range args[2:] {
+ if a != nil {
+ variadicArgs[i] = a.(options.WithOption)
+ }
+ }
+ run(args[0].(url.URL), args[1].(map[string][]string), variadicArgs...)
})
return _c
}
@@ -90,7 +106,7 @@ func (_c *MockTransport_CreateTransportInstance_Call) Return(_a0 TransportInstan
return _c
}
-func (_c *MockTransport_CreateTransportInstance_Call) RunAndReturn(run func(url.URL, map[string][]string) (TransportInstance, error)) *MockTransport_CreateTransportInstance_Call {
+func (_c *MockTransport_CreateTransportInstance_Call) RunAndReturn(run func(url.URL, map[string][]string, ...options.WithOption) (TransportInstance, error)) *MockTransport_CreateTransportInstance_Call {
_c.Call.Return(run)
return _c
}
diff --git a/plc4go/spi/transports/pcap/Transport.go b/plc4go/spi/transports/pcap/Transport.go
index 8bc986a28a..ef455e6364 100644
--- a/plc4go/spi/transports/pcap/Transport.go
+++ b/plc4go/spi/transports/pcap/Transport.go
@@ -23,12 +23,13 @@ import (
"bufio"
"bytes"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/gopacket/gopacket"
"github.com/gopacket/gopacket/layers"
"github.com/gopacket/gopacket/pcap"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
+ "github.com/rs/zerolog"
"io"
"net/url"
"strconv"
@@ -45,10 +46,13 @@ const (
)
type Transport struct {
+ log zerolog.Logger
}
-func NewTransport() *Transport {
- return &Transport{}
+func NewTransport(_options ...options.WithOption) *Transport {
+ return &Transport{
+ log: options.ExtractCustomLogger(_options...),
+ }
}
func (m Transport) GetTransportCode() string {
@@ -59,7 +63,7 @@ func (m Transport) GetTransportName() string {
return "PCAP(NG) Playback Transport"
}
-func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[string][]string) (transports.TransportInstance, error) {
+func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[string][]string, _options ...options.WithOption) (transports.TransportInstance, error) {
var transportType = PCAP
if val, ok := options["transport-type"]; ok {
transportType = TransportType(val[0])
@@ -77,7 +81,7 @@ func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[str
}
}
- return NewPcapTransportInstance(transportUrl.Path, transportType, portRange, speedFactor, &m), nil
+ return NewPcapTransportInstance(transportUrl.Path, transportType, portRange, speedFactor, &m, _options...), nil
}
func (m Transport) String() string {
@@ -95,15 +99,19 @@ type TransportInstance struct {
handle *pcap.Handle
mutex sync.Mutex
reader *bufio.Reader
+
+ log zerolog.Logger
}
-func NewPcapTransportInstance(transportFile string, transportType TransportType, portRange string, speedFactor float32, transport *Transport) *TransportInstance {
+func NewPcapTransportInstance(transportFile string, transportType TransportType, portRange string, speedFactor float32, transport *Transport, _options ...options.WithOption) *TransportInstance {
transportInstance := &TransportInstance{
transportFile: transportFile,
transportType: transportType,
portRange: portRange,
speedFactor: speedFactor,
transport: transport,
+
+ log: options.ExtractCustomLogger(_options...),
}
transportInstance.DefaultBufferedTransportInstance = transports.NewDefaultBufferedTransportInstance(transportInstance)
return transportInstance
@@ -132,7 +140,7 @@ func (m *TransportInstance) Connect() error {
go func(m *TransportInstance, buffer *bytes.Buffer) {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
packageCount := 0
@@ -140,33 +148,33 @@ func (m *TransportInstance) Connect() error {
for m.connected {
packetData, captureInfo, err := m.handle.ReadPacketData()
packageCount++
- log.Info().Msgf("Read new package (nr. %d) %#v", packageCount, captureInfo)
+ m.log.Info().Msgf("Read new package (nr. %d) %#v", packageCount, captureInfo)
if err != nil {
if err == io.EOF {
- log.Info().Msg("Done reading pcap")
+ m.log.Info().Msg("Done reading pcap")
break
}
- log.Warn().Err(err).Msg("Error reading")
+ m.log.Warn().Err(err).Msg("Error reading")
m.connected = false
return
}
if lastPacketTime != nil && m.speedFactor != 0 {
timeToSleep := captureInfo.Timestamp.Sub(*lastPacketTime)
timeToSleep = time.Duration(int64(float64(timeToSleep) / float64(m.speedFactor)))
- log.Debug().Msgf("Sleeping for %v (Speed factor %fx)", timeToSleep, m.speedFactor)
+ m.log.Debug().Msgf("Sleeping for %v (Speed factor %fx)", timeToSleep, m.speedFactor)
time.Sleep(timeToSleep)
}
// Decode a packet
packet := gopacket.NewPacket(packetData, layers.LayerTypeEthernet, gopacket.Default)
- log.Debug().Msgf("Packet dump (nr. %d):\n%s", packageCount, packet.Dump())
+ m.log.Debug().Msgf("Packet dump (nr. %d):\n%s", packageCount, packet.Dump())
var payload []byte
switch m.transportType {
case TCP:
if tcpLayer := packet.Layer(layers.LayerTypeTCP); tcpLayer != nil {
tcp, _ := tcpLayer.(*layers.TCP)
payload = tcp.Payload
- log.Debug().Msgf("TCP: From src port %d to dst port %d", tcp.SrcPort, tcp.DstPort)
+ m.log.Debug().Msgf("TCP: From src port %d to dst port %d", tcp.SrcPort, tcp.DstPort)
} else {
continue
}
@@ -174,7 +182,7 @@ func (m *TransportInstance) Connect() error {
if tcpLayer := packet.Layer(layers.LayerTypeUDP); tcpLayer != nil {
udp, _ := tcpLayer.(*layers.UDP)
payload = udp.Payload
- log.Debug().Msgf("UDP: From src port %d to dst port %d", udp.SrcPort, udp.DstPort)
+ m.log.Debug().Msgf("UDP: From src port %d to dst port %d", udp.SrcPort, udp.DstPort)
} else {
continue
}
diff --git a/plc4go/spi/transports/serial/Transport.go b/plc4go/spi/transports/serial/Transport.go
index 6cc0067a39..8640121603 100644
--- a/plc4go/spi/transports/serial/Transport.go
+++ b/plc4go/spi/transports/serial/Transport.go
@@ -22,20 +22,27 @@ package serial
import (
"bufio"
"fmt"
- "github.com/apache/plc4x/plc4go/spi/transports"
- "github.com/jacobsa/go-serial/serial"
- "github.com/pkg/errors"
"io"
"net"
"net/url"
"strconv"
+
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/transports"
+
+ "github.com/jacobsa/go-serial/serial"
+ "github.com/pkg/errors"
+ "github.com/rs/zerolog"
)
type Transport struct {
+ log zerolog.Logger
}
-func NewTransport() *Transport {
- return &Transport{}
+func NewTransport(_options ...options.WithOption) *Transport {
+ return &Transport{
+ log: options.ExtractCustomLogger(_options...),
+ }
}
func (m Transport) GetTransportCode() string {
@@ -46,11 +53,11 @@ func (m Transport) GetTransportName() string {
return "Serial Transport"
}
-func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[string][]string) (transports.TransportInstance, error) {
- return m.CreateTransportInstanceForLocalAddress(transportUrl, options, nil)
+func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[string][]string, _options ...options.WithOption) (transports.TransportInstance, error) {
+ return m.CreateTransportInstanceForLocalAddress(transportUrl, options, nil, _options...)
}
-func (m Transport) CreateTransportInstanceForLocalAddress(transportUrl url.URL, options map[string][]string, _ *net.UDPAddr) (transports.TransportInstance, error) {
+func (m Transport) CreateTransportInstanceForLocalAddress(transportUrl url.URL, options map[string][]string, _ *net.UDPAddr, _options ...options.WithOption) (transports.TransportInstance, error) {
var serialPortName = transportUrl.Path
var baudRate = uint(115200)
@@ -73,7 +80,7 @@ func (m Transport) CreateTransportInstanceForLocalAddress(transportUrl url.URL,
}
}
- return NewTransportInstance(serialPortName, baudRate, connectTimeout, &m), nil
+ return NewTransportInstance(serialPortName, baudRate, connectTimeout, &m, _options...), nil
}
func (m Transport) String() string {
@@ -88,14 +95,18 @@ type TransportInstance struct {
transport *Transport
serialPort io.ReadWriteCloser
reader *bufio.Reader
+
+ log zerolog.Logger
}
-func NewTransportInstance(serialPortName string, baudRate uint, connectTimeout uint32, transport *Transport) *TransportInstance {
+func NewTransportInstance(serialPortName string, baudRate uint, connectTimeout uint32, transport *Transport, _options ...options.WithOption) *TransportInstance {
transportInstance := &TransportInstance{
SerialPortName: serialPortName,
BaudRate: baudRate,
ConnectTimeout: connectTimeout,
transport: transport,
+
+ log: options.ExtractCustomLogger(_options...),
}
transportInstance.DefaultBufferedTransportInstance = transports.NewDefaultBufferedTransportInstance(transportInstance)
return transportInstance
@@ -111,11 +122,11 @@ func (m *TransportInstance) Connect() error {
// Add a logging layer ...
/*logFile, err := ioutil.TempFile(os.TempDir(), "transport-logger")
if err != nil {
- log.Error().Msg("Error creating file for logging transport requests")
+ m.log.Error().Msg("Error creating file for logging transport requests")
} else {
fileLogger := zerolog.New(logFile).With().Logger()
m.serialPort = utils.NewTransportLogger(m.serialPort, utils.WithLogger(fileLogger))
- log.Trace().Msgf("Logging Transport to file %s", logFile.Name())
+ m.log.Trace().Msgf("Logging Transport to file %s", logFile.Name())
}*/
m.reader = bufio.NewReader(m.serialPort)
diff --git a/plc4go/spi/transports/tcp/Transport.go b/plc4go/spi/transports/tcp/Transport.go
index b84e8e7932..09c0fd2ba1 100644
--- a/plc4go/spi/transports/tcp/Transport.go
+++ b/plc4go/spi/transports/tcp/Transport.go
@@ -28,12 +28,16 @@ import (
"regexp"
"strconv"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/utils"
+
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
)
type Transport struct {
+ log zerolog.Logger
}
func NewTransport() *Transport {
@@ -48,7 +52,7 @@ func (m Transport) GetTransportName() string {
return "TCP/IP Socket Transport"
}
-func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[string][]string) (transports.TransportInstance, error) {
+func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[string][]string, _options ...options.WithOption) (transports.TransportInstance, error) {
connectionStringRegexp := regexp.MustCompile(`^((?P<ip>[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3})|(?P<hostname>[a-zA-Z0-9.\-]+))(:(?P<port>[0-9]{1,5}))?`)
var address string
var port int
@@ -85,7 +89,7 @@ func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[str
return nil, errors.Wrap(err, "error resolving typ address")
}
- return NewTcpTransportInstance(tcpAddr, connectTimeout, &m), nil
+ return NewTcpTransportInstance(tcpAddr, connectTimeout, &m, _options...), nil
}
func (m Transport) String() string {
@@ -100,13 +104,17 @@ type TransportInstance struct {
transport *Transport
tcpConn net.Conn
reader *bufio.Reader
+
+ log zerolog.Logger
}
-func NewTcpTransportInstance(remoteAddress *net.TCPAddr, connectTimeout uint32, transport *Transport) *TransportInstance {
+func NewTcpTransportInstance(remoteAddress *net.TCPAddr, connectTimeout uint32, transport *Transport, _options ...options.WithOption) *TransportInstance {
transportInstance := &TransportInstance{
RemoteAddress: remoteAddress,
ConnectTimeout: connectTimeout,
transport: transport,
+
+ log: options.ExtractCustomLogger(_options...),
}
transportInstance.DefaultBufferedTransportInstance = transports.NewDefaultBufferedTransportInstance(transportInstance)
return transportInstance
diff --git a/plc4go/spi/transports/test/Transport.go b/plc4go/spi/transports/test/Transport.go
index 995bd969dc..53c3f9383f 100644
--- a/plc4go/spi/transports/test/Transport.go
+++ b/plc4go/spi/transports/test/Transport.go
@@ -23,19 +23,27 @@ import (
"bufio"
"bytes"
"context"
- "github.com/apache/plc4x/plc4go/spi/transports"
- "github.com/pkg/errors"
- "github.com/rs/zerolog/log"
"math"
"net/url"
+
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/transports"
+
+ "github.com/pkg/errors"
+ "github.com/rs/zerolog"
)
type Transport struct {
preregisteredInstances map[url.URL]transports.TransportInstance
+
+ log zerolog.Logger
}
-func NewTransport() *Transport {
- return &Transport{preregisteredInstances: map[url.URL]transports.TransportInstance{}}
+func NewTransport(_options ...options.WithOption) *Transport {
+ return &Transport{
+ preregisteredInstances: map[url.URL]transports.TransportInstance{},
+ log: options.ExtractCustomLogger(_options...),
+ }
}
func (m *Transport) GetTransportCode() string {
@@ -46,15 +54,15 @@ func (m *Transport) GetTransportName() string {
return "Test Transport"
}
-func (m *Transport) CreateTransportInstance(transportUrl url.URL, options map[string][]string) (transports.TransportInstance, error) {
+func (m *Transport) CreateTransportInstance(transportUrl url.URL, options map[string][]string, _options ...options.WithOption) (transports.TransportInstance, error) {
if _, ok := options["failTestTransport"]; ok {
return nil, errors.New("test transport failed on purpose")
}
if preregisteredInstance, ok := m.preregisteredInstances[transportUrl]; ok {
- log.Trace().Msgf("Returning pre registered instance for %v", transportUrl)
+ m.log.Trace().Msgf("Returning pre registered instance for %v", transportUrl)
return preregisteredInstance, nil
}
- log.Trace().Msg("create transport instance")
+ m.log.Trace().Msg("create transport instance")
return NewTransportInstance(m), nil
}
@@ -76,19 +84,23 @@ type TransportInstance struct {
connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
+
+ log zerolog.Logger
}
-func NewTransportInstance(transport *Transport) *TransportInstance {
+func NewTransportInstance(transport *Transport, _options ...options.WithOption) *TransportInstance {
return &TransportInstance{
readBuffer: []byte{},
writeBuffer: []byte{},
connected: false,
transport: transport,
+
+ log: options.ExtractCustomLogger(_options...),
}
}
func (m *TransportInstance) Connect() error {
- log.Trace().Msg("Connect")
+ m.log.Trace().Msg("Connect")
m.connected = true
return nil
}
@@ -98,7 +110,7 @@ func (m *TransportInstance) ConnectWithContext(_ context.Context) error {
}
func (m *TransportInstance) Close() error {
- log.Trace().Msg("Close")
+ m.log.Trace().Msg("Close")
m.connected = false
return nil
}
@@ -109,7 +121,7 @@ func (m *TransportInstance) IsConnected() bool {
func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
readableBytes := len(m.readBuffer)
- log.Trace().Msgf("return number of readable bytes %d", readableBytes)
+ m.log.Trace().Msgf("return number of readable bytes %d", readableBytes)
return uint32(readableBytes), nil
}
@@ -129,7 +141,7 @@ func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, re
func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]byte, error) {
availableBytes := uint32(math.Min(float64(numBytes), float64(len(m.readBuffer))))
- log.Trace().Msgf("Peek %d readable bytes (%d available bytes)", numBytes, availableBytes)
+ m.log.Trace().Msgf("Peek %d readable bytes (%d available bytes)", numBytes, availableBytes)
var err error
if availableBytes != numBytes {
err = errors.New("not enough bytes available")
@@ -141,7 +153,7 @@ func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]byte, error) {
}
func (m *TransportInstance) Read(numBytes uint32) ([]byte, error) {
- log.Trace().Msgf("Read num bytes %d", numBytes)
+ m.log.Trace().Msgf("Read num bytes %d", numBytes)
data := m.readBuffer[0:int(numBytes)]
m.readBuffer = m.readBuffer[int(numBytes):]
return data, nil
@@ -155,23 +167,23 @@ func (m *TransportInstance) Write(data []byte) error {
if m.writeInterceptor != nil {
m.writeInterceptor(m, data)
}
- log.Trace().Msgf("Write data %#x", data)
+ m.log.Trace().Msgf("Write data %#x", data)
m.writeBuffer = append(m.writeBuffer, data...)
return nil
}
func (m *TransportInstance) FillReadBuffer(data []byte) {
- log.Trace().Msgf("FillReadBuffer with %#x", data)
+ m.log.Trace().Msgf("FillReadBuffer with %#x", data)
m.readBuffer = append(m.readBuffer, data...)
}
func (m *TransportInstance) GetNumDrainableBytes() uint32 {
- log.Trace().Msg("get number of drainable bytes")
+ m.log.Trace().Msg("get number of drainable bytes")
return uint32(len(m.writeBuffer))
}
func (m *TransportInstance) DrainWriteBuffer(numBytes uint32) []byte {
- log.Trace().Msgf("Drain write buffer with number of bytes %d", numBytes)
+ m.log.Trace().Msgf("Drain write buffer with number of bytes %d", numBytes)
data := m.writeBuffer[0:int(numBytes)]
m.writeBuffer = m.writeBuffer[int(numBytes):]
return data
diff --git a/plc4go/spi/transports/udp/Transport.go b/plc4go/spi/transports/udp/Transport.go
index caefed21ba..2d0ea13768 100644
--- a/plc4go/spi/transports/udp/Transport.go
+++ b/plc4go/spi/transports/udp/Transport.go
@@ -23,10 +23,12 @@ import (
"bufio"
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/libp2p/go-reuseport"
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
"net"
"net/url"
"regexp"
@@ -34,10 +36,13 @@ import (
)
type Transport struct {
+ log zerolog.Logger
}
-func NewTransport() *Transport {
- return &Transport{}
+func NewTransport(_options ...options.WithOption) *Transport {
+ return &Transport{
+ log: options.ExtractCustomLogger(_options...),
+ }
}
func (m Transport) GetTransportCode() string {
@@ -48,11 +53,11 @@ func (m Transport) GetTransportName() string {
return "UDP Datagram Transport"
}
-func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[string][]string) (transports.TransportInstance, error) {
- return m.CreateTransportInstanceForLocalAddress(transportUrl, options, nil)
+func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[string][]string, _options ...options.WithOption) (transports.TransportInstance, error) {
+ return m.CreateTransportInstanceForLocalAddress(transportUrl, options, nil, _options...)
}
-func (m Transport) CreateTransportInstanceForLocalAddress(transportUrl url.URL, options map[string][]string, localAddress *net.UDPAddr) (transports.TransportInstance, error) {
+func (m Transport) CreateTransportInstanceForLocalAddress(transportUrl url.URL, options map[string][]string, localAddress *net.UDPAddr, _options ...options.WithOption) (transports.TransportInstance, error) {
connectionStringRegexp := regexp.MustCompile(`^((?P<ip>[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3})|(?P<hostname>[a-zA-Z0-9.\-]+))(:(?P<port>[0-9]{1,5}))?`)
var remoteAddressString string
var remotePort int
@@ -105,7 +110,7 @@ func (m Transport) CreateTransportInstanceForLocalAddress(transportUrl url.URL,
return nil, errors.Wrap(err, "error resolving typ address")
}
- return NewTransportInstance(localAddress, remoteAddress, connectTimeout, soReUse, &m), nil
+ return NewTransportInstance(localAddress, remoteAddress, connectTimeout, soReUse, &m, _options...), nil
}
func (m Transport) String() string {
@@ -120,15 +125,19 @@ type TransportInstance struct {
transport *Transport
udpConn *net.UDPConn
reader *bufio.Reader
+
+ log zerolog.Logger
}
-func NewTransportInstance(localAddress *net.UDPAddr, remoteAddress *net.UDPAddr, connectTimeout uint32, soReUse bool, transport *Transport) *TransportInstance {
+func NewTransportInstance(localAddress *net.UDPAddr, remoteAddress *net.UDPAddr, connectTimeout uint32, soReUse bool, transport *Transport, _options ...options.WithOption) *TransportInstance {
return &TransportInstance{
LocalAddress: localAddress,
RemoteAddress: remoteAddress,
ConnectTimeout: connectTimeout,
SoReUse: soReUse,
transport: transport,
+
+ log: options.ExtractCustomLogger(_options...),
}
}