You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2020/11/12 13:15:15 UTC

[plc4x] branch feature/plc4go updated: - Updated the KNX driver to submit only actually changed values as well as passing the address along (When using a pattern it is important to be able to access the actual address of a datapoint)

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch feature/plc4go
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/feature/plc4go by this push:
     new 1608cc1  - Updated the KNX driver to submit only actually changed values as well as passing the address along (When using a pattern it is important to be able to access the actual address of a datapoint)
1608cc1 is described below

commit 1608cc1252ef5ca0ca5200f78e71bf76d028c2ce
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Thu Nov 12 14:15:08 2020 +0100

    - Updated the KNX driver to submit only actually changed values as well as passing the address along (When using a pattern it is important to be able to access the actual address of a datapoint)
---
 plc4go/cmd/main/drivers/knxnetip_test.go           |  4 +-
 .../internal/plc4go/knxnetip/KncNetIpConnection.go | 37 ++++++++--
 .../internal/plc4go/knxnetip/KnxNetIpSubscriber.go | 44 +++++++-----
 .../plc4go/knxnetip/KnxNetIpSubscriptionEvent.go   | 84 ++++++++++++++++++++++
 .../plc4go/model/DefaultPlcSubscriptionEvent.go    |  7 +-
 plc4go/pkg/plc4go/model/plc_subscription_event.go  |  1 +
 6 files changed, 151 insertions(+), 26 deletions(-)

diff --git a/plc4go/cmd/main/drivers/knxnetip_test.go b/plc4go/cmd/main/drivers/knxnetip_test.go
index b070cf4..db4c12f 100644
--- a/plc4go/cmd/main/drivers/knxnetip_test.go
+++ b/plc4go/cmd/main/drivers/knxnetip_test.go
@@ -146,7 +146,9 @@ func TestKnxNetIpPlc4goDriver(t *testing.T) {
 func knxEventHandler(event apiModel.PlcSubscriptionEvent) {
     for _, fieldName := range event.GetFieldNames() {
         if event.GetResponseCode(fieldName) == apiModel.PlcResponseCode_OK {
-            fmt.Printf("Got update for field %s with value %s", fieldName, event.GetValue(fieldName).GetString())
+            groupAddress := event.GetAddress(fieldName)
+            fmt.Printf("Got update for field %s with address %s. Value changed to: %s\n",
+                fieldName, groupAddress, event.GetValue(fieldName).GetString())
         }
     }
 }
\ No newline at end of file
diff --git a/plc4go/internal/plc4go/knxnetip/KncNetIpConnection.go b/plc4go/internal/plc4go/knxnetip/KncNetIpConnection.go
index b16f66a..07ad4e5 100644
--- a/plc4go/internal/plc4go/knxnetip/KncNetIpConnection.go
+++ b/plc4go/internal/plc4go/knxnetip/KncNetIpConnection.go
@@ -22,7 +22,6 @@ import (
     "bytes"
     "errors"
     "fmt"
-    "net"
     driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
     internalModel "github.com/apache/plc4x/plc4go/internal/plc4go/model"
     "github.com/apache/plc4x/plc4go/internal/plc4go/spi"
@@ -32,6 +31,7 @@ import (
     "github.com/apache/plc4x/plc4go/internal/plc4go/utils"
     "github.com/apache/plc4x/plc4go/pkg/plc4go"
     apiModel "github.com/apache/plc4x/plc4go/pkg/plc4go/model"
+    "net"
     "strconv"
     "time"
 )
@@ -59,6 +59,7 @@ type KnxNetIpConnection struct {
     valueHandler             spi.PlcValueHandler
     quitConnectionStateTimer chan struct{}
     subscribers              []*KnxNetIpSubscriber
+    valueCache               map[uint16][]int8
 
     GatewayKnxAddress      *driverModel.KnxAddress
     GatewayName            string
@@ -77,6 +78,7 @@ func NewKnxNetIpConnection(messageCodec spi.MessageCodec, options map[string][]s
         valueHandler:       NewValueHandler(),
         requestInterceptor: interceptors.NewSingleItemRequestInterceptor(),
         subscribers:        []*KnxNetIpSubscriber{},
+        valueCache:         map[uint16][]int8{},
     }
 }
 
@@ -193,7 +195,7 @@ func (m *KnxNetIpConnection) Connect() <-chan plc4go.PlcConnectionConnectResult
                         // Save the KNX Address the Gateway assigned to this connection.
                         m.ClientKnxAddress = tunnelConnectionDataBlock.KnxAddress
 
-                        fmt.Printf("Successfully connected to KNXnet/IP Gateway '%s' with KNX address '%d.%d.%d' got assigned client KNX address '%d.%d.%d'",
+                        fmt.Printf("Successfully connected to KNXnet/IP Gateway '%s' with KNX address '%d.%d.%d' got assigned client KNX address '%d.%d.%d'\n",
                             m.GatewayName,
                             m.GatewayKnxAddress.MainGroup, m.GatewayKnxAddress.MiddleGroup, m.GatewayKnxAddress.SubGroup,
                             m.ClientKnxAddress.MainGroup, m.ClientKnxAddress.MiddleGroup, m.ClientKnxAddress.SubGroup)
@@ -363,12 +365,21 @@ func (m *KnxNetIpConnection) handleIncomingTunnelingRequest(tunnelingRequestChan
             break
         }
 
-        cemiDataInd := driverModel.CastCEMIDataInd(tunnelingRequest.Cemi.Child)
-        if cemiDataInd != nil {
-            for _, subscriber := range m.subscribers {
-                subscriber.handle(cemiDataInd.CemiDataFrame)
+        go func() {
+            cemiDataInd := driverModel.CastCEMIDataInd(tunnelingRequest.Cemi.Child)
+            if cemiDataInd != nil {
+                addressData := uint16(cemiDataInd.CemiDataFrame.DestinationAddress[0])<<8 | (uint16(cemiDataInd.CemiDataFrame.DestinationAddress[1]) & 0xFF)
+                val, ok := m.valueCache[addressData]
+                changed := false
+                if !ok || !m.sliceEqual(val, cemiDataInd.CemiDataFrame.Data) {
+                    m.valueCache[addressData] = cemiDataInd.CemiDataFrame.Data
+                    changed = true
+                }
+                for _, subscriber := range m.subscribers {
+                    subscriber.handleValueChange(cemiDataInd.CemiDataFrame, changed)
+                }
             }
-        }
+        }()
     }
 }
 
@@ -398,3 +409,15 @@ func (m *KnxNetIpConnection) removeSubscriber(subscriber *KnxNetIpSubscriber) {
         }
     }
 }
