You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2020/11/12 13:15:15 UTC
[plc4x] branch feature/plc4go updated: - Updated the KNX driver to
submit only actually changed values as well as passing the address along
(When using a pattern it is important to be able to access the actual
address of a datapoint)
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch feature/plc4go
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/feature/plc4go by this push:
new 1608cc1 - Updated the KNX driver to submit only actually changed values as well as passing the address along (When using a pattern it is important to be able to access the actual address of a datapoint)
1608cc1 is described below
commit 1608cc1252ef5ca0ca5200f78e71bf76d028c2ce
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Thu Nov 12 14:15:08 2020 +0100
- Updated the KNX driver to submit only actually changed values as well as passing the address along (When using a pattern it is important to be able to access the actual address of a datapoint)
---
plc4go/cmd/main/drivers/knxnetip_test.go | 4 +-
.../internal/plc4go/knxnetip/KncNetIpConnection.go | 37 ++++++++--
.../internal/plc4go/knxnetip/KnxNetIpSubscriber.go | 44 +++++++-----
.../plc4go/knxnetip/KnxNetIpSubscriptionEvent.go | 84 ++++++++++++++++++++++
.../plc4go/model/DefaultPlcSubscriptionEvent.go | 7 +-
plc4go/pkg/plc4go/model/plc_subscription_event.go | 1 +
6 files changed, 151 insertions(+), 26 deletions(-)
diff --git a/plc4go/cmd/main/drivers/knxnetip_test.go b/plc4go/cmd/main/drivers/knxnetip_test.go
index b070cf4..db4c12f 100644
--- a/plc4go/cmd/main/drivers/knxnetip_test.go
+++ b/plc4go/cmd/main/drivers/knxnetip_test.go
@@ -146,7 +146,9 @@ func TestKnxNetIpPlc4goDriver(t *testing.T) {
func knxEventHandler(event apiModel.PlcSubscriptionEvent) {
for _, fieldName := range event.GetFieldNames() {
if event.GetResponseCode(fieldName) == apiModel.PlcResponseCode_OK {
- fmt.Printf("Got update for field %s with value %s", fieldName, event.GetValue(fieldName).GetString())
+ groupAddress := event.GetAddress(fieldName)
+ fmt.Printf("Got update for field %s with address %s. Value changed to: %s\n",
+ fieldName, groupAddress, event.GetValue(fieldName).GetString())
}
}
}
\ No newline at end of file
diff --git a/plc4go/internal/plc4go/knxnetip/KncNetIpConnection.go b/plc4go/internal/plc4go/knxnetip/KncNetIpConnection.go
index b16f66a..07ad4e5 100644
--- a/plc4go/internal/plc4go/knxnetip/KncNetIpConnection.go
+++ b/plc4go/internal/plc4go/knxnetip/KncNetIpConnection.go
@@ -22,7 +22,6 @@ import (
"bytes"
"errors"
"fmt"
- "net"
driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
internalModel "github.com/apache/plc4x/plc4go/internal/plc4go/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
@@ -32,6 +31,7 @@ import (
"github.com/apache/plc4x/plc4go/internal/plc4go/utils"
"github.com/apache/plc4x/plc4go/pkg/plc4go"
apiModel "github.com/apache/plc4x/plc4go/pkg/plc4go/model"
+ "net"
"strconv"
"time"
)
@@ -59,6 +59,7 @@ type KnxNetIpConnection struct {
valueHandler spi.PlcValueHandler
quitConnectionStateTimer chan struct{}
subscribers []*KnxNetIpSubscriber
+ valueCache map[uint16][]int8
GatewayKnxAddress *driverModel.KnxAddress
GatewayName string
@@ -77,6 +78,7 @@ func NewKnxNetIpConnection(messageCodec spi.MessageCodec, options map[string][]s
valueHandler: NewValueHandler(),
requestInterceptor: interceptors.NewSingleItemRequestInterceptor(),
subscribers: []*KnxNetIpSubscriber{},
+ valueCache: map[uint16][]int8{},
}
}
@@ -193,7 +195,7 @@ func (m *KnxNetIpConnection) Connect() <-chan plc4go.PlcConnectionConnectResult
// Save the KNX Address the Gateway assigned to this connection.
m.ClientKnxAddress = tunnelConnectionDataBlock.KnxAddress
- fmt.Printf("Successfully connected to KNXnet/IP Gateway '%s' with KNX address '%d.%d.%d' got assigned client KNX address '%d.%d.%d'",
+ fmt.Printf("Successfully connected to KNXnet/IP Gateway '%s' with KNX address '%d.%d.%d' got assigned client KNX address '%d.%d.%d'\n",
m.GatewayName,
m.GatewayKnxAddress.MainGroup, m.GatewayKnxAddress.MiddleGroup, m.GatewayKnxAddress.SubGroup,
m.ClientKnxAddress.MainGroup, m.ClientKnxAddress.MiddleGroup, m.ClientKnxAddress.SubGroup)
@@ -363,12 +365,21 @@ func (m *KnxNetIpConnection) handleIncomingTunnelingRequest(tunnelingRequestChan
break
}
- cemiDataInd := driverModel.CastCEMIDataInd(tunnelingRequest.Cemi.Child)
- if cemiDataInd != nil {
- for _, subscriber := range m.subscribers {
- subscriber.handle(cemiDataInd.CemiDataFrame)
+ go func() {
+ cemiDataInd := driverModel.CastCEMIDataInd(tunnelingRequest.Cemi.Child)
+ if cemiDataInd != nil {
+ addressData := uint16(cemiDataInd.CemiDataFrame.DestinationAddress[0])<<8 | (uint16(cemiDataInd.CemiDataFrame.DestinationAddress[1]) & 0xFF)
+ val, ok := m.valueCache[addressData]
+ changed := false
+ if !ok || !m.sliceEqual(val, cemiDataInd.CemiDataFrame.Data) {
+ m.valueCache[addressData] = cemiDataInd.CemiDataFrame.Data
+ changed = true
+ }
+ for _, subscriber := range m.subscribers {
+ subscriber.handleValueChange(cemiDataInd.CemiDataFrame, changed)
+ }
}
- }
+ }()
}
}
@@ -398,3 +409,15 @@ func (m *KnxNetIpConnection) removeSubscriber(subscriber *KnxNetIpSubscriber) {
}
}
}
+
+func (m *KnxNetIpConnection) sliceEqual(a, b []int8) bool {
+ if len(a) != len(b) {
+ return false
+ }
+ for i, v := range a {
+ if v != b[i] {
+ return false
+ }
+ }
+ return true
+}
diff --git a/plc4go/internal/plc4go/knxnetip/KnxNetIpSubscriber.go b/plc4go/internal/plc4go/knxnetip/KnxNetIpSubscriber.go
index 5c19b7d..95d8d2b 100644
--- a/plc4go/internal/plc4go/knxnetip/KnxNetIpSubscriber.go
+++ b/plc4go/internal/plc4go/knxnetip/KnxNetIpSubscriber.go
@@ -74,7 +74,10 @@ func (m *KnxNetIpSubscriber) Unsubscribe(unsubscriptionRequest apiModel.PlcUnsub
return result
}
-func (m *KnxNetIpSubscriber) handle(cemiDataFrame *driverModel.CEMIDataFrame) {
+/*
+ * Callback for incoming value change events from the KNX bus
+ */
+func (m *KnxNetIpSubscriber) handleValueChange(cemiDataFrame *driverModel.CEMIDataFrame, changed bool) {
// Decode the group-address according to the settings in the driver
// Group addresses can be 1, 2 or 3 levels (3 being the default)
garb := utils.NewReadBuffer(utils.Int8ToUint8(cemiDataFrame.DestinationAddress))
@@ -89,6 +92,7 @@ func (m *KnxNetIpSubscriber) handle(cemiDataFrame *driverModel.CEMIDataFrame) {
types := map[string]internalModel.SubscriptionType{}
intervals := map[string]time.Duration{}
responseCodes := map[string]apiModel.PlcResponseCode{}
+ addresses := map[string][]int8{}
plcValues := map[string]values.PlcValue{}
// Check if this datagram matches any address in this subscription request
@@ -98,30 +102,36 @@ func (m *KnxNetIpSubscriber) handle(cemiDataFrame *driverModel.CEMIDataFrame) {
if err != nil {
continue
}
+ subscriptionType := subscriptionRequest.GetType(fieldName)
// If it matches, take the datatype of each matching field and try to decode the payload
if field.matches(*groupAddress) {
- var payload []uint8
- payload = append(payload, uint8(cemiDataFrame.DataFirstByte))
- payload = append(payload, utils.Int8ToByte(cemiDataFrame.Data)...)
- rb := utils.NewReadBuffer(payload)
- plcValue, err := driverModel.KnxDatapointParse(rb, field.GetTypeName())
- fields[fieldName] = field
- types[fieldName] = subscriptionRequest.GetType(fieldName)
- intervals[fieldName] = subscriptionRequest.GetInterval(fieldName)
- if err == nil {
- responseCodes[fieldName] = apiModel.PlcResponseCode_OK
- plcValues[fieldName] = plcValue
- } else {
- // TODO: Do a little more here ...
- responseCodes[fieldName] = apiModel.PlcResponseCode_INTERNAL_ERROR
- plcValues[fieldName] = nil
+ // If this is a CHANGE_OF_STATE field, filter out the events where the value actually hasn't changed.
+ if subscriptionType == internalModel.SUBSCRIPTION_CHANGE_OF_STATE && changed {
+ var payload []uint8
+ payload = append(payload, uint8(cemiDataFrame.DataFirstByte))
+ payload = append(payload, utils.Int8ToByte(cemiDataFrame.Data)...)
+ rb := utils.NewReadBuffer(payload)
+ plcValue, err := driverModel.KnxDatapointParse(rb, field.GetTypeName())
+ fields[fieldName] = field
+ types[fieldName] = subscriptionRequest.GetType(fieldName)
+ intervals[fieldName] = subscriptionRequest.GetInterval(fieldName)
+ addresses[fieldName] = cemiDataFrame.DestinationAddress
+ if err == nil {
+ responseCodes[fieldName] = apiModel.PlcResponseCode_OK
+ plcValues[fieldName] = plcValue
+ } else {
+ // TODO: Do a little more here ...
+ responseCodes[fieldName] = apiModel.PlcResponseCode_INTERNAL_ERROR
+ plcValues[fieldName] = nil
+ }
}
}
}
// Assemble a PlcSubscription event
if len(plcValues) > 0 {
- event := internalModel.NewDefaultPlcSubscriptionEvent(fields, types, intervals, responseCodes, plcValues)
+ event := NewKnxNetIpSubscriptionEvent(
+ fields, types, intervals, responseCodes, addresses, plcValues)
eventHandler := subscriptionRequest.GetEventHandler()
eventHandler(event)
}
diff --git a/plc4go/internal/plc4go/knxnetip/KnxNetIpSubscriptionEvent.go b/plc4go/internal/plc4go/knxnetip/KnxNetIpSubscriptionEvent.go
new file mode 100644
index 0000000..364c11c
--- /dev/null
+++ b/plc4go/internal/plc4go/knxnetip/KnxNetIpSubscriptionEvent.go
@@ -0,0 +1,84 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package knxnetip
+
+import (
+ driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
+ internalMode "github.com/apache/plc4x/plc4go/internal/plc4go/model"
+ "github.com/apache/plc4x/plc4go/internal/plc4go/utils"
+ apiModel "github.com/apache/plc4x/plc4go/pkg/plc4go/model"
+ "github.com/apache/plc4x/plc4go/pkg/plc4go/values"
+ "strconv"
+ "time"
+)
+
+type KnxNetIpSubscriptionEvent struct {
+ addresses map[string][]int8
+ internalMode.DefaultPlcSubscriptionEvent
+}
+
+func NewKnxNetIpSubscriptionEvent(fields map[string]apiModel.PlcField, types map[string]internalMode.SubscriptionType,
+ intervals map[string]time.Duration, responseCodes map[string]apiModel.PlcResponseCode,
+ addresses map[string][]int8, values map[string]values.PlcValue) KnxNetIpSubscriptionEvent {
+ return KnxNetIpSubscriptionEvent{
+ addresses: addresses,
+ DefaultPlcSubscriptionEvent:
+ internalMode.NewDefaultPlcSubscriptionEvent(fields, types, intervals, responseCodes, values),
+ }
+}
+
+/*
+ * Decode the binary data in the address according to the field requested
+ */
+func (m KnxNetIpSubscriptionEvent) GetAddress(name string) string {
+ rawAddress := m.addresses[name]
+ rawAddressReadBuffer := utils.NewReadBuffer(utils.Int8ToUint8(rawAddress))
+ field := m.DefaultPlcSubscriptionEvent.GetField(name)
+ var groupAddress *driverModel.KnxGroupAddress
+ var err error
+ switch field.(type) {
+ case KnxNetIpGroupAddress3LevelPlcField:
+ groupAddress, err = driverModel.KnxGroupAddressParse(rawAddressReadBuffer, 3)
+ case KnxNetIpGroupAddress2LevelPlcField:
+ groupAddress, err = driverModel.KnxGroupAddressParse(rawAddressReadBuffer, 2)
+ case KnxNetIpGroupAddress1LevelPlcField:
+ groupAddress, err = driverModel.KnxGroupAddressParse(rawAddressReadBuffer, 1)
+ }
+ if err != nil {
+ return ""
+ }
+ return m.groupAddressToString(groupAddress)
+}
+
+func (m KnxNetIpSubscriptionEvent) groupAddressToString(groupAddress *driverModel.KnxGroupAddress) string {
+ if groupAddress != nil {
+ switch groupAddress.Child.(type) {
+ case *driverModel.KnxGroupAddress3Level:
+ level3 := driverModel.CastKnxGroupAddress3Level(groupAddress)
+ return strconv.Itoa(int(level3.MainGroup)) + "/" + strconv.Itoa(int(level3.MiddleGroup)) + "/" + strconv.Itoa(int(level3.SubGroup))
+ case *driverModel.KnxGroupAddress2Level:
+ level2 := driverModel.CastKnxGroupAddress2Level(groupAddress)
+ return strconv.Itoa(int(level2.MainGroup)) + "/" + strconv.Itoa(int(level2.SubGroup))
+ case *driverModel.KnxGroupAddressFreeLevel:
+ level1 := driverModel.CastKnxGroupAddressFreeLevel(groupAddress)
+ return strconv.Itoa(int(level1.SubGroup))
+ }
+ }
+ return ""
+}
diff --git a/plc4go/internal/plc4go/model/DefaultPlcSubscriptionEvent.go b/plc4go/internal/plc4go/model/DefaultPlcSubscriptionEvent.go
index 67c476f..c7e3f14 100644
--- a/plc4go/internal/plc4go/model/DefaultPlcSubscriptionEvent.go
+++ b/plc4go/internal/plc4go/model/DefaultPlcSubscriptionEvent.go
@@ -35,7 +35,8 @@ type DefaultPlcSubscriptionEvent struct {
}
func NewDefaultPlcSubscriptionEvent(fields map[string]model.PlcField, types map[string]SubscriptionType,
- intervals map[string]time.Duration, responseCodes map[string]model.PlcResponseCode, values map[string]values.PlcValue) DefaultPlcSubscriptionEvent {
+ intervals map[string]time.Duration, responseCodes map[string]model.PlcResponseCode,
+ values map[string]values.PlcValue) DefaultPlcSubscriptionEvent {
return DefaultPlcSubscriptionEvent{
fields: fields,
types: types,
@@ -69,6 +70,10 @@ func (m DefaultPlcSubscriptionEvent) GetResponseCode(name string) model.PlcRespo
return m.responseCodes[name]
}
+func (m DefaultPlcSubscriptionEvent) GetAddress(name string) string {
+ panic("GetAddress not implemented")
+}
+
func (m DefaultPlcSubscriptionEvent) GetValue(name string) values.PlcValue {
return m.values[name]
}
diff --git a/plc4go/pkg/plc4go/model/plc_subscription_event.go b/plc4go/pkg/plc4go/model/plc_subscription_event.go
index 295f8c8..3eab46d 100644
--- a/plc4go/pkg/plc4go/model/plc_subscription_event.go
+++ b/plc4go/pkg/plc4go/model/plc_subscription_event.go
@@ -24,6 +24,7 @@ type PlcSubscriptionEvent interface {
GetRequest() PlcSubscriptionRequest
GetFieldNames() []string
GetResponseCode(name string) PlcResponseCode
+ GetAddress(name string) string
GetValue(name string) values.PlcValue
PlcResponse
}