You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/07/29 11:40:35 UTC

[plc4x] branch develop updated (2bd067a5e -> d516baae6)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from 2bd067a5e feat(plc4go/cbus): properly map binary and level status responses to plcValue
     new 8f8633a71 feat(plc4go/bacnet): update vendors
     new a82e7006a fix(cbus): fixed detection of monitored sal
     new 7341b6122 fix(plc4go): fixed some issues regarding custom message handlers
     new d516baae6 feat(plc4go/cbus): first implementation of subscription for monitored SAL messages

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/bacnetip/MessageCodec.go           |   2 +-
 plc4go/internal/cbus/Connection.go                 |  81 ++++++---
 plc4go/internal/cbus/Field.go                      |  61 +++++++
 plc4go/internal/cbus/FieldHandler.go               | 190 +++++++++++++--------
 plc4go/internal/cbus/MessageCodec.go               |  25 ++-
 plc4go/internal/cbus/Subscriber.go                 | 146 ++++++++++++++++
 .../{knxnetip => cbus}/SubscriptionEvent.go        |  29 +---
 plc4go/internal/cbus/fieldtype_string.go           |   5 +-
 plc4go/internal/knxnetip/MessageCodec.go           |   6 +-
 plc4go/internal/knxnetip/SubscriptionEvent.go      |   2 +-
 plc4go/internal/spi/default/DefaultCodec.go        |  15 +-
 plc4go/internal/spi/model/DefaultPlcReadRequest.go |   8 +
 .../internal/spi/model/DefaultPlcReadResponse.go   |   8 +
 .../spi/model/DefaultPlcSubscriptionEvent.go       |   8 +
 .../spi/model/DefaultPlcSubscriptionRequest.go     |   8 +
 .../spi/model/DefaultPlcSubscriptionResponse.go    |   8 +
 .../internal/spi/model/DefaultPlcWriteRequest.go   |   8 +
 .../internal/spi/model/DefaultPlcWriteResponse.go  |   8 +
 plc4go/internal/spi/testutils/ManualTestRunner.go  |  13 +-
 .../bacnetip/readwrite/model/BACnetVendorId.go     |  64 +++++++
 .../cbus/readwrite/model/ApplicationIdContainer.go |  28 +--
 .../protocols/cbus/readwrite/model/EncodedReply.go |   4 +-
 .../tests/drivers/tests/manual_cbus_driver_test.go |  39 ++++-
 .../apache/plc4x/java/cbus/RandomPackagesTest.java |  12 ++
 .../src/main/resources/protocols/cbus/c-bus.mspec  |  10 +-
 .../watertank/WaterTankSimulationModule.java       |  10 +-
 26 files changed, 630 insertions(+), 168 deletions(-)
 create mode 100644 plc4go/internal/cbus/Subscriber.go
 copy plc4go/internal/{knxnetip => cbus}/SubscriptionEvent.go (60%)


[plc4x] 04/04: feat(plc4go/cbus): first implementation of subscription for monitored SAL messages

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit d516baae6790156d6aa865d77a56e154ca59d595
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jul 29 13:40:27 2022 +0200

    feat(plc4go/cbus): first implementation of subscription for monitored SAL messages
---
 plc4go/internal/cbus/Connection.go                 |  81 ++++++---
 plc4go/internal/cbus/Field.go                      |  61 +++++++
 plc4go/internal/cbus/FieldHandler.go               | 190 +++++++++++++--------
 plc4go/internal/cbus/MessageCodec.go               |  25 ++-
 plc4go/internal/cbus/Subscriber.go                 | 146 ++++++++++++++++
 .../{knxnetip => cbus}/SubscriptionEvent.go        |  29 +---
 plc4go/internal/cbus/fieldtype_string.go           |   5 +-
 plc4go/internal/knxnetip/SubscriptionEvent.go      |   2 +-
 plc4go/internal/spi/model/DefaultPlcReadRequest.go |   8 +
 .../internal/spi/model/DefaultPlcReadResponse.go   |   8 +
 .../spi/model/DefaultPlcSubscriptionEvent.go       |   8 +
 .../spi/model/DefaultPlcSubscriptionRequest.go     |   8 +
 .../spi/model/DefaultPlcSubscriptionResponse.go    |   8 +
 .../internal/spi/model/DefaultPlcWriteRequest.go   |   8 +
 .../internal/spi/model/DefaultPlcWriteResponse.go  |   8 +
 plc4go/internal/spi/testutils/ManualTestRunner.go  |  13 +-
 .../cbus/readwrite/model/ApplicationIdContainer.go |  28 +--
 .../tests/drivers/tests/manual_cbus_driver_test.go |  39 ++++-
 .../src/main/resources/protocols/cbus/c-bus.mspec  |   8 +-
 .../watertank/WaterTankSimulationModule.java       |  10 +-
 20 files changed, 538 insertions(+), 155 deletions(-)

diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index c3e079622..fe88e0a87 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -26,6 +26,7 @@ import (
 	internalModel "github.com/apache/plc4x/plc4go/internal/spi/model"
 	"github.com/apache/plc4x/plc4go/pkg/api"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
+	"github.com/rs/zerolog/log"
 	"sync"
 )
 
@@ -50,10 +51,12 @@ type Connection struct {
 	_default.DefaultConnection
 	alphaGenerator AlphaGenerator
 	messageCodec   spi.MessageCodec
-	configuration  Configuration
-	driverContext  DriverContext
+	subscribers    []*Subscriber
 	tm             *spi.RequestTransactionManager
 
+	configuration Configuration
+	driverContext DriverContext
+
 	connectionId string
 	tracer       *spi.Tracer
 }
@@ -78,27 +81,27 @@ func NewConnection(messageCodec spi.MessageCodec, configuration Configuration, d
 	return connection
 }
 
