You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/09/06 12:46:44 UTC
[plc4x] branch develop updated: feat(plc4go/cbus): added source to events
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 31438eba8 feat(plc4go/cbus): added source to events
31438eba8 is described below
commit 31438eba8c6a6c92b5a4ba42c337900747358be0
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Sep 6 14:46:37 2022 +0200
feat(plc4go/cbus): added source to events
---
plc4go/internal/cbus/Subscriber.go | 11 +++--
plc4go/internal/cbus/SubscriptionEvent.go | 21 ++++++--
plc4go/spi/model/DefaultPlcSubscriptionEvent.go | 60 +++++++++++++++++++++--
plc4go/spi/model/DefaultPlcSubscriptionRequest.go | 13 +++++
4 files changed, 94 insertions(+), 11 deletions(-)
diff --git a/plc4go/internal/cbus/Subscriber.go b/plc4go/internal/cbus/Subscriber.go
index 712ead1bc..0093c33fa 100644
--- a/plc4go/internal/cbus/Subscriber.go
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -44,8 +44,7 @@ func NewSubscriber(connection *Connection) *Subscriber {
}
}
-func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
- // TODO: handle context
+func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
result := make(chan apiModel.PlcSubscriptionRequestResult)
go func() {
internalPlcSubscriptionRequest := subscriptionRequest.(spiModel.DefaultPlcSubscriptionRequest)
@@ -112,6 +111,7 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
intervals := map[string]time.Duration{}
responseCodes := map[string]apiModel.PlcResponseCode{}
address := map[string]string{}
+ sources := map[string]string{}
plcValues := map[string]apiValues.PlcValue{}
fieldName := subscriptionHandle.fieldName
@@ -122,6 +122,7 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
continue
}
}
+ sources[fieldName] = unitAddressString
subscriptionType := subscriptionHandle.fieldType
// TODO: handle subscriptionType
@@ -217,7 +218,7 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
// Assemble a PlcSubscription event
if len(plcValues) > 0 {
- event := NewSubscriptionEvent(fields, types, intervals, responseCodes, address, plcValues)
+ event := NewSubscriptionEvent(fields, types, intervals, responseCodes, address, sources, plcValues)
consumer(event)
}
}
@@ -239,6 +240,7 @@ func (m *Subscriber) handleMonitoredSal(sal readWriteModel.MonitoredSAL) bool {
intervals := map[string]time.Duration{}
responseCodes := map[string]apiModel.PlcResponseCode{}
address := map[string]string{}
+ sources := map[string]string{}
plcValues := map[string]apiValues.PlcValue{}
fieldName := subscriptionHandle.fieldName
@@ -278,6 +280,7 @@ func (m *Subscriber) handleMonitoredSal(sal readWriteModel.MonitoredSAL) bool {
continue
}
}
+ sources[fieldName] = unitAddressString
if application := field.GetApplication(); application != nil {
if actualApplicationIdString := application.ApplicationId().String(); applicationString != actualApplicationIdString {
@@ -302,7 +305,7 @@ func (m *Subscriber) handleMonitoredSal(sal readWriteModel.MonitoredSAL) bool {
// Assemble a PlcSubscription event
if len(plcValues) > 0 {
- event := NewSubscriptionEvent(fields, types, intervals, responseCodes, address, plcValues)
+ event := NewSubscriptionEvent(fields, types, intervals, responseCodes, address, sources, plcValues)
consumer(event)
}
}
diff --git a/plc4go/internal/cbus/SubscriptionEvent.go b/plc4go/internal/cbus/SubscriptionEvent.go
index bc6a092d4..70bbcd879 100644
--- a/plc4go/internal/cbus/SubscriptionEvent.go
+++ b/plc4go/internal/cbus/SubscriptionEvent.go
@@ -29,12 +29,21 @@ import (
type SubscriptionEvent struct {
internalMode.DefaultPlcSubscriptionEvent
address map[string]string
+ sources map[string]string
}
-func NewSubscriptionEvent(fields map[string]apiModel.PlcField, types map[string]internalMode.SubscriptionType,
- intervals map[string]time.Duration, responseCodes map[string]apiModel.PlcResponseCode,
- address map[string]string, values map[string]values.PlcValue) SubscriptionEvent {
- subscriptionEvent := SubscriptionEvent{address: address}
+func NewSubscriptionEvent(
+ fields map[string]apiModel.PlcField,
+ types map[string]internalMode.SubscriptionType,
+ intervals map[string]time.Duration,
+ responseCodes map[string]apiModel.PlcResponseCode,
+ address map[string]string,
+ sources map[string]string,
+ values map[string]values.PlcValue) SubscriptionEvent {
+ subscriptionEvent := SubscriptionEvent{
+ address: address,
+ sources: sources,
+ }
subscriptionEvent.DefaultPlcSubscriptionEvent = internalMode.NewDefaultPlcSubscriptionEvent(subscriptionEvent, fields, types, intervals, responseCodes, values)
return subscriptionEvent
}
@@ -42,3 +51,7 @@ func NewSubscriptionEvent(fields map[string]apiModel.PlcField, types map[string]
func (m SubscriptionEvent) GetAddress(name string) string {
return m.address[name]
}
+
+func (m SubscriptionEvent) GetSource(name string) string {
+ return m.sources[name]
+}
diff --git a/plc4go/spi/model/DefaultPlcSubscriptionEvent.go b/plc4go/spi/model/DefaultPlcSubscriptionEvent.go
index 81dd600de..1bce13096 100644
--- a/plc4go/spi/model/DefaultPlcSubscriptionEvent.go
+++ b/plc4go/spi/model/DefaultPlcSubscriptionEvent.go
@@ -85,13 +85,66 @@ func (m DefaultPlcSubscriptionEvent) GetValue(name string) values.PlcValue {
}
func (m DefaultPlcSubscriptionEvent) Serialize(writeBuffer utils.WriteBuffer) error {
- if err := writeBuffer.PushContext("PlcReadResponse"); err != nil {
+ if err := writeBuffer.PushContext("PlcSubscriptionEvent"); err != nil {
+ return err
+ }
+
+ if err := writeBuffer.WriteSerializable(m.DefaultResponse); err != nil {
return err
}
if err := writeBuffer.PushContext("fields"); err != nil {
return err
}
+ for _, fieldName := range m.GetFieldNames() {
+ if err := writeBuffer.PushContext(fieldName); err != nil {
+ return err
+ }
+ valueResponse := m.GetField(fieldName)
+ if err := writeBuffer.WriteString("addressString", uint32(len(valueResponse.GetAddressString())*8), "UTF-8", valueResponse.GetAddressString()); err != nil {
+ return err
+ }
+ if err := writeBuffer.WriteString("typeName", uint32(len(valueResponse.GetTypeName())*8), "UTF-8", valueResponse.GetTypeName()); err != nil {
+ return err
+ }
+ if err := writeBuffer.WriteUint16(fieldName, 8, uint16(valueResponse.GetQuantity())); err != nil {
+ return err
+ }
+ if err := writeBuffer.PopContext(fieldName); err != nil {
+ return err
+ }
+ }
+ if err := writeBuffer.PopContext("fields"); err != nil {
+ return err
+ }
+ if err := writeBuffer.PushContext("types"); err != nil {
+ return err
+ }
+ for _, fieldName := range m.GetFieldNames() {
+ fieldType := m.GetType(fieldName)
+ if err := writeBuffer.WriteUint8(fieldName, 8, uint8(fieldType), utils.WithAdditionalStringRepresentation(fieldType.String())); err != nil {
+ return err
+ }
+ }
+ if err := writeBuffer.PopContext("types"); err != nil {
+ return err
+ }
+ if err := writeBuffer.PushContext("intervals"); err != nil {
+ return err
+ }
+ for _, fieldName := range m.GetFieldNames() {
+ interval := m.GetInterval(fieldName)
+ if err := writeBuffer.WriteInt64(fieldName, 8, int64(interval), utils.WithAdditionalStringRepresentation(interval.String())); err != nil {
+ return err
+ }
+ }
+ if err := writeBuffer.PopContext("intervals"); err != nil {
+ return err
+ }
+
+ if err := writeBuffer.PushContext("values"); err != nil {
+ return err
+ }
for _, fieldName := range m.GetFieldNames() {
if err := writeBuffer.PushContext(fieldName); err != nil {
return err
@@ -104,10 +157,11 @@ func (m DefaultPlcSubscriptionEvent) Serialize(writeBuffer utils.WriteBuffer) er
return err
}
}
- if err := writeBuffer.PopContext("fields"); err != nil {
+ if err := writeBuffer.PopContext("values"); err != nil {
return err
}
- if err := writeBuffer.PopContext("PlcReadResponse"); err != nil {
+
+ if err := writeBuffer.PopContext("PlcSubscriptionEvent"); err != nil {
return err
}
return nil
diff --git a/plc4go/spi/model/DefaultPlcSubscriptionRequest.go b/plc4go/spi/model/DefaultPlcSubscriptionRequest.go
index 2cea374fa..3c6b551a2 100644
--- a/plc4go/spi/model/DefaultPlcSubscriptionRequest.go
+++ b/plc4go/spi/model/DefaultPlcSubscriptionRequest.go
@@ -36,6 +36,19 @@ const (
SubscriptionEvent SubscriptionType = 0x03
)
+func (s SubscriptionType) String() string {
+ switch s {
+ case SubscriptionCyclic:
+ return "SubscriptionCyclic"
+ case SubscriptionChangeOfState:
+ return "SubscriptionChangeOfState"
+ case SubscriptionEvent:
+ return "SubscriptionEvent"
+ default:
+ return "Unknown"
+ }
+}
+
type DefaultPlcSubscriptionRequestBuilder struct {
subscriber spi.PlcSubscriber
fieldHandler spi.PlcFieldHandler