You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/09/02 11:28:34 UTC

[plc4x] 02/02: fix(plc4go/cbus): change browser to not brute force all unit addresses rather use the installation mmi

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 4359797154f497aa0f63b18e84961ac73bcfa868
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Sep 2 13:28:25 2022 +0200

    fix(plc4go/cbus): change browser to not brute force all unit addresses rather use the installation mmi
---
 plc4go/internal/cbus/Browser.go      | 171 ++++++++++++++++++++++++++++++++++-
 plc4go/internal/cbus/FieldHandler.go |   4 +
 plc4go/internal/cbus/Reader.go       |  12 ++-
 plc4go/internal/cbus/Subscriber.go   |  31 +++++--
 4 files changed, 204 insertions(+), 14 deletions(-)

diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index c72d9fa2b..3c2b570ce 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -28,6 +28,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi"
 	_default "github.com/apache/plc4x/plc4go/spi/default"
 	"github.com/apache/plc4x/plc4go/spi/model"
+	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
 	"time"
 )
@@ -60,9 +61,19 @@ func (m Browser) BrowseField(ctx context.Context, browseRequest apiModel.PlcBrow
 		if unitAddress := field.unitAddress; unitAddress != nil {
 			units = append(units, *unitAddress)
 		} else {
+			// TODO: check if we still want the option to brute force all addresses
+			installedUnitAddressBytes, err := m.getInstalledUnitAddressBytes(ctx)
+			if err != nil {
+				log.Warn().Err(err).Msg("Unable to get installed uints")
+				return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
+			}
+
 			allUnits = true
 			for i := 0; i <= 0xFF; i++ {
-				units = append(units, readWriteModel.NewUnitAddress(byte(i)))
+				unitAddressByte := byte(i)
+				if _, ok := installedUnitAddressBytes[unitAddressByte]; ok {
+					units = append(units, readWriteModel.NewUnitAddress(unitAddressByte))
+				}
 			}
 		}
 		if attribute := field.attribute; attribute != nil {
@@ -75,7 +86,7 @@ func (m Browser) BrowseField(ctx context.Context, browseRequest apiModel.PlcBrow
 		}
 
 		if allUnits {
-			log.Info().Msg("Querying all units")
+			log.Info().Msg("Querying all (available) units")
 		}
 	unitLoop:
 		for _, unit := range units {
@@ -146,3 +157,159 @@ func (m Browser) BrowseField(ctx context.Context, browseRequest apiModel.PlcBrow
 	}
 	return apiModel.PlcResponseCode_OK, queryResults
 }
+
+func (m Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]any, error) {
+	// We need to presubscribe to catch the 2 followup responses
+	subscriptionRequest, err := m.connection.SubscriptionRequestBuilder().
+		AddEventQuery("installationMMIMonitor", "mmimonitor/*/NETWORK_CONTROL").
+		Build()
+	if err != nil {
+		return nil, errors.Wrap(err, "Error subscribing to the installation MMI")
+	}
+	subCtx, subCtxCancel := context.WithTimeout(ctx, time.Second*2)
+	subscriptionResult := <-subscriptionRequest.ExecuteWithContext(subCtx)
+	subCtxCancel()
+	if err := subscriptionResult.GetErr(); err != nil {
+		return nil, errors.Wrap(err, "Error subscribing to the mmi")
+	}
+	if responseCode := subscriptionResult.GetResponse().GetResponseCode("installationMMIMonitor"); responseCode != apiModel.PlcResponseCode_OK {
+		return nil, errors.Errorf("Got %s", responseCode)
+	}
+	subscriptionHandle, err := subscriptionResult.GetResponse().GetSubscriptionHandle("installationMMIMonitor")
+	if err != nil {
+		return nil, errors.Wrap(err, "Error getting the subscription handle")
+	}
+
+	blockOffset88Received := false
+	blockOffset88ReceivedChan := make(chan any, 100) // We only expect one, but we make it a bit bigger to no clog up
+	blockOffset176Received := false
+	blockOffset176ReceivedChan := make(chan any, 100) // We only expect one, but we make it a bit bigger to no clog up
+	result := make(map[byte]any)
+	plcConsumerRegistration := subscriptionHandle.Register(func(event apiModel.PlcSubscriptionEvent) {
+		fmt.Printf("what %v\n", event)
+		if responseCode := event.GetResponseCode("installationMMIMonitor"); responseCode != apiModel.PlcResponseCode_OK {
+			log.Warn().Msgf("Ignoring %v", event)
+			return
+		}
+		rootValue := event.GetValue("installationMMIMonitor")
+		if !rootValue.IsStruct() {
+			log.Warn().Msgf("Ignoring %v should be a struct", rootValue)
+			return
+		}
+		rootStruct := rootValue.GetStruct()
+		if applicationValue := rootStruct["application"]; applicationValue == nil || !applicationValue.IsString() || applicationValue.GetString() != "NETWORK_CONTROL" {
+			log.Warn().Msgf("Ignoring %v should contain a application field of type string with value NETWORK_CONTROL", rootStruct)
+			return
+		}
+		var blockStart int
+		if blockStartValue := rootStruct["blockStart"]; blockStartValue == nil || !blockStartValue.IsByte() {
+			log.Warn().Msgf("Ignoring %v should contain a blockStart field of type byte", rootStruct)
+			return
+		} else {
+			blockStart = int(blockStartValue.GetByte())
+		}
+		println(blockStart)
+		switch blockStart {
+		case 88:
+			select {
+			case blockOffset88ReceivedChan <- true:
+			default:
+			}
+		case 176:
+			select {
+			case blockOffset176ReceivedChan <- true:
+			default:
+			}
+		case 0:
+			log.Debug().Msgf("We ignore 0 as we handle it with the read request below\n%v", event)
+			return
+		}
+
+		if plcListValue := rootStruct["values"]; plcListValue == nil || !plcListValue.IsList() {
+			log.Warn().Msgf("Ignoring v should contain a values field of type list", rootStruct)
+			return
+		} else {
+			for unitByteAddress, plcValue := range plcListValue.GetList() {
+				unitByteAddress = blockStart + unitByteAddress
+				if !plcValue.IsString() {
+					log.Warn().Msgf("Ignoring %v at %d should be a string", plcValue, unitByteAddress)
+					return
+				}
+				switch plcValue.GetString() {
+				case readWriteModel.GAVState_ON.PLC4XEnumName(), readWriteModel.GAVState_OFF.PLC4XEnumName():
+					log.Debug().Msgf("unit %d does exists", unitByteAddress)
+					result[byte(unitByteAddress)] = true
+				case readWriteModel.GAVState_DOES_NOT_EXIST.PLC4XEnumName():
+					log.Debug().Msgf("unit %d does not exists", unitByteAddress)
+				case readWriteModel.GAVState_ERROR.PLC4XEnumName():
+					log.Warn().Msgf("unit %d is in error state")
+				}
+			}
+		}
+	})
+	defer plcConsumerRegistration.Unregister()
+
+	readRequest, err := m.connection.ReadRequestBuilder().
+		AddQuery("installationMMI", "status/binary/0xFF").
+		Build()
+	if err != nil {
+		return nil, errors.Wrap(err, "Error getting the installation MMI")
+	}
+	readCtx, readCtxCancel := context.WithTimeout(ctx, time.Second*2)
+	readRequestResult := <-readRequest.ExecuteWithContext(readCtx)
+	readCtxCancel()
+	if err := readRequestResult.GetErr(); err != nil {
+		return nil, errors.Wrap(err, "Error reading the mmi")
+	}
+	if responseCode := readRequestResult.GetResponse().GetResponseCode("installationMMI"); responseCode != apiModel.PlcResponseCode_OK {
+		return nil, errors.Errorf("Got %s", responseCode)
+	}
+	rootValue := readRequestResult.GetResponse().GetValue("installationMMI")
+	if !rootValue.IsStruct() {
+		return nil, errors.Errorf("%v should be a struct", rootValue)
+	}
+	rootStruct := rootValue.GetStruct()
+	if applicationValue := rootStruct["application"]; applicationValue == nil || !applicationValue.IsString() || applicationValue.GetString() != "NETWORK_CONTROL" {
+		return nil, errors.Errorf("%v should contain a application field of type string with value NETWORK_CONTROL", rootStruct)
+	}
+	var blockStart int
+	if blockStartValue := rootStruct["blockStart"]; blockStartValue == nil || !blockStartValue.IsByte() || blockStartValue.GetByte() != 0 {
+		return nil, errors.Errorf("%v should contain a blockStart field of type byte with value 0", rootStruct)
+	} else {
+		blockStart = int(blockStartValue.GetByte())
+	}
+
+	if plcListValue := rootStruct["values"]; plcListValue == nil || !plcListValue.IsList() {
+		return nil, errors.Errorf("%v should contain a values field of type list", rootStruct)
+	} else {
+		for unitByteAddress, plcValue := range plcListValue.GetList() {
+			unitByteAddress = blockStart + unitByteAddress
+			if !plcValue.IsString() {
+				return nil, errors.Errorf("%v at %d should be a string", plcValue, unitByteAddress)
+			}
+			switch plcValue.GetString() {
+			case readWriteModel.GAVState_ON.PLC4XEnumName(), readWriteModel.GAVState_OFF.PLC4XEnumName():
+				log.Debug().Msgf("unit %d does exists", unitByteAddress)
+				result[byte(unitByteAddress)] = true
+			case readWriteModel.GAVState_DOES_NOT_EXIST.PLC4XEnumName():
+				log.Debug().Msgf("unit %d does not exists", unitByteAddress)
+			case readWriteModel.GAVState_ERROR.PLC4XEnumName():
+				log.Warn().Msgf("unit %d is in error state")
+			}
+		}
+	}
+
+	syncCtx, syncCtxCancel := context.WithTimeout(ctx, time.Second*2)
+	defer syncCtxCancel()
+	for !blockOffset88Received || !blockOffset176Received {
+		select {
+		case <-blockOffset88ReceivedChan:
+			blockOffset88Received = true
+		case <-blockOffset176ReceivedChan:
+			blockOffset176Received = true
+		case <-syncCtx.Done():
+			return nil, errors.Wrap(err, "error waiting for other offsets")
+		}
+	}
+	return result, nil
+}
diff --git a/plc4go/internal/cbus/FieldHandler.go b/plc4go/internal/cbus/FieldHandler.go
index 07c65d5d2..4ba91ea90 100644
--- a/plc4go/internal/cbus/FieldHandler.go
+++ b/plc4go/internal/cbus/FieldHandler.go
@@ -504,6 +504,10 @@ func applicationIdFromArgument(applicationIdArgument string) (readWriteModel.App
 			return readWriteModel.ApplicationIdContainer_ERROR_REPORTING_CE, nil
 		case readWriteModel.ApplicationId_HVAC_ACTUATOR:
 			return readWriteModel.ApplicationIdContainer_HVAC_ACTUATOR_73, nil
+		case readWriteModel.ApplicationId_INFO_MESSAGES:
+			return readWriteModel.ApplicationIdContainer_INFO_MESSAGES, nil
+		case readWriteModel.ApplicationId_NETWORK_CONTROL:
+			return readWriteModel.ApplicationIdContainer_NETWORK_CONTROL, nil
 		default:
 			return 0, errors.Errorf("%s can't be used directly... select proper application id container", applicationId)
 		}
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 7d7de8d48..661c9d555 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -200,7 +200,11 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
 								plcListValues[i*4+2] = spiValues.NewPlcSTRING(statusByte.GetGav2().String())
 								plcListValues[i*4+3] = spiValues.NewPlcSTRING(statusByte.GetGav3().String())
 							}
