You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/01 19:27:58 UTC
[plc4x] branch develop updated: refactor(plc4go): switch from global loggers to local loggers
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 95571e9494 refactor(plc4go): switch from global loggers to local loggers
95571e9494 is described below
commit 95571e94942038cf712be7e07be1bebfaa8a8fc4
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Jun 1 21:27:50 2023 +0200
refactor(plc4go): switch from global loggers to local loggers
---
plc4go/internal/ads/Connection.go | 35 +++++-----
plc4go/internal/ads/Discoverer.go | 25 +++----
plc4go/internal/ads/DiscoveryMessageCodec.go | 25 ++++---
plc4go/internal/ads/Driver.go | 19 +++---
plc4go/internal/ads/Interactions.go | 13 ++--
plc4go/internal/ads/MessageCodec.go | 45 ++++++++-----
plc4go/internal/ads/Reader.go | 25 ++++---
plc4go/internal/ads/Subscriber.go | 20 ++++--
plc4go/internal/ads/ValueHandler.go | 14 ++--
plc4go/internal/ads/Writer.go | 17 +++--
plc4go/internal/ads/model/Configuration.go | 16 ++---
plc4go/internal/ads/model/SubscriptionEvent.go | 12 +++-
plc4go/internal/bacnetip/Connection.go | 18 +++--
plc4go/internal/bacnetip/Subscriber.go | 19 +++++-
plc4go/internal/cbus/Connection.go | 2 +-
plc4go/internal/cbus/Subscriber.go | 7 +-
plc4go/internal/cbus/SubscriptionEvent.go | 7 +-
plc4go/internal/cbus/ValueHandler.go | 7 +-
plc4go/internal/eip/Configuration.go | 12 ++--
plc4go/internal/eip/Connection.go | 64 +++++++++++-------
plc4go/internal/eip/EipDriver.go | 17 +++--
plc4go/internal/eip/MessageCodec.go | 26 ++++----
plc4go/internal/eip/Reader.go | 15 +++--
plc4go/internal/eip/ValueHandler.go | 11 ++--
plc4go/internal/eip/Writer.go | 22 ++++---
plc4go/internal/knxnetip/Browser.go | 13 ++--
plc4go/internal/knxnetip/Connection.go | 33 +++++-----
.../knxnetip/ConnectionDriverSpecificOperations.go | 17 +++--
plc4go/internal/knxnetip/ConnectionHelper.go | 13 ++--
plc4go/internal/knxnetip/Discoverer.go | 29 +++++----
plc4go/internal/knxnetip/Driver.go | 3 +-
plc4go/internal/knxnetip/MessageCodec.go | 76 ++++++++++++----------
plc4go/internal/knxnetip/Reader.go | 14 ++--
plc4go/internal/knxnetip/Subscriber.go | 14 +++-
plc4go/internal/knxnetip/SubscriptionEvent.go | 24 ++++---
plc4go/internal/modbus/Connection.go | 27 ++++----
plc4go/internal/modbus/MessageCodec.go | 20 +++---
plc4go/internal/modbus/ModbusAsciiDriver.go | 31 +++++----
plc4go/internal/modbus/ModbusRtuDriver.go | 31 +++++----
plc4go/internal/modbus/ModbusTcpDriver.go | 31 +++++----
plc4go/internal/modbus/Reader.go | 34 +++++-----
plc4go/internal/modbus/Tag.go | 9 +--
plc4go/internal/modbus/TagHandler.go | 7 +-
plc4go/internal/modbus/ValueHandler.go | 11 ++--
plc4go/internal/modbus/Writer.go | 12 ++--
plc4go/internal/s7/Configuration.go | 24 +++----
plc4go/internal/s7/Connection.go | 49 +++++++-------
plc4go/internal/s7/Driver.go | 31 +++++----
plc4go/internal/s7/MessageCodec.go | 27 ++++----
plc4go/internal/s7/Reader.go | 28 ++++----
plc4go/internal/s7/ValueHandler.go | 11 ++--
plc4go/internal/s7/Writer.go | 24 ++++---
plc4go/internal/simulated/Device.go | 24 ++++---
plc4go/internal/simulated/Device_test.go | 28 ++++----
plc4go/internal/simulated/Driver.go | 4 +-
plc4go/pkg/api/logging/init.go | 23 +++----
plc4go/spi/default/DefaultCodec.go | 4 +-
plc4go/spi/model/DefaultPlcSubscriptionEvent.go | 16 +++--
plc4go/spi/model/DefaultPlcSubscriptionResponse.go | 15 +++--
plc4go/spi/testutils/TestUtils.go | 2 +-
plc4go/spi/utils/IdGenerator.go | 4 +-
plc4go/spi/values/PlcValueHandler.go | 12 +++-
62 files changed, 729 insertions(+), 539 deletions(-)
diff --git a/plc4go/internal/ads/Connection.go b/plc4go/internal/ads/Connection.go
index 3e08d7ad8d..1b6d034a86 100644
--- a/plc4go/internal/ads/Connection.go
+++ b/plc4go/internal/ads/Connection.go
@@ -25,6 +25,7 @@ import (
"fmt"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/tracer"
+ "github.com/rs/zerolog"
"strconv"
"strings"
@@ -41,7 +42,6 @@ import (
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Connection struct {
@@ -54,9 +54,11 @@ type Connection struct {
tracer *tracer.Tracer
subscriptions map[uint32]apiModel.PlcSubscriptionHandle
+
+ log zerolog.Logger
}
-func NewConnection(messageCodec spi.MessageCodec, configuration model.Configuration, options map[string][]string, _options ...options.WithOption) (*Connection, error) {
+func NewConnection(messageCodec spi.MessageCodec, configuration model.Configuration, connectionOptions map[string][]string, _options ...options.WithOption) (*Connection, error) {
driverContext, err := NewDriverContext(configuration)
if err != nil {
return nil, err
@@ -66,18 +68,21 @@ func NewConnection(messageCodec spi.MessageCodec, configuration model.Configurat
configuration: configuration,
driverContext: driverContext,
subscriptions: map[uint32]apiModel.PlcSubscriptionHandle{},
+ log: options.ExtractCustomLogger(_options...),
}
- if traceEnabledOption, ok := options["traceEnabled"]; ok {
+ if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
// TODO: Connection Id is probably "" all the time.
connection.tracer = tracer.NewTracer(driverContext.connectionId, _options...)
}
}
tagHandler := NewTagHandlerWithDriverContext(driverContext)
- valueHandler := NewValueHandlerWithDriverContext(driverContext, tagHandler)
+ valueHandler := NewValueHandlerWithDriverContext(driverContext, tagHandler, _options...)
connection.DefaultConnection = _default.NewDefaultConnection(connection,
- _default.WithPlcTagHandler(tagHandler),
- _default.WithPlcValueHandler(valueHandler),
+ append(_options,
+ _default.WithPlcTagHandler(tagHandler),
+ _default.WithPlcValueHandler(valueHandler),
+ )...,
)
return connection, nil
}
@@ -99,7 +104,7 @@ func (m *Connection) GetConnection() plc4go.PlcConnection {
}
func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
- log.Trace().Msg("Connecting")
+ m.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
// Reset the driver context (Actually this should not be required, but just to be on the safe side)
@@ -165,7 +170,7 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed: %v", err)
+ m.log.Error().Msgf("panic-ed: %v", err)
}
}()
for message := range defaultIncomingMessageChannel {
@@ -178,13 +183,13 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
m.handleIncomingDeviceNotificationRequest(
amsTCPPacket.GetUserdata().(readWriteModel.AdsDeviceNotificationRequest))
default:
- log.Warn().Msgf("Got unexpected type of incoming ADS message %v", message)
+ m.log.Warn().Msgf("Got unexpected type of incoming ADS message %v", message)
}
default:
- log.Warn().Msgf("Got unexpected type of incoming ADS message %v", message)
+ m.log.Warn().Msgf("Got unexpected type of incoming ADS message %v", message)
}
}
- log.Info().Msg("Done waiting for messages ...")
+ m.log.Info().Msg("Done waiting for messages ...")
}()
// Subscribe for changes to the symbol or the offline-versions
@@ -194,10 +199,10 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
if event.GetResponseCode("offlineVersion") == apiModel.PlcResponseCode_OK {
newVersion := event.GetValue("offlineVersion").GetUint8()
if newVersion != m.driverContext.symbolVersion {
- log.Info().Msg("detected offline version change: reloading symbol- and data-type-table.")
+ m.log.Info().Msg("detected offline version change: reloading symbol- and data-type-table.")
err := m.readSymbolTableAndDatatypeTable(ctx)
if err != nil {
- log.Error().Err(err).Msg("error updating data-type and symbol tables")
+ m.log.Error().Err(err).Msg("error updating data-type and symbol tables")
}
}
}
@@ -207,10 +212,10 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
if event.GetResponseCode("onlineVersion") == apiModel.PlcResponseCode_OK {
newVersion := event.GetValue("onlineVersion").GetUint32()
if newVersion != m.driverContext.onlineVersion {
- log.Info().Msg("detected online version change: reloading symbol- and data-type-table.")
+ m.log.Info().Msg("detected online version change: reloading symbol- and data-type-table.")
err := m.readSymbolTableAndDatatypeTable(ctx)
if err != nil {
- log.Error().Err(err).Msg("error updating data-type and symbol tables")
+ m.log.Error().Err(err).Msg("error updating data-type and symbol tables")
}
}
}
diff --git a/plc4go/internal/ads/Discoverer.go b/plc4go/internal/ads/Discoverer.go
index f31e51c0e4..34fdc419cf 100644
--- a/plc4go/internal/ads/Discoverer.go
+++ b/plc4go/internal/ads/Discoverer.go
@@ -24,6 +24,7 @@ import (
"encoding/binary"
"fmt"
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
"net"
"net/url"
"strconv"
@@ -33,11 +34,9 @@ import (
apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
"github.com/apache/plc4x/plc4go/protocols/ads/discovery/readwrite/model"
driverModel "github.com/apache/plc4x/plc4go/protocols/ads/readwrite/model"
- "github.com/apache/plc4x/plc4go/spi"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/options"
spiValues "github.com/apache/plc4x/plc4go/spi/values"
- "github.com/rs/zerolog/log"
)
type discovery struct {
@@ -48,11 +47,13 @@ type discovery struct {
}
type Discoverer struct {
- messageCodec spi.MessageCodec
+ log zerolog.Logger
}
-func NewDiscoverer() *Discoverer {
- return &Discoverer{}
+func NewDiscoverer(_options ...options.WithOption) *Discoverer {
+ return &Discoverer{
+ log: options.ExtractCustomLogger(_options...),
+ }
}
func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
@@ -144,7 +145,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
go func(discoveryItem *discovery) {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ d.log.Error().Msgf("panic-ed %v", err)
}
}()
buf := make([]byte, 1024)
@@ -155,7 +156,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
}
discoveryResponse, err := model.AdsDiscoveryParse(buf[0:length])
if err != nil {
- log.Error().Err(err).Str("src-ip", fromAddr.String()).Msg("error decoding response")
+ d.log.Error().Err(err).Str("src-ip", fromAddr.String()).Msg("error decoding response")
continue
}
@@ -254,24 +255,24 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
// Serialize the message
bytes, err := discoveryRequestMessage.Serialize()
if err != nil {
- log.Error().Err(err).Str("broadcast-ip", discoveryItem.broadcastAddress.String()).Msg("Error serialising broadcast search packet")
+ d.log.Error().Err(err).Str("broadcast-ip", discoveryItem.broadcastAddress.String()).Msg("Error serialising broadcast search packet")
continue
}
// Create a not-connected UDP connection to the broadcast address
requestAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("%s:%d", discoveryItem.broadcastAddress.String(), model.AdsDiscoveryConstants_ADSDISCOVERYUDPDEFAULTPORT))
if err != nil {
- log.Error().Err(err).Str("broadcast-ip", discoveryItem.broadcastAddress.String()).Msg("Error resolving target socket for broadcast search")
+ d.log.Error().Err(err).Str("broadcast-ip", discoveryItem.broadcastAddress.String()).Msg("Error resolving target socket for broadcast search")
continue
}
/*localAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("%s:%d", ipv4Addr.String(), model.AdsDiscoveryConstants_ADSDISCOVERYUDPDEFAULTPORT))
if err != nil {
- log.Error().Err(err).Str("local-ip", ipv4Addr.String()).Msg("Error resolving local address for broadcast search")
+ m.log.Error().Err(err).Str("local-ip", ipv4Addr.String()).Msg("Error resolving local address for broadcast search")
continue
}
udp, err := net.DialUDP("udp4", localAddr, requestAddr)
if err != nil {
- log.Error().Err(err).Str("local-ip", ipv4Addr.String()).Str("broadcast-ip", broadcastAddress.String()).
+ m.log.Error().Err(err).Str("local-ip", ipv4Addr.String()).Str("broadcast-ip", broadcastAddress.String()).
Msg("Error creating sending udp socket for broadcast search")
continue
}*/
@@ -279,7 +280,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
// Send out the message.
_, err = discoveryItem.socket.WriteTo(bytes, requestAddr)
if err != nil {
- log.Error().Err(err).Str("broadcast-ip", discoveryItem.broadcastAddress.String()).Msg("Error sending request for broadcast search")
+ d.log.Error().Err(err).Str("broadcast-ip", discoveryItem.broadcastAddress.String()).Msg("Error sending request for broadcast search")
continue
}
}
diff --git a/plc4go/internal/ads/DiscoveryMessageCodec.go b/plc4go/internal/ads/DiscoveryMessageCodec.go
index 6d3cdf2e20..5e1d8475a2 100644
--- a/plc4go/internal/ads/DiscoveryMessageCodec.go
+++ b/plc4go/internal/ads/DiscoveryMessageCodec.go
@@ -23,18 +23,23 @@ import (
"github.com/apache/plc4x/plc4go/protocols/ads/discovery/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/default"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
+ "github.com/rs/zerolog"
)
type DiscoveryMessageCodec struct {
_default.DefaultCodec
+
+ log zerolog.Logger
}
-func NewDiscoveryMessageCodec(transportInstance transports.TransportInstance) *DiscoveryMessageCodec {
- codec := &DiscoveryMessageCodec{}
- codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance)
+func NewDiscoveryMessageCodec(transportInstance transports.TransportInstance, _options ...options.WithOption) *DiscoveryMessageCodec {
+ codec := &DiscoveryMessageCodec{
+ log: options.ExtractCustomLogger(_options...),
+ }
+ codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance, _options...)
return codec
}
@@ -43,7 +48,7 @@ func (m *DiscoveryMessageCodec) GetCodec() spi.MessageCodec {
}
func (m *DiscoveryMessageCodec) Send(message spi.Message) error {
- log.Trace().Msg("Sending message")
+ m.log.Trace().Msg("Sending message")
// Cast the message to the correct type of struct
tcpPaket := message.(model.AdsDiscovery)
// Serialize the request
@@ -63,17 +68,17 @@ func (m *DiscoveryMessageCodec) Send(message spi.Message) error {
func (m *DiscoveryMessageCodec) Receive() (spi.Message, error) {
// We need at least 6 bytes in order to know how big the packet is in total
if num, err := m.GetTransportInstance().GetNumBytesAvailableInBuffer(); (err == nil) && (num >= 6) {
- log.Debug().Msgf("we got %d readable bytes", num)
+ m.log.Debug().Msgf("we got %d readable bytes", num)
data, err := m.GetTransportInstance().PeekReadableBytes(6)
if err != nil {
- log.Warn().Err(err).Msg("error peeking")
+ m.log.Warn().Err(err).Msg("error peeking")
// TODO: Possibly clean up ...
return nil, nil
}
// Get the size of the entire packet little endian plus size of header
packetSize := (uint32(data[5]) << 24) + (uint32(data[4]) << 16) + (uint32(data[3]) << 8) + (uint32(data[2])) + 6
if num < packetSize {
- log.Debug().Msgf("Not enough bytes. Got: %d Need: %d\n", num, packetSize)
+ m.log.Debug().Msgf("Not enough bytes. Got: %d Need: %d\n", num, packetSize)
return nil, nil
}
data, err = m.GetTransportInstance().Read(packetSize)
@@ -83,13 +88,13 @@ func (m *DiscoveryMessageCodec) Receive() (spi.Message, error) {
}
tcpPacket, err := model.AdsDiscoveryParse(data)
if err != nil {
- log.Warn().Err(err).Msg("error parsing")
+ m.log.Warn().Err(err).Msg("error parsing")
// TODO: Possibly clean up ...
return nil, nil
}
return tcpPacket, nil
} else if err != nil {
- log.Warn().Err(err).Msg("Got error reading")
+ m.log.Warn().Err(err).Msg("Got error reading")
return nil, nil
}
// TODO: maybe we return here a not enough error error
diff --git a/plc4go/internal/ads/Driver.go b/plc4go/internal/ads/Driver.go
index 44667369f0..91681eebd8 100644
--- a/plc4go/internal/ads/Driver.go
+++ b/plc4go/internal/ads/Driver.go
@@ -33,13 +33,12 @@ import (
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Driver struct {
_default.DefaultDriver
- log zerolog.Logger // TODO: use it
+ log zerolog.Logger
}
func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
@@ -51,11 +50,11 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
}
func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(options))
+ m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(options))
// Get the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
- log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
+ m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
@@ -65,7 +64,7 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
// Have the transport create a new transport-instance.
transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
if err != nil {
- log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
+ m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
@@ -73,11 +72,11 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
// Create a new codec for taking care of encoding/decoding of messages
codec := NewMessageCodec(transportInstance)
- log.Debug().Msgf("working with codec %#v", codec)
+ m.log.Debug().Msgf("working with codec %#v", codec)
- configuration, err := model.ParseFromOptions(options)
+ configuration, err := model.ParseFromOptions(m.log, options)
if err != nil {
- log.Error().Err(err).Msgf("Invalid options")
+ m.log.Error().Err(err).Msgf("Invalid options")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "invalid configuration"))
return ch
@@ -90,7 +89,7 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "couldn't create connection"))
return ch
}
- log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
+ m.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
@@ -99,5 +98,5 @@ func (m *Driver) SupportsDiscovery() bool {
}
func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
- return NewDiscoverer().Discover(ctx, callback, discoveryOptions...)
+ return NewDiscoverer(options.WithCustomLogger(m.log)).Discover(ctx, callback, discoveryOptions...)
}
diff --git a/plc4go/internal/ads/Interactions.go b/plc4go/internal/ads/Interactions.go
index 00209ae811..b6d5ecc9ad 100644
--- a/plc4go/internal/ads/Interactions.go
+++ b/plc4go/internal/ads/Interactions.go
@@ -22,7 +22,6 @@ package ads
import (
"context"
"fmt"
- "github.com/rs/zerolog/log"
"time"
"github.com/apache/plc4x/plc4go/protocols/ads/readwrite/model"
@@ -34,7 +33,7 @@ func (m *Connection) ExecuteAdsReadDeviceInfoRequest(ctx context.Context) (model
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
request := m.NewAdsReadDeviceInfoRequest()
@@ -75,7 +74,7 @@ func (m *Connection) ExecuteAdsReadRequest(ctx context.Context, indexGroup uint3
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
request := m.NewAdsReadRequest(indexGroup, indexOffset, length)
@@ -116,7 +115,7 @@ func (m *Connection) ExecuteAdsWriteRequest(ctx context.Context, indexGroup uint
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
request := m.NewAdsWriteRequest(indexGroup, indexOffset, data)
@@ -157,7 +156,7 @@ func (m *Connection) ExecuteAdsReadWriteRequest(ctx context.Context, indexGroup
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
request := m.NewAdsReadWriteRequest(indexGroup, indexOffset, readLength, items, writeData)
@@ -198,7 +197,7 @@ func (m *Connection) ExecuteAdsAddDeviceNotificationRequest(ctx context.Context,
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
request := m.NewAdsAddDeviceNotificationRequest(indexGroup, indexOffset, length, transmissionMode, maxDelay, cycleTime)
@@ -239,7 +238,7 @@ func (m *Connection) ExecuteAdsDeleteDeviceNotificationRequest(ctx context.Conte
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
request := m.NewAdsDeleteDeviceNotificationRequest(notificationHandle)
diff --git a/plc4go/internal/ads/MessageCodec.go b/plc4go/internal/ads/MessageCodec.go
index c8a6859065..75f85416a4 100644
--- a/plc4go/internal/ads/MessageCodec.go
+++ b/plc4go/internal/ads/MessageCodec.go
@@ -23,6 +23,8 @@ import (
"bufio"
"context"
"encoding/binary"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/rs/zerolog"
"github.com/apache/plc4x/plc4go/protocols/ads/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
@@ -30,22 +32,31 @@ import (
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type MessageCodec struct {
_default.DefaultCodec
+
+ log zerolog.Logger
}
-func NewMessageCodec(transportInstance transports.TransportInstance) *MessageCodec {
- codec := &MessageCodec{}
- codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance, _default.WithCustomMessageHandler(
- // This just prevents the loop from aborting in the start and by returning false,
- // it makes the message go to the default channel, as this means:
- // The handler hasn't handled the message
- func(codec _default.DefaultCodecRequirements, message spi.Message) bool {
- return false
- }))
+func NewMessageCodec(transportInstance transports.TransportInstance, _options ...options.WithOption) *MessageCodec {
+ codec := &MessageCodec{
+ log: options.ExtractCustomLogger(_options...),
+ }
+ codec.DefaultCodec = _default.NewDefaultCodec(
+ codec,
+ transportInstance,
+ append(_options,
+ _default.WithCustomMessageHandler(
+ // This just prevents the loop from aborting in the start and by returning false,
+ // it makes the message go to the default channel, as this means:
+ // The handler hasn't handled the message
+ func(codec _default.DefaultCodecRequirements, message spi.Message) bool {
+ return false
+ }),
+ )...,
+ )
return codec
}
@@ -54,7 +65,7 @@ func (m *MessageCodec) GetCodec() spi.MessageCodec {
}
func (m *MessageCodec) Send(message spi.Message) error {
- log.Trace().Msg("Sending message")
+ m.log.Trace().Msg("Sending message")
// Cast the message to the correct type of struct
tcpPaket := message.(model.AmsTCPPacket)
// Serialize the request
@@ -83,15 +94,15 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
}
return numBytesAvailable < 6
}); err != nil {
- log.Warn().Err(err).Msg("error filling buffer")
+ m.log.Warn().Err(err).Msg("error filling buffer")
}
// We need at least 6 bytes in order to know how big the packet is in total
if num, err := transportInstance.GetNumBytesAvailableInBuffer(); (err == nil) && (num >= 6) {
- log.Debug().Msgf("we got %d readable bytes", num)
+ m.log.Debug().Msgf("we got %d readable bytes", num)
data, err := transportInstance.PeekReadableBytes(6)
if err != nil {
- log.Warn().Err(err).Msg("error peeking")
+ m.log.Warn().Err(err).Msg("error peeking")
// TODO: Possibly clean up ...
return nil, nil
}
@@ -106,7 +117,7 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
}
return numBytesAvailable < packetSize
}); err != nil {
- log.Warn().Err(err).Msg("error filling buffer")
+ m.log.Warn().Err(err).Msg("error filling buffer")
}
}
data, err = transportInstance.Read(packetSize)
@@ -117,13 +128,13 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
rb := utils.NewReadBufferByteBased(data, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian))
tcpPacket, err := model.AmsTCPPacketParseWithBuffer(context.Background(), rb)
if err != nil {
- log.Warn().Err(err).Msg("error parsing")
+ m.log.Warn().Err(err).Msg("error parsing")
// TODO: Possibly clean up ...
return nil, nil
}
return tcpPacket, nil
} else if err != nil {
- log.Warn().Err(err).Msg("Got error reading")
+ m.log.Warn().Err(err).Msg("Got error reading")
return nil, nil
}
// TODO: maybe we return here a not enough error error
diff --git a/plc4go/internal/ads/Reader.go b/plc4go/internal/ads/Reader.go
index 326edc4f89..b0b4255a4e 100644
--- a/plc4go/internal/ads/Reader.go
+++ b/plc4go/internal/ads/Reader.go
@@ -34,7 +34,6 @@ import (
spiValues "github.com/apache/plc4x/plc4go/spi/values"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
@@ -42,7 +41,7 @@ func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
}
func (m *Connection) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
- log.Trace().Msg("Reading")
+ m.log.Trace().Msg("Reading")
result := make(chan apiModel.PlcReadRequestResult, 1)
go func() {
defer func() {
@@ -62,7 +61,7 @@ func (m *Connection) Read(ctx context.Context, readRequest apiModel.PlcReadReque
func (m *Connection) singleRead(ctx context.Context, readRequest apiModel.PlcReadRequest, result chan apiModel.PlcReadRequestResult) {
if len(readRequest.GetTagNames()) != 1 {
result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.New("this part of the ads driver only supports single-item requests"))
- log.Debug().Msgf("this part of the ads driver only supports single-item requests. Got %d tags", len(readRequest.GetTagNames()))
+ m.log.Debug().Msgf("this part of the ads driver only supports single-item requests. Got %d tags", len(readRequest.GetTagNames()))
return
}
@@ -73,7 +72,7 @@ func (m *Connection) singleRead(ctx context.Context, readRequest apiModel.PlcRea
adsField, err := model.CastToSymbolicPlcTagFromPlcTag(tag)
if err != nil {
result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Wrap(err, "invalid tag item type"))
- log.Debug().Msgf("Invalid tag item type %T", tag)
+ m.log.Debug().Msgf("Invalid tag item type %T", tag)
return
}
// Replace the symbolic tag with a direct one
@@ -84,14 +83,14 @@ func (m *Connection) singleRead(ctx context.Context, readRequest apiModel.PlcRea
nil,
errors.Wrap(err, "invalid tag item type"),
)
- log.Debug().Msgf("Invalid tag item type %T", tag)
+ m.log.Debug().Msgf("Invalid tag item type %T", tag)
return
}
}
directAdsTag, ok := tag.(*model.DirectPlcTag)
if !ok {
result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.New("invalid tag item type"))
- log.Debug().Msgf("Invalid tag item type %T", tag)
+ m.log.Debug().Msgf("Invalid tag item type %T", tag)
return
}
@@ -119,11 +118,11 @@ func (m *Connection) singleRead(ctx context.Context, readRequest apiModel.PlcRea
responseCodes := map[string]apiModel.PlcResponseCode{}
plcValues := map[string]apiValues.PlcValue{}
for _, tagName := range readRequest.GetTagNames() {
- log.Debug().Msgf("get a tag from request with name %s", tagName)
+ m.log.Debug().Msgf("get a tag from request with name %s", tagName)
// Try to parse the value
plcValue, err := m.parsePlcValue(directAdsTag.DataType, directAdsTag.DataType.GetArrayInfo(), rb)
if err != nil {
- log.Error().Err(err).Msg("Error parsing plc value")
+ m.log.Error().Err(err).Msg("Error parsing plc value")
responseCodes[tagName] = apiModel.PlcResponseCode_INTERNAL_ERROR
} else {
plcValues[tagName] = plcValue
@@ -155,7 +154,7 @@ func (m *Connection) multiRead(ctx context.Context, readRequest apiModel.PlcRead
nil,
errors.Wrap(err, "invalid tag item type"),
)
- log.Debug().Msgf("Invalid tag item type %T", tag)
+ m.log.Debug().Msgf("Invalid tag item type %T", tag)
return
}
// Replace the symbolic tag with a direct one
@@ -166,7 +165,7 @@ func (m *Connection) multiRead(ctx context.Context, readRequest apiModel.PlcRead
nil,
errors.Wrap(err, "invalid tag item type"),
)
- log.Debug().Msgf("Invalid tag item type %T", tag)
+ m.log.Debug().Msgf("Invalid tag item type %T", tag)
return
}
}
@@ -177,7 +176,7 @@ func (m *Connection) multiRead(ctx context.Context, readRequest apiModel.PlcRead
nil,
errors.New("invalid tag item type"),
)
- log.Debug().Msgf("Invalid tag item type %T", tag)
+ m.log.Debug().Msgf("Invalid tag item type %T", tag)
return
}
@@ -235,11 +234,11 @@ func (m *Connection) multiRead(ctx context.Context, readRequest apiModel.PlcRead
}
directAdsTag := directAdsTags[tagName]
- log.Debug().Msgf("get a tag from request with name %s", tagName)
+ m.log.Debug().Msgf("get a tag from request with name %s", tagName)
// Try to parse the value
plcValue, err := m.parsePlcValue(directAdsTag.DataType, directAdsTag.DataType.GetArrayInfo(), rb)
if err != nil {
- log.Error().Err(err).Msg("Error parsing plc value")
+ m.log.Error().Err(err).Msg("Error parsing plc value")
responseCodes[tagName] = apiModel.PlcResponseCode_INTERNAL_ERROR
} else {
plcValues[tagName] = plcValue
diff --git a/plc4go/internal/ads/Subscriber.go b/plc4go/internal/ads/Subscriber.go
index 88c346adf8..cf0ced6ed1 100644
--- a/plc4go/internal/ads/Subscriber.go
+++ b/plc4go/internal/ads/Subscriber.go
@@ -21,6 +21,7 @@ package ads
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
"time"
dirverModel "github.com/apache/plc4x/plc4go/internal/ads/model"
@@ -29,7 +30,6 @@ import (
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
func (m *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
@@ -99,7 +99,7 @@ func (m *Connection) Subscribe(ctx context.Context, subscriptionRequest apiModel
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
// Iterate over all sub-results
@@ -154,6 +154,7 @@ func (m *Connection) subscribe(ctx context.Context, subscriptionRequest apiModel
subscriptionRequest,
map[string]apiModel.PlcResponseCode{tagName: apiModel.PlcResponseCode_OK},
map[string]apiModel.PlcSubscriptionHandle{tagName: subscriptionHandle},
+ options.WithCustomLogger(m.log),
),
nil,
)
@@ -165,19 +166,19 @@ func (m *Connection) subscribe(ctx context.Context, subscriptionRequest apiModel
func (m *Connection) processSubscriptionResponses(_ context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest, subscriptionResults map[string]apiModel.PlcSubscriptionRequestResult) apiModel.PlcSubscriptionRequestResult {
if len(subscriptionResults) == 1 {
- log.Debug().Msg("We got only one response, no merging required")
+ m.log.Debug().Msg("We got only one response, no merging required")
for tagName := range subscriptionResults {
return subscriptionResults[tagName]
}
}
- log.Trace().Msg("Merging requests")
+ m.log.Trace().Msg("Merging requests")
responseCodes := map[string]apiModel.PlcResponseCode{}
subscriptionHandles := map[string]apiModel.PlcSubscriptionHandle{}
var err error = nil
for _, subscriptionResult := range subscriptionResults {
if subscriptionResult.GetErr() != nil {
- log.Debug().Err(subscriptionResult.GetErr()).Msgf("Error during subscription")
+ m.log.Debug().Err(subscriptionResult.GetErr()).Msgf("Error during subscription")
if err == nil {
// Lazy initialization of multi error
err = utils.MultiError{MainError: errors.New("while aggregating results"), Errors: []error{subscriptionResult.GetErr()}}
@@ -187,7 +188,7 @@ func (m *Connection) processSubscriptionResponses(_ context.Context, subscriptio
}
} else if subscriptionResult.GetResponse() != nil {
if len(subscriptionResult.GetResponse().GetRequest().GetTagNames()) > 1 {
- log.Error().Int("numberOfTags", len(subscriptionResult.GetResponse().GetRequest().GetTagNames())).Msg("We should only get 1")
+ m.log.Error().Int("numberOfTags", len(subscriptionResult.GetResponse().GetRequest().GetTagNames())).Msg("We should only get 1")
}
for _, tagName := range subscriptionResult.GetResponse().GetRequest().GetTagNames() {
handle, err := subscriptionResult.GetResponse().GetSubscriptionHandle(tagName)
@@ -202,7 +203,12 @@ func (m *Connection) processSubscriptionResponses(_ context.Context, subscriptio
}
return spiModel.NewDefaultPlcSubscriptionRequestResult(
subscriptionRequest,
- spiModel.NewDefaultPlcSubscriptionResponse(subscriptionRequest, responseCodes, subscriptionHandles),
+ spiModel.NewDefaultPlcSubscriptionResponse(
+ subscriptionRequest,
+ responseCodes,
+ subscriptionHandles,
+ options.WithCustomLogger(m.log),
+ ),
err,
)
}
diff --git a/plc4go/internal/ads/ValueHandler.go b/plc4go/internal/ads/ValueHandler.go
index b640fbf760..613273b57d 100644
--- a/plc4go/internal/ads/ValueHandler.go
+++ b/plc4go/internal/ads/ValueHandler.go
@@ -21,6 +21,7 @@ package ads
import (
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/options"
"reflect"
"strconv"
@@ -40,14 +41,17 @@ type ValueHandler struct {
tagHandler TagHandler
}
-func NewValueHandler() ValueHandler {
- return ValueHandler{}
+func NewValueHandler(_options ...options.WithOption) ValueHandler {
+ return ValueHandler{
+ DefaultValueHandler: spiValues.NewDefaultValueHandler(_options...),
+ }
}
-func NewValueHandlerWithDriverContext(driverContext *DriverContext, tagHandler TagHandler) ValueHandler {
+func NewValueHandlerWithDriverContext(driverContext *DriverContext, tagHandler TagHandler, _options ...options.WithOption) ValueHandler {
return ValueHandler{
- driverContext: driverContext,
- tagHandler: tagHandler,
+ DefaultValueHandler: spiValues.NewDefaultValueHandler(_options...),
+ driverContext: driverContext,
+ tagHandler: tagHandler,
}
}
diff --git a/plc4go/internal/ads/Writer.go b/plc4go/internal/ads/Writer.go
index 1227a8c8e0..95ad6e5131 100644
--- a/plc4go/internal/ads/Writer.go
+++ b/plc4go/internal/ads/Writer.go
@@ -33,7 +33,6 @@ import (
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
@@ -41,7 +40,7 @@ func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
}
func (m *Connection) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
- log.Trace().Msg("Writing")
+ m.log.Trace().Msg("Writing")
result := make(chan apiModel.PlcWriteRequestResult, 1)
go func() {
defer func() {
@@ -61,7 +60,7 @@ func (m *Connection) Write(ctx context.Context, writeRequest apiModel.PlcWriteRe
func (m *Connection) singleWrite(ctx context.Context, writeRequest apiModel.PlcWriteRequest, result chan apiModel.PlcWriteRequestResult) {
if len(writeRequest.GetTagNames()) != 1 {
result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.New("this part of the ads driver only supports single-item requests"))
- log.Debug().Msgf("this part of the ads driver only supports single-item requests. Got %d tags", len(writeRequest.GetTagNames()))
+ m.log.Debug().Msgf("this part of the ads driver only supports single-item requests. Got %d tags", len(writeRequest.GetTagNames()))
return
}
@@ -76,21 +75,21 @@ func (m *Connection) singleWrite(ctx context.Context, writeRequest apiModel.PlcW
nil,
errors.Wrap(err, "invalid tag item type"),
)
- log.Debug().Msgf("Invalid tag item type %T", tag)
+ m.log.Debug().Msgf("Invalid tag item type %T", tag)
return
}
// Replace the symbolic tag with a direct one
tag, err = m.resolveSymbolicTag(ctx, adsField)
if err != nil {
result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Wrap(err, "invalid tag item type"))
- log.Debug().Msgf("Invalid tag item type %T", tag)
+ m.log.Debug().Msgf("Invalid tag item type %T", tag)
return
}
}
directAdsTag, ok := tag.(*model.DirectPlcTag)
if !ok {
result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.New("invalid tag item type"))
- log.Debug().Msgf("Invalid tag item type %T", tag)
+ m.log.Debug().Msgf("Invalid tag item type %T", tag)
return
}
@@ -145,21 +144,21 @@ func (m *Connection) multiWrite(ctx context.Context, writeRequest apiModel.PlcWr
adsField, err := model.CastToSymbolicPlcTagFromPlcTag(tag)
if err != nil {
result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Wrap(err, "invalid tag item type"))
- log.Debug().Msgf("Invalid tag item type %T", tag)
+ m.log.Debug().Msgf("Invalid tag item type %T", tag)
return
}
// Replace the symbolic tag with a direct one
tag, err = m.resolveSymbolicTag(ctx, adsField)
if err != nil {
result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Wrap(err, "invalid tag item type"))
- log.Debug().Msgf("Invalid tag item type %T", tag)
+ m.log.Debug().Msgf("Invalid tag item type %T", tag)
return
}
}
directAdsTag, ok := tag.(*model.DirectPlcTag)
if !ok {
result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.New("invalid tag item type"))
- log.Debug().Msgf("Invalid tag item type %T", tag)
+ m.log.Debug().Msgf("Invalid tag item type %T", tag)
return
}
diff --git a/plc4go/internal/ads/model/Configuration.go b/plc4go/internal/ads/model/Configuration.go
index fdda44e38d..9be1d6159c 100644
--- a/plc4go/internal/ads/model/Configuration.go
+++ b/plc4go/internal/ads/model/Configuration.go
@@ -20,12 +20,12 @@
package model
import (
+ "github.com/rs/zerolog"
"strconv"
"strings"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/ads/readwrite/model"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Configuration struct {
@@ -35,10 +35,10 @@ type Configuration struct {
TargetAmsPort uint16
}
-func ParseFromOptions(options map[string][]string) (Configuration, error) {
+func ParseFromOptions(localLogger zerolog.Logger, options map[string][]string) (Configuration, error) {
configuration := Configuration{}
- sourceAmsNetId := getFromOptions(options, "sourceAmsNetId")
+ sourceAmsNetId := getFromOptions(localLogger, options, "sourceAmsNetId")
if sourceAmsNetId == "" {
return Configuration{}, errors.New("Required parameter sourceAmsNetId missing")
}
@@ -75,7 +75,7 @@ func ParseFromOptions(options map[string][]string) (Configuration, error) {
uint8(octet5),
uint8(octet6),
)
- sourceAmsPort := getFromOptions(options, "sourceAmsPort")
+ sourceAmsPort := getFromOptions(localLogger, options, "sourceAmsPort")
if sourceAmsPort == "" {
return Configuration{}, errors.New("Required parameter sourceAmsPort missing")
}
@@ -84,7 +84,7 @@ func ParseFromOptions(options map[string][]string) (Configuration, error) {
return Configuration{}, errors.Wrap(err, "error parsing sourceAmsPort")
}
configuration.SourceAmsPort = uint16(parsedUint)
- targetAmsNetId := getFromOptions(options, "targetAmsNetId")
+ targetAmsNetId := getFromOptions(localLogger, options, "targetAmsNetId")
if sourceAmsNetId == "" {
return Configuration{}, errors.New("Required parameter targetAmsNetId missing")
}
@@ -121,7 +121,7 @@ func ParseFromOptions(options map[string][]string) (Configuration, error) {
uint8(octet5),
uint8(octet6),
)
- targetAmsPort := getFromOptions(options, "targetAmsPort")
+ targetAmsPort := getFromOptions(localLogger, options, "targetAmsPort")
if targetAmsPort == "" {
return Configuration{}, errors.New("Required parameter targetAmsPort missing")
}
@@ -134,13 +134,13 @@ func ParseFromOptions(options map[string][]string) (Configuration, error) {
return configuration, nil
}
-func getFromOptions(options map[string][]string, key string) string {
+func getFromOptions(localLogger zerolog.Logger, options map[string][]string, key string) string {
if optionValues, ok := options[key]; ok {
if len(optionValues) <= 0 {
return ""
}
if len(optionValues) > 1 {
- log.Warn().Msgf("Options %s must be unique", key)
+ localLogger.Warn().Msgf("Options %s must be unique", key)
}
return optionValues[0]
}
diff --git a/plc4go/internal/ads/model/SubscriptionEvent.go b/plc4go/internal/ads/model/SubscriptionEvent.go
index e39e732ba9..d4902825c1 100644
--- a/plc4go/internal/ads/model/SubscriptionEvent.go
+++ b/plc4go/internal/ads/model/SubscriptionEvent.go
@@ -20,6 +20,7 @@
package model
import (
+ "github.com/apache/plc4x/plc4go/spi/options"
"time"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -31,9 +32,16 @@ type SubscriptionEvent struct {
*spiModel.DefaultPlcSubscriptionEvent
}
-func NewSubscriptionEvent(tags map[string]apiModel.PlcTag, types map[string]spiModel.SubscriptionType, intervals map[string]time.Duration, responseCodes map[string]apiModel.PlcResponseCode, values map[string]values.PlcValue) SubscriptionEvent {
+func NewSubscriptionEvent(
+ tags map[string]apiModel.PlcTag,
+ types map[string]spiModel.SubscriptionType,
+ intervals map[string]time.Duration,
+ responseCodes map[string]apiModel.PlcResponseCode,
+ values map[string]values.PlcValue,
+ _options ...options.WithOption,
+) SubscriptionEvent {
subscriptionEvent := SubscriptionEvent{}
- event := spiModel.NewDefaultPlcSubscriptionEvent(&subscriptionEvent, tags, types, intervals, responseCodes, values)
+ event := spiModel.NewDefaultPlcSubscriptionEvent(&subscriptionEvent, tags, types, intervals, responseCodes, values, _options...)
subscriptionEvent.DefaultPlcSubscriptionEvent = event.(*spiModel.DefaultPlcSubscriptionEvent)
return subscriptionEvent
}
diff --git a/plc4go/internal/bacnetip/Connection.go b/plc4go/internal/bacnetip/Connection.go
index 97791c09e2..516c4b1fe5 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -27,6 +27,7 @@ import (
"github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
"sync"
"time"
@@ -47,15 +48,18 @@ type Connection struct {
connectionId string
tracer *tracer.Tracer
+
+ log zerolog.Logger
}
-func NewConnection(messageCodec spi.MessageCodec, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, options map[string][]string, _options ...options.WithOption) *Connection {
+func NewConnection(messageCodec spi.MessageCodec, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, connectionOptions map[string][]string, _options ...options.WithOption) *Connection {
connection := &Connection{
invokeIdGenerator: InvokeIdGenerator{currentInvokeId: 0},
messageCodec: messageCodec,
tm: tm,
+ log: options.ExtractCustomLogger(_options...),
}
- if traceEnabledOption, ok := options["traceEnabled"]; ok {
+ if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
connection.tracer = tracer.NewTracer(connection.connectionId, _options...)
}
@@ -80,7 +84,7 @@ func (c *Connection) GetTracer() *tracer.Tracer {
}
func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
- log.Trace().Msg("Connecting")
+ c.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
go func() {
defer func() {
@@ -96,7 +100,7 @@ func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
}
}()
for c.IsConnected() {
- log.Trace().Msg("Polling data")
+ c.log.Trace().Msg("Polling data")
incomingMessageChannel := c.messageCodec.GetDefaultIncomingMessageChannel()
timeout := time.NewTimer(20 * time.Millisecond)
defer utils.CleanupTimer(timeout)
@@ -107,7 +111,7 @@ func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
case <-timeout.C:
}
}
- log.Info().Msg("Ending incoming message transfer")
+ c.log.Info().Msg("Ending incoming message transfer")
}()
ch <- connectionConnectResult
}()
@@ -127,13 +131,13 @@ func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
}
func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
- return spiModel.NewDefaultPlcSubscriptionRequestBuilder(c.GetPlcTagHandler(), c.GetPlcValueHandler(), NewSubscriber(c))
+ return spiModel.NewDefaultPlcSubscriptionRequestBuilder(c.GetPlcTagHandler(), c.GetPlcValueHandler(), NewSubscriber(c, options.WithCustomLogger(c.log)))
}
func (c *Connection) addSubscriber(subscriber *Subscriber) {
for _, sub := range c.subscribers {
if sub == subscriber {
- log.Debug().Msgf("Subscriber %v already added", subscriber)
+ c.log.Debug().Msgf("Subscriber %v already added", subscriber)
return
}
}
diff --git a/plc4go/internal/bacnetip/Subscriber.go b/plc4go/internal/bacnetip/Subscriber.go
index adb088300c..e24aa0cfb7 100644
--- a/plc4go/internal/bacnetip/Subscriber.go
+++ b/plc4go/internal/bacnetip/Subscriber.go
@@ -21,7 +21,9 @@ package bacnetip
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
@@ -30,12 +32,16 @@ import (
type Subscriber struct {
connection *Connection
consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+
+ log zerolog.Logger
}
-func NewSubscriber(connection *Connection) *Subscriber {
+func NewSubscriber(connection *Connection, _options ...options.WithOption) *Subscriber {
return &Subscriber{
connection: connection,
consumers: make(map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer),
+
+ log: options.ExtractCustomLogger(_options...),
}
}
@@ -59,7 +65,16 @@ func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel
subscriptionValues[tagName] = spiModel.NewDefaultPlcSubscriptionHandle(m)
}
- result <- spiModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, spiModel.NewDefaultPlcSubscriptionResponse(subscriptionRequest, responseCodes, subscriptionValues), nil)
+ result <- spiModel.NewDefaultPlcSubscriptionRequestResult(
+ subscriptionRequest,
+ spiModel.NewDefaultPlcSubscriptionResponse(
+ subscriptionRequest,
+ responseCodes,
+ subscriptionValues,
+ options.WithCustomLogger(m.log),
+ ),
+ nil,
+ )
}()
return result
}
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index ec500374f8..88e574a986 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -91,7 +91,7 @@ func NewConnection(messageCodec *MessageCodec, configuration Configuration, driv
connection,
append(_options,
_default.WithPlcTagHandler(tagHandler),
- _default.WithPlcValueHandler(NewValueHandler()),
+ _default.WithPlcValueHandler(NewValueHandler(_options...)),
)...,
)
return connection
diff --git a/plc4go/internal/cbus/Subscriber.go b/plc4go/internal/cbus/Subscriber.go
index bc13fcd3bc..45f3f4318d 100644
--- a/plc4go/internal/cbus/Subscriber.go
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -74,7 +74,12 @@ func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.P
result <- spiModel.NewDefaultPlcSubscriptionRequestResult(
subscriptionRequest,
- spiModel.NewDefaultPlcSubscriptionResponse(subscriptionRequest, responseCodes, subscriptionValues),
+ spiModel.NewDefaultPlcSubscriptionResponse(
+ subscriptionRequest,
+ responseCodes,
+ subscriptionValues,
+ options.WithCustomLogger(m.log),
+ ),
nil,
)
}()
diff --git a/plc4go/internal/cbus/SubscriptionEvent.go b/plc4go/internal/cbus/SubscriptionEvent.go
index 66317a5ee5..149e2b9312 100644
--- a/plc4go/internal/cbus/SubscriptionEvent.go
+++ b/plc4go/internal/cbus/SubscriptionEvent.go
@@ -20,6 +20,7 @@
package cbus
import (
+ "github.com/apache/plc4x/plc4go/spi/options"
"time"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -40,12 +41,14 @@ func NewSubscriptionEvent(
responseCodes map[string]apiModel.PlcResponseCode,
address map[string]string,
sources map[string]string,
- values map[string]apiValues.PlcValue) SubscriptionEvent {
+ values map[string]apiValues.PlcValue,
+ _options ...options.WithOption,
+) SubscriptionEvent {
subscriptionEvent := SubscriptionEvent{
address: address,
sources: sources,
}
- event := spiModel.NewDefaultPlcSubscriptionEvent(&subscriptionEvent, tags, types, intervals, responseCodes, values)
+ event := spiModel.NewDefaultPlcSubscriptionEvent(&subscriptionEvent, tags, types, intervals, responseCodes, values, _options...)
subscriptionEvent.DefaultPlcSubscriptionEvent = event.(*spiModel.DefaultPlcSubscriptionEvent)
return subscriptionEvent
}
diff --git a/plc4go/internal/cbus/ValueHandler.go b/plc4go/internal/cbus/ValueHandler.go
index feabf555f1..2b39e1c95e 100644
--- a/plc4go/internal/cbus/ValueHandler.go
+++ b/plc4go/internal/cbus/ValueHandler.go
@@ -20,6 +20,7 @@
package cbus
import (
+ "github.com/apache/plc4x/plc4go/spi/options"
"reflect"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -33,8 +34,10 @@ type ValueHandler struct {
spiValues.DefaultValueHandler
}
-func NewValueHandler() ValueHandler {
- return ValueHandler{}
+func NewValueHandler(_options ...options.WithOption) ValueHandler {
+ return ValueHandler{
+ spiValues.NewDefaultValueHandler(_options...),
+ }
}
func (m ValueHandler) NewPlcValue(tag apiModel.PlcTag, value any) (apiValues.PlcValue, error) {
diff --git a/plc4go/internal/eip/Configuration.go b/plc4go/internal/eip/Configuration.go
index 9a3f903c32..981c3005b5 100644
--- a/plc4go/internal/eip/Configuration.go
+++ b/plc4go/internal/eip/Configuration.go
@@ -20,10 +20,10 @@
package eip
import (
+ "github.com/rs/zerolog"
"strconv"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Configuration struct {
@@ -31,19 +31,19 @@ type Configuration struct {
slot int8
}
-func ParseFromOptions(options map[string][]string) (Configuration, error) {
+func ParseFromOptions(localLogger zerolog.Logger, options map[string][]string) (Configuration, error) {
configuration := Configuration{
backplane: 1,
slot: 0,
}
- if localRackString := getFromOptions(options, "backplane"); localRackString != "" {
+ if localRackString := getFromOptions(localLogger, options, "backplane"); localRackString != "" {
parsedBackplane, err := strconv.ParseInt(localRackString, 10, 8)
if err != nil {
return Configuration{}, errors.Wrap(err, "Error parsing backplane")
}
configuration.backplane = int8(parsedBackplane)
}
- if localSlotString := getFromOptions(options, "slot"); localSlotString != "" {
+ if localSlotString := getFromOptions(localLogger, options, "slot"); localSlotString != "" {
parsedSlot, err := strconv.ParseInt(localSlotString, 10, 8)
if err != nil {
return Configuration{}, errors.Wrap(err, "Error parsing slot")
@@ -53,13 +53,13 @@ func ParseFromOptions(options map[string][]string) (Configuration, error) {
return configuration, nil
}
-func getFromOptions(options map[string][]string, key string) string {
+func getFromOptions(localLogger zerolog.Logger, options map[string][]string, key string) string {
if optionValues, ok := options[key]; ok {
if len(optionValues) <= 0 {
return ""
}
if len(optionValues) > 1 {
- log.Warn().Msgf("Options %s must be unique", key)
+ localLogger.Warn().Msgf("Options %s must be unique", key)
}
return optionValues[0]
}
diff --git a/plc4go/internal/eip/Connection.go b/plc4go/internal/eip/Connection.go
index 418922e563..fbb85e2a14 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -22,8 +22,10 @@ package eip
import (
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/tracer"
"github.com/apache/plc4x/plc4go/spi/transactions"
+ "github.com/rs/zerolog"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -33,7 +35,6 @@ import (
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
const (
@@ -58,24 +59,37 @@ type Connection struct {
useConnectionManager bool
routingAddress []readWriteModel.PathSegment
tracer *tracer.Tracer
+
+ log zerolog.Logger
}
-func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, options map[string][]string) *Connection {
+func NewConnection(
+ messageCodec spi.MessageCodec,
+ configuration Configuration,
+ driverContext DriverContext,
+ tagHandler spi.PlcTagHandler,
+ tm transactions.RequestTransactionManager,
+ connectionOptions map[string][]string,
+ _options ...options.WithOption,
+) *Connection {
connection := &Connection{
messageCodec: messageCodec,
configuration: configuration,
driverContext: driverContext,
tm: tm,
+ log: options.ExtractCustomLogger(_options...),
}
- if traceEnabledOption, ok := options["traceEnabled"]; ok {
+ if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
// TODO: Fix this.
// connection.tracer = spi.NewTracer(connection.connectionId)
}
}
connection.DefaultConnection = _default.NewDefaultConnection(connection,
- _default.WithPlcTagHandler(tagHandler),
- _default.WithPlcValueHandler(NewValueHandler()),
+ append(_options,
+ _default.WithPlcTagHandler(tagHandler),
+ _default.WithPlcValueHandler(NewValueHandler(_options...)),
+ )...,
)
// TODO: connectionPathSize
@@ -105,7 +119,7 @@ func (m *Connection) GetMessageCodec() spi.MessageCodec {
}
func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
- log.Trace().Msg("Connecting")
+ m.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
go func() {
defer func() {
@@ -121,7 +135,7 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
// For testing purposes we can skip the waiting for a complete connection
if !m.driverContext.awaitSetupComplete {
go m.setupConnection(ctx, ch)
- log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
+ m.log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
// Here we write directly and don't wait till the connection is "really" connected
// Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete
ch <- _default.NewDefaultPlcConnectionConnectResult(m, err)
@@ -144,7 +158,7 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
result <- _default.NewDefaultPlcConnectionCloseResult(nil, errors.Errorf("panic-ed %v", err))
}
}()
- log.Debug().Msg("Sending UnregisterSession EIP Packet")
+ m.log.Debug().Msg("Sending UnregisterSession EIP Packet")
_ = m.messageCodec.SendRequest(ctx, readWriteModel.NewEipDisconnectRequest(m.sessionHandle, 0, []byte(DefaultSenderContext), 0), func(message spi.Message) bool {
return true
}, func(message spi.Message) error {
@@ -152,7 +166,7 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
}, func(err error) error {
return nil
}, m.GetTtl()) //Unregister gets no response
- log.Debug().Msgf("Unregistred Session %d", m.sessionHandle)
+ m.log.Debug().Msgf("Unregistred Session %d", m.sessionHandle)
}()
return result
}
@@ -161,7 +175,7 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// List Services Request
- log.Debug().Msg("Sending ListServices Request")
+ m.log.Debug().Msg("Sending ListServices Request")
listServicesResultChan := make(chan readWriteModel.ListServicesResponse)
listServicesResultErrorChan := make(chan error)
if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewListServicesRequest(EmptySessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), uint32(0)), func(message spi.Message) bool {
@@ -175,7 +189,7 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
listServicesResponse := message.(readWriteModel.ListServicesResponse)
serviceResponse := listServicesResponse.GetTypeIds()[0].(readWriteModel.ServicesResponse)
if serviceResponse.GetSupportsCIPEncapsulation() {
- log.Debug().Msg("Device is capable of CIP over EIP encapsulation")
+ m.log.Debug().Msg("Device is capable of CIP over EIP encapsulation")
}
m.cipEncapsulationAvailable = serviceResponse.GetSupportsCIPEncapsulation()
listServicesResultChan <- listServicesResponse
@@ -183,7 +197,7 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}, func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
m.Close()
}
listServicesResultErrorChan <- errors.Wrap(err, "got error processing request")
@@ -199,7 +213,7 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Connect Register Session
- log.Debug().Msg("Sending EipConnectionRequest")
+ m.log.Debug().Msg("Sending EipConnectionRequest")
connectionResponseChan := make(chan readWriteModel.EipConnectionResponse)
connectionResponseErrorChan := make(chan error)
if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewEipConnectionRequest(EmptySessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), uint32(0)), func(message spi.Message) bool {
@@ -212,10 +226,10 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
if connectionResponse.GetStatus() == 0 {
m.sessionHandle = connectionResponse.GetSessionHandle()
m.senderContext = connectionResponse.GetSenderContext()
- log.Debug().Msgf("Got assigned with Session %d", m.sessionHandle)
+ m.log.Debug().Msgf("Got assigned with Session %d", m.sessionHandle)
connectionResponseChan <- connectionResponse
} else {
- log.Error().Msgf("Got unsuccessful status for connection request: %d", connectionResponse.GetStatus())
+ m.log.Error().Msgf("Got unsuccessful status for connection request: %d", connectionResponse.GetStatus())
connectionResponseErrorChan <- errors.New("got unsuccessful connection response")
}
} else {
@@ -248,7 +262,7 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
unconnectedDataItem := cipRRData.GetTypeIds()[1].(readWriteModel.UnConnectedDataItem)
connectionManagerResponse := unconnectedDataItem.GetService().(readWriteModel.CipConnectionManagerResponse)
m.connectionId = connectionManagerResponse.GetOtConnectionId()
- log.Debug().Msgf("Got assigned with connection if %d", m.connectionId)
+ m.log.Debug().Msgf("Got assigned with connection if %d", m.connectionId)
connectionResponseChan <- connectionResponse
} else {
connectionResponseErrorChan <- fmt.Errorf("got status code while opening Connection manager: %d", cipRRData.GetStatus())
@@ -257,7 +271,7 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}, func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
m.Close()
}
connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
@@ -270,7 +284,7 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}, func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
m.Close()
}
connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
@@ -287,7 +301,7 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
////////////////////////////////////////////////////////////////////////////////////////////////////////////
// List All Attributes
- log.Debug().Msg("Sending ListAllAttributes Request")
+ m.log.Debug().Msg("Sending ListAllAttributes Request")
listAllAttributesResponseChan := make(chan readWriteModel.GetAttributeAllResponse)
listAllAttributesErrorChan := make(chan error)
classSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(uint8(0), uint8(2)))
@@ -320,14 +334,14 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}
}
}
- log.Debug().Msgf("Connection using message router %t, using connection manager %t", m.useMessageRouter, m.useConnectionManager)
+ m.log.Debug().Msgf("Connection using message router %t, using connection manager %t", m.useMessageRouter, m.useConnectionManager)
listAllAttributesResponseChan <- response
}
return nil
}, func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
m.Close()
}
connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
@@ -355,7 +369,7 @@ func (m *Connection) fireConnectionError(err error, ch chan<- plc4go.PlcConnecti
if m.driverContext.awaitSetupComplete {
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Error during connection"))
} else {
- log.Error().Err(err).Msg("awaitSetupComplete set to false and we got a error during connect")
+ m.log.Error().Err(err).Msg("awaitSetupComplete set to false and we got a error during connect")
}
}
@@ -363,7 +377,7 @@ func (m *Connection) fireConnected(ch chan<- plc4go.PlcConnectionConnectResult)
if m.driverContext.awaitSetupComplete {
ch <- _default.NewDefaultPlcConnectionConnectResult(m, nil)
} else {
- log.Info().Msg("Successfully connected")
+ m.log.Info().Msg("Successfully connected")
}
m.SetConnected(true)
}
@@ -376,12 +390,12 @@ func (m *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
}
func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
- return spiModel.NewDefaultPlcReadRequestBuilder(m.GetPlcTagHandler(), NewReader(m.messageCodec, m.tm, m.configuration, &m.sessionHandle))
+ return spiModel.NewDefaultPlcReadRequestBuilder(m.GetPlcTagHandler(), NewReader(m.messageCodec, m.tm, m.configuration, &m.sessionHandle, options.WithCustomLogger(m.log)))
}
func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
return spiModel.NewDefaultPlcWriteRequestBuilder(
- m.GetPlcTagHandler(), m.GetPlcValueHandler(), NewWriter(m.messageCodec, m.tm, m.configuration, &m.sessionHandle, &m.senderContext))
+ m.GetPlcTagHandler(), m.GetPlcValueHandler(), NewWriter(m.messageCodec, m.tm, m.configuration, &m.sessionHandle, &m.senderContext, options.WithCustomLogger(m.log)))
}
func (m *Connection) String() string {
diff --git a/plc4go/internal/eip/EipDriver.go b/plc4go/internal/eip/EipDriver.go
index 3d15e0de30..6dc0e4f9a3 100644
--- a/plc4go/internal/eip/EipDriver.go
+++ b/plc4go/internal/eip/EipDriver.go
@@ -30,7 +30,6 @@ import (
_default "github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Driver struct {
@@ -55,11 +54,11 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
}
func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(options))
+ m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(options))
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
- log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
+ m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
@@ -69,18 +68,18 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
// Have the transport create a new transport-instance.
transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
if err != nil {
- log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
+ m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
}
codec := NewMessageCodec(transportInstance)
- log.Debug().Msgf("working with codec %#v", codec)
+ m.log.Debug().Msgf("working with codec %#v", codec)
- configuration, err := ParseFromOptions(options)
+ configuration, err := ParseFromOptions(m.log, options)
if err != nil {
- log.Error().Err(err).Msgf("Invalid options")
+ m.log.Error().Err(err).Msgf("Invalid options")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
return ch
@@ -88,7 +87,7 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
driverContext, err := NewDriverContext(configuration)
if err != nil {
- log.Error().Err(err).Msgf("Invalid options")
+ m.log.Error().Err(err).Msgf("Invalid options")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
return ch
@@ -98,7 +97,7 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
// Create the new connection
connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, options)
- log.Debug().Msg("created connection, connecting now")
+ m.log.Debug().Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/eip/MessageCodec.go b/plc4go/internal/eip/MessageCodec.go
index 9606863525..b7717cffd6 100644
--- a/plc4go/internal/eip/MessageCodec.go
+++ b/plc4go/internal/eip/MessageCodec.go
@@ -22,6 +22,8 @@ package eip
import (
"context"
"encoding/binary"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/rs/zerolog"
"github.com/apache/plc4x/plc4go/protocols/eip/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
@@ -29,16 +31,18 @@ import (
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type MessageCodec struct {
_default.DefaultCodec
+ log zerolog.Logger
}
-func NewMessageCodec(transportInstance transports.TransportInstance) *MessageCodec {
- codec := &MessageCodec{}
- codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance)
+func NewMessageCodec(transportInstance transports.TransportInstance, _options ...options.WithOption) *MessageCodec {
+ codec := &MessageCodec{
+ log: options.ExtractCustomLogger(_options...),
+ }
+ codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance, _options...)
return codec
}
@@ -47,7 +51,7 @@ func (m *MessageCodec) GetCodec() spi.MessageCodec {
}
func (m *MessageCodec) Send(message spi.Message) error {
- log.Trace().Msg("Sending message")
+ m.log.Trace().Msg("Sending message")
// Cast the message to the correct type of struct
eipPacket := message.(model.EipPacket)
// Serialize the request
@@ -68,35 +72,35 @@ func (m *MessageCodec) Send(message spi.Message) error {
func (m *MessageCodec) Receive() (spi.Message, error) {
// We need at least 6 bytes in order to know how big the packet is in total
if num, err := m.GetTransportInstance().GetNumBytesAvailableInBuffer(); (err == nil) && (num >= 4) {
- log.Debug().Msgf("we got %d readable bytes", num)
+ m.log.Debug().Msgf("we got %d readable bytes", num)
data, err := m.GetTransportInstance().PeekReadableBytes(4)
if err != nil {
- log.Warn().Err(err).Msg("error peeking")
+ m.log.Warn().Err(err).Msg("error peeking")
// TODO: Possibly clean up ...
return nil, nil
}
//Second byte for the size and then add the header size 24
packetSize := uint32(((uint16(data[3]) << 8) + uint16(data[2])) + 24)
if num < packetSize {
- log.Debug().Msgf("Not enough bytes. Got: %d Need: %d\n", num, packetSize)
+ m.log.Debug().Msgf("Not enough bytes. Got: %d Need: %d\n", num, packetSize)
return nil, nil
}
data, err = m.GetTransportInstance().Read(packetSize)
if err != nil {
- log.Debug().Err(err).Msg("Error reading")
+ m.log.Debug().Err(err).Msg("Error reading")
// TODO: Possibly clean up ...
return nil, nil
}
rb := utils.NewReadBufferByteBased(data, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian))
eipPacket, err := model.EipPacketParseWithBuffer(context.Background(), rb, true)
if err != nil {
- log.Warn().Err(err).Msg("error parsing")
+ m.log.Warn().Err(err).Msg("error parsing")
// TODO: Possibly clean up ...
return nil, nil
}
return eipPacket, nil
} else if err != nil {
- log.Warn().Err(err).Msg("Got error reading")
+ m.log.Warn().Err(err).Msg("Got error reading")
return nil, nil
}
// TODO: maybe we return here a not enough error error
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 5105835298..61b5503c6f 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -23,7 +23,9 @@ import (
"context"
"encoding/binary"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transactions"
+ "github.com/rs/zerolog"
"regexp"
"strconv"
"time"
@@ -37,7 +39,6 @@ import (
spiValues "github.com/apache/plc4x/plc4go/spi/values"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Reader struct {
@@ -45,20 +46,24 @@ type Reader struct {
tm transactions.RequestTransactionManager
configuration Configuration
sessionHandle *uint32
+
+ log zerolog.Logger
}
-func NewReader(messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager, configuration Configuration, sessionHandle *uint32) *Reader {
+func NewReader(messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager, configuration Configuration, sessionHandle *uint32, _options ...options.WithOption) *Reader {
return &Reader{
messageCodec: messageCodec,
tm: tm,
configuration: configuration,
sessionHandle: sessionHandle,
+
+ log: options.ExtractCustomLogger(_options...),
}
}
func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
// TODO: handle ctx
- log.Trace().Msg("Reading")
+ m.log.Trace().Msg("Reading")
result := make(chan apiModel.PlcReadRequestResult, 1)
go func() {
defer func() {
@@ -106,7 +111,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
cipRRData := message.(readWriteModel.CipRRData)
unconnectedDataItem := cipRRData.GetTypeIds()[1].(readWriteModel.UnConnectedDataItem)
// Convert the eip response into a PLC4X response
- log.Trace().Msg("convert response to PLC4X response")
+ m.log.Trace().Msg("convert response to PLC4X response")
readResponse, err := m.ToPlc4xReadResponse(unconnectedDataItem.GetService(), readRequest)
if err != nil {
result <- spiModel.NewDefaultPlcReadRequestResult(
@@ -257,7 +262,7 @@ func (m *Reader) ToPlc4xReadResponse(response readWriteModel.CipService, readReq
}
// Return the response
- log.Trace().Msg("Returning the response")
+ m.log.Trace().Msg("Returning the response")
return spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues), nil
}
diff --git a/plc4go/internal/eip/ValueHandler.go b/plc4go/internal/eip/ValueHandler.go
index 51abb49013..438a0feca5 100644
--- a/plc4go/internal/eip/ValueHandler.go
+++ b/plc4go/internal/eip/ValueHandler.go
@@ -20,13 +20,16 @@
package eip
import (
- "github.com/apache/plc4x/plc4go/spi/values"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ spiValues "github.com/apache/plc4x/plc4go/spi/values"
)
type ValueHandler struct {
- values.DefaultValueHandler
+ spiValues.DefaultValueHandler
}
-func NewValueHandler() ValueHandler {
- return ValueHandler{}
+func NewValueHandler(_options ...options.WithOption) ValueHandler {
+ return ValueHandler{
+ spiValues.NewDefaultValueHandler(_options...),
+ }
}
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index 195f753e2a..52cef6d3f3 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -22,7 +22,9 @@ package eip
import (
"context"
"encoding/binary"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transactions"
+ "github.com/rs/zerolog"
"strings"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -33,7 +35,6 @@ import (
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Writer struct {
@@ -42,15 +43,18 @@ type Writer struct {
configuration Configuration
sessionHandle *uint32
senderContext *[]uint8
+
+ log zerolog.Logger
}
-func NewWriter(messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager, configuration Configuration, sessionHandle *uint32, senderContext *[]uint8) Writer {
+func NewWriter(messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager, configuration Configuration, sessionHandle *uint32, senderContext *[]uint8, _options ...options.WithOption) Writer {
return Writer{
messageCodec: messageCodec,
tm: tm,
configuration: configuration,
sessionHandle: sessionHandle,
senderContext: senderContext,
+ log: options.ExtractCustomLogger(_options...),
}
}
@@ -100,7 +104,7 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
/* if len(items) == 1 {
// Assemble the finished paket
- log.Trace().Msg("Assemble paket")
+ m.log.Trace().Msg("Assemble paket")
pkt := readWriteModel.NewCipRRData(
readWriteModel.NewCipExchange(
readWriteModel.NewCipUnconnectedRequest(
@@ -140,12 +144,12 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
return true
}, func(message spi.Message) error {
// Convert the response into an
- log.Trace().Msg("convert response to ")
+ m.log.Trace().Msg("convert response to ")
eipPacket := message.(readWriteModel.EipPacket)
cipRRData := eipPacket.(readWriteModel.CipRRData)
cipWriteResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipWriteResponse)
// Convert the eip response into a PLC4X response
- log.Trace().Msg("convert response to PLC4X response")
+ m.log.Trace().Msg("convert response to PLC4X response")
readResponse, err := m.ToPlc4xWriteResponse(cipWriteResponse, writeRequest)
if err != nil {
@@ -188,7 +192,7 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
data := readWriteModel.NewServices(nb, offsets, serviceArr, 0)
// Assemble the finished paket
- log.Trace().Msg("Assemble paket")
+ m.log.Trace().Msg("Assemble paket")
pkt := readWriteModel.NewCipRRData(
readWriteModel.NewCipExchange(
readWriteModel.NewCipUnconnectedRequest(
@@ -231,12 +235,12 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
return true
}, func(message spi.Message) error {
// Convert the response into an
- log.Trace().Msg("convert response to ")
+ m.log.Trace().Msg("convert response to ")
eipPacket := message.(readWriteModel.EipPacket)
cipRRData := eipPacket.(readWriteModel.CipRRData)
multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
// Convert the eip response into a PLC4X response
- log.Trace().Msg("convert response to PLC4X response")
+ m.log.Trace().Msg("convert response to PLC4X response")
readResponse, err := m.ToPlc4xWriteResponse(multipleServiceResponse, writeRequest)
if err != nil {
@@ -340,6 +344,6 @@ func (m Writer) ToPlc4xWriteResponse(response readWriteModel.CipService, writeRe
}
// Return the response
- log.Trace().Msg("Returning the response")
+ m.log.Trace().Msg("Returning the response")
return spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil
}
diff --git a/plc4go/internal/knxnetip/Browser.go b/plc4go/internal/knxnetip/Browser.go
index fffc0ecf73..585e8807bc 100644
--- a/plc4go/internal/knxnetip/Browser.go
+++ b/plc4go/internal/knxnetip/Browser.go
@@ -24,6 +24,7 @@ import (
"encoding/hex"
"fmt"
"github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/rs/zerolog"
"strconv"
"strings"
"time"
@@ -37,7 +38,6 @@ import (
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Browser struct {
@@ -45,6 +45,8 @@ type Browser struct {
connection *Connection
messageCodec spi.MessageCodec
sequenceCounter uint8
+
+ log zerolog.Logger
}
func NewBrowser(connection *Connection, messageCodec spi.MessageCodec, _options ...options.WithOption) *Browser {
@@ -52,6 +54,7 @@ func NewBrowser(connection *Connection, messageCodec spi.MessageCodec, _options
connection: connection,
messageCodec: messageCodec,
sequenceCounter: 0,
+ log: options.ExtractCustomLogger(_options...),
}
browser.DefaultBrowser = _default.NewDefaultBrowser(browser, _options...)
return &browser
@@ -62,7 +65,7 @@ func (m Browser) BrowseQuery(ctx context.Context, interceptor func(result apiMod
case DeviceQuery:
queryResults, err := m.executeDeviceQuery(ctx, query.(DeviceQuery), interceptor)
if err != nil {
- log.Warn().Err(err).Msg("Error executing device query")
+ m.log.Warn().Err(err).Msg("Error executing device query")
return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
} else {
return apiModel.PlcResponseCode_OK, queryResults
@@ -70,7 +73,7 @@ func (m Browser) BrowseQuery(ctx context.Context, interceptor func(result apiMod
case CommunicationObjectQuery:
queryResults, err := m.executeCommunicationObjectQuery(ctx, query.(CommunicationObjectQuery))
if err != nil {
- log.Warn().Err(err).Msg("Error executing device query")
+ m.log.Warn().Err(err).Msg("Error executing device query")
return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
} else {
return apiModel.PlcResponseCode_OK, queryResults
@@ -377,7 +380,7 @@ func (m Browser) executeCommunicationObjectQuery(ctx context.Context, query Comm
data := []uint8{uint8((comObjectSettings >> 8) & 0xFF), uint8(comObjectSettings & 0xFF)}
descriptor, err := driverModel.GroupObjectDescriptorRealisationTypeBParse(data)
if err != nil {
- log.Info().Err(err).Msg("error parsing com object descriptor")
+ m.log.Info().Err(err).Msg("error parsing com object descriptor")
continue
}
@@ -511,7 +514,7 @@ func (m Browser) executeCommunicationObjectQuery(ctx context.Context, query Comm
readResult = <-rrr
if readResult.GetResponse().GetResponseCode("comObjectTableAddress") == apiModel.PlcResponseCode_OK {
comObjectTableAddress := readResult.GetResponse().GetValue("comObjectTableAddress").GetUint16()
- log.Info().Msgf("Com Object Table Address: %x", comObjectTableAddress)
+ m.log.Info().Msgf("Com Object Table Address: %x", comObjectTableAddress)
}
}
diff --git a/plc4go/internal/knxnetip/Connection.go b/plc4go/internal/knxnetip/Connection.go
index 9646c73672..887e9548da 100644
--- a/plc4go/internal/knxnetip/Connection.go
+++ b/plc4go/internal/knxnetip/Connection.go
@@ -26,6 +26,7 @@ import (
"fmt"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/tracer"
+ "github.com/rs/zerolog"
"strconv"
"strings"
"sync"
@@ -42,7 +43,6 @@ import (
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type ConnectionMetadata struct {
@@ -136,6 +136,8 @@ type Connection struct {
connectionId string
tracer *tracer.Tracer
+
+ log zerolog.Logger
}
func (m *Connection) String() string {
@@ -163,13 +165,13 @@ type KnxDeviceAuthenticateResult struct {
}
type InternalResult struct {
- responsemessage spi.Message
+ responseMessage spi.Message
err error
}
-func NewConnection(transportInstance transports.TransportInstance, options map[string][]string, tagHandler spi.PlcTagHandler, _options ...options.WithOption) *Connection {
+func NewConnection(transportInstance transports.TransportInstance, connectionOptions map[string][]string, tagHandler spi.PlcTagHandler, _options ...options.WithOption) *Connection {
connection := &Connection{
- options: options,
+ options: connectionOptions,
tagHandler: tagHandler,
valueHandler: NewValueHandler(),
requestInterceptor: interceptors.NewSingleItemRequestInterceptor(
@@ -186,16 +188,17 @@ func NewConnection(transportInstance transports.TransportInstance, options map[s
defaultTtl: time.Second * 10,
DeviceConnections: map[driverModel.KnxAddress]*KnxDeviceConnection{},
handleTunnelingRequests: true,
+ log: options.ExtractCustomLogger(_options...),
}
connection.connectionTtl = connection.defaultTtl * 2
- if traceEnabledOption, ok := options["traceEnabled"]; ok {
+ if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
connection.tracer = tracer.NewTracer(connection.connectionId, _options...)
}
}
// If a building key was provided, save that in a dedicated variable
- if buildingKey, ok := options["buildingKey"]; ok {
+ if buildingKey, ok := connectionOptions["buildingKey"]; ok {
bc, err := hex.DecodeString(buildingKey[0])
if err == nil {
connection.buildingKey = bc
@@ -298,11 +301,11 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
// Create a go routine to handle incoming tunneling-requests which haven't been
// handled by any other handler. This is where usually the GroupValueWrite messages
// are being handled.
- log.Debug().Msg("Starting tunneling handler")
+ m.log.Debug().Msg("Starting tunneling handler")
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
defaultIncomingMessageChannel := m.messageCodec.GetDefaultIncomingMessageChannel()
@@ -312,15 +315,15 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
if !ok {
tunnelingResponse, ok := incomingMessage.(driverModel.TunnelingResponseExactly)
if ok {
- log.Warn().Msgf("Got an unhandled TunnelingResponse message %v\n", tunnelingResponse)
+ m.log.Warn().Msgf("Got an unhandled TunnelingResponse message %v\n", tunnelingResponse)
} else {
- log.Warn().Msgf("Not a TunnelingRequest or TunnelingResponse message %v\n", incomingMessage)
+ m.log.Warn().Msgf("Not a TunnelingRequest or TunnelingResponse message %v\n", incomingMessage)
}
continue
}
if tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
- log.Warn().Msgf("Not for this connection %v\n", tunnelingRequest)
+ m.log.Warn().Msgf("Not for this connection %v\n", tunnelingRequest)
continue
}
@@ -353,7 +356,7 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
m.handleIncomingTunnelingRequest(ctx, tunnelingRequest)
}
}
- log.Warn().Msg("Tunneling handler shat down")
+ m.log.Warn().Msg("Tunneling handler shat down")
}()
// Fire the "connected" event
@@ -375,7 +378,7 @@ func (m *Connection) doSomethingAndClose(something func()) {
something()
err := m.messageCodec.Disconnect()
if err != nil {
- log.Warn().Msgf("error closing connection: %s", err)
+ m.log.Warn().Msgf("error closing connection: %s", err)
}
}
@@ -422,7 +425,7 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
case <-ttlTimer.C:
ttlTimer.Stop()
// If we got a timeout here, well just continue the device will just auto disconnect.
- log.Debug().Msgf("Timeout disconnecting from device %s.", KnxAddressToString(targetAddress))
+ m.log.Debug().Msgf("Timeout disconnecting from device %s.", KnxAddressToString(targetAddress))
}
}
@@ -497,7 +500,7 @@ func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
func (m *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
return spiModel.NewDefaultPlcSubscriptionRequestBuilder(
- m.tagHandler, m.valueHandler, NewSubscriber(m))
+ m.tagHandler, m.valueHandler, NewSubscriber(m, options.WithCustomLogger(m.log)))
}
func (m *Connection) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder {
diff --git a/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go b/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
index 54bb0bc1e6..44173c8334 100644
--- a/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
+++ b/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
@@ -30,7 +30,6 @@ import (
"github.com/apache/plc4x/plc4go/spi/utils"
spiValues "github.com/apache/plc4x/plc4go/spi/values"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -66,7 +65,7 @@ func (m *Connection) ReadGroupAddress(ctx context.Context, groupAddress []byte,
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
groupAddressReadResponse, err := m.sendGroupAddressReadRequest(ctx, groupAddress)
@@ -126,7 +125,7 @@ func (m *Connection) DeviceConnect(ctx context.Context, targetAddress driverMode
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
// If we're already connected, use that connection instead.
@@ -186,7 +185,7 @@ func (m *Connection) DeviceConnect(ctx context.Context, targetAddress driverMode
if err == nil {
deviceApduSize = plcValue.GetUint16()
} else {
- log.Debug().Err(err).Msgf("Error parsing knx property")
+ m.log.Debug().Err(err).Msgf("Error parsing knx property")
}
}
}
@@ -221,7 +220,7 @@ func (m *Connection) DeviceDisconnect(ctx context.Context, targetAddress driverM
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
if connection, ok := m.DeviceConnections[targetAddress]; ok {
@@ -259,7 +258,7 @@ func (m *Connection) DeviceAuthenticate(ctx context.Context, targetAddress drive
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
// Check if there is already a connection available,
@@ -319,7 +318,7 @@ func (m *Connection) DeviceReadProperty(ctx context.Context, targetAddress drive
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
// Check if there is already a connection available,
@@ -403,7 +402,7 @@ func (m *Connection) DeviceReadPropertyDescriptor(ctx context.Context, targetAdd
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
// Check if there is already a connection available,
@@ -468,7 +467,7 @@ func (m *Connection) DeviceReadMemory(ctx context.Context, targetAddress driverM
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
// Set a default datatype, if none is specified
diff --git a/plc4go/internal/knxnetip/ConnectionHelper.go b/plc4go/internal/knxnetip/ConnectionHelper.go
index c26cc9734d..d68cdd5626 100644
--- a/plc4go/internal/knxnetip/ConnectionHelper.go
+++ b/plc4go/internal/knxnetip/ConnectionHelper.go
@@ -32,7 +32,6 @@ import (
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/transports/udp"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -55,7 +54,7 @@ func (m *Connection) handleIncomingTunnelingRequest(ctx context.Context, tunneli
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
lDataInd, ok := tunnelingRequest.GetCemi().(driverModel.LDataIndExactly)
@@ -88,7 +87,7 @@ func (m *Connection) handleIncomingTunnelingRequest(ctx context.Context, tunneli
// If this is an individual address, and it is targeted at us, we need to ack that.
targetAddress := ByteArrayToKnxAddress(dataFrame.GetDestinationAddress())
if targetAddress == m.ClientKnxAddress {
- log.Info().Msg("Acknowleding an unhandled data message.")
+ m.log.Info().Msg("Acknowleding an unhandled data message.")
_ = m.sendDeviceAck(ctx, dataFrame.GetSourceAddress(), dataFrame.GetApdu().GetCounter(), func(err error) {})
}
}
@@ -99,12 +98,12 @@ func (m *Connection) handleIncomingTunnelingRequest(ctx context.Context, tunneli
// If this is an individual address, and it is targeted at us, we need to ack that.
targetAddress := ByteArrayToKnxAddress(dataFrame.GetDestinationAddress())
if targetAddress == m.ClientKnxAddress {
- log.Info().Msg("Acknowleding an unhandled contol message.")
+ m.log.Info().Msg("Acknowleding an unhandled contol message.")
_ = m.sendDeviceAck(ctx, dataFrame.GetSourceAddress(), dataFrame.GetApdu().GetCounter(), func(err error) {})
}
}
default:
- log.Info().Msg("Unknown unhandled message.")
+ m.log.Info().Msg("Unknown unhandled message.")
}
}()
}
@@ -153,7 +152,7 @@ func (m *Connection) resetTimeout() {
}
func (m *Connection) resetConnection() {
- log.Warn().Msg("Reset connection")
+ m.log.Warn().Msg("Reset connection")
}
func (m *Connection) getGroupAddressNumLevels() uint8 {
@@ -170,7 +169,7 @@ func (m *Connection) getGroupAddressNumLevels() uint8 {
func (m *Connection) addSubscriber(subscriber *Subscriber) {
for _, sub := range m.subscribers {
if sub == subscriber {
- log.Debug().Msgf("Subscriber %v already added", subscriber)
+ m.log.Debug().Msgf("Subscriber %v already added", subscriber)
return
}
}
diff --git a/plc4go/internal/knxnetip/Discoverer.go b/plc4go/internal/knxnetip/Discoverer.go
index 45857edbe0..89b7bc0353 100644
--- a/plc4go/internal/knxnetip/Discoverer.go
+++ b/plc4go/internal/knxnetip/Discoverer.go
@@ -24,7 +24,7 @@ import (
"context"
"fmt"
"github.com/apache/plc4x/plc4go/spi/pool"
- "github.com/rs/zerolog/log"
+ "github.com/rs/zerolog"
"net"
"net/url"
"sync"
@@ -44,13 +44,16 @@ type Discoverer struct {
transportInstanceCreationQueue pool.Executor
deviceScanningWorkItemId atomic.Int32
deviceScanningQueue pool.Executor
+
+ log zerolog.Logger
}
-func NewDiscoverer() *Discoverer {
+func NewDiscoverer(_options ...options.WithOption) *Discoverer {
return &Discoverer{
// TODO: maybe a dynamic executor would be better to not waste cycles when not in use
- transportInstanceCreationQueue: pool.NewFixedSizeExecutor(50, 100),
- deviceScanningQueue: pool.NewFixedSizeExecutor(50, 100),
+ transportInstanceCreationQueue: pool.NewFixedSizeExecutor(50, 100, _options...),
+ deviceScanningQueue: pool.NewFixedSizeExecutor(50, 100, _options...),
+ log: options.ExtractCustomLogger(_options...),
}
}
@@ -101,7 +104,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
go func(netInterface net.Interface) {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ d.log.Error().Msgf("panic-ed %v", err)
}
}()
defer func() { wg.Done() }()
@@ -132,14 +135,14 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
}
go func() {
wg.Wait()
- log.Trace().Msg("Closing transport instance channel")
+ d.log.Trace().Msg("Closing transport instance channel")
close(transportInstances)
}()
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ d.log.Error().Msgf("panic-ed %v", err)
}
}()
for transportInstance := range transportInstances {
@@ -158,27 +161,27 @@ func (d *Discoverer) createTransportInstanceDispatcher(ctx context.Context, wg *
udpTransport.CreateTransportInstanceForLocalAddress(*connectionUrl, nil,
&net.UDPAddr{IP: ipv4Addr, Port: 0})
if err != nil {
- log.Error().Err(err).Msg("error creating transport instance")
+ d.log.Error().Err(err).Msg("error creating transport instance")
return
}
err = transportInstance.ConnectWithContext(ctx)
if err != nil {
- log.Debug().Err(err).Msg("Error Connecting")
+ d.log.Debug().Err(err).Msg("Error Connecting")
return
}
- log.Debug().Msgf("Adding transport instance to scan %v", transportInstance)
+ d.log.Debug().Msgf("Adding transport instance to scan %v", transportInstance)
transportInstances <- transportInstance
}
}
func (d *Discoverer) createDeviceScanDispatcher(udpTransportInstance *udp.TransportInstance, callback func(event apiModel.PlcDiscoveryItem)) pool.Runnable {
return func() {
- log.Debug().Msgf("Scanning %v", udpTransportInstance)
+ d.log.Debug().Msgf("Scanning %v", udpTransportInstance)
// Create a codec for sending and receiving messages.
codec := NewMessageCodec(udpTransportInstance, nil)
// Explicitly start the worker
if err := codec.Connect(); err != nil {
- log.Error().Err(err).Msg("Error connecting")
+ d.log.Error().Err(err).Msg("Error connecting")
return
}
@@ -191,7 +194,7 @@ func (d *Discoverer) createDeviceScanDispatcher(udpTransportInstance *udp.Transp
searchRequestMessage := driverModel.NewSearchRequest(discoveryEndpoint)
// Send the search request.
if err := codec.Send(searchRequestMessage); err != nil {
- log.Debug().Err(err).Msgf("Error sending message:\n%s", searchRequestMessage)
+ d.log.Debug().Err(err).Msgf("Error sending message:\n%s", searchRequestMessage)
return
}
// Keep on reading responses till the timeout is done.
diff --git a/plc4go/internal/knxnetip/Driver.go b/plc4go/internal/knxnetip/Driver.go
index 426b228c00..494a9c1ed9 100644
--- a/plc4go/internal/knxnetip/Driver.go
+++ b/plc4go/internal/knxnetip/Driver.go
@@ -30,7 +30,6 @@ import (
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Driver struct {
@@ -72,7 +71,7 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
// Create the new connection
connection := NewConnection(transportInstance, options, m.GetPlcTagHandler())
- log.Trace().Str("transport", transportUrl.String()).Stringer("connection", connection).Msg("created new connection instance, trying to connect now")
+ m.log.Trace().Str("transport", transportUrl.String()).Stringer("connection", connection).Msg("created new connection instance, trying to connect now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/knxnetip/MessageCodec.go b/plc4go/internal/knxnetip/MessageCodec.go
index 0b7fc76cdd..ef45dc2646 100644
--- a/plc4go/internal/knxnetip/MessageCodec.go
+++ b/plc4go/internal/knxnetip/MessageCodec.go
@@ -23,25 +23,29 @@ import (
"github.com/apache/plc4x/plc4go/protocols/knxnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/default"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
+ "github.com/rs/zerolog"
)
type MessageCodec struct {
_default.DefaultCodec
sequenceCounter int32
messageInterceptor func(message spi.Message)
+
+ log zerolog.Logger
}
-func NewMessageCodec(transportInstance transports.TransportInstance, messageInterceptor func(message spi.Message)) *MessageCodec {
+func NewMessageCodec(transportInstance transports.TransportInstance, messageInterceptor func(message spi.Message), _options ...options.WithOption) *MessageCodec {
codec := &MessageCodec{
messageInterceptor: messageInterceptor,
+ log: options.ExtractCustomLogger(_options...),
}
codec.DefaultCodec = _default.NewDefaultCodec(
codec,
transportInstance,
- _default.WithCustomMessageHandler(CustomMessageHandling),
+ append(_options, _default.WithCustomMessageHandler(CustomMessageHandling(options.ExtractCustomLogger(_options...))))...,
)
return codec
}
@@ -51,7 +55,7 @@ func (m *MessageCodec) GetCodec() spi.MessageCodec {
}
func (m *MessageCodec) Send(message spi.Message) error {
- log.Trace().Msg("Sending message")
+ m.log.Trace().Msg("Sending message")
// Cast the message to the correct type of struct
knxMessage := message.(model.KnxNetIpMessage)
// Serialize the request
@@ -71,66 +75,68 @@ func (m *MessageCodec) Send(message spi.Message) error {
func (m *MessageCodec) Receive() (spi.Message, error) {
// We need at least 6 bytes in order to know how big the packet is in total
if num, err := m.GetTransportInstance().GetNumBytesAvailableInBuffer(); (err == nil) && (num >= 6) {
- log.Debug().Msgf("we got %d readable bytes", num)
+ m.log.Debug().Msgf("we got %d readable bytes", num)
data, err := m.GetTransportInstance().PeekReadableBytes(6)
if err != nil {
- log.Warn().Err(err).Msg("error peeking")
+ m.log.Warn().Err(err).Msg("error peeking")
// TODO: Possibly clean up ...
return nil, nil
}
// Get the size of the entire packet
packetSize := (uint32(data[4]) << 8) + uint32(data[5])
if num < packetSize {
- log.Trace().Msgf("Not enough bytes. Got: %d Need: %d\n", num, packetSize)
+ m.log.Trace().Msgf("Not enough bytes. Got: %d Need: %d\n", num, packetSize)
return nil, nil
}
data, err = m.GetTransportInstance().Read(packetSize)
if err != nil {
- log.Warn().Err(err).Msg("error reading")
+ m.log.Warn().Err(err).Msg("error reading")
// TODO: Possibly clean up ...
return nil, nil
}
knxMessage, err := model.KnxNetIpMessageParse(data)
if err != nil {
- log.Warn().Err(err).Msg("error parsing message")
+ m.log.Warn().Err(err).Msg("error parsing message")
// TODO: Possibly clean up ...
return nil, nil
}
return knxMessage, nil
} else if err != nil {
- log.Warn().Err(err).Msg("Got error reading")
+ m.log.Warn().Err(err).Msg("Got error reading")
return nil, nil
}
return nil, nil
}
-func CustomMessageHandling(codec _default.DefaultCodecRequirements, message spi.Message) bool {
- // If this message is a simple KNXNet/IP UDP Ack, ignore it for now
- tunnelingResponse := message.(model.TunnelingResponse)
- if tunnelingResponse != nil {
- return true
- }
+func CustomMessageHandling(localLog zerolog.Logger) _default.CustomMessageHandler {
+ return func(codec _default.DefaultCodecRequirements, message spi.Message) bool {
+ // If this message is a simple KNXNet/IP UDP Ack, ignore it for now
+ tunnelingResponse := message.(model.TunnelingResponse)
+ if tunnelingResponse != nil {
+ return true
+ }
- // If this is an incoming tunneling request, automatically send a tunneling ACK back to the gateway
- tunnelingRequest := message.(model.TunnelingRequest)
- if tunnelingRequest != nil {
- response := model.NewTunnelingResponse(
- model.NewTunnelingResponseDataBlock(
- tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId(),
- tunnelingRequest.GetTunnelingRequestDataBlock().GetSequenceCounter(),
- model.Status_NO_ERROR),
- )
- err := codec.Send(response)
- if err != nil {
- log.Warn().Err(err).Msg("got an error sending ACK from transport")
+ // If this is an incoming tunneling request, automatically send a tunneling ACK back to the gateway
+ tunnelingRequest := message.(model.TunnelingRequest)
+ if tunnelingRequest != nil {
+ response := model.NewTunnelingResponse(
+ model.NewTunnelingResponseDataBlock(
+ tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId(),
+ tunnelingRequest.GetTunnelingRequestDataBlock().GetSequenceCounter(),
+ model.Status_NO_ERROR),
+ )
+ err := codec.Send(response)
+ if err != nil {
+ localLog.Warn().Err(err).Msg("got an error sending ACK from transport")
+ }
}
- }
- localCodec := codec.(*MessageCodec)
- // Handle the packet itself
- // Give a message interceptor a chance to intercept
- if (*localCodec).messageInterceptor != nil {
- (*localCodec).messageInterceptor(message)
+ localCodec := codec.(*MessageCodec)
+ // Handle the packet itself
+ // Give a message interceptor a chance to intercept
+ if (*localCodec).messageInterceptor != nil {
+ (*localCodec).messageInterceptor(message)
+ }
+ return false
}
- return false
}
diff --git a/plc4go/internal/knxnetip/Reader.go b/plc4go/internal/knxnetip/Reader.go
index f9c0d7921d..4d4d2f7e60 100644
--- a/plc4go/internal/knxnetip/Reader.go
+++ b/plc4go/internal/knxnetip/Reader.go
@@ -21,6 +21,8 @@ package knxnetip
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/rs/zerolog"
"strconv"
"strings"
"time"
@@ -33,16 +35,18 @@ import (
spiValues "github.com/apache/plc4x/plc4go/spi/values"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Reader struct {
connection *Connection
+
+ log zerolog.Logger
}
-func NewReader(connection *Connection) *Reader {
+func NewReader(connection *Connection, _options ...options.WithOption) *Reader {
return &Reader{
connection: connection,
+ log: options.ExtractCustomLogger(_options...),
}
}
@@ -175,7 +179,7 @@ func (m Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <
func (m Reader) readGroupAddress(ctx context.Context, tag GroupAddressTag) (apiModel.PlcResponseCode, apiValues.PlcValue) {
rawAddresses, err := m.resolveAddresses(tag)
if err != nil {
- log.Debug().Err(err).Msg("error resolving addresses")
+ m.log.Debug().Err(err).Msg("error resolving addresses")
return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
}
@@ -188,7 +192,7 @@ func (m Reader) readGroupAddress(ctx context.Context, tag GroupAddressTag) (apiM
// Create a string representation of this numeric address depending on the type of requested address
stringAddress, err := NumericGroupAddressToString(numericAddress, tag)
if err != nil {
- log.Debug().Err(err).Msg("error mapping addresses")
+ m.log.Debug().Err(err).Msg("error mapping addresses")
return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
}
// Try to get a value from the cache
@@ -246,7 +250,7 @@ func (m Reader) readGroupAddress(ctx context.Context, tag GroupAddressTag) (apiM
if len(rawAddresses) == 1 {
stringAddress, err := NumericGroupAddressToString(rawAddresses[0], tag)
if err != nil {
- log.Debug().Err(err).Msg("error mapping addresses")
+ m.log.Debug().Err(err).Msg("error mapping addresses")
return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
}
return apiModel.PlcResponseCode_OK, values[stringAddress]
diff --git a/plc4go/internal/knxnetip/Subscriber.go b/plc4go/internal/knxnetip/Subscriber.go
index 7527563667..e6be1eb835 100644
--- a/plc4go/internal/knxnetip/Subscriber.go
+++ b/plc4go/internal/knxnetip/Subscriber.go
@@ -21,7 +21,9 @@ package knxnetip
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
"time"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -35,12 +37,15 @@ import (
type Subscriber struct {
connection *Connection
consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+
+ log zerolog.Logger
}
-func NewSubscriber(connection *Connection) *Subscriber {
+func NewSubscriber(connection *Connection, _options ...options.WithOption) *Subscriber {
return &Subscriber{
connection: connection,
consumers: make(map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer),
+ log: options.ExtractCustomLogger(_options...),
}
}
@@ -69,7 +74,12 @@ func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel
result <- spiModel.NewDefaultPlcSubscriptionRequestResult(
subscriptionRequest,
- spiModel.NewDefaultPlcSubscriptionResponse(subscriptionRequest, responseCodes, subscriptionValues),
+ spiModel.NewDefaultPlcSubscriptionResponse(
+ subscriptionRequest,
+ responseCodes,
+ subscriptionValues,
+ options.WithCustomLogger(m.log),
+ ),
nil,
)
}()
diff --git a/plc4go/internal/knxnetip/SubscriptionEvent.go b/plc4go/internal/knxnetip/SubscriptionEvent.go
index 974f57d159..f6c053c206 100644
--- a/plc4go/internal/knxnetip/SubscriptionEvent.go
+++ b/plc4go/internal/knxnetip/SubscriptionEvent.go
@@ -20,26 +20,34 @@
package knxnetip
import (
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/rs/zerolog"
"time"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
driverModel "github.com/apache/plc4x/plc4go/protocols/knxnetip/readwrite/model"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
-
- "github.com/rs/zerolog/log"
)
type SubscriptionEvent struct {
*spiModel.DefaultPlcSubscriptionEvent
addresses map[string][]byte
+
+ log zerolog.Logger
}
-func NewSubscriptionEvent(tags map[string]apiModel.PlcTag, types map[string]spiModel.SubscriptionType,
- intervals map[string]time.Duration, responseCodes map[string]apiModel.PlcResponseCode,
- addresses map[string][]byte, values map[string]values.PlcValue) SubscriptionEvent {
+func NewSubscriptionEvent(
+ tags map[string]apiModel.PlcTag,
+ types map[string]spiModel.SubscriptionType,
+ intervals map[string]time.Duration,
+ responseCodes map[string]apiModel.PlcResponseCode,
+ addresses map[string][]byte,
+ values map[string]values.PlcValue,
+ _options ...options.WithOption,
+) SubscriptionEvent {
subscriptionEvent := SubscriptionEvent{addresses: addresses}
- event := spiModel.NewDefaultPlcSubscriptionEvent(&subscriptionEvent, tags, types, intervals, responseCodes, values)
+ event := spiModel.NewDefaultPlcSubscriptionEvent(&subscriptionEvent, tags, types, intervals, responseCodes, values, _options...)
subscriptionEvent.DefaultPlcSubscriptionEvent = event.(*spiModel.DefaultPlcSubscriptionEvent)
return subscriptionEvent
}
@@ -59,12 +67,12 @@ func (m SubscriptionEvent) GetAddress(name string) string {
groupAddress, err = driverModel.KnxGroupAddressParse(rawAddress, 1)
}
if err != nil {
- log.Debug().Err(err).Msg("error parsing")
+ m.log.Debug().Err(err).Msg("error parsing")
return ""
}
toString, err := GroupAddressToString(groupAddress)
if err != nil {
- log.Debug().Err(err).Msg("error mapping")
+ m.log.Debug().Err(err).Msg("error mapping")
return ""
}
return toString
diff --git a/plc4go/internal/modbus/Connection.go b/plc4go/internal/modbus/Connection.go
index 55a93f5120..854a29af8f 100644
--- a/plc4go/internal/modbus/Connection.go
+++ b/plc4go/internal/modbus/Connection.go
@@ -24,6 +24,7 @@ import (
"fmt"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/tracer"
+ "github.com/rs/zerolog"
"time"
"github.com/apache/plc4x/plc4go/pkg/api"
@@ -35,7 +36,6 @@ import (
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Connection struct {
@@ -47,13 +47,15 @@ type Connection struct {
connectionId string
tracer *tracer.Tracer
+
+ log zerolog.Logger
}
-func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, options map[string][]string, tagHandler spi.PlcTagHandler, _options ...options.WithOption) *Connection {
+func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, connectionOptions map[string][]string, tagHandler spi.PlcTagHandler, _options ...options.WithOption) *Connection {
connection := &Connection{
unitIdentifier: unitIdentifier,
messageCodec: messageCodec,
- options: options,
+ options: connectionOptions,
requestInterceptor: interceptors.NewSingleItemRequestInterceptor(
spiModel.NewDefaultPlcReadRequest,
spiModel.NewDefaultPlcWriteRequest,
@@ -61,8 +63,9 @@ func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, options
spiModel.NewDefaultPlcWriteResponse,
_options...,
),
+ log: options.ExtractCustomLogger(_options...),
}
- if traceEnabledOption, ok := options["traceEnabled"]; ok {
+ if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
connection.tracer = tracer.NewTracer(connection.connectionId, _options...)
}
@@ -70,7 +73,7 @@ func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, options
connection.DefaultConnection = _default.NewDefaultConnection(connection,
_default.WithDefaultTtl(time.Second*5),
_default.WithPlcTagHandler(tagHandler),
- _default.WithPlcValueHandler(NewValueHandler()),
+ _default.WithPlcValueHandler(NewValueHandler(_options...)),
)
return connection
}
@@ -98,7 +101,7 @@ func (m *Connection) GetMessageCodec() spi.MessageCodec {
func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
// TODO: use proper context
ctx := context.TODO()
- log.Trace().Msg("Pinging")
+ m.log.Trace().Msg("Pinging")
result := make(chan plc4go.PlcConnectionPingResult, 1)
go func() {
defer func() {
@@ -117,19 +120,19 @@ func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
return responseAdu.GetTransactionIdentifier() == 1 && responseAdu.GetUnitIdentifier() == m.unitIdentifier
},
func(message spi.Message) error {
- log.Trace().Msgf("Received Message")
+ m.log.Trace().Msgf("Received Message")
if message != nil {
// If we got a valid response (even if it will probably contain an error, we know the remote is available)
- log.Trace().Msg("got valid response")
+ m.log.Trace().Msg("got valid response")
result <- _default.NewDefaultPlcConnectionPingResult(nil)
} else {
- log.Trace().Msg("got no response")
+ m.log.Trace().Msg("got no response")
result <- _default.NewDefaultPlcConnectionPingResult(errors.New("no response"))
}
return nil
},
func(err error) error {
- log.Trace().Msgf("Received Error")
+ m.log.Trace().Msgf("Received Error")
result <- _default.NewDefaultPlcConnectionPingResult(errors.Wrap(err, "got error processing request"))
return nil
},
@@ -151,7 +154,7 @@ func (m *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
return spiModel.NewDefaultPlcReadRequestBuilderWithInterceptor(
m.GetPlcTagHandler(),
- NewReader(m.unitIdentifier, m.messageCodec),
+ NewReader(m.unitIdentifier, m.messageCodec, options.WithCustomLogger(m.log)),
m.requestInterceptor,
)
}
@@ -160,7 +163,7 @@ func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
return spiModel.NewDefaultPlcWriteRequestBuilderWithInterceptor(
m.GetPlcTagHandler(),
m.GetPlcValueHandler(),
- NewWriter(m.unitIdentifier, m.messageCodec),
+ NewWriter(m.unitIdentifier, m.messageCodec, options.WithCustomLogger(m.log)),
m.requestInterceptor,
)
}
diff --git a/plc4go/internal/modbus/MessageCodec.go b/plc4go/internal/modbus/MessageCodec.go
index 597bc5bbcb..6469d1c616 100644
--- a/plc4go/internal/modbus/MessageCodec.go
+++ b/plc4go/internal/modbus/MessageCodec.go
@@ -23,19 +23,23 @@ import (
"github.com/apache/plc4x/plc4go/protocols/modbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/default"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
+ "github.com/rs/zerolog"
)
type MessageCodec struct {
_default.DefaultCodec
expectationCounter int32
+
+ log zerolog.Logger
}
-func NewMessageCodec(transportInstance transports.TransportInstance) *MessageCodec {
+func NewMessageCodec(transportInstance transports.TransportInstance, _options ...options.WithOption) *MessageCodec {
codec := &MessageCodec{
expectationCounter: 1,
+ log: options.ExtractCustomLogger(_options...),
}
codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance)
return codec
@@ -46,7 +50,7 @@ func (m *MessageCodec) GetCodec() spi.MessageCodec {
}
func (m *MessageCodec) Send(message spi.Message) error {
- log.Trace().Msg("Sending message")
+ m.log.Trace().Msg("Sending message")
// Cast the message to the correct type of struct
tcpAdu := message.(model.ModbusTcpADU)
// Serialize the request
@@ -66,17 +70,17 @@ func (m *MessageCodec) Send(message spi.Message) error {
func (m *MessageCodec) Receive() (spi.Message, error) {
// We need at least 6 bytes in order to know how big the packet is in total
if num, err := m.GetTransportInstance().GetNumBytesAvailableInBuffer(); (err == nil) && (num >= 6) {
- log.Debug().Msgf("we got %d readable bytes", num)
+ m.log.Debug().Msgf("we got %d readable bytes", num)
data, err := m.GetTransportInstance().PeekReadableBytes(6)
if err != nil {
- log.Warn().Err(err).Msg("error peeking")
+ m.log.Warn().Err(err).Msg("error peeking")
// TODO: Possibly clean up ...
return nil, nil
}
// Get the size of the entire packet
packetSize := (uint32(data[4]) << 8) + uint32(data[5]) + 6
if num < packetSize {
- log.Debug().Msgf("Not enough bytes. Got: %d Need: %d\n", num, packetSize)
+ m.log.Debug().Msgf("Not enough bytes. Got: %d Need: %d\n", num, packetSize)
return nil, nil
}
data, err = m.GetTransportInstance().Read(packetSize)
@@ -86,13 +90,13 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
}
tcpAdu, err := model.ModbusTcpADUParse(data, model.DriverType_MODBUS_TCP, true)
if err != nil {
- log.Warn().Err(err).Msg("error parsing")
+ m.log.Warn().Err(err).Msg("error parsing")
// TODO: Possibly clean up ...
return nil, nil
}
return tcpAdu, nil
} else if err != nil {
- log.Warn().Err(err).Msg("Got error reading")
+ m.log.Warn().Err(err).Msg("Got error reading")
return nil, nil
}
// TODO: maybe we return here a not enough error error
diff --git a/plc4go/internal/modbus/ModbusAsciiDriver.go b/plc4go/internal/modbus/ModbusAsciiDriver.go
index d36672d072..73e224cd98 100644
--- a/plc4go/internal/modbus/ModbusAsciiDriver.go
+++ b/plc4go/internal/modbus/ModbusAsciiDriver.go
@@ -29,7 +29,6 @@ import (
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
"github.com/rs/zerolog"
- "github.com/rs/zerolog/log"
"net/url"
"strconv"
)
@@ -48,22 +47,22 @@ func NewModbusAsciiDriver(_options ...options.WithOption) *ModbusAsciiDriver {
return driver
}
-func (m ModbusAsciiDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(options))
+func (m ModbusAsciiDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, connectionOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+ m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(connectionOptions))
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
- log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
+ m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
- options["defaultTcpPort"] = []string{"502"}
+ connectionOptions["defaultTcpPort"] = []string{"502"}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, connectionOptions)
if err != nil {
- log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
+ m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", connectionOptions["defaultTcpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
@@ -75,7 +74,7 @@ func (m ModbusAsciiDriver) GetConnectionWithContext(ctx context.Context, transpo
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
for {
@@ -83,28 +82,28 @@ func (m ModbusAsciiDriver) GetConnectionWithContext(ctx context.Context, transpo
adu := msg.(model.ModbusTcpADU)
serialized, err := json.Marshal(adu)
if err != nil {
- log.Error().Err(err).Msg("got error serializing adu")
+ m.log.Error().Err(err).Msg("got error serializing adu")
} else {
- log.Debug().Msgf("got message in the default handler %s\n", serialized)
+ m.log.Debug().Msgf("got message in the default handler %s\n", serialized)
}
}
}()
- codec := NewMessageCodec(transportInstance)
- log.Debug().Msgf("working with codec %#v", codec)
+ codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
+ m.log.Debug().Msgf("working with codec %#v", codec)
// If a unit-identifier was provided in the connection string use this, otherwise use the default of 1
unitIdentifier := uint8(1)
- if value, ok := options["unit-identifier"]; ok {
+ if value, ok := connectionOptions["unit-identifier"]; ok {
var intValue uint64
intValue, err = strconv.ParseUint(value[0], 10, 8)
if err == nil {
unitIdentifier = uint8(intValue)
}
}
- log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
+ m.log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
// Create the new connection
- connection := NewConnection(unitIdentifier, codec, options, m.GetPlcTagHandler())
- log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
+ connection := NewConnection(unitIdentifier, codec, connectionOptions, m.GetPlcTagHandler())
+ m.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/modbus/ModbusRtuDriver.go b/plc4go/internal/modbus/ModbusRtuDriver.go
index 51bfabe649..a2c0f5b6fc 100644
--- a/plc4go/internal/modbus/ModbusRtuDriver.go
+++ b/plc4go/internal/modbus/ModbusRtuDriver.go
@@ -29,7 +29,6 @@ import (
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
"github.com/rs/zerolog"
- "github.com/rs/zerolog/log"
"net/url"
"strconv"
)
@@ -48,22 +47,22 @@ func NewModbusRtuDriver(_options ...options.WithOption) *ModbusRtuDriver {
return driver
}
-func (m ModbusRtuDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(options))
+func (m ModbusRtuDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+ m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
- log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
+ m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
- options["defaultTcpPort"] = []string{"502"}
+ driverOptions["defaultTcpPort"] = []string{"502"}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions)
if err != nil {
- log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
+ m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
@@ -75,7 +74,7 @@ func (m ModbusRtuDriver) GetConnectionWithContext(ctx context.Context, transport
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
for {
@@ -83,28 +82,28 @@ func (m ModbusRtuDriver) GetConnectionWithContext(ctx context.Context, transport
adu := msg.(model.ModbusTcpADU)
serialized, err := json.Marshal(adu)
if err != nil {
- log.Error().Err(err).Msg("got error serializing adu")
+ m.log.Error().Err(err).Msg("got error serializing adu")
} else {
- log.Debug().Msgf("got message in the default handler %s\n", serialized)
+ m.log.Debug().Msgf("got message in the default handler %s\n", serialized)
}
}
}()
- codec := NewMessageCodec(transportInstance)
- log.Debug().Msgf("working with codec %#v", codec)
+ codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
+ m.log.Debug().Msgf("working with codec %#v", codec)
// If a unit-identifier was provided in the connection string use this, otherwise use the default of 1
unitIdentifier := uint8(1)
- if value, ok := options["unit-identifier"]; ok {
+ if value, ok := driverOptions["unit-identifier"]; ok {
var intValue uint64
intValue, err = strconv.ParseUint(value[0], 10, 8)
if err == nil {
unitIdentifier = uint8(intValue)
}
}
- log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
+ m.log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
// Create the new connection
- connection := NewConnection(unitIdentifier, codec, options, m.GetPlcTagHandler())
- log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
+ connection := NewConnection(unitIdentifier, codec, driverOptions, m.GetPlcTagHandler())
+ m.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/modbus/ModbusTcpDriver.go b/plc4go/internal/modbus/ModbusTcpDriver.go
index 1633a6a71c..e0d6069dc8 100644
--- a/plc4go/internal/modbus/ModbusTcpDriver.go
+++ b/plc4go/internal/modbus/ModbusTcpDriver.go
@@ -29,7 +29,6 @@ import (
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
"github.com/rs/zerolog"
- "github.com/rs/zerolog/log"
"net/url"
"strconv"
)
@@ -48,22 +47,22 @@ func NewModbusTcpDriver(_options ...options.WithOption) *ModbusTcpDriver {
return driver
}
-func (m ModbusTcpDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(options))
+func (m ModbusTcpDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+ m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
- log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
+ m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
- options["defaultTcpPort"] = []string{"502"}
+ driverOptions["defaultTcpPort"] = []string{"502"}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions)
if err != nil {
- log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
+ m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
@@ -75,7 +74,7 @@ func (m ModbusTcpDriver) GetConnectionWithContext(ctx context.Context, transport
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ m.log.Error().Msgf("panic-ed %v", err)
}
}()
for {
@@ -83,28 +82,28 @@ func (m ModbusTcpDriver) GetConnectionWithContext(ctx context.Context, transport
adu := msg.(model.ModbusTcpADU)
serialized, err := json.Marshal(adu)
if err != nil {
- log.Error().Err(err).Msg("got error serializing adu")
+ m.log.Error().Err(err).Msg("got error serializing adu")
} else {
- log.Debug().Msgf("got message in the default handler %s\n", serialized)
+ m.log.Debug().Msgf("got message in the default handler %s\n", serialized)
}
}
}()
- codec := NewMessageCodec(transportInstance)
- log.Debug().Msgf("working with codec %#v", codec)
+ codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
+ m.log.Debug().Msgf("working with codec %#v", codec)
// If a unit-identifier was provided in the connection string use this, otherwise use the default of 1
unitIdentifier := uint8(1)
- if value, ok := options["unit-identifier"]; ok {
+ if value, ok := driverOptions["unit-identifier"]; ok {
var intValue uint64
intValue, err = strconv.ParseUint(value[0], 10, 8)
if err == nil {
unitIdentifier = uint8(intValue)
}
}
- log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
+ m.log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
// Create the new connection
- connection := NewConnection(unitIdentifier, codec, options, m.GetPlcTagHandler())
- log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
+ connection := NewConnection(unitIdentifier, codec, driverOptions, m.GetPlcTagHandler())
+ m.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/modbus/Reader.go b/plc4go/internal/modbus/Reader.go
index 9d94f74cce..29d2689dc1 100644
--- a/plc4go/internal/modbus/Reader.go
+++ b/plc4go/internal/modbus/Reader.go
@@ -21,6 +21,8 @@ package modbus
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/rs/zerolog"
"math"
"sync/atomic"
"time"
@@ -32,26 +34,28 @@ import (
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Reader struct {
transactionIdentifier int32
unitIdentifier uint8
messageCodec spi.MessageCodec
+
+ log zerolog.Logger
}
-func NewReader(unitIdentifier uint8, messageCodec spi.MessageCodec) *Reader {
+func NewReader(unitIdentifier uint8, messageCodec spi.MessageCodec, _options ...options.WithOption) *Reader {
return &Reader{
transactionIdentifier: 0,
unitIdentifier: unitIdentifier,
messageCodec: messageCodec,
+ log: options.ExtractCustomLogger(_options...),
}
}
func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
// TODO: handle ctx
- log.Trace().Msg("Reading")
+ m.log.Trace().Msg("Reading")
result := make(chan apiModel.PlcReadRequestResult, 1)
go func() {
defer func() {
@@ -61,7 +65,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
}()
if len(readRequest.GetTagNames()) != 1 {
result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.New("modbus only supports single-item requests"))
- log.Debug().Msgf("modbus only supports single-item requests. Got %d tags", len(readRequest.GetTagNames()))
+ m.log.Debug().Msgf("modbus only supports single-item requests. Got %d tags", len(readRequest.GetTagNames()))
return
}
// If we are requesting only one tag, use a
@@ -74,11 +78,11 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
nil,
errors.Wrap(err, "invalid tag item type"),
)
- log.Debug().Msgf("Invalid tag item type %T", tag)
+ m.log.Debug().Msgf("Invalid tag item type %T", tag)
return
}
numWords := uint16(math.Ceil(float64(modbusTagVar.Quantity*uint16(modbusTagVar.Datatype.DataTypeSize())) / float64(2)))
- log.Debug().Msgf("Working with %d words", numWords)
+ m.log.Debug().Msgf("Working with %d words", numWords)
var pdu readWriteModel.ModbusPDU = nil
switch modbusTagVar.TagType {
case Coil:
@@ -102,7 +106,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
nil,
errors.Errorf("unsupported tag type %x", modbusTagVar.TagType),
)
- log.Debug().Msgf("Unsupported tag type %x", modbusTagVar.TagType)
+ m.log.Debug().Msgf("Unsupported tag type %x", modbusTagVar.TagType)
return
}
@@ -112,24 +116,24 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
transactionIdentifier = 1
atomic.StoreInt32(&m.transactionIdentifier, 1)
}
- log.Debug().Msgf("Calculated transaction identifier %x", transactionIdentifier)
+ m.log.Debug().Msgf("Calculated transaction identifier %x", transactionIdentifier)
// Assemble the finished ADU
- log.Trace().Msg("Assemble ADU")
+ m.log.Trace().Msg("Assemble ADU")
requestAdu := readWriteModel.NewModbusTcpADU(uint16(transactionIdentifier), m.unitIdentifier, pdu, false)
// Send the ADU over the wire
- log.Trace().Msg("Send ADU")
+ m.log.Trace().Msg("Send ADU")
if err = m.messageCodec.SendRequest(ctx, requestAdu, func(message spi.Message) bool {
responseAdu := message.(readWriteModel.ModbusTcpADU)
return responseAdu.GetTransactionIdentifier() == uint16(transactionIdentifier) &&
responseAdu.GetUnitIdentifier() == requestAdu.UnitIdentifier
}, func(message spi.Message) error {
// Convert the response into an ADU
- log.Trace().Msg("convert response to ADU")
+ m.log.Trace().Msg("convert response to ADU")
responseAdu := message.(readWriteModel.ModbusTcpADU)
// Convert the modbus response into a PLC4X response
- log.Trace().Msg("convert response to PLC4X response")
+ m.log.Trace().Msg("convert response to PLC4X response")
readResponse, err := m.ToPlc4xReadResponse(responseAdu, readRequest)
if err != nil {
@@ -186,7 +190,7 @@ func (m *Reader) ToPlc4xReadResponse(responseAdu readWriteModel.ModbusTcpADU, re
}
// Get the tag from the request
- log.Trace().Msg("get a tag from request")
+ m.log.Trace().Msg("get a tag from request")
tagName := readRequest.GetTagNames()[0]
tag, err := CastToModbusTagFromPlcTag(readRequest.GetTag(tagName))
if err != nil {
@@ -194,7 +198,7 @@ func (m *Reader) ToPlc4xReadResponse(responseAdu readWriteModel.ModbusTcpADU, re
}
// Decode the data according to the information from the request
- log.Trace().Msg("decode data")
+ m.log.Trace().Msg("decode data")
value, err := readWriteModel.DataItemParse(context.Background(), data, tag.Datatype, tag.Quantity)
if err != nil {
return nil, errors.Wrap(err, "Error parsing data item")
@@ -205,6 +209,6 @@ func (m *Reader) ToPlc4xReadResponse(responseAdu readWriteModel.ModbusTcpADU, re
responseCodes[tagName] = apiModel.PlcResponseCode_OK
// Return the response
- log.Trace().Msg("Returning the response")
+ m.log.Trace().Msg("Returning the response")
return spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues), nil
}
diff --git a/plc4go/internal/modbus/Tag.go b/plc4go/internal/modbus/Tag.go
index e9b6b1863e..eaa027d317 100644
--- a/plc4go/internal/modbus/Tag.go
+++ b/plc4go/internal/modbus/Tag.go
@@ -23,6 +23,7 @@ import (
"context"
"encoding/binary"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/options"
"strconv"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -32,7 +33,6 @@ import (
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
const (
@@ -57,18 +57,19 @@ func NewTag(tagType TagType, address uint16, quantity uint16, datatype readWrite
}
}
-func NewModbusPlcTagFromStrings(tagType TagType, addressString string, quantityString string, datatype readWriteModel.ModbusDataType) (apiModel.PlcTag, error) {
+func NewModbusPlcTagFromStrings(tagType TagType, addressString string, quantityString string, datatype readWriteModel.ModbusDataType, _options ...options.WithOption) (apiModel.PlcTag, error) {
address, err := strconv.ParseUint(addressString, 10, 16)
if err != nil {
return nil, errors.Errorf("Couldn't parse address string '%s' into an int", addressString)
}
+ localLogger := options.ExtractCustomLogger(_options...)
if quantityString == "" {
- log.Debug().Msg("No quantity supplied, assuming 1")
+ localLogger.Debug().Msg("No quantity supplied, assuming 1")
quantityString = "1"
}
quantity, err := strconv.ParseUint(quantityString, 10, 16)
if err != nil {
- log.Warn().Err(err).Msgf("Error during parsing for %s. Falling back to 1", quantityString)
+ localLogger.Warn().Err(err).Msgf("Error during parsing for %s. Falling back to 1", quantityString)
quantity = 1
}
return NewTag(tagType, uint16(address), uint16(quantity), datatype), nil
diff --git a/plc4go/internal/modbus/TagHandler.go b/plc4go/internal/modbus/TagHandler.go
index 47987b49dc..edca39a695 100644
--- a/plc4go/internal/modbus/TagHandler.go
+++ b/plc4go/internal/modbus/TagHandler.go
@@ -21,6 +21,8 @@ package modbus
import (
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/rs/zerolog"
"regexp"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -56,9 +58,11 @@ type TagHandler struct {
numericHoldingRegisterPattern *regexp.Regexp
plc4xExtendedRegisterPattern *regexp.Regexp
numericExtendedRegisterPattern *regexp.Regexp
+
+ log zerolog.Logger
}
-func NewTagHandler() TagHandler {
+func NewTagHandler(_options ...options.WithOption) TagHandler {
generalAddressPattern := `(?P<address>\d+)(:(?P<datatype>[a-zA-Z_]+))?(\[(?P<quantity>\d+)])?$`
generalFixedDigitAddressPattern := `(?P<address>\d{4,5})?(:(?P<datatype>[a-zA-Z_]+))?(\[(?P<quantity>\d+)])?$`
return TagHandler{
@@ -72,6 +76,7 @@ func NewTagHandler() TagHandler {
numericHoldingRegisterPattern: regexp.MustCompile("^4[xX]?" + generalFixedDigitAddressPattern),
plc4xExtendedRegisterPattern: regexp.MustCompile("^extended-register:" + generalAddressPattern),
numericExtendedRegisterPattern: regexp.MustCompile("^6[xX]?" + generalFixedDigitAddressPattern),
+ log: options.ExtractCustomLogger(_options...),
}
}
diff --git a/plc4go/internal/modbus/ValueHandler.go b/plc4go/internal/modbus/ValueHandler.go
index 85ad6b1532..bbfa30f58e 100644
--- a/plc4go/internal/modbus/ValueHandler.go
+++ b/plc4go/internal/modbus/ValueHandler.go
@@ -20,13 +20,16 @@
package modbus
import (
- "github.com/apache/plc4x/plc4go/spi/values"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ spiValues "github.com/apache/plc4x/plc4go/spi/values"
)
type ValueHandler struct {
- values.DefaultValueHandler
+ spiValues.DefaultValueHandler
}
-func NewValueHandler() ValueHandler {
- return ValueHandler{}
+func NewValueHandler(_options ...options.WithOption) ValueHandler {
+ return ValueHandler{
+ spiValues.NewDefaultValueHandler(_options...),
+ }
}
diff --git a/plc4go/internal/modbus/Writer.go b/plc4go/internal/modbus/Writer.go
index 24f14dd89b..6c21dbff56 100644
--- a/plc4go/internal/modbus/Writer.go
+++ b/plc4go/internal/modbus/Writer.go
@@ -21,6 +21,8 @@ package modbus
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/rs/zerolog"
"math"
"sync/atomic"
"time"
@@ -31,20 +33,22 @@ import (
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Writer struct {
transactionIdentifier int32
unitIdentifier uint8
messageCodec spi.MessageCodec
+
+ log zerolog.Logger
}
-func NewWriter(unitIdentifier uint8, messageCodec spi.MessageCodec) Writer {
+func NewWriter(unitIdentifier uint8, messageCodec spi.MessageCodec, _options ...options.WithOption) Writer {
return Writer{
transactionIdentifier: 0,
unitIdentifier: unitIdentifier,
messageCodec: messageCodec,
+ log: options.ExtractCustomLogger(_options...),
}
}
@@ -186,13 +190,13 @@ func (m Writer) ToPlc4xWriteResponse(requestAdu readWriteModel.ModbusTcpADU, res
case readWriteModel.ModbusErrorCode_GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND:
responseCodes[tagName] = apiModel.PlcResponseCode_REMOTE_ERROR
default:
- log.Debug().Msgf("Unmapped exception code %x", resp.GetExceptionCode())
+ m.log.Debug().Msgf("Unmapped exception code %x", resp.GetExceptionCode())
}
default:
return nil, errors.Errorf("unsupported response type %T", resp)
}
// Return the response
- log.Trace().Msg("Returning the response")
+ m.log.Trace().Msg("Returning the response")
return spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil
}
diff --git a/plc4go/internal/s7/Configuration.go b/plc4go/internal/s7/Configuration.go
index 45e0dfb985..5c6d7d4bda 100644
--- a/plc4go/internal/s7/Configuration.go
+++ b/plc4go/internal/s7/Configuration.go
@@ -20,10 +20,10 @@
package s7
import (
+ "github.com/rs/zerolog"
"strconv"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Configuration struct {
@@ -37,7 +37,7 @@ type Configuration struct {
controllerType ControllerType
}
-func ParseFromOptions(options map[string][]string) (Configuration, error) {
+func ParseFromOptions(localLog zerolog.Logger, options map[string][]string) (Configuration, error) {
configuration := Configuration{
localRack: 1,
localSlot: 1,
@@ -48,35 +48,35 @@ func ParseFromOptions(options map[string][]string) (Configuration, error) {
maxAmqCallee: 8,
controllerType: ControllerType_UNKNOWN,
}
- if localRackString := getFromOptions(options, "local-rack"); localRackString != "" {
+ if localRackString := getFromOptions(localLog, options, "local-rack"); localRackString != "" {
parsedInt, err := strconv.ParseInt(localRackString, 10, 32)
if err != nil {
return Configuration{}, errors.Wrap(err, "Error parsing local-rack")
}
configuration.localRack = int32(parsedInt)
}
- if localSlotString := getFromOptions(options, "local-slot"); localSlotString != "" {
+ if localSlotString := getFromOptions(localLog, options, "local-slot"); localSlotString != "" {
parsedInt, err := strconv.ParseInt(localSlotString, 10, 32)
if err != nil {
return Configuration{}, errors.Wrap(err, "Error parsing local-slot")
}
configuration.localSlot = int32(parsedInt)
}
- if remoteRackString := getFromOptions(options, "remote-rack"); remoteRackString != "" {
+ if remoteRackString := getFromOptions(localLog, options, "remote-rack"); remoteRackString != "" {
parsedInt, err := strconv.ParseInt(remoteRackString, 10, 32)
if err != nil {
return Configuration{}, errors.Wrap(err, "Error parsing remote-rack")
}
configuration.remoteRack = int32(parsedInt)
}
- if remoteSlotString := getFromOptions(options, "remote-slot"); remoteSlotString != "" {
+ if remoteSlotString := getFromOptions(localLog, options, "remote-slot"); remoteSlotString != "" {
parsedInt, err := strconv.ParseInt(remoteSlotString, 10, 32)
if err != nil {
return Configuration{}, errors.Wrap(err, "Error parsing remote-slot")
}
configuration.remoteSlot = int32(parsedInt)
}
- if controllerTypeString := getFromOptions(options, "controller-type"); controllerTypeString != "" {
+ if controllerTypeString := getFromOptions(localLog, options, "controller-type"); controllerTypeString != "" {
switch controllerTypeString {
case "ANY":
configuration.controllerType = ControllerType_ANY
@@ -97,7 +97,7 @@ func ParseFromOptions(options map[string][]string) (Configuration, error) {
}
}
- pduSizeString := getFromOptions(options, "pdu-size")
+ pduSizeString := getFromOptions(localLog, options, "pdu-size")
if pduSizeString != "" {
parsedUint, err := strconv.ParseUint(pduSizeString, 10, 16)
if err != nil {
@@ -106,7 +106,7 @@ func ParseFromOptions(options map[string][]string) (Configuration, error) {
configuration.pduSize = uint16(parsedUint)
}
- if maxAmqCallerString := getFromOptions(options, "max-amq-caller"); maxAmqCallerString != "" {
+ if maxAmqCallerString := getFromOptions(localLog, options, "max-amq-caller"); maxAmqCallerString != "" {
parsedUint, err := strconv.ParseUint(maxAmqCallerString, 10, 16)
if err != nil {
return Configuration{}, errors.Wrapf(err, "Error parsing max-amq-caller %s", maxAmqCallerString)
@@ -114,7 +114,7 @@ func ParseFromOptions(options map[string][]string) (Configuration, error) {
configuration.maxAmqCaller = uint16(parsedUint)
}
- if maxAmqCalleeString := getFromOptions(options, "max-amq-callee"); maxAmqCalleeString != "" {
+ if maxAmqCalleeString := getFromOptions(localLog, options, "max-amq-callee"); maxAmqCalleeString != "" {
parsedUint, err := strconv.ParseUint(maxAmqCalleeString, 10, 16)
if err != nil {
return Configuration{}, errors.Wrapf(err, "Error parsing max-amq-callee %s", maxAmqCalleeString)
@@ -124,13 +124,13 @@ func ParseFromOptions(options map[string][]string) (Configuration, error) {
return configuration, nil
}
-func getFromOptions(options map[string][]string, key string) string {
+func getFromOptions(localLog zerolog.Logger, options map[string][]string, key string) string {
if optionValues, ok := options[key]; ok {
if len(optionValues) <= 0 {
return ""
}
if len(optionValues) > 1 {
- log.Warn().Msgf("Options %s must be unique", key)
+ localLog.Warn().Msgf("Options %s must be unique", key)
}
return optionValues[0]
}
diff --git a/plc4go/internal/s7/Connection.go b/plc4go/internal/s7/Connection.go
index 29e9f92232..867f44b6a9 100644
--- a/plc4go/internal/s7/Connection.go
+++ b/plc4go/internal/s7/Connection.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/tracer"
"github.com/apache/plc4x/plc4go/spi/transactions"
+ "github.com/rs/zerolog"
"reflect"
"strings"
"sync"
@@ -37,7 +38,6 @@ import (
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type TpduGenerator struct {
@@ -67,17 +67,20 @@ type Connection struct {
connectionId string
tracer *tracer.Tracer
+
+ log zerolog.Logger
}
-func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, options map[string][]string, _options ...options.WithOption) *Connection {
+func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, connectionOptions map[string][]string, _options ...options.WithOption) *Connection {
connection := &Connection{
tpduGenerator: TpduGenerator{currentTpduId: 10},
messageCodec: messageCodec,
configuration: configuration,
driverContext: driverContext,
tm: tm,
+ log: options.ExtractCustomLogger(_options...),
}
- if traceEnabledOption, ok := options["traceEnabled"]; ok {
+ if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
connection.tracer = tracer.NewTracer(connection.connectionId, _options...)
}
@@ -110,7 +113,7 @@ func (m *Connection) GetMessageCodec() spi.MessageCodec {
}
func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
- log.Trace().Msg("Connecting")
+ m.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
go func() {
defer func() {
@@ -125,7 +128,7 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
// Only on active connections we do a connection
if m.driverContext.PassiveMode {
- log.Info().Msg("S7 Driver running in PASSIVE mode.")
+ m.log.Info().Msg("S7 Driver running in PASSIVE mode.")
ch <- _default.NewDefaultPlcConnectionConnectResult(m, nil)
return
}
@@ -133,7 +136,7 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
// For testing purposes we can skip the waiting for a complete connection
if !m.driverContext.awaitSetupComplete {
go m.setupConnection(ctx, ch)
- log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
+ m.log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
// Here we write directly and don't wait till the connection is "really" connected
// Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete
ch <- _default.NewDefaultPlcConnectionConnectResult(m, err)
@@ -142,7 +145,7 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
}
// Only the TCP transport supports login.
- log.Info().Msg("S7 Driver running in ACTIVE mode.")
+ m.log.Info().Msg("S7 Driver running in ACTIVE mode.")
m.setupConnection(ctx, ch)
}()
@@ -150,7 +153,7 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
}
func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
- log.Debug().Msg("Sending COTP Connection Request")
+ m.log.Debug().Msg("Sending COTP Connection Request")
// Open the session on ISO Transport Protocol first.
cotpConnectionResult := make(chan readWriteModel.COTPPacketConnectionResponse, 1)
cotpConnectionErrorChan := make(chan error, 1)
@@ -169,7 +172,7 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}, func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
m.Close()
}
cotpConnectionErrorChan <- errors.Wrap(err, "got error processing request")
@@ -179,8 +182,8 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}
select {
case cotpPacketConnectionResponse := <-cotpConnectionResult:
- log.Debug().Msg("Got COTP Connection Response")
- log.Debug().Msg("Sending S7 Connection Request")
+ m.log.Debug().Msg("Got COTP Connection Response")
+ m.log.Debug().Msg("Sending S7 Connection Request")
// Send an S7 login message.
s7ConnectionResult := make(chan readWriteModel.S7ParameterSetupCommunication, 1)
@@ -210,7 +213,7 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}, func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
m.Close()
}
s7ConnectionErrorChan <- errors.Wrap(err, "got error processing request")
@@ -220,8 +223,8 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}
select {
case setupCommunication := <-s7ConnectionResult:
- log.Debug().Msg("Got S7 Connection Response")
- log.Debug().Msg("Sending identify remote Request")
+ m.log.Debug().Msg("Got S7 Connection Response")
+ m.log.Debug().Msg("Sending identify remote Request")
// Save some data from the response.
m.driverContext.MaxAmqCaller = setupCommunication.GetMaxAmqCaller()
m.driverContext.MaxAmqCallee = setupCommunication.GetMaxAmqCallee()
@@ -243,7 +246,7 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}
// Prepare a message to request the remote to identify itself.
- log.Debug().Msg("Sending S7 Identification Request")
+ m.log.Debug().Msg("Sending S7 Identification Request")
s7IdentificationResult := make(chan readWriteModel.S7PayloadUserData, 1)
s7IdentificationErrorChan := make(chan error, 1)
if err := m.messageCodec.SendRequest(ctx, m.createIdentifyRemoteMessage(), func(message spi.Message) bool {
@@ -270,7 +273,7 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}, func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
m.Close()
}
s7IdentificationErrorChan <- errors.Wrap(err, "got error processing request")
@@ -280,7 +283,7 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}
select {
case payloadUserData := <-s7IdentificationResult:
- log.Debug().Msg("Got S7 Identification Response")
+ m.log.Debug().Msg("Got S7 Identification Response")
m.extractControllerTypeAndFireConnected(payloadUserData, ch)
case err := <-s7IdentificationErrorChan:
m.fireConnectionError(errors.Wrap(err, "Error receiving identify remote Request"), ch)
@@ -297,7 +300,7 @@ func (m *Connection) fireConnectionError(err error, ch chan<- plc4go.PlcConnecti
if m.driverContext.awaitSetupComplete {
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Error during connection"))
} else {
- log.Error().Err(err).Msg("awaitSetupComplete set to false and we got a error during connect")
+ m.log.Error().Err(err).Msg("awaitSetupComplete set to false and we got a error during connect")
}
}
@@ -305,7 +308,7 @@ func (m *Connection) fireConnected(ch chan<- plc4go.PlcConnectionConnectResult)
if m.driverContext.awaitSetupComplete {
ch <- _default.NewDefaultPlcConnectionConnectResult(m, nil)
} else {
- log.Info().Msg("Successfully connected")
+ m.log.Info().Msg("Successfully connected")
}
m.SetConnected(true)
}
@@ -341,7 +344,7 @@ func (m *Connection) extractControllerTypeAndFireConnected(payloadUserData readW
case "4":
controllerType = ControllerType_S7_400
default:
- log.Info().Msgf("Looking up unknown article number %s", articleNumber)
+ m.log.Info().Msgf("Looking up unknown article number %s", articleNumber)
controllerType = ControllerType_ANY
}
m.driverContext.ControllerType = controllerType
@@ -401,12 +404,12 @@ func (m *Connection) createS7ConnectionRequest(cotpPacketConnectionResponse read
case readWriteModel.COTPParameterCallingTsap:
if parameter.GetTsapId() != m.driverContext.CallingTsapId {
m.driverContext.CallingTsapId = parameter.GetTsapId()
- log.Warn().Msgf("Switching calling TSAP id to '%x'", m.driverContext.CallingTsapId)
+ m.log.Warn().Msgf("Switching calling TSAP id to '%x'", m.driverContext.CallingTsapId)
}
case readWriteModel.COTPParameterTpduSize:
m.driverContext.CotpTpduSize = parameter.GetTpduSize()
default:
- log.Warn().Msgf("Got unknown parameter type '%v'", reflect.TypeOf(parameter))
+ m.log.Warn().Msgf("Got unknown parameter type '%v'", reflect.TypeOf(parameter))
}
}
@@ -441,7 +444,7 @@ func (m *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
}
func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
- return spiModel.NewDefaultPlcReadRequestBuilder(m.GetPlcTagHandler(), NewReader(&m.tpduGenerator, m.messageCodec, m.tm))
+ return spiModel.NewDefaultPlcReadRequestBuilder(m.GetPlcTagHandler(), NewReader(&m.tpduGenerator, m.messageCodec, m.tm, options.WithCustomLogger(m.log)))
}
func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
diff --git a/plc4go/internal/s7/Driver.go b/plc4go/internal/s7/Driver.go
index 30277a25c7..ae8ffc382a 100644
--- a/plc4go/internal/s7/Driver.go
+++ b/plc4go/internal/s7/Driver.go
@@ -30,7 +30,6 @@ import (
_default "github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Driver struct {
@@ -54,51 +53,51 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
return driver
}
-func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(options))
+func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+ m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
// Get the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
- log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
+ m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
- options["defaultTcpPort"] = []string{"102"}
+ driverOptions["defaultTcpPort"] = []string{"102"}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions)
if err != nil {
- log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
+ m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
}
- codec := NewMessageCodec(transportInstance)
- log.Debug().Msgf("working with codec %#v", codec)
+ codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
+ m.log.Debug().Msgf("working with codec %#v", codec)
- configuration, err := ParseFromOptions(options)
+ configuration, err := ParseFromOptions(m.log, driverOptions)
if err != nil {
- log.Error().Err(err).Msgf("Invalid options")
+ m.log.Error().Err(err).Msgf("Invalid driverOptions")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid driverOptions"))
return ch
}
driverContext, err := NewDriverContext(configuration)
if err != nil {
- log.Error().Err(err).Msgf("Invalid options")
+ m.log.Error().Err(err).Msgf("Invalid driverOptions")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid driverOptions"))
return ch
}
driverContext.awaitSetupComplete = m.awaitSetupComplete
driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete
// Create the new connection
- connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, options)
- log.Debug().Msg("created connection, connecting now")
+ connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, driverOptions)
+ m.log.Debug().Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/s7/MessageCodec.go b/plc4go/internal/s7/MessageCodec.go
index 325ed21bbf..59f4edd4ed 100644
--- a/plc4go/internal/s7/MessageCodec.go
+++ b/plc4go/internal/s7/MessageCodec.go
@@ -23,18 +23,23 @@ import (
"github.com/apache/plc4x/plc4go/protocols/s7/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/default"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
+ "github.com/rs/zerolog"
)
type MessageCodec struct {
_default.DefaultCodec
+
+ log zerolog.Logger
}
-func NewMessageCodec(transportInstance transports.TransportInstance) *MessageCodec {
- codec := &MessageCodec{}
- codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance)
+func NewMessageCodec(transportInstance transports.TransportInstance, _options ...options.WithOption) *MessageCodec {
+ codec := &MessageCodec{
+ log: options.ExtractCustomLogger(_options...),
+ }
+ codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance, _options...)
return codec
}
@@ -43,7 +48,7 @@ func (m *MessageCodec) GetCodec() spi.MessageCodec {
}
func (m *MessageCodec) Send(message spi.Message) error {
- log.Trace().Msg("Sending message")
+ m.log.Trace().Msg("Sending message")
// Cast the message to the correct type of struct
tpktPacket := message.(model.TPKTPacketExactly)
// Serialize the request
@@ -63,34 +68,34 @@ func (m *MessageCodec) Send(message spi.Message) error {
func (m *MessageCodec) Receive() (spi.Message, error) {
// We need at least 6 bytes in order to know how big the packet is in total
if num, err := m.GetTransportInstance().GetNumBytesAvailableInBuffer(); (err == nil) && (num >= 4) {
- log.Debug().Msgf("we got %d readable bytes", num)
+ m.log.Debug().Msgf("we got %d readable bytes", num)
data, err := m.GetTransportInstance().PeekReadableBytes(4)
if err != nil {
- log.Warn().Err(err).Msg("error peeking")
+ m.log.Warn().Err(err).Msg("error peeking")
// TODO: Possibly clean up ...
return nil, nil
}
// Get the size of the entire packet
packetSize := (uint32(data[2]) << 8) + uint32(data[3])
if num < packetSize {
- log.Debug().Msgf("Not enough bytes. Got: %d Need: %d\n", num, packetSize)
+ m.log.Debug().Msgf("Not enough bytes. Got: %d Need: %d\n", num, packetSize)
return nil, nil
}
data, err = m.GetTransportInstance().Read(packetSize)
if err != nil {
- log.Debug().Err(err).Msg("Error reading")
+ m.log.Debug().Err(err).Msg("Error reading")
// TODO: Possibly clean up ...
return nil, nil
}
tpktPacket, err := model.TPKTPacketParse(data)
if err != nil {
- log.Warn().Err(err).Msg("error parsing")
+ m.log.Warn().Err(err).Msg("error parsing")
// TODO: Possibly clean up ...
return nil, nil
}
return tpktPacket, nil
} else if err != nil {
- log.Warn().Err(err).Msg("Got error reading")
+ m.log.Warn().Err(err).Msg("Got error reading")
return nil, nil
}
// TODO: maybe we return here a not enough error error
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index 4a99674f62..7b34d87748 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -21,7 +21,9 @@ package s7
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transactions"
+ "github.com/rs/zerolog"
"time"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -32,26 +34,28 @@ import (
spiValues "github.com/apache/plc4x/plc4go/spi/values"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Reader struct {
tpduGenerator *TpduGenerator
messageCodec spi.MessageCodec
tm transactions.RequestTransactionManager
+
+ log zerolog.Logger
}
-func NewReader(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager) *Reader {
+func NewReader(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager, _options ...options.WithOption) *Reader {
return &Reader{
tpduGenerator: tpduGenerator,
messageCodec: messageCodec,
tm: tm,
+ log: options.ExtractCustomLogger(_options...),
}
}
func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
// TODO: handle ctx
- log.Trace().Msg("Reading")
+ m.log.Trace().Msg("Reading")
result := make(chan apiModel.PlcReadRequestResult, 1)
go func() {
defer func() {
@@ -90,7 +94,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
s7MessageRequest = readWriteModel.NewS7MessageRequest(tpduId, request.Parameter, request.Payload)
// Assemble the finished paket
- log.Trace().Msg("Assemble paket")
+ m.log.Trace().Msg("Assemble paket")
// TODO: why do we use a uint16 above and the cotp a uint8?
tpktPacket := readWriteModel.NewTPKTPacket(
readWriteModel.NewCOTPPacketData(true,
@@ -105,7 +109,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
transaction.Submit(func(transaction transactions.RequestTransaction) {
// Send the over the wire
- log.Trace().Msg("Send ")
+ m.log.Trace().Msg("Send ")
if err := m.messageCodec.SendRequest(ctx, tpktPacket, func(message spi.Message) bool {
tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
if !ok {
@@ -122,12 +126,12 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
return payload.GetTpduReference() == tpduId
}, func(message spi.Message) error {
// Convert the response into an
- log.Trace().Msg("convert response to ")
+ m.log.Trace().Msg("convert response to ")
tpktPacket := message.(readWriteModel.TPKTPacket)
cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
payload := cotpPacketData.GetPayload()
// Convert the s7 response into a PLC4X response
- log.Trace().Msg("convert response to PLC4X response")
+ m.log.Trace().Msg("convert response to PLC4X response")
readResponse, err := m.ToPlc4xReadResponse(payload, readRequest)
if err != nil {
@@ -184,16 +188,16 @@ func (m *Reader) ToPlc4xReadResponse(response readWriteModel.S7Message, readRequ
if (errorClass != 0) || (errorCode != 0) {
// This is usually the case if PUT/GET wasn't enabled on the PLC
if (errorClass == 129) && (errorCode == 4) {
- log.Warn().Msg("Got an error response from the PLC. This particular response code usually indicates " +
+ m.log.Warn().Msg("Got an error response from the PLC. This particular response code usually indicates " +
"that PUT/GET is not enabled on the PLC.")
for _, tagName := range readRequest.GetTagNames() {
responseCodes[tagName] = apiModel.PlcResponseCode_ACCESS_DENIED
plcValues[tagName] = spiValues.NewPlcNULL()
}
- log.Trace().Msg("Returning the response")
+ m.log.Trace().Msg("Returning the response")
return spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues), nil
} else {
- log.Warn().Msgf("Got an unknown error response from the PLC. Error Class: %d, Error Code %d. "+
+ m.log.Warn().Msgf("Got an unknown error response from the PLC. Error Class: %d, Error Code %d. "+
"We probably need to implement explicit handling for this, so please file a bug-report "+
"on https://issues.apache.org/jira/projects/PLC4X and ideally attach a WireShark dump "+
"containing a capture of the communication.",
@@ -223,7 +227,7 @@ func (m *Reader) ToPlc4xReadResponse(response readWriteModel.S7Message, readRequ
responseCode := decodeResponseCode(payloadItem.GetReturnCode())
// Decode the data according to the information from the request
- log.Trace().Msg("decode data")
+ m.log.Trace().Msg("decode data")
responseCodes[tagName] = responseCode
if responseCode == apiModel.PlcResponseCode_OK {
plcValue, err := readWriteModel.DataItemParse(context.Background(), payloadItem.GetData(), tag.GetDataType().DataProtocolId(), int32(tag.GetNumElements()))
@@ -235,7 +239,7 @@ func (m *Reader) ToPlc4xReadResponse(response readWriteModel.S7Message, readRequ
}
// Return the response
- log.Trace().Msg("Returning the response")
+ m.log.Trace().Msg("Returning the response")
return spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues), nil
}
diff --git a/plc4go/internal/s7/ValueHandler.go b/plc4go/internal/s7/ValueHandler.go
index 73fd0238e4..5fc89402c4 100644
--- a/plc4go/internal/s7/ValueHandler.go
+++ b/plc4go/internal/s7/ValueHandler.go
@@ -20,13 +20,16 @@
package s7
import (
- "github.com/apache/plc4x/plc4go/spi/values"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ spiValues "github.com/apache/plc4x/plc4go/spi/values"
)
type ValueHandler struct {
- values.DefaultValueHandler
+ spiValues.DefaultValueHandler
}
-func NewValueHandler() ValueHandler {
- return ValueHandler{}
+func NewValueHandler(_options ...options.WithOption) ValueHandler {
+ return ValueHandler{
+ spiValues.NewDefaultValueHandler(_options...),
+ }
}
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index 2bf9a8f3b5..bb67795494 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -21,7 +21,9 @@ package s7
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transactions"
+ "github.com/rs/zerolog"
"time"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -31,20 +33,22 @@ import (
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/pkg/errors"
- "github.com/rs/zerolog/log"
)
type Writer struct {
tpduGenerator *TpduGenerator
messageCodec spi.MessageCodec
tm transactions.RequestTransactionManager
+
+ log zerolog.Logger
}
-func NewWriter(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager) Writer {
+func NewWriter(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager, _options ...options.WithOption) Writer {
return Writer{
tpduGenerator: tpduGenerator,
messageCodec: messageCodec,
tm: tm,
+ log: options.ExtractCustomLogger(_options...),
}
}
@@ -85,7 +89,7 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
)
// Assemble the finished paket
- log.Trace().Msg("Assemble paket")
+ m.log.Trace().Msg("Assemble paket")
// TODO: why do we use a uint16 above and the cotp a uint8?
tpktPacket := readWriteModel.NewTPKTPacket(
readWriteModel.NewCOTPPacketData(
@@ -117,12 +121,12 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
return payload.GetTpduReference() == tpduId
}, func(message spi.Message) error {
// Convert the response into an
- log.Trace().Msg("convert response to ")
+ m.log.Trace().Msg("convert response to ")
tpktPacket := message.(readWriteModel.TPKTPacket)
cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
payload := cotpPacketData.GetPayload()
// Convert the s7 response into a PLC4X response
- log.Trace().Msg("convert response to PLC4X response")
+ m.log.Trace().Msg("convert response to PLC4X response")
readResponse, err := m.ToPlc4xWriteResponse(payload, writeRequest)
if err != nil {
@@ -171,15 +175,15 @@ func (m Writer) ToPlc4xWriteResponse(response readWriteModel.S7Message, writeReq
if (errorClass != 0) || (errorCode != 0) {
// This is usually the case if PUT/GET wasn't enabled on the PLC
if (errorClass == 129) && (errorCode == 4) {
- log.Warn().Msg("Got an error response from the PLC. This particular response code usually indicates " +
+ m.log.Warn().Msg("Got an error response from the PLC. This particular response code usually indicates " +
"that PUT/GET is not enabled on the PLC.")
for _, tagName := range writeRequest.GetTagNames() {
responseCodes[tagName] = apiModel.PlcResponseCode_ACCESS_DENIED
}
- log.Trace().Msg("Returning the response")
+ m.log.Trace().Msg("Returning the response")
return spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil
} else {
- log.Warn().Msgf("Got an unknown error response from the PLC. Error Class: %d, Error Code %d. "+
+ m.log.Warn().Msgf("Got an unknown error response from the PLC. Error Class: %d, Error Code %d. "+
"We probably need to implement explicit handling for this, so please file a bug-report "+
"on https://issues.apache.org/jira/projects/PLC4X and ideally attach a WireShark dump "+
"containing a capture of the communication.",
@@ -207,12 +211,12 @@ func (m Writer) ToPlc4xWriteResponse(response readWriteModel.S7Message, writeReq
responseCode := decodeResponseCode(payloadItem.GetReturnCode())
// Decode the data according to the information from the request
- log.Trace().Msg("decode data")
+ m.log.Trace().Msg("decode data")
responseCodes[tagName] = responseCode
}
// Return the response
- log.Trace().Msg("Returning the response")
+ m.log.Trace().Msg("Returning the response")
return spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil
}
diff --git a/plc4go/internal/simulated/Device.go b/plc4go/internal/simulated/Device.go
index 76d48bfa79..89fcb47ea7 100644
--- a/plc4go/internal/simulated/Device.go
+++ b/plc4go/internal/simulated/Device.go
@@ -21,50 +21,54 @@ package simulated
import (
"context"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/rs/zerolog"
"math/rand"
"github.com/apache/plc4x/plc4go/pkg/api/values"
"github.com/apache/plc4x/plc4go/protocols/simulated/readwrite/model"
- "github.com/rs/zerolog/log"
)
type Device struct {
Name string
State map[simulatedTag]*values.PlcValue
+
+ log zerolog.Logger
}
-func NewDevice(name string) *Device {
+func NewDevice(name string, _options ...options.WithOption) *Device {
return &Device{
Name: name,
State: make(map[simulatedTag]*values.PlcValue),
+ log: options.ExtractCustomLogger(_options...),
}
}
-func (t *Device) Get(tag simulatedTag) *values.PlcValue {
+func (d *Device) Get(tag simulatedTag) *values.PlcValue {
switch tag.TagType {
case TagState:
- return t.State[tag]
+ return d.State[tag]
case TagRandom:
- return t.getRandomValue(tag)
+ return d.getRandomValue(tag)
}
return nil
}
-func (t *Device) Set(tag simulatedTag, value *values.PlcValue) {
+func (d *Device) Set(tag simulatedTag, value *values.PlcValue) {
switch tag.TagType {
case TagState:
- t.State[tag] = value
+ d.State[tag] = value
break
case TagRandom:
- // TODO: Doesn't really make any sense to write a random
+ // TODO: Doesn'd really make any sense to write a random
break
case TagStdOut:
- log.Debug().Msgf("TEST PLC STDOUT [%s]: %s", tag.Name, (*value).GetString())
+ d.log.Debug().Msgf("TEST PLC STDOUT [%s]: %s", tag.Name, (*value).GetString())
break
}
}
-func (t *Device) getRandomValue(tag simulatedTag) *values.PlcValue {
+func (d *Device) getRandomValue(tag simulatedTag) *values.PlcValue {
size := tag.GetDataTypeSize().DataTypeSize()
data := make([]byte, uint16(size)*tag.Quantity)
rand.Read(data)
diff --git a/plc4go/internal/simulated/Device_test.go b/plc4go/internal/simulated/Device_test.go
index e1ddec33e5..729a694185 100644
--- a/plc4go/internal/simulated/Device_test.go
+++ b/plc4go/internal/simulated/Device_test.go
@@ -26,8 +26,6 @@ import (
apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/simulated/readwrite/model"
spiValues "github.com/apache/plc4x/plc4go/spi/values"
-
- "github.com/rs/zerolog/log"
)
func TestDevice_Get(t1 *testing.T) {
@@ -85,19 +83,19 @@ func TestDevice_Get(t1 *testing.T) {
},
}
for _, tt := range tests {
- t1.Run(tt.name, func(t1 *testing.T) {
- t := &Device{
+ t1.Run(tt.name, func(t *testing.T) {
+ d := &Device{
Name: tt.fields.Name,
State: tt.fields.State,
}
- got := t.Get(tt.args.field)
+ got := d.Get(tt.args.field)
if got != nil {
- log.Debug().Msgf("Result: %v", *got)
+ t.Logf("Result: %v", *got)
} else {
- log.Debug().Msg("Result: nil")
+ t.Logf("Result: nil")
}
- if tt.args.verifyOutput && !assert.Equal(t1, tt.want, got) {
- t1.Errorf("Get() = %v, want %v", got, tt.want)
+ if tt.args.verifyOutput && !assert.Equal(t, tt.want, got) {
+ t.Errorf("Get() = %v, want %v", got, tt.want)
}
})
}
@@ -107,7 +105,7 @@ func TestDevice_Get(t1 *testing.T) {
* When first playing around with random apiValues I only got "false" apiValues.
* So I added this test in order to verify I'm actually getting random apiValues.
*/
-func TestDevice_Random(t1 *testing.T) {
+func TestDevice_Random(t *testing.T) {
type fields struct {
Name string
State map[simulatedTag]*apiValues.PlcValue
@@ -136,15 +134,15 @@ func TestDevice_Random(t1 *testing.T) {
},
}
for _, tt := range tests {
- t1.Run(tt.name, func(t1 *testing.T) {
- t := &Device{
+ t.Run(tt.name, func(t *testing.T) {
+ d := &Device{
Name: tt.fields.Name,
State: tt.fields.State,
}
numTrue := 0
numFalse := 0
for i := 0; i < tt.args.numRuns; i++ {
- got := t.Get(tt.args.field)
+ got := d.Get(tt.args.field)
boolValue := (*got).GetBool()
if boolValue {
numTrue++
@@ -153,9 +151,9 @@ func TestDevice_Random(t1 *testing.T) {
}
}
if numTrue == 0 || numFalse == 0 {
- t1.Errorf("Random doesn't seem to work. In %d runs I got %d true and %d false apiValues", tt.args.numRuns, numTrue, numFalse)
+ t.Errorf("Random doesn'd seem to work. In %d runs I got %d true and %d false apiValues", tt.args.numRuns, numTrue, numFalse)
} else {
- log.Info().Msgf("In %d runs I got %d true and %d false apiValues", tt.args.numRuns, numTrue, numFalse)
+ t.Logf("In %d runs I got %d true and %d false apiValues", tt.args.numRuns, numTrue, numFalse)
}
})
}
diff --git a/plc4go/internal/simulated/Driver.go b/plc4go/internal/simulated/Driver.go
index 2e6110f4b8..847b8cb396 100644
--- a/plc4go/internal/simulated/Driver.go
+++ b/plc4go/internal/simulated/Driver.go
@@ -47,8 +47,8 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
return driver
}
-func (d *Driver) GetConnectionWithContext(ctx context.Context, _ url.URL, _ map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- connection := NewConnection(NewDevice("test"), d.GetPlcTagHandler(), d.valueHandler, options)
+func (d *Driver) GetConnectionWithContext(ctx context.Context, _ url.URL, _ map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+ connection := NewConnection(NewDevice("test", options.WithCustomLogger(d.log)), d.GetPlcTagHandler(), d.valueHandler, driverOptions)
d.log.Debug().Msgf("Connecting and returning connection %v", connection)
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/pkg/api/logging/init.go b/plc4go/pkg/api/logging/init.go
index c7fb0bba6b..5d1c1bf20d 100644
--- a/plc4go/pkg/api/logging/init.go
+++ b/plc4go/pkg/api/logging/init.go
@@ -17,54 +17,55 @@
* under the License.
*/
+// Deprecated: use options to configure logging
package logging
import (
"github.com/rs/zerolog"
- "github.com/rs/zerolog/log"
+ globalLog "github.com/rs/zerolog/log"
)
var oldLogger zerolog.Logger
// Deprecated: use config.WithCustomLogger
-// init is used for _ imports for easy log config
+// init is used for _ imports for easy globalLog config
func init() {
- oldLogger = log.Logger
- log.Logger = log.Logger.Level(zerolog.ErrorLevel)
+ oldLogger = globalLog.Logger
+ globalLog.Logger = globalLog.Logger.Level(zerolog.ErrorLevel)
}
// Deprecated: use config.WithCustomLogger
// ErrorLevel configures zerolog to WarnLevel
func ErrorLevel() {
- log.Logger = log.Logger.Level(zerolog.ErrorLevel)
+ globalLog.Logger = globalLog.Logger.Level(zerolog.ErrorLevel)
}
// Deprecated: use config.WithCustomLogger
// WarnLevel configures zerolog to WarnLevel
func WarnLevel() {
- log.Logger = log.Logger.Level(zerolog.WarnLevel)
+ globalLog.Logger = globalLog.Logger.Level(zerolog.WarnLevel)
}
// Deprecated: use config.WithCustomLogger
// InfoLevel configures zerolog to InfoLevel
func InfoLevel() {
- log.Logger = log.Logger.Level(zerolog.InfoLevel)
+ globalLog.Logger = globalLog.Logger.Level(zerolog.InfoLevel)
}
// Deprecated: use config.WithCustomLogger
// DebugLevel configures zerolog to DebugLevel
func DebugLevel() {
- log.Logger = log.Logger.Level(zerolog.DebugLevel)
+ globalLog.Logger = globalLog.Logger.Level(zerolog.DebugLevel)
}
// Deprecated: use config.WithCustomLogger
// TraceLevel configures zerolog to TraceLevel
func TraceLevel() {
- log.Logger = log.Logger.Level(zerolog.TraceLevel)
+ globalLog.Logger = globalLog.Logger.Level(zerolog.TraceLevel)
}
// Deprecated: use config.WithCustomLogger
-// ResetLogging can be used to reset to the old log settings
+// ResetLogging can be used to reset to the old globalLog settings
func ResetLogging() {
- log.Logger = oldLogger
+ globalLog.Logger = oldLogger
}
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 53ec5c6bae..2eb58690ed 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -61,7 +61,9 @@ type DefaultExpectation struct {
HandleError spi.HandleError
}
-func WithCustomMessageHandler(customMessageHandler func(codec DefaultCodecRequirements, message spi.Message) bool) options.WithOption {
+type CustomMessageHandler func(codec DefaultCodecRequirements, message spi.Message) bool
+
+func WithCustomMessageHandler(customMessageHandler CustomMessageHandler) options.WithOption {
return withCustomMessageHandler{customMessageHandler: customMessageHandler}
}
diff --git a/plc4go/spi/model/DefaultPlcSubscriptionEvent.go b/plc4go/spi/model/DefaultPlcSubscriptionEvent.go
index b4f91f4cb0..3241757fe7 100644
--- a/plc4go/spi/model/DefaultPlcSubscriptionEvent.go
+++ b/plc4go/spi/model/DefaultPlcSubscriptionEvent.go
@@ -20,20 +20,22 @@
package model
import (
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/rs/zerolog"
"time"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
"github.com/apache/plc4x/plc4go/spi/utils"
spiValues "github.com/apache/plc4x/plc4go/spi/values"
-
- "github.com/rs/zerolog/log"
)
//go:generate go run ../../tools/plc4xgenerator/gen.go -type=DefaultPlcSubscriptionEvent
type DefaultPlcSubscriptionEvent struct {
DefaultPlcSubscriptionEventRequirements `ignore:"true"` // Avoid recursion
values map[string]*DefaultPlcSubscriptionEventItem
+
+ log zerolog.Logger `ignore:"true"`
}
type DefaultPlcSubscriptionEventRequirements interface {
@@ -48,6 +50,7 @@ func NewDefaultPlcSubscriptionEvent(
intervals map[string]time.Duration,
responseCodes map[string]apiModel.PlcResponseCode,
values map[string]apiValues.PlcValue,
+ _options ...options.WithOption,
) apiModel.PlcSubscriptionEvent {
valueMap := map[string]*DefaultPlcSubscriptionEventItem{}
@@ -62,6 +65,7 @@ func NewDefaultPlcSubscriptionEvent(
return &DefaultPlcSubscriptionEvent{
DefaultPlcSubscriptionEventRequirements: defaultPlcSubscriptionEventRequirements,
values: valueMap,
+ log: options.ExtractCustomLogger(_options...),
}
}
@@ -88,7 +92,7 @@ func (d *DefaultPlcSubscriptionEvent) GetResponseCode(name string) apiModel.PlcR
func (d *DefaultPlcSubscriptionEvent) GetTag(name string) apiModel.PlcTag {
item := d.values[name]
if item == nil {
- log.Warn().Msgf("field for %s not found", name)
+ d.log.Warn().Msgf("field for %s not found", name)
return nil
}
return item.GetTag()
@@ -97,7 +101,7 @@ func (d *DefaultPlcSubscriptionEvent) GetTag(name string) apiModel.PlcTag {
func (d *DefaultPlcSubscriptionEvent) GetType(name string) SubscriptionType {
item := d.values[name]
if item == nil {
- log.Warn().Msgf("field for %s not found", name)
+ d.log.Warn().Msgf("field for %s not found", name)
return 0
}
return item.GetSubscriptionType()
@@ -106,7 +110,7 @@ func (d *DefaultPlcSubscriptionEvent) GetType(name string) SubscriptionType {
func (d *DefaultPlcSubscriptionEvent) GetInterval(name string) time.Duration {
item := d.values[name]
if item == nil {
- log.Warn().Msgf("field for %s not found", name)
+ d.log.Warn().Msgf("field for %s not found", name)
return -1
}
return item.GetInterval()
@@ -115,7 +119,7 @@ func (d *DefaultPlcSubscriptionEvent) GetInterval(name string) time.Duration {
func (d *DefaultPlcSubscriptionEvent) GetValue(name string) apiValues.PlcValue {
item := d.values[name]
if item == nil {
- log.Warn().Msgf("field for %s not found", name)
+ d.log.Warn().Msgf("field for %s not found", name)
return spiValues.PlcNull{}
}
return item.GetValue()
diff --git a/plc4go/spi/model/DefaultPlcSubscriptionResponse.go b/plc4go/spi/model/DefaultPlcSubscriptionResponse.go
index 72c9dd4102..f36b3dc03e 100644
--- a/plc4go/spi/model/DefaultPlcSubscriptionResponse.go
+++ b/plc4go/spi/model/DefaultPlcSubscriptionResponse.go
@@ -20,10 +20,9 @@
package model
import (
- "github.com/pkg/errors"
- "github.com/rs/zerolog/log"
-
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/pkg/errors"
)
//go:generate go run ../../tools/plc4xgenerator/gen.go -type=DefaultPlcSubscriptionResponse
@@ -32,7 +31,12 @@ type DefaultPlcSubscriptionResponse struct {
values map[string]*DefaultPlcSubscriptionResponseItem
}
-func NewDefaultPlcSubscriptionResponse(request apiModel.PlcSubscriptionRequest, responseCodes map[string]apiModel.PlcResponseCode, values map[string]apiModel.PlcSubscriptionHandle) apiModel.PlcSubscriptionResponse {
+func NewDefaultPlcSubscriptionResponse(
+ request apiModel.PlcSubscriptionRequest,
+ responseCodes map[string]apiModel.PlcResponseCode,
+ values map[string]apiModel.PlcSubscriptionHandle,
+ _options ...options.WithOption,
+) apiModel.PlcSubscriptionResponse {
valueMap := map[string]*DefaultPlcSubscriptionResponseItem{}
for name, code := range responseCodes {
value := values[name]
@@ -42,10 +46,11 @@ func NewDefaultPlcSubscriptionResponse(request apiModel.PlcSubscriptionRequest,
request: request,
values: valueMap,
}
+ localLog := options.ExtractCustomLogger(_options...)
for subscriptionTagName, consumers := range request.(*DefaultPlcSubscriptionRequest).preRegisteredConsumers {
subscriptionHandle, err := plcSubscriptionResponse.GetSubscriptionHandle(subscriptionTagName)
if subscriptionHandle == nil || err != nil {
- log.Error().Msgf("PlcSubscriptionHandle for %s not found", subscriptionTagName)
+ localLog.Error().Msgf("PlcSubscriptionHandle for %s not found", subscriptionTagName)
continue
}
for _, consumer := range consumers {
diff --git a/plc4go/spi/testutils/TestUtils.go b/plc4go/spi/testutils/TestUtils.go
index 91904cac6c..af5ad52446 100644
--- a/plc4go/spi/testutils/TestUtils.go
+++ b/plc4go/spi/testutils/TestUtils.go
@@ -20,6 +20,7 @@
package testutils
import (
+ "github.com/rs/zerolog/log"
"os"
"runtime/debug"
"strings"
@@ -31,7 +32,6 @@ import (
"github.com/ajankovic/xdiff/parser"
"github.com/pkg/errors"
"github.com/rs/zerolog"
- "github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
)
diff --git a/plc4go/spi/utils/IdGenerator.go b/plc4go/spi/utils/IdGenerator.go
index a44e2f97df..777a33ccb7 100644
--- a/plc4go/spi/utils/IdGenerator.go
+++ b/plc4go/spi/utils/IdGenerator.go
@@ -27,9 +27,9 @@ import (
var randomByteFiller = rand.Read
-func GenerateId(log zerolog.Logger, numBytes int) string {
+func GenerateId(localLog zerolog.Logger, numBytes int) string {
transactionIdBytes := make([]byte, numBytes)
n, err := randomByteFiller(transactionIdBytes)
- log.Trace().Err(err).Msgf("Read %d bytes", n)
+ localLog.Trace().Err(err).Msgf("Read %d bytes", n)
return hex.EncodeToString(transactionIdBytes)
}
diff --git a/plc4go/spi/values/PlcValueHandler.go b/plc4go/spi/values/PlcValueHandler.go
index 127f01bd1d..1156ec6608 100644
--- a/plc4go/spi/values/PlcValueHandler.go
+++ b/plc4go/spi/values/PlcValueHandler.go
@@ -22,6 +22,8 @@ package values
import (
"errors"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/rs/zerolog"
"reflect"
"strconv"
"strings"
@@ -29,10 +31,16 @@ import (
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
- "github.com/rs/zerolog/log"
)
type DefaultValueHandler struct {
+ log zerolog.Logger
+}
+
+func NewDefaultValueHandler(_options ...options.WithOption) DefaultValueHandler {
+ return DefaultValueHandler{
+ log: options.ExtractCustomLogger(_options...),
+ }
}
func (m DefaultValueHandler) NewPlcValue(tag apiModel.PlcTag, value any) (apiValues.PlcValue, error) {
@@ -96,7 +104,7 @@ func (m DefaultValueHandler) ParseSimpleType(tag apiModel.PlcTag, value any) (ap
stringValue := fmt.Sprintf("%v", value)
plcValue, err = m.NewPlcValueFromType(tag.GetValueType(), stringValue)
if err == nil {
- log.Debug().Msgf("had to convert %v into %v by using string conversion", value, plcValue)
+ m.log.Debug().Msgf("had to convert %v into %v by using string conversion", value, plcValue)
}
}
return plcValue, err