You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/22 09:58:46 UTC
[plc4x] branch develop updated: fix(plc4go): ensure options are passed downstream
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 840ca2a2d2 fix(plc4go): ensure options are passed downstream
840ca2a2d2 is described below
commit 840ca2a2d22cb33544050e73373f890d66c6c998
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Jun 22 11:58:39 2023 +0200
fix(plc4go): ensure options are passed downstream
---
plc4go/internal/ads/Connection.go | 2 +
plc4go/internal/ads/Driver.go | 21 +-
plc4go/internal/ads/Subscriber.go | 11 +-
plc4go/internal/ads/model/AdsSubscriptionHandle.go | 6 +-
plc4go/internal/bacnetip/Connection.go | 23 ++-
plc4go/internal/bacnetip/Subscriber.go | 8 +-
plc4go/internal/cbus/Connection.go | 29 ++-
plc4go/internal/cbus/Discoverer.go | 11 +-
plc4go/internal/cbus/Driver.go | 28 ++-
plc4go/internal/cbus/Subscriber.go | 8 +-
plc4go/internal/eip/Connection.go | 211 +++++++++++----------
plc4go/internal/eip/Driver.go | 61 +++---
plc4go/internal/knxnetip/Connection.go | 10 +-
plc4go/internal/knxnetip/Discoverer.go | 10 +-
plc4go/internal/knxnetip/Driver.go | 31 ++-
plc4go/internal/knxnetip/Subscriber.go | 40 ++--
plc4go/internal/modbus/AsciiDriver.go | 45 +++--
plc4go/internal/modbus/Connection.go | 76 ++++----
plc4go/internal/modbus/RtuDriver.go | 44 +++--
plc4go/internal/modbus/TcpDriver.go | 45 +++--
plc4go/internal/s7/Connection.go | 192 ++++++++++---------
plc4go/internal/s7/Driver.go | 61 +++---
plc4go/internal/simulated/Driver.go | 17 +-
plc4go/pkg/api/cache/PlcConnectionCache.go | 11 +-
24 files changed, 626 insertions(+), 375 deletions(-)
diff --git a/plc4go/internal/ads/Connection.go b/plc4go/internal/ads/Connection.go
index ffe40ada06..5f531bd12a 100644
--- a/plc4go/internal/ads/Connection.go
+++ b/plc4go/internal/ads/Connection.go
@@ -58,6 +58,7 @@ type Connection struct {
passLogToModel bool
log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewConnection(messageCodec spi.MessageCodec, configuration model.Configuration, connectionOptions map[string][]string, _options ...options.WithOption) (*Connection, error) {
@@ -74,6 +75,7 @@ func NewConnection(messageCodec spi.MessageCodec, configuration model.Configurat
subscriptions: map[uint32]apiModel.PlcSubscriptionHandle{},
passLogToModel: passLoggerToModel,
log: customLogger,
+ _options: _options,
}
if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
diff --git a/plc4go/internal/ads/Driver.go b/plc4go/internal/ads/Driver.go
index c6a515c8f4..8cc3e876b5 100644
--- a/plc4go/internal/ads/Driver.go
+++ b/plc4go/internal/ads/Driver.go
@@ -38,13 +38,15 @@ import (
type Driver struct {
_default.DefaultDriver
- log zerolog.Logger
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
driver := &Driver{
- log: customLogger,
+ log: customLogger,
+ _options: _options,
}
driver.DefaultDriver = _default.NewDefaultDriver(driver, "ads", "Beckhoff TwinCat ADS", "tcp", NewTagHandler())
return driver
@@ -63,7 +65,11 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
driverOptions["defaultTcpPort"] = []string{strconv.Itoa(int(adsModel.AdsConstants_ADSTCPDEFAULTPORT))}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions, options.WithCustomLogger(m.log))
+ transportInstance, err := transport.CreateTransportInstance(
+ transportUrl,
+ driverOptions,
+ append(m._options, options.WithCustomLogger(m.log))...,
+ )
if err != nil {
m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
@@ -72,7 +78,10 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
}
// Create a new codec for taking care of encoding/decoding of messages
- codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
+ codec := NewMessageCodec(
+ transportInstance,
+ append(m._options, options.WithCustomLogger(m.log))...,
+ )
m.log.Debug().Msgf("working with codec %#v", codec)
configuration, err := model.ParseFromOptions(m.log, driverOptions)
@@ -99,5 +108,7 @@ func (m *Driver) SupportsDiscovery() bool {
}
func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
- return NewDiscoverer(options.WithCustomLogger(m.log)).Discover(ctx, callback, discoveryOptions...)
+ return NewDiscoverer(
+ append(m._options, options.WithCustomLogger(m.log))...,
+ ).Discover(ctx, callback, discoveryOptions...)
}
diff --git a/plc4go/internal/ads/Subscriber.go b/plc4go/internal/ads/Subscriber.go
index 6a0c874859..743752b8b3 100644
--- a/plc4go/internal/ads/Subscriber.go
+++ b/plc4go/internal/ads/Subscriber.go
@@ -150,14 +150,19 @@ func (m *Connection) subscribe(ctx context.Context, subscriptionRequest apiModel
)
}
// Create a new subscription handle.
- subscriptionHandle := dirverModel.NewAdsSubscriptionHandle(m, tagName, directTag, options.WithCustomLogger(m.log))
+ subscriptionHandle := dirverModel.NewAdsSubscriptionHandle(
+ m,
+ tagName,
+ directTag,
+ append(m._options, options.WithCustomLogger(m.log))...,
+ )
responseChan <- spiModel.NewDefaultPlcSubscriptionRequestResult(
subscriptionRequest,
spiModel.NewDefaultPlcSubscriptionResponse(
subscriptionRequest,
map[string]apiModel.PlcResponseCode{tagName: apiModel.PlcResponseCode_OK},
map[string]apiModel.PlcSubscriptionHandle{tagName: subscriptionHandle},
- options.WithCustomLogger(m.log),
+ append(m._options, options.WithCustomLogger(m.log))...,
),
nil,
)
@@ -210,7 +215,7 @@ func (m *Connection) processSubscriptionResponses(_ context.Context, subscriptio
subscriptionRequest,
responseCodes,
subscriptionHandles,
- options.WithCustomLogger(m.log),
+ append(m._options, options.WithCustomLogger(m.log))...,
),
err,
)
diff --git a/plc4go/internal/ads/model/AdsSubscriptionHandle.go b/plc4go/internal/ads/model/AdsSubscriptionHandle.go
index 59ba802ba6..b805da0958 100644
--- a/plc4go/internal/ads/model/AdsSubscriptionHandle.go
+++ b/plc4go/internal/ads/model/AdsSubscriptionHandle.go
@@ -36,7 +36,8 @@ type AdsSubscriptionHandle struct {
directTag DirectPlcTag
consumers []apiModel.PlcSubscriptionEventConsumer
- log zerolog.Logger
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewAdsSubscriptionHandle(subscriber spi.PlcSubscriber, tagName string, directTag DirectPlcTag, _options ...options.WithOption) *AdsSubscriptionHandle {
@@ -47,6 +48,7 @@ func NewAdsSubscriptionHandle(subscriber spi.PlcSubscriber, tagName string, dire
directTag: directTag,
consumers: []apiModel.PlcSubscriptionEventConsumer{},
log: customLogger,
+ _options: _options,
}
}
@@ -70,7 +72,7 @@ func (t *AdsSubscriptionHandle) PublishPlcValue(value apiValues.PlcValue) {
map[string]time.Duration{t.tagName: time.Second},
map[string]apiModel.PlcResponseCode{t.tagName: apiModel.PlcResponseCode_OK},
map[string]apiValues.PlcValue{t.tagName: value},
- options.WithCustomLogger(t.log),
+ append(t._options, options.WithCustomLogger(t.log))...,
)
for _, consumer := range t.consumers {
consumer(&event)
diff --git a/plc4go/internal/bacnetip/Connection.go b/plc4go/internal/bacnetip/Connection.go
index 0192bfb1b6..30a28a5ba2 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -50,7 +50,8 @@ type Connection struct {
connectionId string
tracer tracer.Tracer
- log zerolog.Logger
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewConnection(messageCodec spi.MessageCodec, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, connectionOptions map[string][]string, _options ...options.WithOption) *Connection {
@@ -60,6 +61,7 @@ func NewConnection(messageCodec spi.MessageCodec, tagHandler spi.PlcTagHandler,
messageCodec: messageCodec,
tm: tm,
log: customLogger,
+ _options: _options,
}
if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
@@ -134,11 +136,26 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
}
func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
- return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.invokeIdGenerator, c.messageCodec, c.tm, options.WithCustomLogger(c.log)))
+ return spiModel.NewDefaultPlcReadRequestBuilder(
+ c.GetPlcTagHandler(),
+ NewReader(
+ &c.invokeIdGenerator,
+ c.messageCodec,
+ c.tm,
+ append(c._options, options.WithCustomLogger(c.log))...,
+ ),
+ )
}
func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
- return spiModel.NewDefaultPlcSubscriptionRequestBuilder(c.GetPlcTagHandler(), c.GetPlcValueHandler(), NewSubscriber(c, options.WithCustomLogger(c.log)))
+ return spiModel.NewDefaultPlcSubscriptionRequestBuilder(
+ c.GetPlcTagHandler(),
+ c.GetPlcValueHandler(),
+ NewSubscriber(
+ c,
+ append(c._options, options.WithCustomLogger(c.log))...,
+ ),
+ )
}
func (c *Connection) addSubscriber(subscriber *Subscriber) {
diff --git a/plc4go/internal/bacnetip/Subscriber.go b/plc4go/internal/bacnetip/Subscriber.go
index 856e98cca0..9d828b7b76 100644
--- a/plc4go/internal/bacnetip/Subscriber.go
+++ b/plc4go/internal/bacnetip/Subscriber.go
@@ -33,7 +33,8 @@ type Subscriber struct {
connection *Connection
consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
- log zerolog.Logger
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewSubscriber(connection *Connection, _options ...options.WithOption) *Subscriber {
@@ -42,7 +43,8 @@ func NewSubscriber(connection *Connection, _options ...options.WithOption) *Subs
connection: connection,
consumers: make(map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer),
- log: logger,
+ log: logger,
+ _options: _options,
}
}
@@ -72,7 +74,7 @@ func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel
subscriptionRequest,
responseCodes,
subscriptionValues,
- options.WithCustomLogger(m.log),
+ append(m._options, options.WithCustomLogger(m.log))...,
),
nil,
)
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 8b2322322b..2f9331ce94 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -74,7 +74,8 @@ type Connection struct {
connectionId string
tracer tracer.Tracer
- log zerolog.Logger `ignore:"true"`
+ log zerolog.Logger `ignore:"true"`
+ _options []options.WithOption `ignore:"true"` // Used to pass them downstream
}
func NewConnection(messageCodec *MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, connectionOptions map[string][]string, _options ...options.WithOption) *Connection {
@@ -86,7 +87,8 @@ func NewConnection(messageCodec *MessageCodec, configuration Configuration, driv
driverContext: driverContext,
tm: tm,
- log: customLogger,
+ log: customLogger,
+ _options: _options,
}
if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
@@ -175,7 +177,15 @@ func (c *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
}
func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
- return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.alphaGenerator, c.messageCodec, c.tm, options.WithCustomLogger(c.log)))
+ return spiModel.NewDefaultPlcReadRequestBuilder(
+ c.GetPlcTagHandler(),
+ NewReader(
+ &c.alphaGenerator,
+ c.messageCodec,
+ c.tm,
+ append(c._options, options.WithCustomLogger(c.log))...,
+ ),
+ )
}
func (c *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
@@ -186,7 +196,10 @@ func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionReques
return spiModel.NewDefaultPlcSubscriptionRequestBuilder(
c.GetPlcTagHandler(),
c.GetPlcValueHandler(),
- NewSubscriber(c.addSubscriber, options.WithCustomLogger(c.log)),
+ NewSubscriber(
+ c.addSubscriber,
+ append(c._options, options.WithCustomLogger(c.log))...,
+ ),
)
}
@@ -196,7 +209,13 @@ func (c *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRe
}
func (c *Connection) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder {
- return spiModel.NewDefaultPlcBrowseRequestBuilder(c.GetPlcTagHandler(), NewBrowser(c, options.WithCustomLogger(c.log)))
+ return spiModel.NewDefaultPlcBrowseRequestBuilder(
+ c.GetPlcTagHandler(),
+ NewBrowser(
+ c,
+ append(c._options, options.WithCustomLogger(c.log))...,
+ ),
+ )
}
func (c *Connection) addSubscriber(subscriber *Subscriber) {
diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
index 6deeda3c77..0aaea236c9 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -47,7 +47,8 @@ type Discoverer struct {
deviceScanningWorkItemId atomic.Int32
deviceScanningQueue pool.Executor
- log zerolog.Logger
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewDiscoverer(_options ...options.WithOption) *Discoverer {
@@ -57,7 +58,8 @@ func NewDiscoverer(_options ...options.WithOption) *Discoverer {
transportInstanceCreationQueue: pool.NewFixedSizeExecutor(50, 100, _options...),
deviceScanningQueue: pool.NewFixedSizeExecutor(50, 100, _options...),
- log: customLogger,
+ log: customLogger,
+ _options: _options,
}
}
@@ -222,7 +224,10 @@ func (d *Discoverer) createDeviceScanDispatcher(tcpTransportInstance *tcp.Transp
transportInstanceLogger := d.log.With().Stringer("transportInstance", tcpTransportInstance).Logger()
transportInstanceLogger.Debug().Msgf("Scanning %v", tcpTransportInstance)
// Create a codec for sending and receiving messages.
- codec := NewMessageCodec(tcpTransportInstance, options.WithCustomLogger(d.log))
+ codec := NewMessageCodec(
+ tcpTransportInstance,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
// Explicitly start the worker
if err := codec.ConnectWithContext(context.TODO()); err != nil {
transportInstanceLogger.Debug().Err(err).Msg("Error connecting")
diff --git a/plc4go/internal/cbus/Driver.go b/plc4go/internal/cbus/Driver.go
index 7068551722..686ff0d92f 100644
--- a/plc4go/internal/cbus/Driver.go
+++ b/plc4go/internal/cbus/Driver.go
@@ -41,7 +41,8 @@ type Driver struct {
awaitSetupComplete bool
awaitDisconnectComplete bool
- log zerolog.Logger
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
@@ -51,6 +52,7 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
awaitSetupComplete: true,
awaitDisconnectComplete: true,
log: customLogger,
+ _options: _options,
}
driver.DefaultDriver = _default.NewDefaultDriver(driver, "c-bus", "Clipsal Bus", "tcp", NewTagHandler())
return driver
@@ -67,7 +69,11 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
driverOptions["defaultTcpPort"] = []string{strconv.FormatUint(uint64(readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT), 10)}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions, options.WithCustomLogger(m.log))
+ transportInstance, err := transport.CreateTransportInstance(
+ transportUrl,
+ driverOptions,
+ append(m._options, options.WithCustomLogger(m.log))...,
+ )
if err != nil {
m.log.Error().Err(err).Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
return m.reportError(errors.Wrapf(err, "couldn't initialize transport configuration for given transport url %s", transportUrl.String()))
@@ -78,8 +84,10 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
m.log.Error().Err(err).Msgf("Invalid options")
return m.reportError(errors.Wrap(err, "Invalid options"))
}
- // TODO: we might need to remember the original options
- codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
+ codec := NewMessageCodec(
+ transportInstance,
+ append(m._options, options.WithCustomLogger(m.log))...,
+ )
m.log.Debug().Msgf("working with codec:\n%s", codec)
driverContext := NewDriverContext(configuration)
@@ -87,7 +95,13 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete
// Create the new connection
- connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, driverOptions, options.WithCustomLogger(m.log))
+ connection := NewConnection(
+ codec, configuration,
+ driverContext,
+ m.GetPlcTagHandler(),
+ m.tm, driverOptions,
+ append(m._options, options.WithCustomLogger(m.log))...,
+ )
m.log.Debug().Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
@@ -111,7 +125,9 @@ func (m *Driver) SupportsDiscovery() bool {
}
func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
- return NewDiscoverer(options.WithCustomLogger(m.log)).Discover(ctx, callback, discoveryOptions...)
+ return NewDiscoverer(
+ append(m._options, options.WithCustomLogger(m.log))...,
+ ).Discover(ctx, callback, discoveryOptions...)
}
func (m *Driver) Close() error {
diff --git a/plc4go/internal/cbus/Subscriber.go b/plc4go/internal/cbus/Subscriber.go
index 68d4d40d7b..992219efd3 100644
--- a/plc4go/internal/cbus/Subscriber.go
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -44,7 +44,8 @@ type Subscriber struct {
consumersMutex sync.RWMutex
- log zerolog.Logger `ignore:"true"`
+ log zerolog.Logger `ignore:"true"`
+ _options []options.WithOption `ignore:"true"` // Used to pass them downstream
}
func NewSubscriber(addSubscriber func(subscriber *Subscriber), _options ...options.WithOption) *Subscriber {
@@ -53,7 +54,8 @@ func NewSubscriber(addSubscriber func(subscriber *Subscriber), _options ...optio
addSubscriber: addSubscriber,
consumers: make(map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer),
- log: customLogger,
+ log: customLogger,
+ _options: _options,
}
}
@@ -95,7 +97,7 @@ func (s *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.P
subscriptionRequest,
responseCodes,
subscriptionValues,
- options.WithCustomLogger(s.log),
+ append(s._options, options.WithCustomLogger(s.log))...,
),
nil,
)
diff --git a/plc4go/internal/eip/Connection.go b/plc4go/internal/eip/Connection.go
index 11cf065eea..597f1f0715 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -62,7 +62,8 @@ type Connection struct {
routingAddress []readWriteModel.PathSegment
tracer tracer.Tracer
- log zerolog.Logger
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewConnection(
@@ -81,6 +82,7 @@ func NewConnection(
driverContext: driverContext,
tm: tm,
log: customLogger,
+ _options: _options,
}
if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
@@ -100,29 +102,29 @@ func NewConnection(
return connection
}
-func (m *Connection) GetConnectionId() string {
+func (c *Connection) GetConnectionId() string {
// TODO: Fix this
- return "" //m.connectionId
+ return "" //c.connectionId
}
-func (m *Connection) IsTraceEnabled() bool {
- return m.tracer != nil
+func (c *Connection) IsTraceEnabled() bool {
+ return c.tracer != nil
}
-func (m *Connection) GetTracer() tracer.Tracer {
- return m.tracer
+func (c *Connection) GetTracer() tracer.Tracer {
+ return c.tracer
}
-func (m *Connection) GetConnection() plc4go.PlcConnection {
- return m
+func (c *Connection) GetConnection() plc4go.PlcConnection {
+ return c
}
-func (m *Connection) GetMessageCodec() spi.MessageCodec {
- return m.messageCodec
+func (c *Connection) GetMessageCodec() spi.MessageCodec {
+ return c.messageCodec
}
-func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
- m.log.Trace().Msg("Connecting")
+func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
+ c.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
go func() {
defer func() {
@@ -130,28 +132,28 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
}
}()
- err := m.messageCodec.ConnectWithContext(ctx)
+ err := c.messageCodec.ConnectWithContext(ctx)
if err != nil {
- ch <- _default.NewDefaultPlcConnectionConnectResult(m, err)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(c, err)
}
// For testing purposes we can skip the waiting for a complete connection
- if !m.driverContext.awaitSetupComplete {
- go m.setupConnection(ctx, ch)
- m.log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
+ if !c.driverContext.awaitSetupComplete {
+ go c.setupConnection(ctx, ch)
+ c.log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
// Here we write directly and don't wait till the connection is "really" connected
- // Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete
- ch <- _default.NewDefaultPlcConnectionConnectResult(m, err)
- m.SetConnected(true)
+ // Note: we can't use fireConnected here as it's guarded against c.driverContext.awaitSetupComplete
+ ch <- _default.NewDefaultPlcConnectionConnectResult(c, err)
+ c.SetConnected(true)
return
}
- m.setupConnection(ctx, ch)
+ c.setupConnection(ctx, ch)
}()
return ch
}
-func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
+func (c *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
// TODO: use proper context
ctx := context.TODO()
result := make(chan plc4go.PlcConnectionCloseResult, 1)
@@ -161,10 +163,10 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
result <- _default.NewDefaultPlcConnectionCloseResult(nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
}
}()
- m.log.Debug().Msg("Sending UnregisterSession EIP Packet")
- _ = m.messageCodec.SendRequest(
+ c.log.Debug().Msg("Sending UnregisterSession EIP Packet")
+ _ = c.messageCodec.SendRequest(
ctx,
- readWriteModel.NewEipDisconnectRequest(m.sessionHandle, 0, []byte(DefaultSenderContext), 0), func(message spi.Message) bool {
+ readWriteModel.NewEipDisconnectRequest(c.sessionHandle, 0, []byte(DefaultSenderContext), 0), func(message spi.Message) bool {
return true
},
func(message spi.Message) error {
@@ -173,42 +175,42 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
func(err error) error {
return nil
},
- m.GetTtl(),
+ c.GetTtl(),
) //Unregister gets no response
- m.log.Debug().Msgf("Unregistred Session %d", m.sessionHandle)
+ c.log.Debug().Msgf("Unregistred Session %d", c.sessionHandle)
}()
return result
}
-func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
- if err := m.listServiceRequest(ctx, ch); err != nil {
- m.fireConnectionError(errors.Wrap(err, "error listing service request"), ch)
+func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
+ if err := c.listServiceRequest(ctx, ch); err != nil {
+ c.fireConnectionError(errors.Wrap(err, "error listing service request"), ch)
return
}
- if err := m.connectRegisterSession(ctx, ch); err != nil {
- m.fireConnectionError(errors.Wrap(err, "error connect register session"), ch)
+ if err := c.connectRegisterSession(ctx, ch); err != nil {
+ c.fireConnectionError(errors.Wrap(err, "error connect register session"), ch)
return
}
- if err := m.listAllAttributes(ctx, ch); err != nil {
- m.fireConnectionError(errors.Wrap(err, "error list all attributes"), ch)
+ if err := c.listAllAttributes(ctx, ch); err != nil {
+ c.fireConnectionError(errors.Wrap(err, "error list all attributes"), ch)
return
}
- if m.useConnectionManager {
+ if c.useConnectionManager {
// TODO: Continue here ....
} else {
// Send an event that connection setup is complete.
- m.fireConnected(ch)
+ c.fireConnected(ch)
}
}
-func (m *Connection) listServiceRequest(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) error {
- m.log.Debug().Msg("Sending ListServices Request")
+func (c *Connection) listServiceRequest(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) error {
+ c.log.Debug().Msg("Sending ListServices Request")
listServicesResultChan := make(chan readWriteModel.ListServicesResponse, 1)
listServicesResultErrorChan := make(chan error, 1)
- if err := m.messageCodec.SendRequest(
+ if err := c.messageCodec.SendRequest(
ctx,
readWriteModel.NewListServicesRequest(
EmptySessionHandle,
@@ -228,23 +230,23 @@ func (m *Connection) listServiceRequest(ctx context.Context, ch chan plc4go.PlcC
listServicesResponse := message.(readWriteModel.ListServicesResponse)
serviceResponse := listServicesResponse.GetTypeIds()[0].(readWriteModel.ServicesResponse)
if serviceResponse.GetSupportsCIPEncapsulation() {
- m.log.Debug().Msg("Device is capable of CIP over EIP encapsulation")
+ c.log.Debug().Msg("Device is capable of CIP over EIP encapsulation")
}
- m.cipEncapsulationAvailable = serviceResponse.GetSupportsCIPEncapsulation()
+ c.cipEncapsulationAvailable = serviceResponse.GetSupportsCIPEncapsulation()
listServicesResultChan <- listServicesResponse
return nil
},
func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
+ c.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ c.Close()
}
listServicesResultErrorChan <- errors.Wrap(err, "got error processing request")
return nil
},
- m.GetTtl()); err != nil {
- m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
+ c.GetTtl()); err != nil {
+ c.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
}
timeout := time.NewTimer(1 * time.Second)
@@ -259,11 +261,11 @@ func (m *Connection) listServiceRequest(ctx context.Context, ch chan plc4go.PlcC
}
}
-func (m *Connection) connectRegisterSession(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) error {
- m.log.Debug().Msg("Sending EipConnectionRequest")
+func (c *Connection) connectRegisterSession(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) error {
+ c.log.Debug().Msg("Sending EipConnectionRequest")
connectionResponseChan := make(chan readWriteModel.EipConnectionResponse, 1)
connectionResponseErrorChan := make(chan error, 1)
- if err := m.messageCodec.SendRequest(
+ if err := c.messageCodec.SendRequest(
ctx,
readWriteModel.NewEipConnectionRequest(
EmptySessionHandle,
@@ -280,12 +282,12 @@ func (m *Connection) connectRegisterSession(ctx context.Context, ch chan plc4go.
connectionResponse := eipPacket.(readWriteModel.EipConnectionResponse)
if connectionResponse != nil {
if connectionResponse.GetStatus() == 0 {
- m.sessionHandle = connectionResponse.GetSessionHandle()
- m.senderContext = connectionResponse.GetSenderContext()
- m.log.Debug().Msgf("Got assigned with Session %d", m.sessionHandle)
+ c.sessionHandle = connectionResponse.GetSessionHandle()
+ c.senderContext = connectionResponse.GetSenderContext()
+ c.log.Debug().Msgf("Got assigned with Session %d", c.sessionHandle)
connectionResponseChan <- connectionResponse
} else {
- m.log.Error().Msgf("Got unsuccessful status for connection request: %d", connectionResponse.GetStatus())
+ c.log.Error().Msgf("Got unsuccessful status for connection request: %d", connectionResponse.GetStatus())
connectionResponseErrorChan <- errors.New("got unsuccessful connection response")
}
} else {
@@ -294,17 +296,17 @@ func (m *Connection) connectRegisterSession(ctx context.Context, ch chan plc4go.
instanceSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(0, 1))
exchange := readWriteModel.NewUnConnectedDataItem(
readWriteModel.NewCipConnectionManagerRequest(classSegment, instanceSegment, 0, 10,
- 14, 536870914, 33944, m.connectionSerialNumber,
+ 14, 536870914, 33944, c.connectionSerialNumber,
4919, 42, 3, 2101812,
readWriteModel.NewNetworkConnectionParameters(4002, false, 2, 0, true),
2113537,
readWriteModel.NewNetworkConnectionParameters(4002, false, 2, 0, true),
readWriteModel.NewTransportType(true, 2, 3),
- m.connectionPathSize, m.routingAddress, 1))
+ c.connectionPathSize, c.routingAddress, 1))
typeIds := []readWriteModel.TypeId{readWriteModel.NewNullAddressItem(), exchange}
- eipWrapper := readWriteModel.NewCipRRData(m.sessionHandle, 0, typeIds,
- m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), m.senderContext, 0)
- if err := m.messageCodec.SendRequest(
+ eipWrapper := readWriteModel.NewCipRRData(c.sessionHandle, 0, typeIds,
+ c.sessionHandle, uint32(readWriteModel.CIPStatus_Success), c.senderContext, 0)
+ if err := c.messageCodec.SendRequest(
ctx,
eipWrapper,
func(message spi.Message) bool {
@@ -320,8 +322,8 @@ func (m *Connection) connectRegisterSession(ctx context.Context, ch chan plc4go.
if cipRRData.GetStatus() == 0 {
unconnectedDataItem := cipRRData.GetTypeIds()[1].(readWriteModel.UnConnectedDataItem)
connectionManagerResponse := unconnectedDataItem.GetService().(readWriteModel.CipConnectionManagerResponse)
- m.connectionId = connectionManagerResponse.GetOtConnectionId()
- m.log.Debug().Msgf("Got assigned with connection if %d", m.connectionId)
+ c.connectionId = connectionManagerResponse.GetOtConnectionId()
+ c.log.Debug().Msgf("Got assigned with connection if %d", c.connectionId)
connectionResponseChan <- connectionResponse
} else {
connectionResponseErrorChan <- fmt.Errorf("got status code while opening Connection manager: %d", cipRRData.GetStatus())
@@ -331,15 +333,15 @@ func (m *Connection) connectRegisterSession(ctx context.Context, ch chan plc4go.
func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
+ c.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ c.Close()
}
connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
return nil
},
- m.GetTtl(),
+ c.GetTtl(),
); err != nil {
- m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
+ c.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
}
}
return nil
@@ -347,15 +349,15 @@ func (m *Connection) connectRegisterSession(ctx context.Context, ch chan plc4go.
func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
+ c.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ c.Close()
}
connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
return nil
},
- m.GetTtl(),
+ c.GetTtl(),
); err != nil {
- m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
+ c.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
}
timeout := time.NewTimer(1 * time.Second)
defer utils.CleanupTimer(timeout)
@@ -369,13 +371,13 @@ func (m *Connection) connectRegisterSession(ctx context.Context, ch chan plc4go.
}
}
-func (m *Connection) listAllAttributes(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) error {
- m.log.Debug().Msg("Sending ListAllAttributes Request")
+func (c *Connection) listAllAttributes(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) error {
+ c.log.Debug().Msg("Sending ListAllAttributes Request")
listAllAttributesResponseChan := make(chan readWriteModel.GetAttributeAllResponse, 1)
listAllAttributesErrorChan := make(chan error, 1)
classSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(uint8(0), uint8(2)))
instanceSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewInstanceID(uint8(0), uint8(1)))
- if err := m.messageCodec.SendRequest(
+ if err := c.messageCodec.SendRequest(
ctx,
readWriteModel.NewCipRRData(
EmptyInterfaceHandle,
@@ -386,9 +388,9 @@ func (m *Connection) listAllAttributes(ctx context.Context, ch chan plc4go.PlcCo
readWriteModel.NewGetAttributeAllRequest(
classSegment, instanceSegment, uint16(0))),
},
- m.sessionHandle,
+ c.sessionHandle,
uint32(readWriteModel.CIPStatus_Success),
- m.senderContext,
+ c.senderContext,
0,
),
func(message spi.Message) bool {
@@ -407,14 +409,14 @@ func (m *Connection) listAllAttributes(ctx context.Context, ch chan plc4go.PlcCo
if curCipClassId, ok := readWriteModel.CIPClassIDByValue(classId); ok {
switch curCipClassId {
case readWriteModel.CIPClassID_MessageRouter:
- m.useMessageRouter = true
+ c.useMessageRouter = true
case readWriteModel.CIPClassID_ConnectionManager:
- m.useConnectionManager = true
+ c.useConnectionManager = true
}
}
}
}
- m.log.Debug().Msgf("Connection using message router %t, using connection manager %t", m.useMessageRouter, m.useConnectionManager)
+ c.log.Debug().Msgf("Connection using message router %t, using connection manager %t", c.useMessageRouter, c.useConnectionManager)
listAllAttributesResponseChan <- response
}
return nil
@@ -422,15 +424,15 @@ func (m *Connection) listAllAttributes(ctx context.Context, ch chan plc4go.PlcCo
func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
+ c.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ c.Close()
}
- m.fireConnectionError(errors.Wrap(err, "got error processing request"), ch)
+ c.fireConnectionError(errors.Wrap(err, "got error processing request"), ch)
return nil
},
- m.GetTtl(),
+ c.GetTtl(),
); err != nil {
- m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
+ c.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
}
timeout := time.NewTimer(1 * time.Second)
@@ -445,39 +447,58 @@ func (m *Connection) listAllAttributes(ctx context.Context, ch chan plc4go.PlcCo
}
}
-func (m *Connection) fireConnectionError(err error, ch chan<- plc4go.PlcConnectionConnectResult) {
- if m.driverContext.awaitSetupComplete {
+func (c *Connection) fireConnectionError(err error, ch chan<- plc4go.PlcConnectionConnectResult) {
+ if c.driverContext.awaitSetupComplete {
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Error during connection"))
} else {
- m.log.Error().Err(err).Msg("awaitSetupComplete set to false and we got a error during connect")
+ c.log.Error().Err(err).Msg("awaitSetupComplete set to false and we got a error during connect")
}
}
-func (m *Connection) fireConnected(ch chan<- plc4go.PlcConnectionConnectResult) {
- if m.driverContext.awaitSetupComplete {
- ch <- _default.NewDefaultPlcConnectionConnectResult(m, nil)
+func (c *Connection) fireConnected(ch chan<- plc4go.PlcConnectionConnectResult) {
+ if c.driverContext.awaitSetupComplete {
+ ch <- _default.NewDefaultPlcConnectionConnectResult(c, nil)
} else {
- m.log.Info().Msg("Successfully connected")
+ c.log.Info().Msg("Successfully connected")
}
- m.SetConnected(true)
+ c.SetConnected(true)
}
-func (m *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
+func (c *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
return _default.DefaultConnectionMetadata{
ProvidesReading: true,
ProvidesWriting: true,
}
}
-func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
- return spiModel.NewDefaultPlcReadRequestBuilder(m.GetPlcTagHandler(), NewReader(m.messageCodec, m.tm, m.configuration, &m.sessionHandle, options.WithCustomLogger(m.log)))
+func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
+ return spiModel.NewDefaultPlcReadRequestBuilder(
+ c.GetPlcTagHandler(),
+ NewReader(
+ c.messageCodec,
+ c.tm,
+ c.configuration,
+ &c.sessionHandle,
+ append(c._options, options.WithCustomLogger(c.log))...,
+ ),
+ )
}
-func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
+func (c *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
return spiModel.NewDefaultPlcWriteRequestBuilder(
- m.GetPlcTagHandler(), m.GetPlcValueHandler(), NewWriter(m.messageCodec, m.tm, m.configuration, &m.sessionHandle, &m.senderContext, options.WithCustomLogger(m.log)))
+ c.GetPlcTagHandler(),
+ c.GetPlcValueHandler(),
+ NewWriter(
+ c.messageCodec,
+ c.tm,
+ c.configuration,
+ &c.sessionHandle,
+ &c.senderContext,
+ append(c._options, options.WithCustomLogger(c.log))...,
+ ),
+ )
}
-func (m *Connection) String() string {
+func (c *Connection) String() string {
return fmt.Sprintf("eip.Connection")
}
diff --git a/plc4go/internal/eip/Driver.go b/plc4go/internal/eip/Driver.go
index 38a6f1234b..116c31a606 100644
--- a/plc4go/internal/eip/Driver.go
+++ b/plc4go/internal/eip/Driver.go
@@ -38,7 +38,8 @@ type Driver struct {
awaitSetupComplete bool
awaitDisconnectComplete bool
- log zerolog.Logger // TODO: use it
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
@@ -48,18 +49,19 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
awaitSetupComplete: true,
awaitDisconnectComplete: true,
- log: customLogger,
+ log: customLogger,
+ _options: _options,
}
driver.DefaultDriver = _default.NewDefaultDriver(driver, "eip", "EthernetIP", "tcp", NewTagHandler())
return driver
}
-func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
+func (d *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+ d.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
- m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
+ d.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
@@ -67,20 +69,27 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
driverOptions["defaultTcpPort"] = []string{"44818"}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions, options.WithCustomLogger(m.log))
+ transportInstance, err := transport.CreateTransportInstance(
+ transportUrl,
+ driverOptions,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
if err != nil {
- m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
+ d.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
}
- codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
- m.log.Debug().Msgf("working with codec %#v", codec)
+ codec := NewMessageCodec(
+ transportInstance,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
+ d.log.Debug().Msgf("working with codec %#v", codec)
- configuration, err := ParseFromOptions(m.log, driverOptions)
+ configuration, err := ParseFromOptions(d.log, driverOptions)
if err != nil {
- m.log.Error().Err(err).Msgf("Invalid driverOptions")
+ d.log.Error().Err(err).Msgf("Invalid driverOptions")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid driverOptions"))
return ch
@@ -88,28 +97,36 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
driverContext, err := NewDriverContext(configuration)
if err != nil {
- m.log.Error().Err(err).Msgf("Invalid driverOptions")
+ d.log.Error().Err(err).Msgf("Invalid driverOptions")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid driverOptions"))
return ch
}
- driverContext.awaitSetupComplete = m.awaitSetupComplete
- driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete
+ driverContext.awaitSetupComplete = d.awaitSetupComplete
+ driverContext.awaitDisconnectComplete = d.awaitDisconnectComplete
// Create the new connection
- connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, driverOptions, options.WithCustomLogger(m.log))
- m.log.Debug().Msg("created connection, connecting now")
+ connection := NewConnection(
+ codec,
+ configuration,
+ driverContext,
+ d.GetPlcTagHandler(),
+ d.tm,
+ driverOptions,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
+ d.log.Debug().Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
-func (m *Driver) SetAwaitSetupComplete(awaitComplete bool) {
- m.awaitSetupComplete = awaitComplete
+func (d *Driver) SetAwaitSetupComplete(awaitComplete bool) {
+ d.awaitSetupComplete = awaitComplete
}
-func (m *Driver) SetAwaitDisconnectComplete(awaitComplete bool) {
- m.awaitDisconnectComplete = awaitComplete
+func (d *Driver) SetAwaitDisconnectComplete(awaitComplete bool) {
+ d.awaitDisconnectComplete = awaitComplete
}
-func (m *Driver) Close() error {
- return m.tm.Close()
+func (d *Driver) Close() error {
+ return d.tm.Close()
}
diff --git a/plc4go/internal/knxnetip/Connection.go b/plc4go/internal/knxnetip/Connection.go
index aaa4a8632a..7970f7e43f 100644
--- a/plc4go/internal/knxnetip/Connection.go
+++ b/plc4go/internal/knxnetip/Connection.go
@@ -140,6 +140,7 @@ type Connection struct {
passLogToModel bool
log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func (m *Connection) String() string {
@@ -194,6 +195,7 @@ func NewConnection(transportInstance transports.TransportInstance, connectionOpt
handleTunnelingRequests: true,
passLogToModel: passLoggerToModel,
log: customLogger,
+ _options: _options,
}
connection.connectionTtl = connection.defaultTtl * 2
@@ -505,7 +507,13 @@ func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
func (m *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
return spiModel.NewDefaultPlcSubscriptionRequestBuilder(
- m.tagHandler, m.valueHandler, NewSubscriber(m, options.WithCustomLogger(m.log)))
+ m.tagHandler,
+ m.valueHandler,
+ NewSubscriber(
+ m,
+ append(m._options, options.WithCustomLogger(m.log))...,
+ ),
+ )
}
func (m *Connection) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder {
diff --git a/plc4go/internal/knxnetip/Discoverer.go b/plc4go/internal/knxnetip/Discoverer.go
index a67350ad6a..4e73bbaab5 100644
--- a/plc4go/internal/knxnetip/Discoverer.go
+++ b/plc4go/internal/knxnetip/Discoverer.go
@@ -46,7 +46,8 @@ type Discoverer struct {
deviceScanningWorkItemId atomic.Int32
deviceScanningQueue pool.Executor
- log zerolog.Logger
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewDiscoverer(_options ...options.WithOption) *Discoverer {
@@ -56,6 +57,7 @@ func NewDiscoverer(_options ...options.WithOption) *Discoverer {
transportInstanceCreationQueue: pool.NewFixedSizeExecutor(50, 100, _options...),
deviceScanningQueue: pool.NewFixedSizeExecutor(50, 100, _options...),
log: customLogger,
+ _options: _options,
}
}
@@ -180,7 +182,11 @@ func (d *Discoverer) createDeviceScanDispatcher(udpTransportInstance *udp.Transp
return func() {
d.log.Debug().Msgf("Scanning %v", udpTransportInstance)
// Create a codec for sending and receiving messages.
- codec := NewMessageCodec(udpTransportInstance, nil, options.WithCustomLogger(d.log))
+ codec := NewMessageCodec(
+ udpTransportInstance,
+ nil,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
// Explicitly start the worker
if err := codec.ConnectWithContext(context.TODO()); err != nil {
d.log.Error().Err(err).Msg("Error connecting")
diff --git a/plc4go/internal/knxnetip/Driver.go b/plc4go/internal/knxnetip/Driver.go
index 6d04554fea..a86ac393d0 100644
--- a/plc4go/internal/knxnetip/Driver.go
+++ b/plc4go/internal/knxnetip/Driver.go
@@ -35,24 +35,26 @@ import (
type Driver struct {
_default.DefaultDriver
- log zerolog.Logger // TODO: use it
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewDriver(_options ...options.WithOption) *Driver {
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
driver := &Driver{
- log: customLogger,
+ log: customLogger,
+ _options: _options,
}
driver.DefaultDriver = _default.NewDefaultDriver(driver, "knxnet-ip", "KNXNet/IP", "udp", NewTagHandler())
return driver
}
-func (m *Driver) CheckQuery(query string) error {
- _, err := m.GetPlcTagHandler().ParseQuery(query)
+func (d *Driver) CheckQuery(query string) error {
+ _, err := d.GetPlcTagHandler().ParseQuery(query)
return err
}
-func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+func (d *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
@@ -63,7 +65,11 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
driverOptions["defaultUdpPort"] = []string{"3671"}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions, options.WithCustomLogger(m.log))
+ transportInstance, err := transport.CreateTransportInstance(
+ transportUrl,
+ driverOptions,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
if err != nil {
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't initialize transport configuration for given transport url %#v", transportUrl))
@@ -71,15 +77,20 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
}
// Create the new connection
- connection := NewConnection(transportInstance, driverOptions, m.GetPlcTagHandler(), options.WithCustomLogger(m.log))
- m.log.Trace().Str("transport", transportUrl.String()).Stringer("connection", connection).Msg("created new connection instance, trying to connect now")
+ connection := NewConnection(
+ transportInstance,
+ driverOptions,
+ d.GetPlcTagHandler(),
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
+ d.log.Trace().Str("transport", transportUrl.String()).Stringer("connection", connection).Msg("created new connection instance, trying to connect now")
return connection.ConnectWithContext(ctx)
}
-func (m *Driver) SupportsDiscovery() bool {
+func (d *Driver) SupportsDiscovery() bool {
return true
}
-func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
+func (d *Driver) DiscoverWithContext(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
return NewDiscoverer().Discover(ctx, callback, discoveryOptions...)
}
diff --git a/plc4go/internal/knxnetip/Subscriber.go b/plc4go/internal/knxnetip/Subscriber.go
index 9b1cc36b3a..b71305f763 100644
--- a/plc4go/internal/knxnetip/Subscriber.go
+++ b/plc4go/internal/knxnetip/Subscriber.go
@@ -41,6 +41,7 @@ type Subscriber struct {
passLogToModel bool
log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewSubscriber(connection *Connection, _options ...options.WithOption) *Subscriber {
@@ -51,10 +52,11 @@ func NewSubscriber(connection *Connection, _options ...options.WithOption) *Subs
consumers: make(map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer),
passLogToModel: passLoggerToModel,
log: customLogger,
+ _options: _options,
}
}
-func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
+func (s *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
// TODO: handle context
result := make(chan apiModel.PlcSubscriptionRequestResult, 1)
go func() {
@@ -66,7 +68,7 @@ func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel
internalPlcSubscriptionRequest := subscriptionRequest.(*spiModel.DefaultPlcSubscriptionRequest)
// Add this subscriber to the connection.
- m.connection.addSubscriber(m)
+ s.connection.addSubscriber(s)
// Just populate all requests with an OK
responseCodes := map[string]apiModel.PlcResponseCode{}
@@ -74,7 +76,7 @@ func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel
for _, tagName := range internalPlcSubscriptionRequest.GetTagNames() {
responseCodes[tagName] = apiModel.PlcResponseCode_OK
tagType := internalPlcSubscriptionRequest.GetType(tagName)
- subscriptionValues[tagName] = NewSubscriptionHandle(m, tagName, internalPlcSubscriptionRequest.GetTag(tagName), tagType, internalPlcSubscriptionRequest.GetInterval(tagName))
+ subscriptionValues[tagName] = NewSubscriptionHandle(s, tagName, internalPlcSubscriptionRequest.GetTag(tagName), tagType, internalPlcSubscriptionRequest.GetInterval(tagName))
}
result <- spiModel.NewDefaultPlcSubscriptionRequestResult(
@@ -83,7 +85,7 @@ func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel
subscriptionRequest,
responseCodes,
subscriptionValues,
- options.WithCustomLogger(m.log),
+ append(s._options, options.WithCustomLogger(s.log))...,
),
nil,
)
@@ -91,7 +93,7 @@ func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel
return result
}
-func (m *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
+func (s *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
// TODO: handle context
result := make(chan apiModel.PlcUnsubscriptionRequestResult, 1)
result <- spiModel.NewDefaultPlcUnsubscriptionRequestResult(unsubscriptionRequest, nil, errors.New("Not Implemented"))
@@ -104,17 +106,17 @@ func (m *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiM
/*
* Callback for incoming value change events from the KNX bus
*/
-func (m *Subscriber) handleValueChange(ctx context.Context, destinationAddress []byte, payload []byte, changed bool) {
+func (s *Subscriber) handleValueChange(ctx context.Context, destinationAddress []byte, payload []byte, changed bool) {
// Decode the group-address according to the settings in the driver
// Group addresses can be 1, 2 or 3 levels (3 being the default)
- ctxForModel := options.GetLoggerContextForModel(ctx, m.log, options.WithPassLoggerToModel(m.passLogToModel))
- groupAddress, err := driverModel.KnxGroupAddressParse(ctxForModel, destinationAddress, m.connection.getGroupAddressNumLevels())
+ ctxForModel := options.GetLoggerContextForModel(ctx, s.log, options.WithPassLoggerToModel(s.passLogToModel))
+ groupAddress, err := driverModel.KnxGroupAddressParse(ctxForModel, destinationAddress, s.connection.getGroupAddressNumLevels())
if err != nil {
return
}
// TODO: aggregate tags and send it to a consumer which want's all of them
- for registration, consumer := range m.consumers {
+ for registration, consumer := range s.consumers {
for _, subscriptionHandle := range registration.GetSubscriptionHandles() {
subscriptionHandle := subscriptionHandle.(*SubscriptionHandle)
groupAddressTag, ok := subscriptionHandle.tag.(GroupAddressTag)
@@ -183,18 +185,26 @@ func (m *Subscriber) handleValueChange(ctx context.Context, destinationAddress [
plcValues[tagName] = spiValues.NewPlcList(plcValueList)
}
}
- event := NewSubscriptionEvent(tags, types, intervals, responseCodes, addresses, plcValues, options.WithCustomLogger(m.log))
+ event := NewSubscriptionEvent(
+ tags,
+ types,
+ intervals,
+ responseCodes,
+ addresses,
+ plcValues,
+ append(s._options, options.WithCustomLogger(s.log))...,
+ )
consumer(&event)
}
}
}
-func (m *Subscriber) Register(consumer apiModel.PlcSubscriptionEventConsumer, handles []apiModel.PlcSubscriptionHandle) apiModel.PlcConsumerRegistration {
- consumerRegistration := spiModel.NewDefaultPlcConsumerRegistration(m, consumer, handles...)
- m.consumers[consumerRegistration.(*spiModel.DefaultPlcConsumerRegistration)] = consumer
+func (s *Subscriber) Register(consumer apiModel.PlcSubscriptionEventConsumer, handles []apiModel.PlcSubscriptionHandle) apiModel.PlcConsumerRegistration {
+ consumerRegistration := spiModel.NewDefaultPlcConsumerRegistration(s, consumer, handles...)
+ s.consumers[consumerRegistration.(*spiModel.DefaultPlcConsumerRegistration)] = consumer
return consumerRegistration
}
-func (m *Subscriber) Unregister(registration apiModel.PlcConsumerRegistration) {
- delete(m.consumers, registration.(*spiModel.DefaultPlcConsumerRegistration))
+func (s *Subscriber) Unregister(registration apiModel.PlcConsumerRegistration) {
+ delete(s.consumers, registration.(*spiModel.DefaultPlcConsumerRegistration))
}
diff --git a/plc4go/internal/modbus/AsciiDriver.go b/plc4go/internal/modbus/AsciiDriver.go
index 7f20d623a0..b2b8e03860 100644
--- a/plc4go/internal/modbus/AsciiDriver.go
+++ b/plc4go/internal/modbus/AsciiDriver.go
@@ -37,24 +37,26 @@ import (
type AsciiDriver struct {
_default.DefaultDriver
- log zerolog.Logger // TODO: use it
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewModbusAsciiDriver(_options ...options.WithOption) *AsciiDriver {
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
driver := &AsciiDriver{
- log: customLogger,
+ log: customLogger,
+ _options: _options,
}
driver.DefaultDriver = _default.NewDefaultDriver(driver, "modbus-ascii", "Modbus ASCII", "serial", NewTagHandler())
return driver
}
-func (m AsciiDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, connectionOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(connectionOptions))
+func (d AsciiDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, connectionOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+ d.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(connectionOptions))
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
- m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
+ d.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
@@ -62,9 +64,13 @@ func (m AsciiDriver) GetConnectionWithContext(ctx context.Context, transportUrl
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
connectionOptions["defaultTcpPort"] = []string{"502"}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, connectionOptions, options.WithCustomLogger(m.log))
+ transportInstance, err := transport.CreateTransportInstance(
+ transportUrl,
+ connectionOptions,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
if err != nil {
- m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", connectionOptions["defaultTcpPort"])
+ d.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", connectionOptions["defaultTcpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
@@ -76,7 +82,7 @@ func (m AsciiDriver) GetConnectionWithContext(ctx context.Context, transportUrl
go func() {
defer func() {
if err := recover(); err != nil {
- m.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
+ d.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
}
}()
for {
@@ -84,14 +90,17 @@ func (m AsciiDriver) GetConnectionWithContext(ctx context.Context, transportUrl
adu := msg.(model.ModbusTcpADU)
serialized, err := json.Marshal(adu)
if err != nil {
- m.log.Error().Err(err).Msg("got error serializing adu")
+ d.log.Error().Err(err).Msg("got error serializing adu")
} else {
- m.log.Debug().Msgf("got message in the default handler %s\n", serialized)
+ d.log.Debug().Msgf("got message in the default handler %s\n", serialized)
}
}
}()
- codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
- m.log.Debug().Msgf("working with codec %#v", codec)
+ codec := NewMessageCodec(
+ transportInstance,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
+ d.log.Debug().Msgf("working with codec %#v", codec)
// If a unit-identifier was provided in the connection string use this, otherwise use the default of 1
unitIdentifier := uint8(1)
@@ -102,10 +111,16 @@ func (m AsciiDriver) GetConnectionWithContext(ctx context.Context, transportUrl
unitIdentifier = uint8(intValue)
}
}
- m.log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
+ d.log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
// Create the new connection
- connection := NewConnection(unitIdentifier, codec, connectionOptions, m.GetPlcTagHandler(), options.WithCustomLogger(m.log))
- m.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
+ connection := NewConnection(
+ unitIdentifier,
+ codec,
+ connectionOptions,
+ d.GetPlcTagHandler(),
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
+ d.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/modbus/Connection.go b/plc4go/internal/modbus/Connection.go
index 5a57a5005c..e4cf7e3be0 100644
--- a/plc4go/internal/modbus/Connection.go
+++ b/plc4go/internal/modbus/Connection.go
@@ -49,7 +49,8 @@ type Connection struct {
connectionId string
tracer tracer.Tracer
- log zerolog.Logger
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, connectionOptions map[string][]string, tagHandler spi.PlcTagHandler, _options ...options.WithOption) *Connection {
@@ -65,7 +66,8 @@ func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, connecti
spiModel.NewDefaultPlcWriteResponse,
_options...,
),
- log: customLogger,
+ log: customLogger,
+ _options: _options,
}
if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
@@ -80,30 +82,30 @@ func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, connecti
return connection
}
-func (m *Connection) GetConnectionId() string {
- return m.connectionId
+func (c *Connection) GetConnectionId() string {
+ return c.connectionId
}
-func (m *Connection) IsTraceEnabled() bool {
- return m.tracer != nil
+func (c *Connection) IsTraceEnabled() bool {
+ return c.tracer != nil
}
-func (m *Connection) GetTracer() tracer.Tracer {
- return m.tracer
+func (c *Connection) GetTracer() tracer.Tracer {
+ return c.tracer
}
-func (m *Connection) GetConnection() plc4go.PlcConnection {
- return m
+func (c *Connection) GetConnection() plc4go.PlcConnection {
+ return c
}
-func (m *Connection) GetMessageCodec() spi.MessageCodec {
- return m.messageCodec
+func (c *Connection) GetMessageCodec() spi.MessageCodec {
+ return c.messageCodec
}
-func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
+func (c *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
// TODO: use proper context
ctx := context.TODO()
- m.log.Trace().Msg("Pinging")
+ c.log.Trace().Msg("Pinging")
result := make(chan plc4go.PlcConnectionPingResult, 1)
go func() {
defer func() {
@@ -112,29 +114,29 @@ func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
}
}()
diagnosticRequestPdu := readWriteModel.NewModbusPDUDiagnosticRequest(0, 0x42)
- pingRequest := readWriteModel.NewModbusTcpADU(1, m.unitIdentifier, diagnosticRequestPdu, false)
- if err := m.messageCodec.SendRequest(ctx, pingRequest,
+ pingRequest := readWriteModel.NewModbusTcpADU(1, c.unitIdentifier, diagnosticRequestPdu, false)
+ if err := c.messageCodec.SendRequest(ctx, pingRequest,
func(message spi.Message) bool {
responseAdu, ok := message.(readWriteModel.ModbusTcpADUExactly)
if !ok {
return false
}
- return responseAdu.GetTransactionIdentifier() == 1 && responseAdu.GetUnitIdentifier() == m.unitIdentifier
+ return responseAdu.GetTransactionIdentifier() == 1 && responseAdu.GetUnitIdentifier() == c.unitIdentifier
},
func(message spi.Message) error {
- m.log.Trace().Msgf("Received Message")
+ c.log.Trace().Msgf("Received Message")
if message != nil {
// If we got a valid response (even if it will probably contain an error, we know the remote is available)
- m.log.Trace().Msg("got valid response")
+ c.log.Trace().Msg("got valid response")
result <- _default.NewDefaultPlcConnectionPingResult(nil)
} else {
- m.log.Trace().Msg("got no response")
+ c.log.Trace().Msg("got no response")
result <- _default.NewDefaultPlcConnectionPingResult(errors.New("no response"))
}
return nil
},
func(err error) error {
- m.log.Trace().Msgf("Received Error")
+ c.log.Trace().Msgf("Received Error")
result <- _default.NewDefaultPlcConnectionPingResult(errors.Wrap(err, "got error processing request"))
return nil
},
@@ -146,30 +148,38 @@ func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
return result
}
-func (m *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
+func (c *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
return _default.DefaultConnectionMetadata{
ProvidesReading: true,
ProvidesWriting: true,
}
}
-func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
+func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
return spiModel.NewDefaultPlcReadRequestBuilderWithInterceptor(
- m.GetPlcTagHandler(),
- NewReader(m.unitIdentifier, m.messageCodec, options.WithCustomLogger(m.log)),
- m.requestInterceptor,
+ c.GetPlcTagHandler(),
+ NewReader(
+ c.unitIdentifier,
+ c.messageCodec,
+ append(c._options, options.WithCustomLogger(c.log))...,
+ ),
+ c.requestInterceptor,
)
}
-func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
+func (c *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
return spiModel.NewDefaultPlcWriteRequestBuilderWithInterceptor(
- m.GetPlcTagHandler(),
- m.GetPlcValueHandler(),
- NewWriter(m.unitIdentifier, m.messageCodec, options.WithCustomLogger(m.log)),
- m.requestInterceptor,
+ c.GetPlcTagHandler(),
+ c.GetPlcValueHandler(),
+ NewWriter(
+ c.unitIdentifier,
+ c.messageCodec,
+ append(c._options, options.WithCustomLogger(c.log))...,
+ ),
+ c.requestInterceptor,
)
}
-func (m *Connection) String() string {
- return fmt.Sprintf("modbus.Connection{unitIdentifier: %d}", m.unitIdentifier)
+func (c *Connection) String() string {
+ return fmt.Sprintf("modbus.Connection{unitIdentifier: %d}", c.unitIdentifier)
}
diff --git a/plc4go/internal/modbus/RtuDriver.go b/plc4go/internal/modbus/RtuDriver.go
index 5b6b75c50b..cfce07ad47 100644
--- a/plc4go/internal/modbus/RtuDriver.go
+++ b/plc4go/internal/modbus/RtuDriver.go
@@ -37,24 +37,26 @@ import (
type RtuDriver struct {
_default.DefaultDriver
- log zerolog.Logger // TODO: use it
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewModbusRtuDriver(_options ...options.WithOption) *RtuDriver {
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
driver := &RtuDriver{
- log: customLogger,
+ log: customLogger,
+ _options: _options,
}
driver.DefaultDriver = _default.NewDefaultDriver(driver, "modbus-rtu", "Modbus RTU", "serial", NewTagHandler())
return driver
}
-func (m RtuDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
+func (d RtuDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+ d.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
- m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
+ d.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
@@ -62,9 +64,13 @@ func (m RtuDriver) GetConnectionWithContext(ctx context.Context, transportUrl ur
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
driverOptions["defaultTcpPort"] = []string{"502"}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions, options.WithCustomLogger(m.log))
+ transportInstance, err := transport.CreateTransportInstance(
+ transportUrl,
+ driverOptions,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
if err != nil {
- m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
+ d.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
@@ -76,7 +82,7 @@ func (m RtuDriver) GetConnectionWithContext(ctx context.Context, transportUrl ur
go func() {
defer func() {
if err := recover(); err != nil {
- m.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
+ d.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
}
}()
for {
@@ -84,14 +90,17 @@ func (m RtuDriver) GetConnectionWithContext(ctx context.Context, transportUrl ur
adu := msg.(model.ModbusTcpADU)
serialized, err := json.Marshal(adu)
if err != nil {
- m.log.Error().Err(err).Msg("got error serializing adu")
+ d.log.Error().Err(err).Msg("got error serializing adu")
} else {
- m.log.Debug().Msgf("got message in the default handler %s\n", serialized)
+ d.log.Debug().Msgf("got message in the default handler %s\n", serialized)
}
}
}()
- codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
- m.log.Debug().Msgf("working with codec %#v", codec)
+ codec := NewMessageCodec(
+ transportInstance,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
+ d.log.Debug().Msgf("working with codec %#v", codec)
// If a unit-identifier was provided in the connection string use this, otherwise use the default of 1
unitIdentifier := uint8(1)
@@ -102,10 +111,15 @@ func (m RtuDriver) GetConnectionWithContext(ctx context.Context, transportUrl ur
unitIdentifier = uint8(intValue)
}
}
- m.log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
+ d.log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
// Create the new connection
- connection := NewConnection(unitIdentifier, codec, driverOptions, m.GetPlcTagHandler(), options.WithCustomLogger(m.log))
- m.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
+ connection := NewConnection(
+ unitIdentifier,
+ codec, driverOptions,
+ d.GetPlcTagHandler(),
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
+ d.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/modbus/TcpDriver.go b/plc4go/internal/modbus/TcpDriver.go
index 217ae4f778..0af6a2e783 100644
--- a/plc4go/internal/modbus/TcpDriver.go
+++ b/plc4go/internal/modbus/TcpDriver.go
@@ -37,24 +37,26 @@ import (
type TcpDriver struct {
_default.DefaultDriver
- log zerolog.Logger // TODO: use it
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewModbusTcpDriver(_options ...options.WithOption) *TcpDriver {
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
driver := &TcpDriver{
- log: customLogger,
+ log: customLogger,
+ _options: _options,
}
driver.DefaultDriver = _default.NewDefaultDriver(driver, "modbus-tcp", "Modbus TCP", "tcp", NewTagHandler())
return driver
}
-func (m TcpDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
+func (d TcpDriver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+ d.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
- m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
+ d.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
@@ -62,9 +64,13 @@ func (m TcpDriver) GetConnectionWithContext(ctx context.Context, transportUrl ur
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
driverOptions["defaultTcpPort"] = []string{"502"}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions, options.WithCustomLogger(m.log))
+ transportInstance, err := transport.CreateTransportInstance(
+ transportUrl,
+ driverOptions,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
if err != nil {
- m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
+ d.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
@@ -76,7 +82,7 @@ func (m TcpDriver) GetConnectionWithContext(ctx context.Context, transportUrl ur
go func() {
defer func() {
if err := recover(); err != nil {
- m.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
+ d.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
}
}()
for {
@@ -84,14 +90,17 @@ func (m TcpDriver) GetConnectionWithContext(ctx context.Context, transportUrl ur
adu := msg.(model.ModbusTcpADU)
serialized, err := json.Marshal(adu)
if err != nil {
- m.log.Error().Err(err).Msg("got error serializing adu")
+ d.log.Error().Err(err).Msg("got error serializing adu")
} else {
- m.log.Debug().Msgf("got message in the default handler %s\n", serialized)
+ d.log.Debug().Msgf("got message in the default handler %s\n", serialized)
}
}
}()
- codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
- m.log.Debug().Msgf("working with codec %#v", codec)
+ codec := NewMessageCodec(
+ transportInstance,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
+ d.log.Debug().Msgf("working with codec %#v", codec)
// If a unit-identifier was provided in the connection string use this, otherwise use the default of 1
unitIdentifier := uint8(1)
@@ -102,10 +111,16 @@ func (m TcpDriver) GetConnectionWithContext(ctx context.Context, transportUrl ur
unitIdentifier = uint8(intValue)
}
}
- m.log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
+ d.log.Debug().Uint8("unitIdentifier", unitIdentifier).Msgf("using unit identifier %d", unitIdentifier)
// Create the new connection
- connection := NewConnection(unitIdentifier, codec, driverOptions, m.GetPlcTagHandler(), options.WithCustomLogger(m.log))
- m.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
+ connection := NewConnection(
+ unitIdentifier,
+ codec,
+ driverOptions,
+ d.GetPlcTagHandler(),
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
+ d.log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/internal/s7/Connection.go b/plc4go/internal/s7/Connection.go
index 4da1c0beff..043944f5d2 100644
--- a/plc4go/internal/s7/Connection.go
+++ b/plc4go/internal/s7/Connection.go
@@ -69,7 +69,8 @@ type Connection struct {
connectionId string
tracer tracer.Tracer
- log zerolog.Logger
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, driverContext DriverContext, tagHandler spi.PlcTagHandler, tm transactions.RequestTransactionManager, connectionOptions map[string][]string, _options ...options.WithOption) *Connection {
@@ -81,6 +82,7 @@ func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, d
driverContext: driverContext,
tm: tm,
log: customLogger,
+ _options: _options,
}
if traceEnabledOption, ok := connectionOptions["traceEnabled"]; ok {
if len(traceEnabledOption) == 1 {
@@ -94,28 +96,28 @@ func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, d
return connection
}
-func (m *Connection) GetConnectionId() string {
- return m.connectionId
+func (c *Connection) GetConnectionId() string {
+ return c.connectionId
}
-func (m *Connection) IsTraceEnabled() bool {
- return m.tracer != nil
+func (c *Connection) IsTraceEnabled() bool {
+ return c.tracer != nil
}
-func (m *Connection) GetTracer() tracer.Tracer {
- return m.tracer
+func (c *Connection) GetTracer() tracer.Tracer {
+ return c.tracer
}
-func (m *Connection) GetConnection() plc4go.PlcConnection {
- return m
+func (c *Connection) GetConnection() plc4go.PlcConnection {
+ return c
}
-func (m *Connection) GetMessageCodec() spi.MessageCodec {
- return m.messageCodec
+func (c *Connection) GetMessageCodec() spi.MessageCodec {
+ return c.messageCodec
}
-func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
- m.log.Trace().Msg("Connecting")
+func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
+ c.log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
go func() {
defer func() {
@@ -123,43 +125,43 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v. Stack: %s", err, debug.Stack()))
}
}()
- err := m.messageCodec.ConnectWithContext(ctx)
+ err := c.messageCodec.ConnectWithContext(ctx)
if err != nil {
- ch <- _default.NewDefaultPlcConnectionConnectResult(m, err)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(c, err)
}
// Only on active connections we do a connection
- if m.driverContext.PassiveMode {
- m.log.Info().Msg("S7 Driver running in PASSIVE mode.")
- ch <- _default.NewDefaultPlcConnectionConnectResult(m, nil)
+ if c.driverContext.PassiveMode {
+ c.log.Info().Msg("S7 Driver running in PASSIVE mode.")
+ ch <- _default.NewDefaultPlcConnectionConnectResult(c, nil)
return
}
// For testing purposes we can skip the waiting for a complete connection
- if !m.driverContext.awaitSetupComplete {
- go m.setupConnection(ctx, ch)
- m.log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
+ if !c.driverContext.awaitSetupComplete {
+ go c.setupConnection(ctx, ch)
+ c.log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
// Here we write directly and don't wait till the connection is "really" connected
- // Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete
- ch <- _default.NewDefaultPlcConnectionConnectResult(m, err)
- m.SetConnected(true)
+ // Note: we can't use fireConnected here as it's guarded against c.driverContext.awaitSetupComplete
+ ch <- _default.NewDefaultPlcConnectionConnectResult(c, err)
+ c.SetConnected(true)
return
}
// Only the TCP transport supports login.
- m.log.Info().Msg("S7 Driver running in ACTIVE mode.")
+ c.log.Info().Msg("S7 Driver running in ACTIVE mode.")
- m.setupConnection(ctx, ch)
+ c.setupConnection(ctx, ch)
}()
return ch
}
-func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
- m.log.Debug().Msg("Sending COTP Connection Request")
+func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
+ c.log.Debug().Msg("Sending COTP Connection Request")
// Open the session on ISO Transport Protocol first.
cotpConnectionResult := make(chan readWriteModel.COTPPacketConnectionResponse, 1)
cotpConnectionErrorChan := make(chan error, 1)
- if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewTPKTPacket(m.createCOTPConnectionRequest()), func(message spi.Message) bool {
+ if err := c.messageCodec.SendRequest(ctx, readWriteModel.NewTPKTPacket(c.createCOTPConnectionRequest()), func(message spi.Message) bool {
tpktPacket := message.(readWriteModel.TPKTPacket)
if tpktPacket == nil {
return false
@@ -174,23 +176,23 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}, func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
+ c.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ c.Close()
}
cotpConnectionErrorChan <- errors.Wrap(err, "got error processing request")
return nil
- }, m.GetTtl()); err != nil {
- m.fireConnectionError(errors.Wrap(err, "Error during sending of COTP Connection Request"), ch)
+ }, c.GetTtl()); err != nil {
+ c.fireConnectionError(errors.Wrap(err, "Error during sending of COTP Connection Request"), ch)
}
select {
case cotpPacketConnectionResponse := <-cotpConnectionResult:
- m.log.Debug().Msg("Got COTP Connection Response")
- m.log.Debug().Msg("Sending S7 Connection Request")
+ c.log.Debug().Msg("Got COTP Connection Response")
+ c.log.Debug().Msg("Sending S7 Connection Request")
// Send an S7 login message.
s7ConnectionResult := make(chan readWriteModel.S7ParameterSetupCommunication, 1)
s7ConnectionErrorChan := make(chan error, 1)
- if err := m.messageCodec.SendRequest(ctx, m.createS7ConnectionRequest(cotpPacketConnectionResponse), func(message spi.Message) bool {
+ if err := c.messageCodec.SendRequest(ctx, c.createS7ConnectionRequest(cotpPacketConnectionResponse), func(message spi.Message) bool {
tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
if !ok {
return false
@@ -215,43 +217,43 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}, func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
+ c.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ c.Close()
}
s7ConnectionErrorChan <- errors.Wrap(err, "got error processing request")
return nil
- }, m.GetTtl()); err != nil {
- m.fireConnectionError(errors.Wrap(err, "Error during sending of S7 Connection Request"), ch)
+ }, c.GetTtl()); err != nil {
+ c.fireConnectionError(errors.Wrap(err, "Error during sending of S7 Connection Request"), ch)
}
select {
case setupCommunication := <-s7ConnectionResult:
- m.log.Debug().Msg("Got S7 Connection Response")
- m.log.Debug().Msg("Sending identify remote Request")
+ c.log.Debug().Msg("Got S7 Connection Response")
+ c.log.Debug().Msg("Sending identify remote Request")
// Save some data from the response.
- m.driverContext.MaxAmqCaller = setupCommunication.GetMaxAmqCaller()
- m.driverContext.MaxAmqCallee = setupCommunication.GetMaxAmqCallee()
- m.driverContext.PduSize = setupCommunication.GetPduLength()
+ c.driverContext.MaxAmqCaller = setupCommunication.GetMaxAmqCaller()
+ c.driverContext.MaxAmqCallee = setupCommunication.GetMaxAmqCallee()
+ c.driverContext.PduSize = setupCommunication.GetPduLength()
// Update the number of concurrent requests to the negotiated number.
// I have never seen anything else than equal values for caller and
// callee, but if they were different, we're only limiting the outgoing
// requests.
- m.tm.SetNumberOfConcurrentRequests(int(m.driverContext.MaxAmqCallee))
+ c.tm.SetNumberOfConcurrentRequests(int(c.driverContext.MaxAmqCallee))
// If the controller type is explicitly set, were finished with the login
// process. If it's set to ANY, we have to query the serial number information
// in order to detect the type of PLC.
- if m.driverContext.ControllerType != ControllerType_ANY {
+ if c.driverContext.ControllerType != ControllerType_ANY {
// Send an event that connection setup is complete.
- m.fireConnected(ch)
+ c.fireConnected(ch)
return
}
// Prepare a message to request the remote to identify itself.
- m.log.Debug().Msg("Sending S7 Identification Request")
+ c.log.Debug().Msg("Sending S7 Identification Request")
s7IdentificationResult := make(chan readWriteModel.S7PayloadUserData, 1)
s7IdentificationErrorChan := make(chan error, 1)
- if err := m.messageCodec.SendRequest(ctx, m.createIdentifyRemoteMessage(), func(message spi.Message) bool {
+ if err := c.messageCodec.SendRequest(ctx, c.createIdentifyRemoteMessage(), func(message spi.Message) bool {
tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
if !ok {
return false
@@ -275,47 +277,47 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}, func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
+ c.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ c.Close()
}
s7IdentificationErrorChan <- errors.Wrap(err, "got error processing request")
return nil
- }, m.GetTtl()); err != nil {
- m.fireConnectionError(errors.Wrap(err, "Error during sending of identify remote Request"), ch)
+ }, c.GetTtl()); err != nil {
+ c.fireConnectionError(errors.Wrap(err, "Error during sending of identify remote Request"), ch)
}
select {
case payloadUserData := <-s7IdentificationResult:
- m.log.Debug().Msg("Got S7 Identification Response")
- m.extractControllerTypeAndFireConnected(payloadUserData, ch)
+ c.log.Debug().Msg("Got S7 Identification Response")
+ c.extractControllerTypeAndFireConnected(payloadUserData, ch)
case err := <-s7IdentificationErrorChan:
- m.fireConnectionError(errors.Wrap(err, "Error receiving identify remote Request"), ch)
+ c.fireConnectionError(errors.Wrap(err, "Error receiving identify remote Request"), ch)
}
case err := <-s7ConnectionErrorChan:
- m.fireConnectionError(errors.Wrap(err, "Error receiving S7 Connection Request"), ch)
+ c.fireConnectionError(errors.Wrap(err, "Error receiving S7 Connection Request"), ch)
}
case err := <-cotpConnectionErrorChan:
- m.fireConnectionError(errors.Wrap(err, "Error receiving of COTP Connection Request"), ch)
+ c.fireConnectionError(errors.Wrap(err, "Error receiving of COTP Connection Request"), ch)
}
}
-func (m *Connection) fireConnectionError(err error, ch chan<- plc4go.PlcConnectionConnectResult) {
- if m.driverContext.awaitSetupComplete {
+func (c *Connection) fireConnectionError(err error, ch chan<- plc4go.PlcConnectionConnectResult) {
+ if c.driverContext.awaitSetupComplete {
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Error during connection"))
} else {
- m.log.Error().Err(err).Msg("awaitSetupComplete set to false and we got a error during connect")
+ c.log.Error().Err(err).Msg("awaitSetupComplete set to false and we got a error during connect")
}
}
-func (m *Connection) fireConnected(ch chan<- plc4go.PlcConnectionConnectResult) {
- if m.driverContext.awaitSetupComplete {
- ch <- _default.NewDefaultPlcConnectionConnectResult(m, nil)
+func (c *Connection) fireConnected(ch chan<- plc4go.PlcConnectionConnectResult) {
+ if c.driverContext.awaitSetupComplete {
+ ch <- _default.NewDefaultPlcConnectionConnectResult(c, nil)
} else {
- m.log.Info().Msg("Successfully connected")
+ c.log.Info().Msg("Successfully connected")
}
- m.SetConnected(true)
+ c.SetConnected(true)
}
-func (m *Connection) extractControllerTypeAndFireConnected(payloadUserData readWriteModel.S7PayloadUserData, ch chan<- plc4go.PlcConnectionConnectResult) {
+func (c *Connection) extractControllerTypeAndFireConnected(payloadUserData readWriteModel.S7PayloadUserData, ch chan<- plc4go.PlcConnectionConnectResult) {
// TODO: how do we handle the case if there no items at all? Should we assume it a successful or failure...
// TODO ... opposed to the java implementation we treat it as a failure
for _, item := range payloadUserData.GetItems() {
@@ -346,21 +348,21 @@ func (m *Connection) extractControllerTypeAndFireConnected(payloadUserData readW
case "4":
controllerType = ControllerType_S7_400
default:
- m.log.Info().Msgf("Looking up unknown article number %s", articleNumber)
+ c.log.Info().Msgf("Looking up unknown article number %s", articleNumber)
controllerType = ControllerType_ANY
}
- m.driverContext.ControllerType = controllerType
+ c.driverContext.ControllerType = controllerType
// Send an event that connection setup is complete.
- m.fireConnected(ch)
+ c.fireConnected(ch)
return
}
}
}
- m.fireConnectionError(errors.New("Coudln't find the required information"), ch)
+ c.fireConnectionError(errors.New("Coudln't find the required information"), ch)
}
-func (m *Connection) createIdentifyRemoteMessage() readWriteModel.TPKTPacket {
+func (c *Connection) createIdentifyRemoteMessage() readWriteModel.TPKTPacket {
identifyRemoteMessage := readWriteModel.NewS7MessageUserData(
1,
readWriteModel.NewS7ParameterUserData(
@@ -398,62 +400,70 @@ func (m *Connection) createIdentifyRemoteMessage() readWriteModel.TPKTPacket {
return readWriteModel.NewTPKTPacket(cotpPacketData)
}
-func (m *Connection) createS7ConnectionRequest(cotpPacketConnectionResponse readWriteModel.COTPPacketConnectionResponse) readWriteModel.TPKTPacket {
+func (c *Connection) createS7ConnectionRequest(cotpPacketConnectionResponse readWriteModel.COTPPacketConnectionResponse) readWriteModel.TPKTPacket {
for _, parameter := range cotpPacketConnectionResponse.GetParameters() {
switch parameter := parameter.(type) {
case readWriteModel.COTPParameterCalledTsap:
- m.driverContext.CalledTsapId = parameter.GetTsapId()
+ c.driverContext.CalledTsapId = parameter.GetTsapId()
case readWriteModel.COTPParameterCallingTsap:
- if parameter.GetTsapId() != m.driverContext.CallingTsapId {
- m.driverContext.CallingTsapId = parameter.GetTsapId()
- m.log.Warn().Msgf("Switching calling TSAP id to '%x'", m.driverContext.CallingTsapId)
+ if parameter.GetTsapId() != c.driverContext.CallingTsapId {
+ c.driverContext.CallingTsapId = parameter.GetTsapId()
+ c.log.Warn().Msgf("Switching calling TSAP id to '%x'", c.driverContext.CallingTsapId)
}
case readWriteModel.COTPParameterTpduSize:
- m.driverContext.CotpTpduSize = parameter.GetTpduSize()
+ c.driverContext.CotpTpduSize = parameter.GetTpduSize()
default:
- m.log.Warn().Msgf("Got unknown parameter type '%v'", reflect.TypeOf(parameter))
+ c.log.Warn().Msgf("Got unknown parameter type '%v'", reflect.TypeOf(parameter))
}
}
s7ParameterSetupCommunication := readWriteModel.NewS7ParameterSetupCommunication(
- m.driverContext.MaxAmqCaller, m.driverContext.MaxAmqCallee, m.driverContext.PduSize,
+ c.driverContext.MaxAmqCaller, c.driverContext.MaxAmqCallee, c.driverContext.PduSize,
)
s7Message := readWriteModel.NewS7MessageRequest(0, s7ParameterSetupCommunication, nil)
cotpPacketData := readWriteModel.NewCOTPPacketData(true, 1, nil, s7Message, 0)
return readWriteModel.NewTPKTPacket(cotpPacketData)
}
-func (m *Connection) createCOTPConnectionRequest() readWriteModel.COTPPacket {
+func (c *Connection) createCOTPConnectionRequest() readWriteModel.COTPPacket {
return readWriteModel.NewCOTPPacketConnectionRequest(
0x0000,
0x000F,
readWriteModel.COTPProtocolClass_CLASS_0,
[]readWriteModel.COTPParameter{
- readWriteModel.NewCOTPParameterCallingTsap(m.driverContext.CallingTsapId, 0),
- readWriteModel.NewCOTPParameterCalledTsap(m.driverContext.CalledTsapId, 0),
- readWriteModel.NewCOTPParameterTpduSize(m.driverContext.CotpTpduSize, 0),
+ readWriteModel.NewCOTPParameterCallingTsap(c.driverContext.CallingTsapId, 0),
+ readWriteModel.NewCOTPParameterCalledTsap(c.driverContext.CalledTsapId, 0),
+ readWriteModel.NewCOTPParameterTpduSize(c.driverContext.CotpTpduSize, 0),
},
nil,
0,
)
}
-func (m *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
+func (c *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
return _default.DefaultConnectionMetadata{
ProvidesReading: true,
ProvidesWriting: true,
}
}
-func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
- return spiModel.NewDefaultPlcReadRequestBuilder(m.GetPlcTagHandler(), NewReader(&m.tpduGenerator, m.messageCodec, m.tm, options.WithCustomLogger(m.log)))
+func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
+ return spiModel.NewDefaultPlcReadRequestBuilder(
+ c.GetPlcTagHandler(),
+ NewReader(
+ &c.tpduGenerator,
+ c.messageCodec,
+ c.tm,
+ append(c._options, options.WithCustomLogger(c.log))...,
+ ),
+ )
}
-func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
+func (c *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
return spiModel.NewDefaultPlcWriteRequestBuilder(
- m.GetPlcTagHandler(), m.GetPlcValueHandler(), NewWriter(&m.tpduGenerator, m.messageCodec, m.tm))
+ c.GetPlcTagHandler(), c.GetPlcValueHandler(), NewWriter(&c.tpduGenerator, c.messageCodec, c.tm))
}
-func (m *Connection) String() string {
+func (c *Connection) String() string {
return fmt.Sprintf("s7.Connection")
}
diff --git a/plc4go/internal/s7/Driver.go b/plc4go/internal/s7/Driver.go
index 80bf8f9f49..e5e34b978f 100644
--- a/plc4go/internal/s7/Driver.go
+++ b/plc4go/internal/s7/Driver.go
@@ -38,7 +38,8 @@ type Driver struct {
awaitSetupComplete bool
awaitDisconnectComplete bool
- log zerolog.Logger // TODO: use it
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
@@ -48,18 +49,19 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
awaitSetupComplete: true,
awaitDisconnectComplete: true,
- log: customLogger,
+ log: customLogger,
+ _options: _options,
}
driver.DefaultDriver = _default.NewDefaultDriver(driver, "s7", "Siemens S7 (Basic)", "tcp", NewTagHandler(_options...))
return driver
}
-func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
+func (d *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
+ d.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
// Get the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
- m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
+ d.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
@@ -67,20 +69,27 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
driverOptions["defaultTcpPort"] = []string{"102"}
// Have the transport create a new transport-instance.
- transportInstance, err := transport.CreateTransportInstance(transportUrl, driverOptions, options.WithCustomLogger(m.log))
+ transportInstance, err := transport.CreateTransportInstance(
+ transportUrl,
+ driverOptions,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
if err != nil {
- m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
+ d.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
}
- codec := NewMessageCodec(transportInstance, options.WithCustomLogger(m.log))
- m.log.Debug().Msgf("working with codec %#v", codec)
+ codec := NewMessageCodec(
+ transportInstance,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
+ d.log.Debug().Msgf("working with codec %#v", codec)
- configuration, err := ParseFromOptions(m.log, driverOptions)
+ configuration, err := ParseFromOptions(d.log, driverOptions)
if err != nil {
- m.log.Error().Err(err).Msgf("Invalid driverOptions")
+ d.log.Error().Err(err).Msgf("Invalid driverOptions")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid driverOptions"))
return ch
@@ -88,28 +97,36 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
driverContext, err := NewDriverContext(configuration)
if err != nil {
- m.log.Error().Err(err).Msgf("Invalid driverOptions")
+ d.log.Error().Err(err).Msgf("Invalid driverOptions")
ch := make(chan plc4go.PlcConnectionConnectResult, 1)
ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid driverOptions"))
return ch
}
- driverContext.awaitSetupComplete = m.awaitSetupComplete
- driverContext.awaitDisconnectComplete = m.awaitDisconnectComplete
+ driverContext.awaitSetupComplete = d.awaitSetupComplete
+ driverContext.awaitDisconnectComplete = d.awaitDisconnectComplete
// Create the new connection
- connection := NewConnection(codec, configuration, driverContext, m.GetPlcTagHandler(), m.tm, driverOptions, options.WithCustomLogger(m.log))
- m.log.Debug().Msg("created connection, connecting now")
+ connection := NewConnection(
+ codec,
+ configuration,
+ driverContext,
+ d.GetPlcTagHandler(),
+ d.tm,
+ driverOptions,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
+ d.log.Debug().Msg("created connection, connecting now")
return connection.ConnectWithContext(ctx)
}
-func (m *Driver) SetAwaitSetupComplete(awaitComplete bool) {
- m.awaitSetupComplete = awaitComplete
+func (d *Driver) SetAwaitSetupComplete(awaitComplete bool) {
+ d.awaitSetupComplete = awaitComplete
}
-func (m *Driver) SetAwaitDisconnectComplete(awaitComplete bool) {
- m.awaitDisconnectComplete = awaitComplete
+func (d *Driver) SetAwaitDisconnectComplete(awaitComplete bool) {
+ d.awaitDisconnectComplete = awaitComplete
}
-func (m *Driver) Close() error {
- return m.tm.Close()
+func (d *Driver) Close() error {
+ return d.tm.Close()
}
diff --git a/plc4go/internal/simulated/Driver.go b/plc4go/internal/simulated/Driver.go
index aac5c17c6d..1f6f334967 100644
--- a/plc4go/internal/simulated/Driver.go
+++ b/plc4go/internal/simulated/Driver.go
@@ -34,7 +34,8 @@ type Driver struct {
_default.DefaultDriver
valueHandler ValueHandler
- log zerolog.Logger
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
@@ -42,14 +43,24 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
driver := &Driver{
valueHandler: NewValueHandler(),
- log: customLogger,
+ log: customLogger,
+ _options: _options,
}
driver.DefaultDriver = _default.NewDefaultDriver(driver, "simulated", "Simulated PLC4X Datasource", "none", NewTagHandler())
return driver
}
func (d *Driver) GetConnectionWithContext(ctx context.Context, _ url.URL, _ map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- connection := NewConnection(NewDevice("test", options.WithCustomLogger(d.log)), d.GetPlcTagHandler(), d.valueHandler, driverOptions, options.WithCustomLogger(d.log))
+ connection := NewConnection(
+ NewDevice(
+ "test",
+ append(d._options, options.WithCustomLogger(d.log))...,
+ ),
+ d.GetPlcTagHandler(),
+ d.valueHandler,
+ driverOptions,
+ append(d._options, options.WithCustomLogger(d.log))...,
+ )
d.log.Debug().Msgf("Connecting and returning connection %v", connection)
return connection.ConnectWithContext(ctx)
}
diff --git a/plc4go/pkg/api/cache/PlcConnectionCache.go b/plc4go/pkg/api/cache/PlcConnectionCache.go
index c6e73ecaa9..62e0dbfbdd 100644
--- a/plc4go/pkg/api/cache/PlcConnectionCache.go
+++ b/plc4go/pkg/api/cache/PlcConnectionCache.go
@@ -45,13 +45,14 @@ func NewPlcConnectionCache(driverManager plc4go.PlcDriverManager, withConnection
}
maxLeaseTime := 5 * time.Second
cc := &plcConnectionCache{
- log: log,
driverManager: driverManager,
maxLeaseTime: maxLeaseTime,
maxWaitTime: maxLeaseTime * 5,
cacheLock: lock.NewCASMutex(),
connections: make(map[string]*connectionContainer),
tracer: nil,
+ log: log,
+ // _options: _options, // TODO: we might want to migrate the connection cache options to proper options
}
for _, option := range withConnectionCacheOptions {
option(cc)
@@ -108,7 +109,8 @@ type plcConnectionCache struct {
connections map[string]*connectionContainer
tracer tracer.Tracer
- log zerolog.Logger
+ log zerolog.Logger
+ _options []options.WithOption // Used to pass them downstream
}
func (t *plcConnectionCache) onConnectionEvent(event connectionEvent) {
@@ -128,7 +130,10 @@ func (t *plcConnectionCache) onConnectionEvent(event connectionEvent) {
///////////////////////////////////////
func (t *plcConnectionCache) EnableTracer() {
- t.tracer = tracer.NewTracer("cache", options.WithCustomLogger(t.log))
+ t.tracer = tracer.NewTracer(
+ "cache",
+ append(t._options, options.WithCustomLogger(t.log))...,
+ )
}
func (t *plcConnectionCache) GetTracer() tracer.Tracer {