You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/13 12:17:25 UTC

[plc4x] 04/04: feat(plc4go/cbus): added more Stringer implementations

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 54221760969bf088eb07df4eb8a46f933dff6ca0
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jun 13 14:17:04 2023 +0200

    feat(plc4go/cbus): added more Stringer implementations
---
 plc4go/internal/cbus/Configuration.go      |  5 ++
 plc4go/internal/cbus/Configuration_test.go | 60 +++++++++++++++++++
 plc4go/internal/cbus/Connection.go         | 46 ++++++++++++--
 plc4go/internal/cbus/Connection_test.go    | 11 +++-
 plc4go/internal/cbus/DriverContext.go      |  6 ++
 plc4go/internal/cbus/MessageCodec.go       | 21 +++++++
 plc4go/internal/cbus/MessageCodec_test.go  | 49 +++++++++++++++
 plc4go/internal/cbus/Subscriber.go         | 76 ++++++++++++-----------
 plc4go/internal/cbus/Subscriber_test.go    | 96 ++++++++++++------------------
 9 files changed, 270 insertions(+), 100 deletions(-)

diff --git a/plc4go/internal/cbus/Configuration.go b/plc4go/internal/cbus/Configuration.go
index 5655c87473..bd02271184 100644
--- a/plc4go/internal/cbus/Configuration.go
+++ b/plc4go/internal/cbus/Configuration.go
@@ -20,6 +20,7 @@
 package cbus
 
 import (
+	"fmt"
 	"github.com/rs/zerolog"
 	"reflect"
 	"strconv"
@@ -98,3 +99,7 @@ func getFromOptions(localLog zerolog.Logger, options map[string][]string, key st
 	}
 	return ""
 }
+
+func (c Configuration) String() string {
+	return fmt.Sprintf("%#v", c)
+}
diff --git a/plc4go/internal/cbus/Configuration_test.go b/plc4go/internal/cbus/Configuration_test.go
index 733d56ce27..17779239a8 100644
--- a/plc4go/internal/cbus/Configuration_test.go
+++ b/plc4go/internal/cbus/Configuration_test.go
@@ -201,3 +201,63 @@ func Test_getFromOptions(t *testing.T) {
 		})
 	}
 }
+
+func TestConfiguration_String(t *testing.T) {
+	type fields struct {
+		Srchk                 bool
+		Exstat                bool
+		Pun                   bool
+		LocalSal              bool
+		Pcn                   bool
+		Idmon                 bool
+		Monitor               bool
+		Smart                 bool
+		XonXoff               bool
+		Connect               bool
+		MonitoredApplication1 byte
+		MonitoredApplication2 byte
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "string it",
+			fields: fields{
+				Srchk:                 true,
+				Exstat:                true,
+				Pun:                   true,
+				LocalSal:              true,
+				Pcn:                   true,
+				Idmon:                 true,
+				Monitor:               true,
+				Smart:                 true,
+				XonXoff:               true,
+				Connect:               true,
+				MonitoredApplication1: 2,
+				MonitoredApplication2: 3,
+			},
+			want: "cbus.Configuration{Srchk:true, Exstat:true, Pun:true, LocalSal:true, Pcn:true, Idmon:true, Monitor:true, Smart:true, XonXoff:true, Connect:true, MonitoredApplication1:0x2, MonitoredApplication2:0x3}",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			c := Configuration{
+				Srchk:                 tt.fields.Srchk,
+				Exstat:                tt.fields.Exstat,
+				Pun:                   tt.fields.Pun,
+				LocalSal:              tt.fields.LocalSal,
+				Pcn:                   tt.fields.Pcn,
+				Idmon:                 tt.fields.Idmon,
+				Monitor:               tt.fields.Monitor,
+				Smart:                 tt.fields.Smart,
+				XonXoff:               tt.fields.XonXoff,
+				Connect:               tt.fields.Connect,
+				MonitoredApplication1: tt.fields.MonitoredApplication1,
+				MonitoredApplication2: tt.fields.MonitoredApplication2,
+			}
+			assert.Equalf(t, tt.want, c.String(), "String()")
+		})
+	}
+}
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 9d65646a7a..393007d21c 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -26,6 +26,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/tracer"
 	"github.com/apache/plc4x/plc4go/spi/transactions"
 	"github.com/rs/zerolog"