+
+func (m *KnxNetIpConnection) sliceEqual(a, b []int8) bool {
+    if len(a) != len(b) {
+        return false
+    }
+    for i, v := range a {
+        if v != b[i] {
+            return false
+        }
+    }
+    return true
+}
diff --git a/plc4go/internal/plc4go/knxnetip/KnxNetIpSubscriber.go b/plc4go/internal/plc4go/knxnetip/KnxNetIpSubscriber.go
index 5c19b7d..95d8d2b 100644
--- a/plc4go/internal/plc4go/knxnetip/KnxNetIpSubscriber.go
+++ b/plc4go/internal/plc4go/knxnetip/KnxNetIpSubscriber.go
@@ -74,7 +74,10 @@ func (m *KnxNetIpSubscriber) Unsubscribe(unsubscriptionRequest apiModel.PlcUnsub
     return result
 }
 
-func (m *KnxNetIpSubscriber) handle(cemiDataFrame *driverModel.CEMIDataFrame) {
+/*
+ * Callback for incoming value change events from the KNX bus
+ */
+func (m *KnxNetIpSubscriber) handleValueChange(cemiDataFrame *driverModel.CEMIDataFrame, changed bool) {
     // Decode the group-address according to the settings in the driver
     // Group addresses can be 1, 2 or 3 levels (3 being the default)
     garb := utils.NewReadBuffer(utils.Int8ToUint8(cemiDataFrame.DestinationAddress))
@@ -89,6 +92,7 @@ func (m *KnxNetIpSubscriber) handle(cemiDataFrame *driverModel.CEMIDataFrame) {
         types := map[string]internalModel.SubscriptionType{}
         intervals := map[string]time.Duration{}
         responseCodes := map[string]apiModel.PlcResponseCode{}
+        addresses := map[string][]int8{}
         plcValues := map[string]values.PlcValue{}
 
         // Check if this datagram matches any address in this subscription request
@@ -98,30 +102,36 @@ func (m *KnxNetIpSubscriber) handle(cemiDataFrame *driverModel.CEMIDataFrame) {
             if err != nil {
                 continue
             }
+            subscriptionType := subscriptionRequest.GetType(fieldName)
             // If it matches, take the datatype of each matching field and try to decode the payload
             if field.matches(*groupAddress) {
-                var payload []uint8
-                payload = append(payload, uint8(cemiDataFrame.DataFirstByte))
-                payload = append(payload, utils.Int8ToByte(cemiDataFrame.Data)...)
-                rb := utils.NewReadBuffer(payload)
-                plcValue, err := driverModel.KnxDatapointParse(rb, field.GetTypeName())
-                fields[fieldName] = field
-                types[fieldName] = subscriptionRequest.GetType(fieldName)
-                intervals[fieldName] = subscriptionRequest.GetInterval(fieldName)
-                if err == nil {
-                    responseCodes[fieldName] = apiModel.PlcResponseCode_OK
-                    plcValues[fieldName] = plcValue
-                } else {
-                    // TODO: Do a little more here ...
-                    responseCodes[fieldName] = apiModel.PlcResponseCode_INTERNAL_ERROR
-                    plcValues[fieldName] = nil
+                // If this is a CHANGE_OF_STATE field, filter out the events where the value actually hasn't changed.
+                if subscriptionType == internalModel.SUBSCRIPTION_CHANGE_OF_STATE && changed {
+                    var payload []uint8
+                    payload = append(payload, uint8(cemiDataFrame.DataFirstByte))
+                    payload = append(payload, utils.Int8ToByte(cemiDataFrame.Data)...)
+                    rb := utils.NewReadBuffer(payload)
+                    plcValue, err := driverModel.KnxDatapointParse(rb, field.GetTypeName())
+                    fields[fieldName] = field
+                    types[fieldName] = subscriptionRequest.GetType(fieldName)
+                    intervals[fieldName] = subscriptionRequest.GetInterval(fieldName)
+                    addresses[fieldName] = cemiDataFrame.DestinationAddress
+                    if err == nil {
+                        responseCodes[fieldName] = apiModel.PlcResponseCode_OK
+                        plcValues[fieldName] = plcValue
+                    } else {
+                        // TODO: Do a little more here ...
+                        responseCodes[fieldName] = apiModel.PlcResponseCode_INTERNAL_ERROR
+                        plcValues[fieldName] = nil
+                    }
                 }
             }
         }
 
         // Assemble a PlcSubscription event
         if len(plcValues) > 0 {
-            event := internalModel.NewDefaultPlcSubscriptionEvent(fields, types, intervals, responseCodes, plcValues)
+            event := NewKnxNetIpSubscriptionEvent(
+                fields, types, intervals, responseCodes, addresses, plcValues)
             eventHandler := subscriptionRequest.GetEventHandler()
             eventHandler(event)
         }
diff --git a/plc4go/internal/plc4go/knxnetip/KnxNetIpSubscriptionEvent.go b/plc4go/internal/plc4go/knxnetip/KnxNetIpSubscriptionEvent.go
new file mode 100644
index 0000000..364c11c
--- /dev/null
+++ b/plc4go/internal/plc4go/knxnetip/KnxNetIpSubscriptionEvent.go
@@ -0,0 +1,84 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package knxnetip
+
+import (
+    driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
+    internalMode "github.com/apache/plc4x/plc4go/internal/plc4go/model"
+    "github.com/apache/plc4x/plc4go/internal/plc4go/utils"
+    apiModel "github.com/apache/plc4x/plc4go/pkg/plc4go/model"
+    "github.com/apache/plc4x/plc4go/pkg/plc4go/values"
+    "strconv"
+    "time"
+)
+
+type KnxNetIpSubscriptionEvent struct {
+    addresses map[string][]int8
+    internalMode.DefaultPlcSubscriptionEvent
+}
+
+func NewKnxNetIpSubscriptionEvent(fields map[string]apiModel.PlcField, types map[string]internalMode.SubscriptionType,
+    intervals map[string]time.Duration, responseCodes map[string]apiModel.PlcResponseCode,
+    addresses map[string][]int8, values map[string]values.PlcValue) KnxNetIpSubscriptionEvent {
+    return KnxNetIpSubscriptionEvent{
+        addresses: addresses,
+        DefaultPlcSubscriptionEvent:
+            internalMode.NewDefaultPlcSubscriptionEvent(fields, types, intervals, responseCodes, values),
+    }
+}
+
+/*
+ * Decode the binary data in the address according to the field requested
+ */
+func (m KnxNetIpSubscriptionEvent) GetAddress(name string) string {
+    rawAddress := m.addresses[name]
+    rawAddressReadBuffer := utils.NewReadBuffer(utils.Int8ToUint8(rawAddress))
+    field := m.DefaultPlcSubscriptionEvent.GetField(name)
+    var groupAddress *driverModel.KnxGroupAddress
+    var err error
+    switch field.(type) {
+    case KnxNetIpGroupAddress3LevelPlcField:
+        groupAddress, err = driverModel.KnxGroupAddressParse(rawAddressReadBuffer, 3)
+    case KnxNetIpGroupAddress2LevelPlcField:
+        groupAddress, err = driverModel.KnxGroupAddressParse(rawAddressReadBuffer, 2)
+    case KnxNetIpGroupAddress1LevelPlcField:
+        groupAddress, err = driverModel.KnxGroupAddressParse(rawAddressReadBuffer, 1)
+    }
+    if err != nil {
+        return ""
+    }
+    return m.groupAddressToString(groupAddress)
+}
+
+func (m KnxNetIpSubscriptionEvent) groupAddressToString(groupAddress *driverModel.KnxGroupAddress) string {
+    if groupAddress != nil {
+        switch groupAddress.Child.(type) {
+        case *driverModel.KnxGroupAddress3Level:
+            level3 := driverModel.CastKnxGroupAddress3Level(groupAddress)
+            return strconv.Itoa(int(level3.MainGroup)) + "/" + strconv.Itoa(int(level3.MiddleGroup)) + "/" + strconv.Itoa(int(level3.SubGroup))
+        case *driverModel.KnxGroupAddress2Level:
+            level2 := driverModel.CastKnxGroupAddress2Level(groupAddress)
+            return strconv.Itoa(int(level2.MainGroup)) + "/" + strconv.Itoa(int(level2.SubGroup))
+        case *driverModel.KnxGroupAddressFreeLevel:
+            level1 := driverModel.CastKnxGroupAddressFreeLevel(groupAddress)
+            return strconv.Itoa(int(level1.SubGroup))
+        }
+    }
+    return ""
+}
diff --git a/plc4go/internal/plc4go/model/DefaultPlcSubscriptionEvent.go b/plc4go/internal/plc4go/model/DefaultPlcSubscriptionEvent.go
index 67c476f..c7e3f14 100644
--- a/plc4go/internal/plc4go/model/DefaultPlcSubscriptionEvent.go
+++ b/plc4go/internal/plc4go/model/DefaultPlcSubscriptionEvent.go
@@ -35,7 +35,8 @@ type DefaultPlcSubscriptionEvent struct {
 }
 
 func NewDefaultPlcSubscriptionEvent(fields map[string]model.PlcField, types map[string]SubscriptionType,
-    intervals map[string]time.Duration, responseCodes map[string]model.PlcResponseCode, values map[string]values.PlcValue) DefaultPlcSubscriptionEvent {
+    intervals map[string]time.Duration, responseCodes map[string]model.PlcResponseCode,
+    values map[string]values.PlcValue) DefaultPlcSubscriptionEvent {
     return DefaultPlcSubscriptionEvent{
         fields:        fields,
         types:         types,
@@ -69,6 +70,10 @@ func (m DefaultPlcSubscriptionEvent) GetResponseCode(name string) model.PlcRespo
     return m.responseCodes[name]
 }
 
+func (m DefaultPlcSubscriptionEvent) GetAddress(name string) string {
+    panic("GetAddress not implemented")
+}
+
 func (m DefaultPlcSubscriptionEvent) GetValue(name string) values.PlcValue {
     return m.values[name]
 }
diff --git a/plc4go/pkg/plc4go/model/plc_subscription_event.go b/plc4go/pkg/plc4go/model/plc_subscription_event.go
index 295f8c8..3eab46d 100644
--- a/plc4go/pkg/plc4go/model/plc_subscription_event.go
+++ b/plc4go/pkg/plc4go/model/plc_subscription_event.go
@@ -24,6 +24,7 @@ type PlcSubscriptionEvent interface {
     GetRequest() PlcSubscriptionRequest
     GetFieldNames() []string
     GetResponseCode(name string) PlcResponseCode
+    GetAddress(name string) string
     GetValue(name string) values.PlcValue
     PlcResponse
 }