-func (m *Connection) GetConnectionId() string {
-	return m.connectionId
+func (c *Connection) GetConnectionId() string {
+	return c.connectionId
 }
 
-func (m *Connection) IsTraceEnabled() bool {
-	return m.tracer != nil
+func (c *Connection) IsTraceEnabled() bool {
+	return c.tracer != nil
 }
 
-func (m *Connection) GetTracer() *spi.Tracer {
-	return m.tracer
+func (c *Connection) GetTracer() *spi.Tracer {
+	return c.tracer
 }
 
-func (m *Connection) GetConnection() plc4go.PlcConnection {
-	return m
+func (c *Connection) GetConnection() plc4go.PlcConnection {
+	return c
 }
 
-func (m *Connection) GetMessageCodec() spi.MessageCodec {
-	return m.messageCodec
+func (c *Connection) GetMessageCodec() spi.MessageCodec {
+	return c.messageCodec
 }
 
-func (m *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
+func (c *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
 	return _default.DefaultConnectionMetadata{
 		ProvidesReading:     true,
 		ProvidesWriting:     true,
@@ -107,29 +110,61 @@ func (m *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
 	}
 }
 
-func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
-	return internalModel.NewDefaultPlcReadRequestBuilder(m.GetPlcFieldHandler(), NewReader(&m.alphaGenerator, m.messageCodec, m.tm))
+func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
+	connectionConnectResult := c.DefaultConnection.Connect()
+	ch := make(chan plc4go.PlcConnectionConnectResult)
+	go func() {
+		connectResult := <-connectionConnectResult
+		if connectResult.GetErr() == nil {
+			log.Debug().Msg("Starting subscription handler")
+			go func() {
+				for c.IsConnected() {
+					log.Debug().Msg("Handling incoming message")
+					for monitoredSal := range c.messageCodec.(*MessageCodec).monitoredSALs {
+						for _, subscriber := range c.subscribers {
+							subscriber.handleMonitoredSal(monitoredSal)
+						}
+					}
+				}
+			}()
+		}
+		ch <- connectResult
+	}()
+	return ch
+}
+
+func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
+	return internalModel.NewDefaultPlcReadRequestBuilder(c.GetPlcFieldHandler(), NewReader(&c.alphaGenerator, c.messageCodec, c.tm))
 }
 
-func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
-	return internalModel.NewDefaultPlcWriteRequestBuilder(m.GetPlcFieldHandler(), m.GetPlcValueHandler(), NewWriter(&m.alphaGenerator, m.messageCodec, m.tm))
+func (c *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
+	return internalModel.NewDefaultPlcWriteRequestBuilder(c.GetPlcFieldHandler(), c.GetPlcValueHandler(), NewWriter(&c.alphaGenerator, c.messageCodec, c.tm))
 }
 
-func (m *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
-	// TODO: where do we get the subscriber from
-	return internalModel.NewDefaultPlcSubscriptionRequestBuilder(m.GetPlcFieldHandler(), m.GetPlcValueHandler(), nil)
+func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
+	return internalModel.NewDefaultPlcSubscriptionRequestBuilder(c.GetPlcFieldHandler(), c.GetPlcValueHandler(), NewSubscriber(c))
 }
 
-func (m *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder {
+func (c *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder {
 	// TODO: where do we get the unsubscriber from
 	return nil
 }
 
-func (m *Connection) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder {
+func (c *Connection) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder {
 	// TODO: where do we get the browser from
 	return internalModel.NewDefaultPlcBrowseRequestBuilder(nil)
 }
 
-func (m *Connection) String() string {
+func (c *Connection) addSubscriber(subscriber *Subscriber) {
+	for _, sub := range c.subscribers {
+		if sub == subscriber {
+			log.Debug().Msgf("Subscriber %v already added", subscriber)
+			return
+		}
+	}
+	c.subscribers = append(c.subscribers, subscriber)
+}
+
+func (c *Connection) String() string {
 	return fmt.Sprintf("cbus.Connection")
 }
diff --git a/plc4go/internal/cbus/Field.go b/plc4go/internal/cbus/Field.go
index 448a43c14..69858bab2 100644
--- a/plc4go/internal/cbus/Field.go
+++ b/plc4go/internal/cbus/Field.go
@@ -100,6 +100,22 @@ func NewCALGetstatusField(unitAddress readWriteModel.UnitAddress, parameter read
 	}
 }
 
+// SALMonitorField can be used to monitor fields
+type SALMonitorField interface {
+	model.PlcField
+	GetUnitAddress() readWriteModel.UnitAddress
+	GetApplication() readWriteModel.ApplicationIdContainer
+}
+
+func NewSALMonitorField(unitAddress readWriteModel.UnitAddress, application readWriteModel.ApplicationIdContainer, numElements uint16) SALMonitorField {
+	return &salMonitorField{
+		fieldType:   SAL_MONITOR,
+		unitAddress: unitAddress,
+		application: application,
+		numElements: numElements,
+	}
+}
+
 ///////////////////////////////////////
 ///////////////////////////////////////
 //
@@ -141,6 +157,13 @@ type calGetstatusField struct {
 	numElements uint16
 }
 
+type salMonitorField struct {
+	fieldType   FieldType
+	unitAddress readWriteModel.UnitAddress
+	application readWriteModel.ApplicationIdContainer
+	numElements uint16
+}
+
 //
 // Internal section
 //
@@ -317,3 +340,41 @@ func (m calGetstatusField) Serialize(writeBuffer utils.WriteBuffer) error {
 	}
 	return nil
 }
+
+func (s salMonitorField) GetAddressString() string {
+	return fmt.Sprintf("%d/%s%s[%d]", s.fieldType, s.unitAddress, s.application, s.numElements)
+}
+
+func (s salMonitorField) GetTypeName() string {
+	return s.fieldType.GetName()
+}
+
+func (s salMonitorField) GetQuantity() uint16 {
+	return s.numElements
+}
+
+func (s salMonitorField) GetUnitAddress() readWriteModel.UnitAddress {
+	return s.unitAddress
+}
+
+func (s salMonitorField) GetApplication() readWriteModel.ApplicationIdContainer {
+	return s.application
+}
+
+func (s salMonitorField) Serialize(writeBuffer utils.WriteBuffer) error {
+	if err := writeBuffer.PushContext(s.fieldType.GetName()); err != nil {
+		return err
+	}
+
+	if err := s.unitAddress.Serialize(writeBuffer); err != nil {
+		return err
+	}
+	if err := s.application.Serialize(writeBuffer); err != nil {
+		return err
+	}
+
+	if err := writeBuffer.PopContext(s.fieldType.GetName()); err != nil {
+		return err
+	}
+	return nil
+}
diff --git a/plc4go/internal/cbus/FieldHandler.go b/plc4go/internal/cbus/FieldHandler.go
index f210d4afd..4c9c742ff 100644
--- a/plc4go/internal/cbus/FieldHandler.go
+++ b/plc4go/internal/cbus/FieldHandler.go
@@ -38,6 +38,7 @@ const (
 	CAL_RECALL
 	CAL_IDENTIFY
 	CAL_GETSTATUS
+	SAL_MONITOR
 )
 
 func (i FieldType) GetName() string {
@@ -47,12 +48,14 @@ func (i FieldType) GetName() string {
 type FieldHandler struct {
 	statusRequestPattern *regexp.Regexp
 	calPattern           *regexp.Regexp
+	monitorPattern       *regexp.Regexp
 }
 
 func NewFieldHandler() FieldHandler {
 	return FieldHandler{
 		statusRequestPattern: regexp.MustCompile(`^status/(?P<statusRequestType>(?P<binary>binary)|level=0x(?P<startingGroupAddressLabel>00|20|40|60|80|A0|C0|E0))/(?P<application>.*)`),
 		calPattern:           regexp.MustCompile(`^cal/(?P<unitAddress>.*)/(?P<calType>recall=\[(?P<recallParamNo>[\w\d]+), ?(?P<recallCount>\d+)]|identify=\[(?P<identifyAttribute>[\w\d]+)]|getstatus=\[(?P<getstatusParamNo>[\w\d]+), ?(?P<getstatusCount>\d+)])`),
+		monitorPattern:       regexp.MustCompile(`^monitor/(?P<unitAddress>.*)/(?P<application>.*)`),
 	}
 }
 
@@ -76,78 +79,9 @@ func (m FieldHandler) ParseQuery(query string) (model.PlcField, error) {
 				return nil, errors.Errorf("Unknown statusRequestType%s", statusRequestArgument)
 			}
 		}
-		var application readWriteModel.ApplicationIdContainer
-		applicationIdArgument := match["application"]
-		if strings.HasPrefix(applicationIdArgument, "0x") {
-			decodedHex, err := hex.DecodeString(applicationIdArgument[2:])
-			if err != nil {
-				return nil, errors.Wrap(err, "Not a valid hex")
-			}
-			if len(decodedHex) != 1 {
-				return nil, errors.Errorf("Hex must be exatly one byte")
-			}
-			application = readWriteModel.ApplicationIdContainer(decodedHex[0])
-		} else {
-			atoi, err := strconv.ParseUint(applicationIdArgument, 10, 8)
-			if err != nil {
-				application = readWriteModel.ApplicationIdContainer(atoi)
-			} else {
-				// We try first the application id
-				applicationId, ok := readWriteModel.ApplicationIdByName(applicationIdArgument)
-				if ok {
-					switch applicationId {
-					case readWriteModel.ApplicationId_TEMPERATURE_BROADCAST:
-						application = readWriteModel.ApplicationIdContainer_TEMPERATURE_BROADCAST_19
-					case readWriteModel.ApplicationId_ROOM_CONTROL_SYSTEM:
-						application = readWriteModel.ApplicationIdContainer_ROOM_CONTROL_SYSTEM_26
-					case readWriteModel.ApplicationId_LIGHTING:
-						application = readWriteModel.ApplicationIdContainer_LIGHTING_38
-					case readWriteModel.ApplicationId_VENTILATION:
-						application = readWriteModel.ApplicationIdContainer_VENTILATION_70
-					case readWriteModel.ApplicationId_IRRIGATION_CONTROL:
-						application = readWriteModel.ApplicationIdContainer_IRRIGATION_CONTROL_71
-					case readWriteModel.ApplicationId_POOLS_SPAS_PONDS_FOUNTAINS_CONTROL:
-						application = readWriteModel.ApplicationIdContainer_POOLS_SPAS_PONDS_FOUNTAINS_CONTROL_72
-					case readWriteModel.ApplicationId_HEATING:
-						application = readWriteModel.ApplicationIdContainer_HEATING_88
-					case readWriteModel.ApplicationId_AIR_CONDITIONING:
-						application = readWriteModel.ApplicationIdContainer_AIR_CONDITIONING_AC
-					case readWriteModel.ApplicationId_TRIGGER_CONTROL:
-						application = readWriteModel.ApplicationIdContainer_TRIGGER_CONTROL_CA
-					case readWriteModel.ApplicationId_ENABLE_CONTROL:
-						application = readWriteModel.ApplicationIdContainer_ENABLE_CONTROL_CB
-					case readWriteModel.ApplicationId_AUDIO_AND_VIDEO:
-						application = readWriteModel.ApplicationIdContainer_AUDIO_AND_VIDEO_CD
-					case readWriteModel.ApplicationId_SECURITY:
-						application = readWriteModel.ApplicationIdContainer_SECURITY_D0
-					case readWriteModel.ApplicationId_METERING:
-						application = readWriteModel.ApplicationIdContainer_METERING_D1
-					case readWriteModel.ApplicationId_ACCESS_CONTROL:
-						application = readWriteModel.ApplicationIdContainer_ACCESS_CONTROL_D5
-					case readWriteModel.ApplicationId_CLOCK_AND_TIMEKEEPING:
-						application = readWriteModel.ApplicationIdContainer_CLOCK_AND_TIMEKEEPING_DF
-					case readWriteModel.ApplicationId_TELEPHONY_STATUS_AND_CONTROL:
-						application = readWriteModel.ApplicationIdContainer_TELEPHONY_STATUS_AND_CONTROL_E0
-					case readWriteModel.ApplicationId_MEASUREMENT:
-						application = readWriteModel.ApplicationIdContainer_MEASUREMENT_E4
-					case readWriteModel.ApplicationId_TESTING:
-						application = readWriteModel.ApplicationIdContainer_TESTING_FA
-					case readWriteModel.ApplicationId_MEDIA_TRANSPORT_CONTROL:
-						application = readWriteModel.ApplicationIdContainer_MEDIA_TRANSPORT_CONTROL_C0
-					case readWriteModel.ApplicationId_ERROR_REPORTING:
-						application = readWriteModel.ApplicationIdContainer_ERROR_REPORTING_CE
-					case readWriteModel.ApplicationId_HVAC_ACTUATOR:
-					default:
-						return nil, errors.Errorf("%s can't be used directly... select proper application id container", applicationId)
-					}
-				} else {
-					applicationIdByName, ok := readWriteModel.ApplicationIdContainerByName(applicationIdArgument)
-					if !ok {
-						return nil, errors.Errorf("Unknown applicationId%s", applicationIdArgument)
-					}
-					application = applicationIdByName
-				}
-			}
+		application, err := applicationIdFromArgument(match["application"])
+		if err != nil {
+			return nil, errors.Wrap(err, "Error getting application id from argument")
 		}
 		return NewStatusField(statusRequestType, startingGroupAddressLabel, application, 1), nil
 	} else if match := utils.GetSubgroupMatches(m.calPattern, query); match != nil {
@@ -262,7 +196,119 @@ func (m FieldHandler) ParseQuery(query string) (model.PlcField, error) {
 		default:
 			return nil, errors.Errorf("Invalid cal type %s", calTypeArgument)
 		}
+	} else if match := utils.GetSubgroupMatches(m.monitorPattern, query); match != nil {
+		var unitAddress readWriteModel.UnitAddress
+		{
+			unitAddressArgument := match["unitAddress"]
+			if unitAddressArgument == "*" {
+				unitAddress = nil
+			} else if strings.HasPrefix(unitAddressArgument, "0x") {
+				decodedHex, err := hex.DecodeString(unitAddressArgument[2:])
+				if err != nil {
+					return nil, errors.Wrap(err, "Not a valid hex")
+				}
+				if len(decodedHex) != 1 {
+					return nil, errors.Errorf("Hex must be exatly one byte")
+				}
+				unitAddress = readWriteModel.NewUnitAddress(decodedHex[0])
+			} else {
+				atoi, err := strconv.ParseUint(unitAddressArgument, 10, 8)
+				if err != nil {
+					return nil, errors.Errorf("Unknown unit address %s", unitAddressArgument)
+				}
+				unitAddress = readWriteModel.NewUnitAddress(byte(atoi))
+			}
+		}
+
+		var application readWriteModel.ApplicationIdContainer
+		{
+			applicationIdArgument := match["application"]
+			if applicationIdArgument == "*" {
+				application = readWriteModel.ApplicationIdContainer_RESERVED_FF
+			} else {
+				var err error
+				application, err = applicationIdFromArgument(applicationIdArgument)
+				if err != nil {
+					return nil, errors.Wrap(err, "Error getting application id from argument")
+				}
+			}
+		}
+
+		return NewSALMonitorField(unitAddress, application, 1), nil
 	} else {
 		return nil, errors.Errorf("Unable to parse %s", query)
 	}
 }
+
+func applicationIdFromArgument(applicationIdArgument string) (readWriteModel.ApplicationIdContainer, error) {
+	if strings.HasPrefix(applicationIdArgument, "0x") {
+		decodedHex, err := hex.DecodeString(applicationIdArgument[2:])
+		if err != nil {
+			return 0, errors.Wrap(err, "Not a valid hex")
+		}
+		if len(decodedHex) != 1 {
+			return 0, errors.Errorf("Hex must be exatly one byte")
+		}
+		return readWriteModel.ApplicationIdContainer(decodedHex[0]), nil
+	}
+	atoi, err := strconv.ParseUint(applicationIdArgument, 10, 8)
+	if err != nil {
+		return readWriteModel.ApplicationIdContainer(atoi), nil
+	}
+	// We try first the application id
+	applicationId, ok := readWriteModel.ApplicationIdByName(applicationIdArgument)
+	if ok {
+		switch applicationId {
+		case readWriteModel.ApplicationId_TEMPERATURE_BROADCAST:
+			return readWriteModel.ApplicationIdContainer_TEMPERATURE_BROADCAST_19, nil
+		case readWriteModel.ApplicationId_ROOM_CONTROL_SYSTEM:
+			return readWriteModel.ApplicationIdContainer_ROOM_CONTROL_SYSTEM_26, nil
+		case readWriteModel.ApplicationId_LIGHTING:
+			return readWriteModel.ApplicationIdContainer_LIGHTING_38, nil
+		case readWriteModel.ApplicationId_VENTILATION:
+			return readWriteModel.ApplicationIdContainer_VENTILATION_70, nil
+		case readWriteModel.ApplicationId_IRRIGATION_CONTROL:
+			return readWriteModel.ApplicationIdContainer_IRRIGATION_CONTROL_71, nil
+		case readWriteModel.ApplicationId_POOLS_SPAS_PONDS_FOUNTAINS_CONTROL:
+			return readWriteModel.ApplicationIdContainer_POOLS_SPAS_PONDS_FOUNTAINS_CONTROL_72, nil
+		case readWriteModel.ApplicationId_HEATING:
+			return readWriteModel.ApplicationIdContainer_HEATING_88, nil
+		case readWriteModel.ApplicationId_AIR_CONDITIONING:
+			return readWriteModel.ApplicationIdContainer_AIR_CONDITIONING_AC, nil
+		case readWriteModel.ApplicationId_TRIGGER_CONTROL:
+			return readWriteModel.ApplicationIdContainer_TRIGGER_CONTROL_CA, nil
+		case readWriteModel.ApplicationId_ENABLE_CONTROL:
+			return readWriteModel.ApplicationIdContainer_ENABLE_CONTROL_CB, nil
+		case readWriteModel.ApplicationId_AUDIO_AND_VIDEO:
+			return readWriteModel.ApplicationIdContainer_AUDIO_AND_VIDEO_CD, nil
+		case readWriteModel.ApplicationId_SECURITY:
+			return readWriteModel.ApplicationIdContainer_SECURITY_D0, nil
+		case readWriteModel.ApplicationId_METERING:
+			return readWriteModel.ApplicationIdContainer_METERING_D1, nil
+		case readWriteModel.ApplicationId_ACCESS_CONTROL:
+			return readWriteModel.ApplicationIdContainer_ACCESS_CONTROL_D5, nil
+		case readWriteModel.ApplicationId_CLOCK_AND_TIMEKEEPING:
+			return readWriteModel.ApplicationIdContainer_CLOCK_AND_TIMEKEEPING_DF, nil
+		case readWriteModel.ApplicationId_TELEPHONY_STATUS_AND_CONTROL:
+			return readWriteModel.ApplicationIdContainer_TELEPHONY_STATUS_AND_CONTROL_E0, nil
+		case readWriteModel.ApplicationId_MEASUREMENT:
+			return readWriteModel.ApplicationIdContainer_MEASUREMENT_E4, nil
+		case readWriteModel.ApplicationId_TESTING:
+			return readWriteModel.ApplicationIdContainer_TESTING_FA, nil
+		case readWriteModel.ApplicationId_MEDIA_TRANSPORT_CONTROL:
+			return readWriteModel.ApplicationIdContainer_MEDIA_TRANSPORT_CONTROL_C0, nil
+		case readWriteModel.ApplicationId_ERROR_REPORTING:
+			return readWriteModel.ApplicationIdContainer_ERROR_REPORTING_CE, nil
+		case readWriteModel.ApplicationId_HVAC_ACTUATOR:
+			return readWriteModel.ApplicationIdContainer_HVAC_ACTUATOR_73, nil
+		default:
+			return 0, errors.Errorf("%s can't be used directly... select proper application id container", applicationId)
+		}
+	} else {
+		applicationIdByName, ok := readWriteModel.ApplicationIdContainerByName(applicationIdArgument)
+		if !ok {
+			return 0, errors.Errorf("Unknown applicationId%s", applicationIdArgument)
+		}
+		return applicationIdByName, nil
+	}
+}
diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index 2a19aa28e..f88c92053 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -33,8 +33,10 @@ import (
 type MessageCodec struct {
 	_default.DefaultCodec
 
-	requestContext  readwriteModel.RequestContext
-	cbusOptions     readwriteModel.CBusOptions
+	requestContext readwriteModel.RequestContext
+	cbusOptions    readwriteModel.CBusOptions
+
+	monitoredSALs   chan readwriteModel.MonitoredSAL
 	lastPackageHash uint32
 	hashEncountered uint
 }
@@ -43,8 +45,25 @@ func NewMessageCodec(transportInstance transports.TransportInstance, srchk bool)
 	codec := &MessageCodec{
 		requestContext: readwriteModel.NewRequestContext(false, false, false),
 		cbusOptions:    readwriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, srchk),
+		monitoredSALs:  make(chan readwriteModel.MonitoredSAL, 100),
 	}
-	codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance)
+	codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance, _default.WithCustomMessageHandler(func(codec _default.DefaultCodecRequirements, message spi.Message) bool {
+		switch message := message.(type) {
+		case readwriteModel.CBusMessageToClientExactly:
+			switch reply := message.GetReply().(type) {
+			case readwriteModel.ReplyOrConfirmationReplyExactly:
+				switch reply := reply.GetReply().(type) {
+				case readwriteModel.ReplyEncodedReplyExactly:
+					switch encodedReply := reply.GetEncodedReply().(type) {
+					case readwriteModel.MonitoredSALReplyExactly:
+						codec.(*MessageCodec).monitoredSALs <- encodedReply.GetMonitoredSAL()
+						return true
+					}
+				}
+			}
+		}
+		return false
+	}))
 	return codec
 }
 
diff --git a/plc4go/internal/cbus/Subscriber.go b/plc4go/internal/cbus/Subscriber.go
new file mode 100644
index 000000000..972ccba1b
--- /dev/null
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package cbus
+
+import (
+	"fmt"
+	spiModel "github.com/apache/plc4x/plc4go/internal/spi/model"
+	spiValues "github.com/apache/plc4x/plc4go/internal/spi/values"
+	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
+	"github.com/apache/plc4x/plc4go/pkg/api/values"
+	"github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
+	"github.com/rs/zerolog/log"
+	"time"
+)
+
+type Subscriber struct {
+	connection           *Connection
+	subscriptionRequests []spiModel.DefaultPlcSubscriptionRequest
+}
+
+func NewSubscriber(connection *Connection) *Subscriber {
+	return &Subscriber{
+		connection:           connection,
+		subscriptionRequests: []spiModel.DefaultPlcSubscriptionRequest{},
+	}
+}
+
+func (m *Subscriber) Subscribe(subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
+	result := make(chan apiModel.PlcSubscriptionRequestResult)
+	go func() {
+		// Add this subscriber to the connection.
+		m.connection.addSubscriber(m)
+
+		// Save the subscription request
+		m.subscriptionRequests = append(m.subscriptionRequests, subscriptionRequest.(spiModel.DefaultPlcSubscriptionRequest))
+
+		// Just populate all requests with an OK
+		responseCodes := map[string]apiModel.PlcResponseCode{}
+		for _, fieldName := range subscriptionRequest.GetFieldNames() {
+			responseCodes[fieldName] = apiModel.PlcResponseCode_OK
+		}
+
+		result <- &spiModel.DefaultPlcSubscriptionRequestResult{
+			Request:  subscriptionRequest,
+			Response: spiModel.NewDefaultPlcSubscriptionResponse(subscriptionRequest, responseCodes),
+			Err:      nil,
+		}
+	}()
+	return result
+}
+
+func (m *Subscriber) Unsubscribe(unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
+	result := make(chan apiModel.PlcUnsubscriptionRequestResult)
+
+	// TODO: As soon as we establish a connection, we start getting data...
+	// subscriptions are more an internal handling of which values to pass where.
+
+	return result
+}
+
+func (m *Subscriber) handleMonitoredSal(sal model.MonitoredSAL) {
+	for _, subscriptionRequest := range m.subscriptionRequests {
+		fields := map[string]apiModel.PlcField{}
+		types := map[string]spiModel.SubscriptionType{}
+		intervals := map[string]time.Duration{}
+		responseCodes := map[string]apiModel.PlcResponseCode{}
+		address := map[string]string{}
+		plcValues := map[string]values.PlcValue{}
+
+		for _, fieldName := range subscriptionRequest.GetFieldNames() {
+			field, ok := subscriptionRequest.GetField(fieldName).(SALMonitorField)
+			if !ok {
+				log.Warn().Msgf("Unusable field for subscription %s", field)
+				responseCodes[fieldName] = apiModel.PlcResponseCode_INVALID_ADDRESS
+				plcValues[fieldName] = nil
+				continue
+			}
+			if unitAddress := field.GetUnitAddress(); unitAddress != nil {
+				// TODO: filter in unit address
+			}
+			application := field.GetApplication()
+			// TODO: filter in unit address
+			_ = application
+
+			subscriptionType := subscriptionRequest.GetType(fieldName)
+			// TODO: handle subscriptionType
+			_ = subscriptionType
+
+			fields[fieldName] = field
+			types[fieldName] = subscriptionRequest.GetType(fieldName)
+			intervals[fieldName] = subscriptionRequest.GetInterval(fieldName)
+
+			var salData model.SALData
+			var unitAddressString, applicationString string
+			switch sal := sal.(type) {
+			case model.MonitoredSALLongFormSmartModeExactly:
+				if sal.GetIsUnitAddress() {
+					unitAddressString = fmt.Sprintf("u%d", sal.GetUnitAddress().GetAddress())
+				} else {
+					unitAddressString = fmt.Sprintf("b%d", sal.GetBridgeAddress().GetAddress())
+					replyNetwork := sal.GetReplyNetwork()
+					for _, bridgeAddress := range replyNetwork.GetNetworkRoute().GetAdditionalBridgeAddresses() {
+						unitAddressString += fmt.Sprintf("-b%d", bridgeAddress.GetAddress())
+					}
+					unitAddressString += fmt.Sprintf("-u%d", replyNetwork.GetUnitAddress().GetAddress())
+				}
+				applicationString = sal.GetApplication().ApplicationId().String()
+				salData = sal.GetSalData()
+			case model.MonitoredSALShortFormBasicModeExactly:
+				unitAddressString = "u0" // On short form it should be always unit 0 TODO: double check that
+				applicationString = sal.GetApplication().ApplicationId().String()
+				salData = sal.GetSalData()
+			}
+			// TODO: we might need to encode more data into the address from sal data
+			address[fieldName] = fmt.Sprintf("/%s/%s", unitAddressString, applicationString)
+
+			// TODO: map values properly
+			plcValues[fieldName] = spiValues.NewPlcSTRING(fmt.Sprintf("%s", salData))
+			responseCodes[fieldName] = apiModel.PlcResponseCode_OK
+
+			// Assemble a PlcSubscription event
+			if len(plcValues) > 0 {
+				event := NewSubscriptionEvent(fields, types, intervals, responseCodes, address, plcValues)
+				eventHandler := subscriptionRequest.GetEventHandler()
+				eventHandler(event)
+			}
+		}
+	}
+}
diff --git a/plc4go/internal/knxnetip/SubscriptionEvent.go b/plc4go/internal/cbus/SubscriptionEvent.go
similarity index 60%
copy from plc4go/internal/knxnetip/SubscriptionEvent.go
copy to plc4go/internal/cbus/SubscriptionEvent.go
index 7a6194f3b..8783bedaf 100644
--- a/plc4go/internal/knxnetip/SubscriptionEvent.go
+++ b/plc4go/internal/cbus/SubscriptionEvent.go
@@ -17,27 +17,25 @@
  * under the License.
  */
 
-package knxnetip
+package cbus
 
 import (
 	internalMode "github.com/apache/plc4x/plc4go/internal/spi/model"
-	"github.com/apache/plc4x/plc4go/internal/spi/utils"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
 	"github.com/apache/plc4x/plc4go/pkg/api/values"
-	driverModel "github.com/apache/plc4x/plc4go/protocols/knxnetip/readwrite/model"
 	"time"
 )
 
 type SubscriptionEvent struct {
-	addresses map[string][]byte
 	internalMode.DefaultPlcSubscriptionEvent
+	address map[string]string
 }
 
 func NewSubscriptionEvent(fields map[string]apiModel.PlcField, types map[string]internalMode.SubscriptionType,
 	intervals map[string]time.Duration, responseCodes map[string]apiModel.PlcResponseCode,
-	addresses map[string][]byte, values map[string]values.PlcValue) SubscriptionEvent {
+	address map[string]string, values map[string]values.PlcValue) SubscriptionEvent {
 	return SubscriptionEvent{
-		addresses:                   addresses,
+		address:                     address,
 		DefaultPlcSubscriptionEvent: internalMode.NewDefaultPlcSubscriptionEvent(fields, types, intervals, responseCodes, values),
 	}
 }
@@ -46,23 +44,6 @@ func (m SubscriptionEvent) GetRequest() apiModel.PlcSubscriptionRequest {
 	panic("implement me")
 }
 
-// GetAddress Decode the binary data in the address according to the field requested
 func (m SubscriptionEvent) GetAddress(name string) string {
-	rawAddress := m.addresses[name]
-	rawAddressReadBuffer := utils.NewReadBufferByteBased(rawAddress)
-	field := m.DefaultPlcSubscriptionEvent.GetField(name)
-	var groupAddress driverModel.KnxGroupAddress
-	var err error
-	switch field.(type) {
-	case GroupAddress3LevelPlcField:
-		groupAddress, err = driverModel.KnxGroupAddressParse(rawAddressReadBuffer, 3)
-	case GroupAddress2LevelPlcField:
-		groupAddress, err = driverModel.KnxGroupAddressParse(rawAddressReadBuffer, 2)
-	case GroupAddress1LevelPlcField:
-		groupAddress, err = driverModel.KnxGroupAddressParse(rawAddressReadBuffer, 1)
-	}
-	if err != nil {
-		return ""
-	}
-	return GroupAddressToString(groupAddress)
+	return m.address[name]
 }
diff --git a/plc4go/internal/cbus/fieldtype_string.go b/plc4go/internal/cbus/fieldtype_string.go
index ec833d764..f32a0dd95 100644
--- a/plc4go/internal/cbus/fieldtype_string.go
+++ b/plc4go/internal/cbus/fieldtype_string.go
@@ -29,11 +29,12 @@ func _() {
 	_ = x[CAL_RECALL-1]
 	_ = x[CAL_IDENTIFY-2]
 	_ = x[CAL_GETSTATUS-3]
+	_ = x[SAL_MONITOR-4]
 }
 
-const _FieldType_name = "STATUSCAL_RECALLCAL_IDENTIFYCAL_GETSTATUS"
+const _FieldType_name = "STATUSCAL_RECALLCAL_IDENTIFYCAL_GETSTATUSSAL_MONITOR"
 
-var _FieldType_index = [...]uint8{0, 6, 16, 28, 41}
+var _FieldType_index = [...]uint8{0, 6, 16, 28, 41, 52}
 
 func (i FieldType) String() string {
 	if i >= FieldType(len(_FieldType_index)-1) {
diff --git a/plc4go/internal/knxnetip/SubscriptionEvent.go b/plc4go/internal/knxnetip/SubscriptionEvent.go
index 7a6194f3b..6824ed73b 100644
--- a/plc4go/internal/knxnetip/SubscriptionEvent.go
+++ b/plc4go/internal/knxnetip/SubscriptionEvent.go
@@ -29,8 +29,8 @@ import (
 )
 
 type SubscriptionEvent struct {
-	addresses map[string][]byte
 	internalMode.DefaultPlcSubscriptionEvent
+	addresses map[string][]byte
 }
 
 func NewSubscriptionEvent(fields map[string]apiModel.PlcField, types map[string]internalMode.SubscriptionType,
diff --git a/plc4go/internal/spi/model/DefaultPlcReadRequest.go b/plc4go/internal/spi/model/DefaultPlcReadRequest.go
index d78cc4df5..5d1714fc1 100644
--- a/plc4go/internal/spi/model/DefaultPlcReadRequest.go
+++ b/plc4go/internal/spi/model/DefaultPlcReadRequest.go
@@ -172,3 +172,11 @@ func (m DefaultPlcReadRequest) Serialize(writeBuffer utils.WriteBuffer) error {
 	}
 	return nil
 }
+
+func (m DefaultPlcReadRequest) String() string {
+	writeBuffer := utils.NewBoxedWriteBufferWithOptions(true, true)
+	if err := writeBuffer.WriteSerializable(m); err != nil {
+		return err.Error()
+	}
+	return writeBuffer.GetBox().String()
+}
diff --git a/plc4go/internal/spi/model/DefaultPlcReadResponse.go b/plc4go/internal/spi/model/DefaultPlcReadResponse.go
index 5b7e5ac04..8d92b66ea 100644
--- a/plc4go/internal/spi/model/DefaultPlcReadResponse.go
+++ b/plc4go/internal/spi/model/DefaultPlcReadResponse.go
@@ -113,3 +113,11 @@ func (m DefaultPlcReadResponse) Serialize(writeBuffer utils.WriteBuffer) error {
 	}
 	return nil
 }
+
+func (m DefaultPlcReadResponse) String() string {
+	writeBuffer := utils.NewBoxedWriteBufferWithOptions(true, true)
+	if err := writeBuffer.WriteSerializable(m); err != nil {
+		return err.Error()
+	}
+	return writeBuffer.GetBox().String()
+}
diff --git a/plc4go/internal/spi/model/DefaultPlcSubscriptionEvent.go b/plc4go/internal/spi/model/DefaultPlcSubscriptionEvent.go
index 8f8dc0ace..2e9119052 100644
--- a/plc4go/internal/spi/model/DefaultPlcSubscriptionEvent.go
+++ b/plc4go/internal/spi/model/DefaultPlcSubscriptionEvent.go
@@ -102,3 +102,11 @@ func (m DefaultPlcSubscriptionEvent) Serialize(writeBuffer utils.WriteBuffer) er
 	}
 	return nil
 }
+
+func (m DefaultPlcSubscriptionEvent) String() string {
+	writeBuffer := utils.NewBoxedWriteBufferWithOptions(true, true)
+	if err := writeBuffer.WriteSerializable(m); err != nil {
+		return err.Error()
+	}
+	return writeBuffer.GetBox().String()
+}
diff --git a/plc4go/internal/spi/model/DefaultPlcSubscriptionRequest.go b/plc4go/internal/spi/model/DefaultPlcSubscriptionRequest.go
index 4f805a597..6ad832991 100644
--- a/plc4go/internal/spi/model/DefaultPlcSubscriptionRequest.go
+++ b/plc4go/internal/spi/model/DefaultPlcSubscriptionRequest.go
@@ -183,3 +183,11 @@ func (m DefaultPlcSubscriptionRequest) Serialize(writeBuffer utils.WriteBuffer)
 	}
 	return nil
 }
+
+func (m DefaultPlcSubscriptionRequest) String() string {
+	writeBuffer := utils.NewBoxedWriteBufferWithOptions(true, true)
+	if err := writeBuffer.WriteSerializable(m); err != nil {
+		return err.Error()
+	}
+	return writeBuffer.GetBox().String()
+}
diff --git a/plc4go/internal/spi/model/DefaultPlcSubscriptionResponse.go b/plc4go/internal/spi/model/DefaultPlcSubscriptionResponse.go
index 3ae453f58..d30d3da77 100644
--- a/plc4go/internal/spi/model/DefaultPlcSubscriptionResponse.go
+++ b/plc4go/internal/spi/model/DefaultPlcSubscriptionResponse.go
@@ -89,3 +89,11 @@ func (m DefaultPlcSubscriptionResponse) Serialize(writeBuffer utils.WriteBuffer)
 	}
 	return nil
 }
