You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2021/03/08 12:45:51 UTC
[plc4x] 03/03: - Worked on detecting bad connections.
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 3142a575b0569e56110524814d234393c109402a
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Mon Mar 8 13:45:41 2021 +0100
- Worked on detecting bad connections.
---
.../internal/plc4go/knxnetip/KnxNetIpConnection.go | 139 ++++++++++++++++++---
1 file changed, 123 insertions(+), 16 deletions(-)
diff --git a/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go b/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
index b7620b8..d2d1e62 100644
--- a/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
+++ b/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
@@ -25,6 +25,7 @@ import (
"fmt"
driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
+ errors2 "github.com/apache/plc4x/plc4go/internal/plc4go/spi/errors"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/interceptors"
internalModel "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
@@ -117,8 +118,12 @@ type KnxNetIpConnection struct {
valueCacheMutex sync.RWMutex
metadata *ConnectionMetadata
defaultTtl time.Duration
+ connectionTtl time.Duration
buildingKey []byte
+ // Used for detecting connection problems
+ connectionTimeoutTimer *time.Timer
+
GatewayKnxAddress *driverModel.KnxAddress
ClientKnxAddress *driverModel.KnxAddress
CommunicationChannelId uint8
@@ -169,6 +174,8 @@ func NewKnxNetIpConnection(transportInstance transports.TransportInstance, optio
defaultTtl: time.Second * 10,
DeviceConnections: map[driverModel.KnxAddress]*KnxDeviceConnection{},
}
+ connection.connectionTtl = connection.defaultTtl * 2
+
// If a building key was provided, save that in a dedicated variable
if buildingKey, ok := options["buildingKey"]; ok {
bc, err := hex.DecodeString(buildingKey[0])
@@ -362,7 +369,7 @@ func (m *KnxNetIpConnection) IsConnected() bool {
case pingResponse := <-pingChannel:
return pingResponse.Err == nil
case <-time.After(m.defaultTtl):
- m.invalidateConnection()
+ m.handleTimeout()
return false
}
}
@@ -914,6 +921,10 @@ func (m *KnxNetIpConnection) sendGatewaySearchRequest() (*driverModel.SearchResp
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing search request: %s", err))
return nil
},
@@ -957,6 +968,10 @@ func (m *KnxNetIpConnection) sendGatewayConnectionRequest() (*driverModel.Connec
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1001,6 +1016,10 @@ func (m *KnxNetIpConnection) sendGatewayDisconnectionRequest() (*driverModel.Dis
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1044,6 +1063,10 @@ func (m *KnxNetIpConnection) sendConnectionStateRequest() (*driverModel.Connecti
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1112,6 +1135,10 @@ func (m *KnxNetIpConnection) sendGroupAddressReadRequest(groupAddress []int8) (*
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1178,14 +1205,18 @@ func (m *KnxNetIpConnection) sendDeviceConnectionRequest(targetAddress driverMod
// If the error flag is set, there was an error connecting
if lDataCon.DataFrame.ErrorFlag {
- errorResult <- errors.New("error connecting")
- return nil
+ errorResult <- errors.New("error connecting to device at: " + KnxAddressToString(&targetAddress))
+ } else {
+ result <- apduControlConnect
}
- result <- apduControlConnect
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1253,14 +1284,18 @@ func (m *KnxNetIpConnection) sendDeviceDisconnectionRequest(targetAddress driver
// If the error flag is set, there was an error disconnecting
if lDataCon.DataFrame.ErrorFlag {
- errorResult <- errors.New("error disconnecting")
- return nil
+ errorResult <- errors.New("error disconnecting from device at: " + KnxAddressToString(&targetAddress))
+ } else {
+ result <- apduControlDisconnect
}
- result <- apduControlDisconnect
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1356,16 +1391,21 @@ func (m *KnxNetIpConnection) sendDeviceAuthentication(targetAddress driverModel.
_ = m.sendDeviceAck(targetAddress, dataFrameExt.Apdu.Counter, func(err error) {
// If the error flag is set, there was an error authenticating
if lDataInd.DataFrame.ErrorFlag {
- errorResult <- errors.New("error authenticating")
- return
+ errorResult <- errors.New("error authenticating at device: " + KnxAddressToString(&targetAddress))
+ } else if err != nil {
+ errorResult <- errors.New("error sending ack to device: " + KnxAddressToString(&targetAddress))
+ } else {
+ result <- apduAuthorizeResponse
}
-
- result <- apduAuthorizeResponse
})
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1441,12 +1481,23 @@ func (m *KnxNetIpConnection) sendDeviceDeviceDescriptorReadRequest(targetAddress
// Acknowledge the receipt
_ = m.sendDeviceAck(targetAddress, dataFrame.Apdu.Counter, func(err error) {
- result <- deviceDescriptorResponse
+ // If the error flag is set, there was an error authenticating
+ if lDataInd.DataFrame.ErrorFlag {
+ errorResult <- errors.New("error reading device descriptor from device: " + KnxAddressToString(&targetAddress))
+ } else if err != nil {
+ errorResult <- errors.New("error sending ack to device: " + KnxAddressToString(&targetAddress))
+ } else {
+ result <- deviceDescriptorResponse
+ }
})
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1530,12 +1581,23 @@ func (m *KnxNetIpConnection) sendDevicePropertyReadRequest(targetAddress driverM
// Acknowledge the receipt
_ = m.sendDeviceAck(targetAddress, dataFrameExt.Apdu.Counter, func(err error) {
- result <- propertyValueResponse
+ // If the error flag is set, there was an error authenticating
+ if lDataInd.DataFrame.ErrorFlag {
+ errorResult <- errors.New("error reading property value from device: " + KnxAddressToString(&targetAddress))
+ } else if err != nil {
+ errorResult <- errors.New("error sending ack to device: " + KnxAddressToString(&targetAddress))
+ } else {
+ result <- propertyValueResponse
+ }
})
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1619,12 +1681,23 @@ func (m *KnxNetIpConnection) sendDevicePropertyDescriptionReadRequest(targetAddr
// Acknowledge the receipt
_ = m.sendDeviceAck(targetAddress, dataFrameExt.Apdu.Counter, func(err error) {
- result <- propertyDescriptionResponse
+ // If the error flag is set, there was an error authenticating
+ if lDataInd.DataFrame.ErrorFlag {
+ errorResult <- errors.New("error reading property description from device: " + KnxAddressToString(&targetAddress))
+ } else if err != nil {
+ errorResult <- errors.New("error sending ack to device: " + KnxAddressToString(&targetAddress))
+ } else {
+ result <- propertyDescriptionResponse
+ }
})
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1704,12 +1777,23 @@ func (m *KnxNetIpConnection) sendDeviceMemoryReadRequest(targetAddress driverMod
// Acknowledge the receipt
_ = m.sendDeviceAck(targetAddress, dataFrameExt.Apdu.Counter, func(err error) {
- result <- dataApduMemoryResponse
+ // If the error flag is set, there was an error authenticating
+ if lDataInd.DataFrame.ErrorFlag {
+ errorResult <- errors.New("error reading memory from device: " + KnxAddressToString(&targetAddress))
+ } else if err != nil {
+ errorResult <- errors.New("error sending ack to device: " + KnxAddressToString(&targetAddress))
+ } else {
+ result <- dataApduMemoryResponse
+ }
})
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1779,6 +1863,10 @@ func (m *KnxNetIpConnection) sendDeviceAck(targetAddress driverModel.KnxAddress,
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
callback(errors.New(fmt.Sprintf("got error processing request: %s", err)))
return nil
},
@@ -1796,6 +1884,7 @@ func (m *KnxNetIpConnection) sendDeviceAck(targetAddress driverModel.KnxAddress,
///////////////////////////////////////////////////////////////////////////////////////////////////////
func (m *KnxNetIpConnection) interceptIncomingMessage(interface{}) {
+ m.resetTimeout()
if m.connectionStateTimer != nil {
// Reset the timer for sending the ConnectionStateRequest
m.connectionStateTimer.Reset(60 * time.Second)
@@ -1875,8 +1964,26 @@ func (m *KnxNetIpConnection) handleValueCacheUpdate(destinationAddress []int8, p
}
}
-func (m *KnxNetIpConnection) invalidateConnection() {
+func (m *KnxNetIpConnection) handleTimeout() {
+ // If this is the first timeout in a sequence, start the timer.
+ if m.connectionTimeoutTimer == nil {
+ m.connectionTimeoutTimer = time.NewTimer(m.connectionTtl)
+ go func() {
+ <-m.connectionTimeoutTimer.C
+ m.resetConnection()
+ }()
+ }
+}
+
+func (m *KnxNetIpConnection) resetTimeout() {
+ if m.connectionTimeoutTimer != nil {
+ m.connectionTimeoutTimer.Stop()
+ m.connectionTimeoutTimer = nil
+ }
+}
+func (m *KnxNetIpConnection) resetConnection() {
+ fmt.Println("Bad connection detected")
}
func (m *KnxNetIpConnection) getGroupAddressNumLevels() uint8 {