You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/09/06 12:46:44 UTC

[plc4x] branch develop updated: feat(plc4go/cbus): added source to events

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 31438eba8 feat(plc4go/cbus): added source to events
31438eba8 is described below

commit 31438eba8c6a6c92b5a4ba42c337900747358be0
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Sep 6 14:46:37 2022 +0200

    feat(plc4go/cbus): added source to events
---
 plc4go/internal/cbus/Subscriber.go                | 11 +++--
 plc4go/internal/cbus/SubscriptionEvent.go         | 21 ++++++--
 plc4go/spi/model/DefaultPlcSubscriptionEvent.go   | 60 +++++++++++++++++++++--
 plc4go/spi/model/DefaultPlcSubscriptionRequest.go | 13 +++++
 4 files changed, 94 insertions(+), 11 deletions(-)

diff --git a/plc4go/internal/cbus/Subscriber.go b/plc4go/internal/cbus/Subscriber.go
index 712ead1bc..0093c33fa 100644
--- a/plc4go/internal/cbus/Subscriber.go
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -44,8 +44,7 @@ func NewSubscriber(connection *Connection) *Subscriber {
 	}
 }
 
-func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
-	// TODO: handle context
+func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
 	result := make(chan apiModel.PlcSubscriptionRequestResult)
 	go func() {
 		internalPlcSubscriptionRequest := subscriptionRequest.(spiModel.DefaultPlcSubscriptionRequest)
@@ -112,6 +111,7 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
 			intervals := map[string]time.Duration{}
 			responseCodes := map[string]apiModel.PlcResponseCode{}
 			address := map[string]string{}
+			sources := map[string]string{}
 			plcValues := map[string]apiValues.PlcValue{}
 			fieldName := subscriptionHandle.fieldName
 
@@ -122,6 +122,7 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
 					continue
 				}
 			}
+			sources[fieldName] = unitAddressString
 
 			subscriptionType := subscriptionHandle.fieldType
 			// TODO: handle subscriptionType
@@ -217,7 +218,7 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
 
 			// Assemble a PlcSubscription event
 			if len(plcValues) > 0 {
-				event := NewSubscriptionEvent(fields, types, intervals, responseCodes, address, plcValues)
+				event := NewSubscriptionEvent(fields, types, intervals, responseCodes, address, sources, plcValues)
 				consumer(event)
 			}
 		}
@@ -239,6 +240,7 @@ func (m *Subscriber) handleMonitoredSal(sal readWriteModel.MonitoredSAL) bool {
 			intervals := map[string]time.Duration{}
 			responseCodes := map[string]apiModel.PlcResponseCode{}
 			address := map[string]string{}
+			sources := map[string]string{}
 			plcValues := map[string]apiValues.PlcValue{}
 			fieldName := subscriptionHandle.fieldName
 
@@ -278,6 +280,7 @@ func (m *Subscriber) handleMonitoredSal(sal readWriteModel.MonitoredSAL) bool {
 					continue
 				}
 			}