+
+func (m DefaultPlcSubscriptionResponse) String() string {
+	writeBuffer := utils.NewBoxedWriteBufferWithOptions(true, true)
+	if err := writeBuffer.WriteSerializable(m); err != nil {
+		return err.Error()
+	}
+	return writeBuffer.GetBox().String()
+}
diff --git a/plc4go/internal/spi/model/DefaultPlcWriteRequest.go b/plc4go/internal/spi/model/DefaultPlcWriteRequest.go
index 1627f0528..c17df7847 100644
--- a/plc4go/internal/spi/model/DefaultPlcWriteRequest.go
+++ b/plc4go/internal/spi/model/DefaultPlcWriteRequest.go
@@ -230,3 +230,11 @@ func (m DefaultPlcWriteRequest) Serialize(writeBuffer utils.WriteBuffer) error {
 	}
 	return nil
 }
+
+func (m DefaultPlcWriteRequest) String() string {
+	writeBuffer := utils.NewBoxedWriteBufferWithOptions(true, true)
+	if err := writeBuffer.WriteSerializable(m); err != nil {
+		return err.Error()
+	}
+	return writeBuffer.GetBox().String()
+}
diff --git a/plc4go/internal/spi/model/DefaultPlcWriteResponse.go b/plc4go/internal/spi/model/DefaultPlcWriteResponse.go
index 4470feafa..aaa423153 100644
--- a/plc4go/internal/spi/model/DefaultPlcWriteResponse.go
+++ b/plc4go/internal/spi/model/DefaultPlcWriteResponse.go
@@ -80,3 +80,11 @@ func (m DefaultPlcWriteResponse) Serialize(writeBuffer utils.WriteBuffer) error
 	}
 	return nil
 }