+	"github.com/rs/zerolog/log"
 	"runtime/debug"
 	"sync"
 	"time"
@@ -57,6 +58,10 @@ func (t *AlphaGenerator) getAndIncrement() byte {
 	return result
 }
 
+func (t *AlphaGenerator) String() string {
+	return fmt.Sprintf("AlphaGenerator(currentAlpha: %c)", t.currentAlpha)
+}
+
 type Connection struct {
 	_default.DefaultConnection
 	alphaGenerator AlphaGenerator
@@ -166,7 +171,11 @@ func (c *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
 }
 
 func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
-	return spiModel.NewDefaultPlcSubscriptionRequestBuilder(c.GetPlcTagHandler(), c.GetPlcValueHandler(), NewSubscriber(c, options.WithCustomLogger(c.log)))
+	return spiModel.NewDefaultPlcSubscriptionRequestBuilder(
+		c.GetPlcTagHandler(),
+		c.GetPlcValueHandler(),
+		NewSubscriber(c.addSubscriber, options.WithCustomLogger(c.log)),
+	)
 }
 
 func (c *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder {
@@ -189,7 +198,28 @@ func (c *Connection) addSubscriber(subscriber *Subscriber) {
 }
 
 func (c *Connection) String() string {
-	return fmt.Sprintf("cbus.Connection")
+	return fmt.Sprintf(
+		"cbus.Connection{\n"+
+			"\tDefaultConnection: %s,\n"+
+			"\tAlphaGenerator: %s\n"+
+			"\tMessageCodec: %s\n"+
+			"\tsubscribers: %s\n"+
+			"\ttm: %s\n"+
+			"\tconfiguration: %s\n"+
+			"\tdriverContext: %s\n"+
+			"\tconnectionId: %s\n"+
+			"\ttracer: %s\n"+
+			"}",
+		c.DefaultConnection,
+		&c.alphaGenerator,
+		c.messageCodec,
+		c.subscribers,
+		c.tm,
+		c.configuration,
+		c.driverContext,
+		c.connectionId,
+		c.tracer,
+	)
 }
 
 func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
@@ -235,12 +265,16 @@ func (c *Connection) startSubscriptionHandler() {
 		c.log.Debug().Msg("SAL handler stated")
 		for c.IsConnected() {
 			for monitoredSal := range c.messageCodec.monitoredSALs {
+				handled := false
 				for _, subscriber := range c.subscribers {
 					if ok := subscriber.handleMonitoredSAL(monitoredSal); ok {
 						c.log.Debug().Msgf("%v handled\n%s", subscriber, monitoredSal)
-						continue
+						handled = true
 					}
 				}
+				if !handled {
+					log.Debug().Msgf("SAL was not handled:\n%s", monitoredSal)
+				}
 			}
 		}
 		c.log.Info().Msg("Ending SAL handler")
@@ -255,12 +289,16 @@ func (c *Connection) startSubscriptionHandler() {
 		c.log.Debug().Msg("default MMI started")
 		for c.IsConnected() {
 			for calReply := range c.messageCodec.monitoredMMIs {
+				handled := false
 				for _, subscriber := range c.subscribers {
 					if ok := subscriber.handleMonitoredMMI(calReply); ok {
 						c.log.Debug().Msgf("%v handled\n%s", subscriber, calReply)
-						continue
+						handled = true
 					}
 				}
+				if !handled {
+					log.Debug().Msgf("MMI was not handled:\n%s", calReply)
+				}
 			}
 		}
 		c.log.Info().Msg("Ending MMI handler")
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index 8feae638e5..b96d950f00 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -539,7 +539,16 @@ func TestConnection_String(t *testing.T) {
 	}{
 		{
 			name: "a string",
-			want: "cbus.Connection",
+			want: "cbus.Connection{\n" +
+				"\tDefaultConnection: %!s(<nil>),\n" +
+				"\tAlphaGenerator: AlphaGenerator(currentAlpha: \x00)\n" +
+				"\tMessageCodec: <nil>\n" +
+				"\tsubscribers: []\n" +
+				"\ttm: %!s(<nil>)\n" +
+				"\tconfiguration: cbus.Configuration{Srchk:false, Exstat:false, Pun:false, LocalSal:false, Pcn:false, Idmon:false, Monitor:false, Smart:false, XonXoff:false, Connect:false, MonitoredApplication1:0x0, MonitoredApplication2:0x0}\n\tdriverContext: cbus.DriverContext{awaitSetupComplete:false, awaitDisconnectComplete:false}\n" +
+				"\tconnectionId: \n" +
+				"\ttracer: <nil>\n" +
+				"}",
 		},
 	}
 	for _, tt := range tests {
diff --git a/plc4go/internal/cbus/DriverContext.go b/plc4go/internal/cbus/DriverContext.go
index f0ed07b1f9..7c4307c960 100644
--- a/plc4go/internal/cbus/DriverContext.go
+++ b/plc4go/internal/cbus/DriverContext.go
@@ -19,6 +19,8 @@
 
 package cbus
 
+import "fmt"
+
 type DriverContext struct {
 	awaitSetupComplete      bool
 	awaitDisconnectComplete bool
@@ -27,3 +29,7 @@ type DriverContext struct {
 func NewDriverContext(_ Configuration) DriverContext {
 	return DriverContext{}
 }
+
+func (d DriverContext) String() string {
+	return fmt.Sprintf("%#v", d)
+}
diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index 1c4c3e3743..b8119c7d8f 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -21,6 +21,7 @@ package cbus
 
 import (
 	"bufio"
+	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/rs/zerolog"
 	"hash/crc32"
@@ -322,6 +323,26 @@ lookingForTheEnd:
 	return cBusMessage, nil
 }
 
+func (m *MessageCodec) String() string {
+	return fmt.Sprintf("MessageCodec{\n"+
+		"\tDefaultCodec: %s,\n"+
+		"\trequestContext: %s,\n"+
+		"\tcbusOptions: %s,\n"+
+		"\tmonitoredMMIs: %d elements,\n"+
+		"\tmonitoredSALs: %d elements,\n"+
+		"\tlastPackageHash: %d,\n"+
+		"\thashEncountered: %d,\n"+
+		"}",
+		m.DefaultCodec,
+		m.requestContext,
+		m.cbusOptions,
+		len(m.monitoredSALs),
+		len(m.monitoredSALs),
+		m.lastPackageHash,
+		m.hashEncountered,
+	)
+}
+
 func extractMMIAndSAL(log zerolog.Logger) _default.CustomMessageHandler {
 	return func(codec _default.DefaultCodecRequirements, message spi.Message) bool {
 		log.Trace().Msgf("Custom handling message:\n%s", message)
diff --git a/plc4go/internal/cbus/MessageCodec_test.go b/plc4go/internal/cbus/MessageCodec_test.go
index 2c11659dfb..68abdb02e0 100644
--- a/plc4go/internal/cbus/MessageCodec_test.go
+++ b/plc4go/internal/cbus/MessageCodec_test.go
@@ -28,6 +28,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/testutils"
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/spi/transports/test"
+	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/assert"
 	"testing"
 )
@@ -878,3 +879,51 @@ func Test_extractMMIAndSAL(t *testing.T) {
 		})
 	}
 }
+
+func TestMessageCodec_String(t *testing.T) {
+	type fields struct {
+		DefaultCodec                  _default.DefaultCodec
+		requestContext                readWriteModel.RequestContext
+		cbusOptions                   readWriteModel.CBusOptions
+		monitoredMMIs                 chan readWriteModel.CALReply
+		monitoredSALs                 chan readWriteModel.MonitoredSAL
+		lastPackageHash               uint32
+		hashEncountered               uint
+		currentlyReportedServerErrors uint
+		log                           zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "string it",
+			want: "MessageCodec{\n" +
+				"\tDefaultCodec: %!s(<nil>),\n" +
+				"\trequestContext: %!s(<nil>),\n" +
+				"\tcbusOptions: %!s(<nil>),\n" +
+				"\tmonitoredMMIs: 0 elements,\n" +
+				"\tmonitoredSALs: 0 elements,\n" +
+				"\tlastPackageHash: 0,\n" +
+				"\thashEncountered: 0,\n" +
+				"}",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			m := &MessageCodec{
+				DefaultCodec:                  tt.fields.DefaultCodec,
+				requestContext:                tt.fields.requestContext,
+				cbusOptions:                   tt.fields.cbusOptions,
+				monitoredMMIs:                 tt.fields.monitoredMMIs,
+				monitoredSALs:                 tt.fields.monitoredSALs,
+				lastPackageHash:               tt.fields.lastPackageHash,
+				hashEncountered:               tt.fields.hashEncountered,
+				currentlyReportedServerErrors: tt.fields.currentlyReportedServerErrors,
+				log:                           tt.fields.log,
+			}
+			assert.Equalf(t, tt.want, m.String(), "String()")
+		})
+	}
+}
diff --git a/plc4go/internal/cbus/Subscriber.go b/plc4go/internal/cbus/Subscriber.go
index e0fef87a17..0454ac9ebb 100644
--- a/plc4go/internal/cbus/Subscriber.go
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -37,22 +37,22 @@ import (
 )
 
 type Subscriber struct {
-	connection *Connection
-	consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+	consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+	addSubscriber func(subscriber *Subscriber)
 
 	log zerolog.Logger
 }
 
-func NewSubscriber(connection *Connection, _options ...options.WithOption) *Subscriber {
+func NewSubscriber(addSubscriber func(subscriber *Subscriber), _options ...options.WithOption) *Subscriber {
 	return &Subscriber{
-		connection: connection,
-		consumers:  make(map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer),
+		addSubscriber: addSubscriber,
+		consumers:     make(map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer),
 
 		log: options.ExtractCustomLogger(_options...),
 	}
 }
 
-func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
+func (s *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
 	result := make(chan apiModel.PlcSubscriptionRequestResult, 1)
 	go func() {
 		defer func() {
@@ -63,14 +63,14 @@ func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.P
 		internalPlcSubscriptionRequest := subscriptionRequest.(*spiModel.DefaultPlcSubscriptionRequest)
 
 		// Add this subscriber to the connection.
-		m.connection.addSubscriber(m)
+		s.addSubscriber(s)
 
 		// Just populate all requests with an OK
 		responseCodes := map[string]apiModel.PlcResponseCode{}
 		subscriptionValues := make(map[string]apiModel.PlcSubscriptionHandle)
 		for _, tagName := range internalPlcSubscriptionRequest.GetTagNames() {
 			responseCodes[tagName] = apiModel.PlcResponseCode_OK
-			subscriptionValues[tagName] = NewSubscriptionHandle(m, tagName, internalPlcSubscriptionRequest.GetTag(tagName), internalPlcSubscriptionRequest.GetType(tagName), internalPlcSubscriptionRequest.GetInterval(tagName))
+			subscriptionValues[tagName] = NewSubscriptionHandle(s, tagName, internalPlcSubscriptionRequest.GetTag(tagName), internalPlcSubscriptionRequest.GetType(tagName), internalPlcSubscriptionRequest.GetInterval(tagName))
 		}
 
 		result <- spiModel.NewDefaultPlcSubscriptionRequestResult(
@@ -79,7 +79,7 @@ func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.P
 				subscriptionRequest,
 				responseCodes,
 				subscriptionValues,
-				options.WithCustomLogger(m.log),
+				options.WithCustomLogger(s.log),
 			),
 			nil,
 		)
@@ -87,7 +87,7 @@ func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.P
 	return result
 }
 
-func (m *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
+func (s *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
 	// TODO: handle context
 	result := make(chan apiModel.PlcUnsubscriptionRequestResult, 1)
 	result <- spiModel.NewDefaultPlcUnsubscriptionRequestResult(unsubscriptionRequest, nil, errors.New("Not Implemented"))
@@ -100,7 +100,7 @@ func (m *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiM
 	return result
 }
 
-func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
+func (s *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
 	var unitAddressString string
 	switch calReply := calReply.(type) {
 	case readWriteModel.CALReplyLongExactly:
@@ -119,18 +119,18 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
 	}
 	calData := calReply.GetCalData()
 	handled := false
-	for registration, consumer := range m.consumers {
+	for registration, consumer := range s.consumers {
 		for _, subscriptionHandle := range registration.GetSubscriptionHandles() {
-			handled = handled || m.offerMMI(unitAddressString, calData, subscriptionHandle.(*SubscriptionHandle), consumer)
+			handled = handled || s.offerMMI(unitAddressString, calData, subscriptionHandle.(*SubscriptionHandle), consumer)
 		}
 	}
 	return handled
 }
 
-func (m *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.CALData, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
+func (s *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.CALData, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
 	tag, ok := subscriptionHandle.tag.(*mmiMonitorTag)
 	if !ok {
-		m.log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
+		s.log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
 		return false
 	}
 
@@ -146,7 +146,7 @@ func (m *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.C
 	if unitAddress := tag.GetUnitAddress(); unitAddress != nil {
 		unitSuffix := fmt.Sprintf("u%d", unitAddress.GetAddress())
 		if !strings.HasSuffix(unitAddressString, unitSuffix) {
-			m.log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
+			s.log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
 			return false
 		}
 	}
@@ -228,12 +228,12 @@ func (m *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.C
 			plcValues[tagName] = spiValues.NewPlcList(plcListValues)
 		}
 	default:
-		m.log.Error().Msgf("Unmapped type %T", calData)
+		s.log.Error().Msgf("Unmapped type %T", calData)
 		return false
 	}
 	if application := tag.GetApplication(); application != nil {
 		if actualApplicationIdString := application.ApplicationId().String(); applicationString != actualApplicationIdString {
-			m.log.Debug().Msgf("Current application id %s  doesn't match actual id %s", unitAddressString, actualApplicationIdString)
+			s.log.Debug().Msgf("Current application id %s  doesn't match actual id %s", unitAddressString, actualApplicationIdString)
 			return false
 		}
 	}
@@ -249,20 +249,20 @@ func (m *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.C
 	return true
 }
 
-func (m *Subscriber) handleMonitoredSAL(sal readWriteModel.MonitoredSAL) bool {
+func (s *Subscriber) handleMonitoredSAL(sal readWriteModel.MonitoredSAL) bool {
 	handled := false
-	for registration, consumer := range m.consumers {
+	for registration, consumer := range s.consumers {
 		for _, subscriptionHandle := range registration.GetSubscriptionHandles() {
-			handled = handled || m.offerSAL(sal, subscriptionHandle.(*SubscriptionHandle), consumer)
+			handled = handled || s.offerSAL(sal, subscriptionHandle.(*SubscriptionHandle), consumer)
 		}
 	}
 	return handled
 }
 
-func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
+func (s *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
 	tag, ok := subscriptionHandle.tag.(*salMonitorTag)
 	if !ok {
-		m.log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
+		s.log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
 		return false
 	}
 	tags := map[string]apiModel.PlcTag{}
@@ -306,7 +306,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
 	if unitAddress := tag.GetUnitAddress(); unitAddress != nil {
 		unitSuffix := fmt.Sprintf("u%d", unitAddress.GetAddress())
 		if !strings.HasSuffix(unitAddressString, unitSuffix) {
-			m.log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
+			s.log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
 			return false
 		}
 	}
@@ -314,7 +314,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
 
 	if application := tag.GetApplication(); application != nil {
 		if actualApplicationIdString := application.ApplicationId().String(); applicationString != actualApplicationIdString {
-			m.log.Debug().Msgf("Current application id %s  doesn't match actual id %s", unitAddressString, actualApplicationIdString)
+			s.log.Debug().Msgf("Current application id %s  doesn't match actual id %s", unitAddressString, actualApplicationIdString)
 			return false
 		}
 	}
@@ -336,7 +336,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
 	case readWriteModel.SALDataErrorReportingExactly:
 		commandTypeGetter = salData.GetErrorReportingData().GetCommandType()
 	case readWriteModel.SALDataFreeUsageExactly:
-		m.log.Info().Msg("Unknown command type")
+		s.log.Info().Msg("Unknown command type")
 	case readWriteModel.SALDataHeatingExactly:
 		commandTypeGetter = salData.GetHeatingData().GetCommandType()
 	case readWriteModel.SALDataHvacActuatorExactly:
@@ -354,9 +354,9 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
 	case readWriteModel.SALDataPoolsSpasPondsFountainsControlExactly:
 		commandTypeGetter = salData.GetPoolsSpaPondsFountainsData().GetCommandType()
 	case readWriteModel.SALDataReservedExactly:
-		m.log.Info().Msg("Unknown command type")
+		s.log.Info().Msg("Unknown command type")
 	case readWriteModel.SALDataRoomControlSystemExactly:
-		m.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
+		s.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
 	case readWriteModel.SALDataSecurityExactly:
 		commandTypeGetter = salData.GetSecurityData().GetCommandType()
 	case readWriteModel.SALDataTelephonyStatusAndControlExactly:
@@ -364,13 +364,13 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
 	case readWriteModel.SALDataTemperatureBroadcastExactly:
 		commandTypeGetter = salData.GetTemperatureBroadcastData().GetCommandType()
 	case readWriteModel.SALDataTestingExactly:
-		m.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
+		s.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
 	case readWriteModel.SALDataTriggerControlExactly:
 		commandTypeGetter = salData.GetTriggerControlData().GetCommandType()
 	case readWriteModel.SALDataVentilationExactly:
 		commandTypeGetter = salData.GetVentilationData().GetCommandType()
 	default:
-		m.log.Error().Msgf("Unmapped type %T", salData)
+		s.log.Error().Msgf("Unmapped type %T", salData)
 	}
 	commandType := "Unknown"
 	if commandTypeGetter != nil {
@@ -383,7 +383,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
 	rbvb := spiValues.NewWriteBufferPlcValueBased()
 	err := salData.SerializeWithWriteBuffer(context.Background(), rbvb)
 	if err != nil {
-		m.log.Error().Err(err).Msg("Error serializing to plc value... just returning it as string")
+		s.log.Error().Err(err).Msg("Error serializing to plc value... just returning it as string")
 		plcValues[tagName] = spiValues.NewPlcSTRING(fmt.Sprintf("%s", salData))
 	} else {
 		plcValues[tagName] = rbvb.GetPlcValue()
@@ -397,12 +397,16 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
 	return true
 }
 
-func (m *Subscriber) Register(consumer apiModel.PlcSubscriptionEventConsumer, handles []apiModel.PlcSubscriptionHandle) apiModel.PlcConsumerRegistration {
-	consumerRegistration := spiModel.NewDefaultPlcConsumerRegistration(m, consumer, handles...)
-	m.consumers[consumerRegistration.(*spiModel.DefaultPlcConsumerRegistration)] = consumer
+func (s *Subscriber) Register(consumer apiModel.PlcSubscriptionEventConsumer, handles []apiModel.PlcSubscriptionHandle) apiModel.PlcConsumerRegistration {
+	consumerRegistration := spiModel.NewDefaultPlcConsumerRegistration(s, consumer, handles...)
+	s.consumers[consumerRegistration.(*spiModel.DefaultPlcConsumerRegistration)] = consumer
 	return consumerRegistration
 }
 
-func (m *Subscriber) Unregister(registration apiModel.PlcConsumerRegistration) {
-	delete(m.consumers, registration.(*spiModel.DefaultPlcConsumerRegistration))
+func (s *Subscriber) Unregister(registration apiModel.PlcConsumerRegistration) {
+	delete(s.consumers, registration.(*spiModel.DefaultPlcConsumerRegistration))
+}
+
+func (s *Subscriber) String() string {
+	return fmt.Sprintf("cbus.Subcriber{\n\tconsumers: %d elements\n}", len(s.consumers))
 }
diff --git a/plc4go/internal/cbus/Subscriber_test.go b/plc4go/internal/cbus/Subscriber_test.go
index d5d794c950..b36f7ae481 100644
--- a/plc4go/internal/cbus/Subscriber_test.go
+++ b/plc4go/internal/cbus/Subscriber_test.go
@@ -21,9 +21,6 @@ package cbus
 
 import (
 	"context"
-	"github.com/apache/plc4x/plc4go/spi/options"
-	"github.com/apache/plc4x/plc4go/spi/testutils"
-	"github.com/apache/plc4x/plc4go/spi/utils"
 	"testing"
 	"time"
 
@@ -36,7 +33,7 @@ import (
 
 func TestNewSubscriber(t *testing.T) {
 	type args struct {
-		connection *Connection
+		addSubscriber func(subscriber *Subscriber)
 	}
 	tests := []struct {
 		name string
@@ -52,15 +49,15 @@ func TestNewSubscriber(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			assert.Equalf(t, tt.want, NewSubscriber(tt.args.connection), "NewSubscriber(%v)", tt.args.connection)
+			assert.Equalf(t, tt.want, NewSubscriber(tt.args.addSubscriber), "NewSubscriber(%t)", tt.args.addSubscriber != nil)
 		})
 	}
 }
 
 func TestSubscriber_Subscribe(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		in0                 context.Context
@@ -80,28 +77,9 @@ func TestSubscriber_Subscribe(t *testing.T) {
 				subscriptionRequest: spiModel.NewDefaultPlcSubscriptionRequest(nil, []string{"blub"}, map[string]apiModel.PlcTag{"blub": NewMMIMonitorTag(readWriteModel.NewUnitAddress(1), nil, 1)}, nil, nil, nil),
 			},
 			setup: func(t *testing.T, fields *fields, args *args) {
-				// Setup logger
-				logger := testutils.ProduceTestingLogger(t)
-
-				loggerOption := options.WithCustomLogger(logger)
-
-				// Set the model logger to the logger above
-				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
-
-				codec := NewMessageCodec(nil, loggerOption)
-				connection := NewConnection(codec, Configuration{}, DriverContext{}, nil, nil, nil, loggerOption)
-				t.Cleanup(func() {
-					timer := time.NewTimer(1 * time.Second)
-					t.Cleanup(func() {
-						utils.CleanupTimer(timer)
-					})
-					select {
-					case <-connection.Close():
-					case <-timer.C:
-						t.Error("timeout")
-					}
-				})
-				fields.connection = connection
+				fields.addSubscriber = func(subscriber *Subscriber) {
+					assert.NotNil(t, subscriber)
+				}
 			},
 			wantAsserter: func(t *testing.T, results <-chan apiModel.PlcSubscriptionRequestResult) bool {
 				timer := time.NewTimer(2 * time.Second)
@@ -123,8 +101,8 @@ func TestSubscriber_Subscribe(t *testing.T) {
 				tt.setup(t, &tt.fields, &tt.args)
 			}
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			assert.Truef(t, tt.wantAsserter(t, m.Subscribe(tt.args.in0, tt.args.subscriptionRequest)), "Subscribe(%v, %v)", tt.args.in0, tt.args.subscriptionRequest)
 		})
@@ -133,8 +111,8 @@ func TestSubscriber_Subscribe(t *testing.T) {
 
 func TestSubscriber_Unsubscribe(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		ctx                   context.Context
@@ -158,8 +136,8 @@ func TestSubscriber_Unsubscribe(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			assert.Truef(t, tt.wantAsserter(t, m.Unsubscribe(tt.args.ctx, tt.args.unsubscriptionRequest)), "Unsubscribe(%v, %v)", tt.args.ctx, tt.args.unsubscriptionRequest)
 		})
@@ -168,8 +146,8 @@ func TestSubscriber_Unsubscribe(t *testing.T) {
 
 func TestSubscriber_handleMonitoredMMI(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		calReply model.CALReply
@@ -249,8 +227,8 @@ func TestSubscriber_handleMonitoredMMI(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			assert.Equalf(t, tt.want, m.handleMonitoredMMI(tt.args.calReply), "handleMonitoredMMI(%v)", tt.args.calReply)
 		})
@@ -259,8 +237,8 @@ func TestSubscriber_handleMonitoredMMI(t *testing.T) {
 
 func TestSubscriber_offerMMI(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		unitAddressString  string
@@ -501,8 +479,8 @@ func TestSubscriber_offerMMI(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			assert.Equalf(t, tt.want, m.offerMMI(tt.args.unitAddressString, tt.args.calData, tt.args.subscriptionHandle, tt.args.consumerProvider(t)), "offerMMI(%v,\n%v\n, \n%v\n, func())", tt.args.unitAddressString, tt.args.calData, tt.args.subscriptionHandle)
 		})
@@ -511,8 +489,8 @@ func TestSubscriber_offerMMI(t *testing.T) {
 
 func TestSubscriber_handleMonitoredSAL(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		sal model.MonitoredSAL
@@ -546,8 +524,8 @@ func TestSubscriber_handleMonitoredSAL(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			assert.Equalf(t, tt.want, m.handleMonitoredSAL(tt.args.sal), "handleMonitoredSAL(%v)", tt.args.sal)
 		})
@@ -556,8 +534,8 @@ func TestSubscriber_handleMonitoredSAL(t *testing.T) {
 
 func TestSubscriber_offerSAL(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		sal                model.MonitoredSAL
@@ -1368,8 +1346,8 @@ func TestSubscriber_offerSAL(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			assert.Equalf(t, tt.want, m.offerSAL(tt.args.sal, tt.args.subscriptionHandle, tt.args.consumerProvider(t)), "offerSAL(\n%v\n, \n%v\n)", tt.args.sal, tt.args.subscriptionHandle)
 		})
@@ -1378,8 +1356,8 @@ func TestSubscriber_offerSAL(t *testing.T) {
 
 func TestSubscriber_Register(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		consumer apiModel.PlcSubscriptionEventConsumer
@@ -1400,8 +1378,8 @@ func TestSubscriber_Register(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			assert.NotNilf(t, m.Register(tt.args.consumer, tt.args.handles), "Register(func(), %v)", tt.args.handles)
 		})
@@ -1410,8 +1388,8 @@ func TestSubscriber_Register(t *testing.T) {
 
 func TestSubscriber_Unregister(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		registration apiModel.PlcConsumerRegistration
@@ -1431,8 +1409,8 @@ func TestSubscriber_Unregister(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			m.Unregister(tt.args.registration)
 		})