+			sources[fieldName] = unitAddressString
 
 			if application := field.GetApplication(); application != nil {
 				if actualApplicationIdString := application.ApplicationId().String(); applicationString != actualApplicationIdString {
@@ -302,7 +305,7 @@ func (m *Subscriber) handleMonitoredSal(sal readWriteModel.MonitoredSAL) bool {
 
 			// Assemble a PlcSubscription event
 			if len(plcValues) > 0 {
-				event := NewSubscriptionEvent(fields, types, intervals, responseCodes, address, plcValues)
+				event := NewSubscriptionEvent(fields, types, intervals, responseCodes, address, sources, plcValues)
 				consumer(event)
 			}
 		}
diff --git a/plc4go/internal/cbus/SubscriptionEvent.go b/plc4go/internal/cbus/SubscriptionEvent.go
index bc6a092d4..70bbcd879 100644
--- a/plc4go/internal/cbus/SubscriptionEvent.go
+++ b/plc4go/internal/cbus/SubscriptionEvent.go
@@ -29,12 +29,21 @@ import (
 type SubscriptionEvent struct {
 	internalMode.DefaultPlcSubscriptionEvent
 	address map[string]string
+	sources map[string]string
 }
 
-func NewSubscriptionEvent(fields map[string]apiModel.PlcField, types map[string]internalMode.SubscriptionType,
-	intervals map[string]time.Duration, responseCodes map[string]apiModel.PlcResponseCode,
-	address map[string]string, values map[string]values.PlcValue) SubscriptionEvent {
-	subscriptionEvent := SubscriptionEvent{address: address}
+func NewSubscriptionEvent(
+	fields map[string]apiModel.PlcField,
+	types map[string]internalMode.SubscriptionType,
+	intervals map[string]time.Duration,
+	responseCodes map[string]apiModel.PlcResponseCode,
+	address map[string]string,
+	sources map[string]string,
+	values map[string]values.PlcValue) SubscriptionEvent {
+	subscriptionEvent := SubscriptionEvent{
+		address: address,
+		sources: sources,
+	}
 	subscriptionEvent.DefaultPlcSubscriptionEvent = internalMode.NewDefaultPlcSubscriptionEvent(subscriptionEvent, fields, types, intervals, responseCodes, values)
 	return subscriptionEvent
 }
@@ -42,3 +51,7 @@ func NewSubscriptionEvent(fields map[string]apiModel.PlcField, types map[string]
 func (m SubscriptionEvent) GetAddress(name string) string {
 	return m.address[name]
 }
+
+func (m SubscriptionEvent) GetSource(name string) string {
+	return m.sources[name]
+}
diff --git a/plc4go/spi/model/DefaultPlcSubscriptionEvent.go b/plc4go/spi/model/DefaultPlcSubscriptionEvent.go
index 81dd600de..1bce13096 100644
--- a/plc4go/spi/model/DefaultPlcSubscriptionEvent.go
+++ b/plc4go/spi/model/DefaultPlcSubscriptionEvent.go
@@ -85,13 +85,66 @@ func (m DefaultPlcSubscriptionEvent) GetValue(name string) values.PlcValue {
 }
 
 func (m DefaultPlcSubscriptionEvent) Serialize(writeBuffer utils.WriteBuffer) error {
-	if err := writeBuffer.PushContext("PlcReadResponse"); err != nil {
+	if err := writeBuffer.PushContext("PlcSubscriptionEvent"); err != nil {
+		return err
+	}
+
+	if err := writeBuffer.WriteSerializable(m.DefaultResponse); err != nil {
 		return err
 	}
 
 	if err := writeBuffer.PushContext("fields"); err != nil {
 		return err
 	}
+	for _, fieldName := range m.GetFieldNames() {
+		if err := writeBuffer.PushContext(fieldName); err != nil {
+			return err
+		}
+		valueResponse := m.GetField(fieldName)
+		if err := writeBuffer.WriteString("addressString", uint32(len(valueResponse.GetAddressString())*8), "UTF-8", valueResponse.GetAddressString()); err != nil {
+			return err
+		}
+		if err := writeBuffer.WriteString("typeName", uint32(len(valueResponse.GetTypeName())*8), "UTF-8", valueResponse.GetTypeName()); err != nil {
+			return err
+		}
+		if err := writeBuffer.WriteUint16(fieldName, 8, uint16(valueResponse.GetQuantity())); err != nil {
+			return err
+		}
+		if err := writeBuffer.PopContext(fieldName); err != nil {
+			return err
+		}
+	}
+	if err := writeBuffer.PopContext("fields"); err != nil {
+		return err
+	}
+	if err := writeBuffer.PushContext("types"); err != nil {
+		return err
+	}
+	for _, fieldName := range m.GetFieldNames() {
+		fieldType := m.GetType(fieldName)
+		if err := writeBuffer.WriteUint8(fieldName, 8, uint8(fieldType), utils.WithAdditionalStringRepresentation(fieldType.String())); err != nil {
+			return err
+		}
+	}
+	if err := writeBuffer.PopContext("types"); err != nil {
+		return err
+	}
+	if err := writeBuffer.PushContext("intervals"); err != nil {
+		return err
+	}
+	for _, fieldName := range m.GetFieldNames() {
+		interval := m.GetInterval(fieldName)
+		if err := writeBuffer.WriteInt64(fieldName, 8, int64(interval), utils.WithAdditionalStringRepresentation(interval.String())); err != nil {
+			return err
+		}
+	}
+	if err := writeBuffer.PopContext("intervals"); err != nil {
+		return err
+	}
+
+	if err := writeBuffer.PushContext("values"); err != nil {
+		return err
+	}
 	for _, fieldName := range m.GetFieldNames() {
 		if err := writeBuffer.PushContext(fieldName); err != nil {
 			return err
@@ -104,10 +157,11 @@ func (m DefaultPlcSubscriptionEvent) Serialize(writeBuffer utils.WriteBuffer) er
 			return err
 		}
 	}
-	if err := writeBuffer.PopContext("fields"); err != nil {
+	if err := writeBuffer.PopContext("values"); err != nil {
 		return err
 	}
-	if err := writeBuffer.PopContext("PlcReadResponse"); err != nil {
+
+	if err := writeBuffer.PopContext("PlcSubscriptionEvent"); err != nil {
 		return err
 	}
 	return nil
diff --git a/plc4go/spi/model/DefaultPlcSubscriptionRequest.go b/plc4go/spi/model/DefaultPlcSubscriptionRequest.go
index 2cea374fa..3c6b551a2 100644
--- a/plc4go/spi/model/DefaultPlcSubscriptionRequest.go
+++ b/plc4go/spi/model/DefaultPlcSubscriptionRequest.go
@@ -36,6 +36,19 @@ const (
 	SubscriptionEvent         SubscriptionType = 0x03
 )
 
+func (s SubscriptionType) String() string {
+	switch s {
+	case SubscriptionCyclic:
+		return "SubscriptionCyclic"
+	case SubscriptionChangeOfState:
+		return "SubscriptionChangeOfState"
+	case SubscriptionEvent:
+		return "SubscriptionEvent"
+	default:
+		return "Unknown"
+	}
+}
+
 type DefaultPlcSubscriptionRequestBuilder struct {
 	subscriber             spi.PlcSubscriber
 	fieldHandler           spi.PlcFieldHandler