-							addPlcValue(fieldNameCopy, spiValues.NewPlcList(plcListValues))
+							addPlcValue(fieldNameCopy, spiValues.NewPlcStruct(map[string]apiValues.PlcValue{
+								"application": spiValues.NewPlcSTRING(application.PLC4XEnumName()),
+								"blockStart":  spiValues.NewPlcBYTE(blockStart),
+								"values":      spiValues.NewPlcList(plcListValues),
+							}))
 						case readWriteModel.CALDataStatusExtendedExactly:
 							coding := calData.GetCoding()
 							// TODO: verify coding... this should be the same
@@ -224,7 +228,11 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
 									plcListValues[i*4+2] = spiValues.NewPlcSTRING(statusByte.GetGav2().String())
 									plcListValues[i*4+3] = spiValues.NewPlcSTRING(statusByte.GetGav3().String())
 								}
-								addPlcValue(fieldNameCopy, spiValues.NewPlcList(plcListValues))
+								addPlcValue(fieldNameCopy, spiValues.NewPlcStruct(map[string]apiValues.PlcValue{
+									"application": spiValues.NewPlcSTRING(application.PLC4XEnumName()),
+									"blockStart":  spiValues.NewPlcBYTE(blockStart),
+									"values":      spiValues.NewPlcList(plcListValues),
+								}))
 							case readWriteModel.StatusCoding_LEVEL_BY_THIS_SERIAL_INTERFACE:
 								fallthrough
 							case readWriteModel.StatusCoding_LEVEL_BY_ELSEWHERE:
diff --git a/plc4go/internal/cbus/Subscriber.go b/plc4go/internal/cbus/Subscriber.go
index 235e57fde..d5fd37e90 100644
--- a/plc4go/internal/cbus/Subscriber.go
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -23,7 +23,7 @@ import (
 	"context"
 	"fmt"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
-	"github.com/apache/plc4x/plc4go/pkg/api/values"
+	apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
 	spiModel "github.com/apache/plc4x/plc4go/spi/model"
 	spiValues "github.com/apache/plc4x/plc4go/spi/values"
@@ -112,7 +112,7 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
 			intervals := map[string]time.Duration{}
 			responseCodes := map[string]apiModel.PlcResponseCode{}
 			address := map[string]string{}
-			plcValues := map[string]values.PlcValue{}
+			plcValues := map[string]apiValues.PlcValue{}
 			fieldName := subscriptionHandle.fieldName
 
 			if unitAddress := field.GetUnitAddress(); unitAddress != nil {
@@ -135,23 +135,30 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
 
 			isLevel := true
 			blockStart := byte(0x0)
+			//	var application readWriteModel.ApplicationIdContainer
 			switch calData := calData.(type) {
 			case readWriteModel.CALDataStatusExactly:
-				applicationString = calData.GetApplication().ApplicationId().String()
+				application := calData.GetApplication()
+				applicationString = application.ApplicationId().String()
 				blockStart = calData.GetBlockStart()
 
 				statusBytes := calData.GetStatusBytes()
 				responseCodes[fieldName] = apiModel.PlcResponseCode_OK
-				plcListValues := make([]values.PlcValue, len(statusBytes)*4)
+				plcListValues := make([]apiValues.PlcValue, len(statusBytes)*4)
 				for i, statusByte := range statusBytes {
 					plcListValues[i*4+0] = spiValues.NewPlcSTRING(statusByte.GetGav0().String())
 					plcListValues[i*4+1] = spiValues.NewPlcSTRING(statusByte.GetGav1().String())
 					plcListValues[i*4+2] = spiValues.NewPlcSTRING(statusByte.GetGav2().String())
 					plcListValues[i*4+3] = spiValues.NewPlcSTRING(statusByte.GetGav3().String())
 				}
-				plcValues[fieldName] = spiValues.NewPlcList(plcListValues)
+				plcValues[fieldName] = spiValues.NewPlcStruct(map[string]apiValues.PlcValue{
+					"application": spiValues.NewPlcSTRING(application.PLC4XEnumName()),
+					"blockStart":  spiValues.NewPlcBYTE(blockStart),
+					"values":      spiValues.NewPlcList(plcListValues),
+				})
 			case readWriteModel.CALDataStatusExtendedExactly:
-				applicationString = calData.GetApplication().ApplicationId().String()
+				application := calData.GetApplication()
+				applicationString = application.ApplicationId().String()
 				isLevel = calData.GetCoding() == readWriteModel.StatusCoding_LEVEL_BY_ELSEWHERE || calData.GetCoding() == readWriteModel.StatusCoding_LEVEL_BY_THIS_SERIAL_INTERFACE
 				blockStart = calData.GetBlockStart()
 				coding := calData.GetCoding()
@@ -161,20 +168,24 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
 				case readWriteModel.StatusCoding_BINARY_BY_ELSEWHERE:
 					statusBytes := calData.GetStatusBytes()
 					responseCodes[fieldName] = apiModel.PlcResponseCode_OK
-					plcListValues := make([]values.PlcValue, len(statusBytes)*4)
+					plcListValues := make([]apiValues.PlcValue, len(statusBytes)*4)
 					for i, statusByte := range statusBytes {
 						plcListValues[i*4+0] = spiValues.NewPlcSTRING(statusByte.GetGav0().String())
 						plcListValues[i*4+1] = spiValues.NewPlcSTRING(statusByte.GetGav1().String())
 						plcListValues[i*4+2] = spiValues.NewPlcSTRING(statusByte.GetGav2().String())
 						plcListValues[i*4+3] = spiValues.NewPlcSTRING(statusByte.GetGav3().String())
 					}
-					plcValues[fieldName] = spiValues.NewPlcList(plcListValues)
+					plcValues[fieldName] = spiValues.NewPlcStruct(map[string]apiValues.PlcValue{
+						"application": spiValues.NewPlcSTRING(application.PLC4XEnumName()),
+						"blockStart":  spiValues.NewPlcBYTE(blockStart),
+						"values":      spiValues.NewPlcList(plcListValues),
+					})
 				case readWriteModel.StatusCoding_LEVEL_BY_THIS_SERIAL_INTERFACE:
 					fallthrough
 				case readWriteModel.StatusCoding_LEVEL_BY_ELSEWHERE:
 					levelInformation := calData.GetLevelInformation()
 					responseCodes[fieldName] = apiModel.PlcResponseCode_OK
-					plcListValues := make([]values.PlcValue, len(levelInformation))
+					plcListValues := make([]apiValues.PlcValue, len(levelInformation))
 					for i, levelInformation := range levelInformation {
 						switch levelInformation := levelInformation.(type) {
 						case readWriteModel.LevelInformationAbsentExactly:
@@ -228,7 +239,7 @@ func (m *Subscriber) handleMonitoredSal(sal readWriteModel.MonitoredSAL) bool {
 			intervals := map[string]time.Duration{}
 			responseCodes := map[string]apiModel.PlcResponseCode{}
 			address := map[string]string{}
-			plcValues := map[string]values.PlcValue{}
+			plcValues := map[string]apiValues.PlcValue{}
 			fieldName := subscriptionHandle.fieldName
 
 			subscriptionType := subscriptionHandle.fieldType