+
+func (m DefaultPlcWriteResponse) String() string {
+	writeBuffer := utils.NewBoxedWriteBufferWithOptions(true, true)
+	if err := writeBuffer.WriteSerializable(m); err != nil {
+		return err.Error()
+	}
+	return writeBuffer.GetBox().String()
+}
diff --git a/plc4go/internal/spi/testutils/ManualTestRunner.go b/plc4go/internal/spi/testutils/ManualTestRunner.go
index bcdcb5e73..dbf22becc 100644
--- a/plc4go/internal/spi/testutils/ManualTestRunner.go
+++ b/plc4go/internal/spi/testutils/ManualTestRunner.go
@@ -57,12 +57,12 @@ func (m *ManualTestSuite) AddTestCase(address string, expectedReadValue interfac
 	m.TestCases = append(m.TestCases, ManualTestCase{address, expectedReadValue, nil})
 }
 
-func (m *ManualTestSuite) Run() {
+func (m *ManualTestSuite) Run() plc4go.PlcConnection {
 	connectionResult := <-m.DriverManager.GetConnection(m.ConnectionString)
 	if connectionResult.GetErr() != nil {
 		panic(connectionResult.GetErr())
 	}
-	connection := connectionResult
+	connection := connectionResult.GetConnection()
 	log.Info().Msg("Reading all types in separate requests")
 	// Run all entries separately:
 	for _, testCase := range m.TestCases {
@@ -74,11 +74,12 @@ func (m *ManualTestSuite) Run() {
 	m.t.Run("combinedTest", func(t *testing.T) {
 		m.runBurstTest(t, connection)
 	})
+	return connection
 }
 
-func (m *ManualTestSuite) runSingleTest(t *testing.T, connection plc4go.PlcConnectionConnectResult, fieldName string, testCase ManualTestCase) {
+func (m *ManualTestSuite) runSingleTest(t *testing.T, connection plc4go.PlcConnection, fieldName string, testCase ManualTestCase) {
 	// Prepare the read-request
-	readRequestBuilder := connection.GetConnection().ReadRequestBuilder()
+	readRequestBuilder := connection.ReadRequestBuilder()
 	readRequestBuilder.AddQuery(fieldName, testCase.Address)
 	readRequest, err := readRequestBuilder.Build()
 	if err != nil {
@@ -110,7 +111,7 @@ func (m *ManualTestSuite) runSingleTest(t *testing.T, connection plc4go.PlcConne
 	}
 }
 
-func (m *ManualTestSuite) runBurstTest(t *testing.T, connection plc4go.PlcConnectionConnectResult) {
+func (m *ManualTestSuite) runBurstTest(t *testing.T, connection plc4go.PlcConnection) {
 	// Read all items in one big request.
 	// Shuffle the list of test cases and run the test 10 times.
 	log.Info().Msg("Reading all items together in random order")
@@ -129,7 +130,7 @@ func (m *ManualTestSuite) runBurstTest(t *testing.T, connection plc4go.PlcConnec
 		}
 		log.Info().Msgf("       using order: %s", sb.String())
 
-		builder := connection.GetConnection().ReadRequestBuilder()
+		builder := connection.ReadRequestBuilder()
 		for _, testCase := range shuffledTestcases {
 			fieldName := testCase.Address
 			builder.AddQuery(fieldName, testCase.Address)
diff --git a/plc4go/protocols/cbus/readwrite/model/ApplicationIdContainer.go b/plc4go/protocols/cbus/readwrite/model/ApplicationIdContainer.go
index 1b98b3a5f..78bf43dcf 100644
--- a/plc4go/protocols/cbus/readwrite/model/ApplicationIdContainer.go
+++ b/plc4go/protocols/cbus/readwrite/model/ApplicationIdContainer.go
@@ -152,8 +152,8 @@ const (
 	ApplicationIdContainer_VENTILATION_70                        ApplicationIdContainer = 0x70
 	ApplicationIdContainer_IRRIGATION_CONTROL_71                 ApplicationIdContainer = 0x71
 	ApplicationIdContainer_POOLS_SPAS_PONDS_FOUNTAINS_CONTROL_72 ApplicationIdContainer = 0x72
-	ApplicationIdContainer_RESERVED_73                           ApplicationIdContainer = 0x73
-	ApplicationIdContainer_RESERVED_74                           ApplicationIdContainer = 0x74
+	ApplicationIdContainer_HVAC_ACTUATOR_73                      ApplicationIdContainer = 0x73
+	ApplicationIdContainer_HVAC_ACTUATOR_74                      ApplicationIdContainer = 0x74
 	ApplicationIdContainer_RESERVED_75                           ApplicationIdContainer = 0x75
 	ApplicationIdContainer_RESERVED_76                           ApplicationIdContainer = 0x76
 	ApplicationIdContainer_RESERVED_77                           ApplicationIdContainer = 0x77
@@ -415,8 +415,8 @@ func init() {
 		ApplicationIdContainer_VENTILATION_70,
 		ApplicationIdContainer_IRRIGATION_CONTROL_71,
 		ApplicationIdContainer_POOLS_SPAS_PONDS_FOUNTAINS_CONTROL_72,
-		ApplicationIdContainer_RESERVED_73,
-		ApplicationIdContainer_RESERVED_74,
+		ApplicationIdContainer_HVAC_ACTUATOR_73,
+		ApplicationIdContainer_HVAC_ACTUATOR_74,
 		ApplicationIdContainer_RESERVED_75,
 		ApplicationIdContainer_RESERVED_76,
 		ApplicationIdContainer_RESERVED_77,
@@ -2875,9 +2875,9 @@ func ApplicationIdContainerByValue(value uint8) (enum ApplicationIdContainer, ok
 	case 0x72:
 		return ApplicationIdContainer_POOLS_SPAS_PONDS_FOUNTAINS_CONTROL_72, true
 	case 0x73:
-		return ApplicationIdContainer_RESERVED_73, true
+		return ApplicationIdContainer_HVAC_ACTUATOR_73, true
 	case 0x74:
-		return ApplicationIdContainer_RESERVED_74, true
+		return ApplicationIdContainer_HVAC_ACTUATOR_74, true
 	case 0x75:
 		return ApplicationIdContainer_RESERVED_75, true
 	case 0x76:
@@ -3392,10 +3392,10 @@ func ApplicationIdContainerByName(value string) (enum ApplicationIdContainer, ok
 		return ApplicationIdContainer_IRRIGATION_CONTROL_71, true
 	case "POOLS_SPAS_PONDS_FOUNTAINS_CONTROL_72":
 		return ApplicationIdContainer_POOLS_SPAS_PONDS_FOUNTAINS_CONTROL_72, true
-	case "RESERVED_73":
-		return ApplicationIdContainer_RESERVED_73, true
-	case "RESERVED_74":
-		return ApplicationIdContainer_RESERVED_74, true
+	case "HVAC_ACTUATOR_73":
+		return ApplicationIdContainer_HVAC_ACTUATOR_73, true
+	case "HVAC_ACTUATOR_74":
+		return ApplicationIdContainer_HVAC_ACTUATOR_74, true
 	case "RESERVED_75":
 		return ApplicationIdContainer_RESERVED_75, true
 	case "RESERVED_76":
@@ -3955,10 +3955,10 @@ func (e ApplicationIdContainer) PLC4XEnumName() string {
 		return "IRRIGATION_CONTROL_71"
 	case ApplicationIdContainer_POOLS_SPAS_PONDS_FOUNTAINS_CONTROL_72:
 		return "POOLS_SPAS_PONDS_FOUNTAINS_CONTROL_72"
-	case ApplicationIdContainer_RESERVED_73:
-		return "RESERVED_73"
-	case ApplicationIdContainer_RESERVED_74:
-		return "RESERVED_74"
+	case ApplicationIdContainer_HVAC_ACTUATOR_73:
+		return "HVAC_ACTUATOR_73"
+	case ApplicationIdContainer_HVAC_ACTUATOR_74:
+		return "HVAC_ACTUATOR_74"
 	case ApplicationIdContainer_RESERVED_75:
 		return "RESERVED_75"
 	case ApplicationIdContainer_RESERVED_76:
diff --git a/plc4go/tests/drivers/tests/manual_cbus_driver_test.go b/plc4go/tests/drivers/tests/manual_cbus_driver_test.go
index 00100d227..ae7e32a2e 100644
--- a/plc4go/tests/drivers/tests/manual_cbus_driver_test.go
+++ b/plc4go/tests/drivers/tests/manual_cbus_driver_test.go
@@ -20,12 +20,16 @@
 package tests
 
 import (
+	"fmt"
 	"github.com/apache/plc4x/plc4go/internal/cbus"
 	"github.com/apache/plc4x/plc4go/internal/spi/testutils"
 	"github.com/apache/plc4x/plc4go/pkg/api"
+	"github.com/apache/plc4x/plc4go/pkg/api/model"
 	"github.com/apache/plc4x/plc4go/pkg/api/transports"
 	_ "github.com/apache/plc4x/plc4go/tests/initializetest"
+	"github.com/stretchr/testify/require"
 	"testing"
+	"time"
 )
 
 func TestManualCBusDriver(t *testing.T) {
@@ -43,5 +47,38 @@ func TestManualCBusDriver(t *testing.T) {
 	//test.AddTestCase("cal/0/identify=[FirmwareVersion]", true)
 	//test.AddTestCase("cal/0/gestatus=[0xFF, 1]", true)
 
-	test.Run()
+	plcConnection := test.Run()
+	t.Run("Subscription test", func(t *testing.T) {
+		gotMonitor := make(chan bool)
+		subscriptionRequest, err := plcConnection.SubscriptionRequestBuilder().
+			AddEventQuery("something", "monitor/*/*").
+			AddItemHandler(func(event model.PlcSubscriptionEvent) {
+				fmt.Printf("\n%s", event)
+				select {
+				case gotMonitor <- true:
+				default:
+				}
+			}).
+			Build()
+		require.NoError(t, err)
+		subscriptionRequest.Execute()
+		timeout := time.After(30 * time.Second)
+		// We expect couple monitors
+		monitorCount := 0
+	waitingForMonitors:
+		for {
+			select {
+			case at := <-timeout:
+				t.Errorf("timeout at %s", at)
+				break waitingForMonitors
+			case <-gotMonitor:
+				monitorCount++
+				println(monitorCount)
+				if monitorCount > 3 {
+					break waitingForMonitors
+				}
+			}
+		}
+		t.Logf("Got %d monitors", monitorCount)
+	})
 }
diff --git a/protocols/c-bus/src/main/resources/protocols/cbus/c-bus.mspec b/protocols/c-bus/src/main/resources/protocols/cbus/c-bus.mspec
index fb54f2f3c..a5eee0a22 100644
--- a/protocols/c-bus/src/main/resources/protocols/cbus/c-bus.mspec
+++ b/protocols/c-bus/src/main/resources/protocols/cbus/c-bus.mspec
@@ -399,8 +399,8 @@
     ['0x70' VENTILATION_70                        ['VENTILATION'                       , 'YES'                 ]]
     ['0x71' IRRIGATION_CONTROL_71                 ['IRRIGATION_CONTROL'                , 'YES'                 ]]
     ['0x72' POOLS_SPAS_PONDS_FOUNTAINS_CONTROL_72 ['POOLS_SPAS_PONDS_FOUNTAINS_CONTROL', 'YES'                 ]]
-    ['0x73' RESERVED_73                           ['HVAC_ACTUATOR'                     , 'NA'                  ]] // HVAC_ACTUATOR
-    ['0x74' RESERVED_74                           ['HVAC_ACTUATOR'                     , 'NA'                  ]] // HVAC_ACTUATOR
+    ['0x73' HVAC_ACTUATOR_73                      ['HVAC_ACTUATOR'                     , 'NA'                  ]]
+    ['0x74' HVAC_ACTUATOR_74                      ['HVAC_ACTUATOR'                     , 'NA'                  ]]
     ['0x75' RESERVED_75                           ['RESERVED'                          , 'NA'                  ]]
     ['0x76' RESERVED_76                           ['RESERVED'                          , 'NA'                  ]]
     ['0x77' RESERVED_77                           ['RESERVED'                          , 'NA'                  ]]
@@ -476,7 +476,7 @@
     ['0xBD' RESERVED_BD                           ['RESERVED'                          , 'NA'                  ]]
     ['0xBE' RESERVED_BE                           ['RESERVED'                          , 'NA'                  ]]
     ['0xBF' RESERVED_BF                           ['RESERVED'                          , 'NA'                  ]]
-    ['0xC0' MEDIA_TRANSPORT_CONTROL_C0            ['MEDIA_TRANSPORT_CONTROL'           , 'NA'                  ]] // MEDIA_TRANSPORT_CONTROL
+    ['0xC0' MEDIA_TRANSPORT_CONTROL_C0            ['MEDIA_TRANSPORT_CONTROL'           , 'NA'                  ]]
     ['0xC1' RESERVED_C1                           ['RESERVED'                          , 'NA'                  ]]
     ['0xC2' RESERVED_C2                           ['RESERVED'                          , 'NA'                  ]]
     ['0xC3' RESERVED_C3                           ['RESERVED'                          , 'NA'                  ]]
@@ -490,7 +490,7 @@
     ['0xCB' ENABLE_CONTROL_CB                     ['ENABLE_CONTROL'                    , 'YES_BUT_RESTRICTIONS']]
     ['0xCC' I_HAVE_NO_IDEA_CC                     ['RESERVED'                          , 'NA'                  ]] // This is the only value actually not defined in the spec.
     ['0xCD' AUDIO_AND_VIDEO_CD                    ['AUDIO_AND_VIDEO'                   , 'YES_BUT_RESTRICTIONS']]
-    ['0xCE' ERROR_REPORTING_CE                    ['ERROR_REPORTING'                   , 'NA'                  ]] // ERROR_REPORTING
+    ['0xCE' ERROR_REPORTING_CE                    ['ERROR_REPORTING'                   , 'NA'                  ]]
     ['0xCF' RESERVED_CF                           ['RESERVED'                          , 'NA'                  ]]
     ['0xD0' SECURITY_D0                           ['SECURITY'                          , 'NO'                  ]]
     ['0xD1' METERING_D1                           ['METERING'                          , 'NO'                  ]]
diff --git a/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/simulation/watertank/WaterTankSimulationModule.java b/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/simulation/watertank/WaterTankSimulationModule.java
index e44119118..5b4dd6116 100644
--- a/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/simulation/watertank/WaterTankSimulationModule.java
+++ b/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/simulation/watertank/WaterTankSimulationModule.java
@@ -26,12 +26,12 @@ import java.util.concurrent.TimeUnit;
 /**
  * This is a little simulation that simulates a Water tank.
  * This tank has a capacity who's "waterLevel" is represented as a Long value.
- * Water can flow into the tank if the input valve is opened and it can flow
+ * Water can flow into the tank if the input valve is opened, and it can flow
  * out of the tank if the output valve is open.
- *
+ * <p>
  * The capacity of the output is slightly smaller than that of the input, so
  * opening both valves will result in the tank filling.
- *
+ * <p>
  * To prevent the tank from bursting, there's an emergency valve which is opened
  * as soon as the water-level reaches a critical maximum.
  */
@@ -80,13 +80,13 @@ public class WaterTankSimulationModule implements SimulationModule {
             Long value = (Long) context.getMemory().get(PROP_WATER_LEVEL);
 
             // If the input valve is open, add 10.
-            if(context.getDigitalInputs().get(NUM_INPUT_VALVE_INPUT)) {
+            if (context.getDigitalInputs().get(NUM_INPUT_VALVE_INPUT)) {
                 value += 10;
                 value = Math.min(MAX_WATER_LEVEL, value);
             }
 
             // If the output valve is open, subtract 8 (It's slightly less throughput than the input)
-            if(context.getDigitalInputs().get(NUM_OUTPUT_VALVE_INPUT)) {
+            if (context.getDigitalInputs().get(NUM_OUTPUT_VALVE_INPUT)) {
                 value -= 8;
                 value = Math.max(0, value);
             }


[plc4x] 01/04: feat(plc4go/bacnet): update vendors

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 8f8633a71c70161b032316ef640c61e337f2293d
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jul 29 11:18:00 2022 +0200

    feat(plc4go/bacnet): update vendors
---
 .../bacnetip/readwrite/model/BACnetVendorId.go     | 64 ++++++++++++++++++++++
 1 file changed, 64 insertions(+)

diff --git a/plc4go/protocols/bacnetip/readwrite/model/BACnetVendorId.go b/plc4go/protocols/bacnetip/readwrite/model/BACnetVendorId.go
index b783b1535..6a43bb37a 100644
--- a/plc4go/protocols/bacnetip/readwrite/model/BACnetVendorId.go
+++ b/plc4go/protocols/bacnetip/readwrite/model/BACnetVendorId.go
@@ -1400,6 +1400,10 @@ const (
 	BACnetVendorId_IDM_ENERGIESYSTEME_GMBH                                                      BACnetVendorId = 1367
 	BACnetVendorId_BE_NEXTBV                                                                    BACnetVendorId = 1368
 	BACnetVendorId_CLEAN_AIRAI_CORPORATION                                                      BACnetVendorId = 1369
+	BACnetVendorId_REVOLUTION_MICROELECTRONICS_AMERICA_INC                                      BACnetVendorId = 1370
+	BACnetVendorId_ARENDARIT_SECURITY_GMBH                                                      BACnetVendorId = 1371
+	BACnetVendorId_ZED_BEE_TECHNOLOGIES_PVT_LTD                                                 BACnetVendorId = 1372
+	BACnetVendorId_WINMATE_TECHNOLOGY_SOLUTIONS_PVT_LTD                                         BACnetVendorId = 1373
 	BACnetVendorId_UNKNOWN_VENDOR                                                               BACnetVendorId = 0xFFFF
 )
 
@@ -2771,6 +2775,10 @@ func init() {
 		BACnetVendorId_IDM_ENERGIESYSTEME_GMBH,
 		BACnetVendorId_BE_NEXTBV,
 		BACnetVendorId_CLEAN_AIRAI_CORPORATION,
+		BACnetVendorId_REVOLUTION_MICROELECTRONICS_AMERICA_INC,
+		BACnetVendorId_ARENDARIT_SECURITY_GMBH,
+		BACnetVendorId_ZED_BEE_TECHNOLOGIES_PVT_LTD,
+		BACnetVendorId_WINMATE_TECHNOLOGY_SOLUTIONS_PVT_LTD,
 		BACnetVendorId_UNKNOWN_VENDOR,
 	}
 }
@@ -4433,6 +4441,22 @@ func (e BACnetVendorId) VendorId() uint16 {
 		{ /* '137' */
 			return 137
 		}
+	case 1370:
+		{ /* '1370' */
+			return 1370
+		}
+	case 1371:
+		{ /* '1371' */
+			return 1371
+		}
+	case 1372:
+		{ /* '1372' */
+			return 1372
+		}
+	case 1373:
+		{ /* '1373' */
+			return 1373
+		}
 	case 138:
 		{ /* '138' */
 			return 138
@@ -9907,6 +9931,22 @@ func (e BACnetVendorId) VendorName() string {
 		{ /* '137' */
 			return "AFDtek Division of Fastek International Inc."
 		}
+	case 1370:
+		{ /* '1370' */
+			return "Revolution Microelectronics (America) Inc."
+		}
+	case 1371:
+		{ /* '1371' */
+			return "Arendar IT-Security GmbH"
+		}
+	case 1372:
+		{ /* '1372' */
+			return "ZedBee Technologies Pvt Ltd"
+		}
+	case 1373:
+		{ /* '1373' */
+			return "Winmate Technology Solutions Pvt. Ltd."
+		}
 	case 138:
 		{ /* '138' */
 			return "PowerCold Comfort Air Solutions, Inc."
@@ -14552,6 +14592,14 @@ func BACnetVendorIdByValue(value uint16) (enum BACnetVendorId, ok bool) {
 		return BACnetVendorId_CLEAN_AIRAI_CORPORATION, true
 	case 137:
 		return BACnetVendorId_AF_DTEK_DIVISIONOF_FASTEK_INTERNATIONAL_INC, true
+	case 1370:
+		return BACnetVendorId_REVOLUTION_MICROELECTRONICS_AMERICA_INC, true
+	case 1371:
+		return BACnetVendorId_ARENDARIT_SECURITY_GMBH, true
+	case 1372:
+		return BACnetVendorId_ZED_BEE_TECHNOLOGIES_PVT_LTD, true
+	case 1373:
+		return BACnetVendorId_WINMATE_TECHNOLOGY_SOLUTIONS_PVT_LTD, true
 	case 138:
 		return BACnetVendorId_POWER_COLD_COMFORT_AIR_SOLUTIONS_INC, true
 	case 139:
@@ -17286,6 +17334,14 @@ func BACnetVendorIdByName(value string) (enum BACnetVendorId, ok bool) {
 		return BACnetVendorId_CLEAN_AIRAI_CORPORATION, true
 	case "AF_DTEK_DIVISIONOF_FASTEK_INTERNATIONAL_INC":
 		return BACnetVendorId_AF_DTEK_DIVISIONOF_FASTEK_INTERNATIONAL_INC, true
+	case "REVOLUTION_MICROELECTRONICS_AMERICA_INC":
+		return BACnetVendorId_REVOLUTION_MICROELECTRONICS_AMERICA_INC, true
+	case "ARENDARIT_SECURITY_GMBH":
+		return BACnetVendorId_ARENDARIT_SECURITY_GMBH, true
+	case "ZED_BEE_TECHNOLOGIES_PVT_LTD":
+		return BACnetVendorId_ZED_BEE_TECHNOLOGIES_PVT_LTD, true
+	case "WINMATE_TECHNOLOGY_SOLUTIONS_PVT_LTD":
+		return BACnetVendorId_WINMATE_TECHNOLOGY_SOLUTIONS_PVT_LTD, true
 	case "POWER_COLD_COMFORT_AIR_SOLUTIONS_INC":
 		return BACnetVendorId_POWER_COLD_COMFORT_AIR_SOLUTIONS_INC, true
 	case "I_CONTROLS":
@@ -20065,6 +20121,14 @@ func (e BACnetVendorId) PLC4XEnumName() string {
 		return "CLEAN_AIRAI_CORPORATION"
 	case BACnetVendorId_AF_DTEK_DIVISIONOF_FASTEK_INTERNATIONAL_INC:
 		return "AF_DTEK_DIVISIONOF_FASTEK_INTERNATIONAL_INC"
+	case BACnetVendorId_REVOLUTION_MICROELECTRONICS_AMERICA_INC:
+		return "REVOLUTION_MICROELECTRONICS_AMERICA_INC"
+	case BACnetVendorId_ARENDARIT_SECURITY_GMBH:
+		return "ARENDARIT_SECURITY_GMBH"
+	case BACnetVendorId_ZED_BEE_TECHNOLOGIES_PVT_LTD:
+		return "ZED_BEE_TECHNOLOGIES_PVT_LTD"
+	case BACnetVendorId_WINMATE_TECHNOLOGY_SOLUTIONS_PVT_LTD:
+		return "WINMATE_TECHNOLOGY_SOLUTIONS_PVT_LTD"
 	case BACnetVendorId_POWER_COLD_COMFORT_AIR_SOLUTIONS_INC:
 		return "POWER_COLD_COMFORT_AIR_SOLUTIONS_INC"
 	case BACnetVendorId_I_CONTROLS:


[plc4x] 03/04: fix(plc4go): fixed some issues regarding custom message handlers

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 7341b6122cf84612fd4a9d0a34c97cf3114eaea0
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jul 29 12:01:10 2022 +0200

    fix(plc4go): fixed some issues regarding custom message handlers
---
 plc4go/internal/bacnetip/MessageCodec.go    |  2 +-
 plc4go/internal/knxnetip/MessageCodec.go    |  6 +++---
 plc4go/internal/spi/default/DefaultCodec.go | 15 +++++++++------
 3 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/plc4go/internal/bacnetip/MessageCodec.go b/plc4go/internal/bacnetip/MessageCodec.go
index edffc0ec1..86621a597 100644
--- a/plc4go/internal/bacnetip/MessageCodec.go
+++ b/plc4go/internal/bacnetip/MessageCodec.go
@@ -100,7 +100,7 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
 	return nil, nil
 }
 
-func (m *MessageCodec) handleCustomMessage(_ *_default.DefaultCodecRequirements, message spi.Message) bool {
+func (m *MessageCodec) handleCustomMessage(_ _default.DefaultCodecRequirements, message spi.Message) bool {
 	// For now, we just put them in the incoming channel
 	m.GetDefaultIncomingMessageChannel() <- message
 	return true
diff --git a/plc4go/internal/knxnetip/MessageCodec.go b/plc4go/internal/knxnetip/MessageCodec.go
index f579e4043..6fb0fb5f5 100644
--- a/plc4go/internal/knxnetip/MessageCodec.go
+++ b/plc4go/internal/knxnetip/MessageCodec.go
@@ -108,7 +108,7 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
 	return nil, nil
 }
 
-func CustomMessageHandling(codec *_default.DefaultCodecRequirements, message spi.Message) bool {
+func CustomMessageHandling(codec _default.DefaultCodecRequirements, message spi.Message) bool {
 	// If this message is a simple KNXNet/IP UDP Ack, ignore it for now
 	tunnelingResponse := model.CastTunnelingResponse(message)
 	if tunnelingResponse != nil {
@@ -124,13 +124,13 @@ func CustomMessageHandling(codec *_default.DefaultCodecRequirements, message spi
 				tunnelingRequest.GetTunnelingRequestDataBlock().GetSequenceCounter(),
 				model.Status_NO_ERROR),
 		)
-		err := (*codec).Send(response)
+		err := codec.Send(response)
 		if err != nil {
 			log.Warn().Err(err).Msg("got an error sending ACK from transport")
 		}
 	}
 
-	localCodec := (*codec).(*MessageCodec)
+	localCodec := codec.(*MessageCodec)
 	// Handle the packet itself
 	// Give a message interceptor a chance to intercept
 	if (*localCodec).messageInterceptor != nil {
diff --git a/plc4go/internal/spi/default/DefaultCodec.go b/plc4go/internal/spi/default/DefaultCodec.go
index 2aef1cdae..fb63035b6 100644
--- a/plc4go/internal/spi/default/DefaultCodec.go
+++ b/plc4go/internal/spi/default/DefaultCodec.go
@@ -60,7 +60,7 @@ type DefaultExpectation struct {
 	HandleError    spi.HandleError
 }
 
-func WithCustomMessageHandler(customMessageHandler func(codec *DefaultCodecRequirements, message spi.Message) bool) options.WithOption {
+func WithCustomMessageHandler(customMessageHandler func(codec DefaultCodecRequirements, message spi.Message) bool) options.WithOption {
 	return withCustomMessageHandler{customMessageHandler: customMessageHandler}
 }
 
@@ -72,7 +72,7 @@ func WithCustomMessageHandler(customMessageHandler func(codec *DefaultCodecRequi
 
 type withCustomMessageHandler struct {
 	options.Option
-	customMessageHandler func(codec *DefaultCodecRequirements, message spi.Message) bool
+	customMessageHandler func(codec DefaultCodecRequirements, message spi.Message) bool
 }
 
 type defaultCodec struct {
@@ -81,11 +81,11 @@ type defaultCodec struct {
 	defaultIncomingMessageChannel chan spi.Message
 	expectations                  []spi.Expectation
 	running                       bool
-	customMessageHandling         func(codec *DefaultCodecRequirements, message spi.Message) bool
+	customMessageHandling         func(codec DefaultCodecRequirements, message spi.Message) bool
 }
 
 func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transportInstance transports.TransportInstance, options ...options.WithOption) DefaultCodec {
-	var customMessageHandler func(codec *DefaultCodecRequirements, message spi.Message) bool
+	var customMessageHandler func(codec DefaultCodecRequirements, message spi.Message) bool
 
 	for _, option := range options {
 		switch option.(type) {
@@ -151,7 +151,7 @@ func (m *defaultCodec) Connect() error {
 
 	if !m.running {
 		log.Debug().Msg("Message codec currently not running, starting worker now")
-		go m.Work(&m.DefaultCodecRequirements)
+		go m.Work(m.DefaultCodecRequirements)
 	}
 	m.running = true
 	return nil
@@ -234,7 +234,7 @@ func (m *defaultCodec) HandleMessages(message spi.Message) bool {
 	return messageHandled
 }
 
-func (m *defaultCodec) Work(codec *DefaultCodecRequirements) {
+func (m *defaultCodec) Work(codec DefaultCodecRequirements) {
 	workerLog := log.With().Logger()
 	if !config.TraceDefaultMessageCodecWorker {
 		workerLog = zerolog.Nop()
@@ -289,7 +289,10 @@ mainLoop:
 		if m.customMessageHandling != nil {
 			workerLog.Trace().Msg("Executing custom handling")
 			if m.customMessageHandling(codec, message) {
+				workerLog.Trace().Msg("Custom handling handled the message")
 				continue mainLoop
+			} else {
+				workerLog.Trace().Msg("Custom handling didn't handle the message")
 			}
 		}
 


[plc4x] 02/04: fix(cbus): fixed detection of monitored sal

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit a82e7006ab3ecb3cce7e7edf02bbd9595d576067
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jul 29 11:18:32 2022 +0200

    fix(cbus): fixed detection of monitored sal
---
 plc4go/protocols/cbus/readwrite/model/EncodedReply.go        |  4 ++--
 .../java/org/apache/plc4x/java/cbus/RandomPackagesTest.java  | 12 ++++++++++++
 .../c-bus/src/main/resources/protocols/cbus/c-bus.mspec      |  2 +-
 3 files changed, 15 insertions(+), 3 deletions(-)

diff --git a/plc4go/protocols/cbus/readwrite/model/EncodedReply.go b/plc4go/protocols/cbus/readwrite/model/EncodedReply.go
index df9411535..b7e124731 100644
--- a/plc4go/protocols/cbus/readwrite/model/EncodedReply.go
+++ b/plc4go/protocols/cbus/readwrite/model/EncodedReply.go
@@ -98,7 +98,7 @@ func (m *_EncodedReply) GetPeekedByte() byte {
 ///////////////////////
 
 func (m *_EncodedReply) GetIsMonitoredSAL() bool {
-	return bool(bool((m.GetPeekedByte() & 0x3F) == (0x05)))
+	return bool(bool(bool(bool((m.GetPeekedByte()&0x3F) == (0x05))) || bool(bool((m.GetPeekedByte()) == (0x00)))) || bool(bool((m.GetPeekedByte()&0xF8) == (0x00))))
 }
 
 func (m *_EncodedReply) GetIsCalCommand() bool {
@@ -175,7 +175,7 @@ func EncodedReplyParse(readBuffer utils.ReadBuffer, cBusOptions CBusOptions, req
 	readBuffer.Reset(currentPos)
 
 	// Virtual field
-	_isMonitoredSAL := bool((peekedByte & 0x3F) == (0x05))
+	_isMonitoredSAL := bool(bool(bool((peekedByte&0x3F) == (0x05))) || bool(bool((peekedByte) == (0x00)))) || bool(bool((peekedByte&0xF8) == (0x00)))
 	isMonitoredSAL := bool(_isMonitoredSAL)
 	_ = isMonitoredSAL
 
diff --git a/plc4j/drivers/c-bus/src/test/java/org/apache/plc4x/java/cbus/RandomPackagesTest.java b/plc4j/drivers/c-bus/src/test/java/org/apache/plc4x/java/cbus/RandomPackagesTest.java
index 72f002d17..a16d0d962 100644
--- a/plc4j/drivers/c-bus/src/test/java/org/apache/plc4x/java/cbus/RandomPackagesTest.java
+++ b/plc4j/drivers/c-bus/src/test/java/org/apache/plc4x/java/cbus/RandomPackagesTest.java
@@ -405,4 +405,16 @@ public class RandomPackagesTest {
         assertMessageMatches(bytes, msg);
     }
 
+    @Test
+    void ownSal() throws Exception {
+        byte[] bytes = "003809AF10\r\n".getBytes(StandardCharsets.UTF_8);
+        ReadBufferByteBased readBufferByteBased = new ReadBufferByteBased(bytes);
+        cBusOptions = C_BUS_OPTIONS_WITH_SRCHK;
+        CBusMessage msg = CBusMessage.staticParse(readBufferByteBased, true, requestContext, cBusOptions);
+        assertThat(msg).isNotNull();
+        System.out.println(msg);
+        System.out.println(((ReplyEncodedReply) ((ReplyOrConfirmationReply) ((CBusMessageToClient) msg).getReply()).getReply()).getEncodedReply());
+
+        assertMessageMatches(bytes, msg);
+    }
 }
diff --git a/protocols/c-bus/src/main/resources/protocols/cbus/c-bus.mspec b/protocols/c-bus/src/main/resources/protocols/cbus/c-bus.mspec
index 74b4ba741..fb54f2f3c 100644
--- a/protocols/c-bus/src/main/resources/protocols/cbus/c-bus.mspec
+++ b/protocols/c-bus/src/main/resources/protocols/cbus/c-bus.mspec
@@ -1393,7 +1393,7 @@
 [type EncodedReply(CBusOptions cBusOptions, RequestContext requestContext)
     [peek    byte peekedByte                                                        ]
     // TODO: if we reliable can detect this with the mask we don't need the request context anymore
-    [virtual bit  isMonitoredSAL            '(peekedByte & 0x3F) == 0x05'                                           ]
+    [virtual bit  isMonitoredSAL            '(peekedByte & 0x3F) == 0x05 || peekedByte == 0x00 || (peekedByte & 0xF8) == 0x00'] // First check if it is in long mode, second for short mode, third for bridged short mode
     [virtual bit  isCalCommand              '(peekedByte & 0x3F) == 0x06 || requestContext.sendCalCommandBefore'    ] // The 0x3F and 0x06 doesn't seem to work always
     [virtual bit  isStandardFormatStatus    '(peekedByte & 0xC0) == 0xC0 && !cBusOptions.exstat'                    ]
     [virtual bit  isExtendedFormatStatus    '(peekedByte & 0xE0) == 0xE0 && (cBusOptions.exstat || requestContext.sendStatusRequestLevelBefore)']