You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2021/03/08 12:45:51 UTC

[plc4x] 03/03: - Worked on detecting bad connections.

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 3142a575b0569e56110524814d234393c109402a
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Mon Mar 8 13:45:41 2021 +0100

    - Worked on detecting bad connections.
---
 .../internal/plc4go/knxnetip/KnxNetIpConnection.go | 139 ++++++++++++++++++---
 1 file changed, 123 insertions(+), 16 deletions(-)

diff --git a/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go b/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
index b7620b8..d2d1e62 100644
--- a/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
+++ b/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
@@ -25,6 +25,7 @@ import (
 	"fmt"
 	driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
+	errors2 "github.com/apache/plc4x/plc4go/internal/plc4go/spi/errors"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/interceptors"
 	internalModel "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
@@ -117,8 +118,12 @@ type KnxNetIpConnection struct {
 	valueCacheMutex sync.RWMutex
 	metadata        *ConnectionMetadata
 	defaultTtl      time.Duration
+	connectionTtl   time.Duration
 	buildingKey     []byte
 
+	// Used for detecting connection problems
+	connectionTimeoutTimer *time.Timer
+
 	GatewayKnxAddress             *driverModel.KnxAddress
 	ClientKnxAddress              *driverModel.KnxAddress
 	CommunicationChannelId        uint8
@@ -169,6 +174,8 @@ func NewKnxNetIpConnection(transportInstance transports.TransportInstance, optio
 		defaultTtl:         time.Second * 10,
 		DeviceConnections:  map[driverModel.KnxAddress]*KnxDeviceConnection{},
 	}
+	connection.connectionTtl = connection.defaultTtl * 2
+
 	// If a building key was provided, save that in a dedicated variable
 	if buildingKey, ok := options["buildingKey"]; ok {
 		bc, err := hex.DecodeString(buildingKey[0])
@@ -362,7 +369,7 @@ func (m *KnxNetIpConnection) IsConnected() bool {
 		case pingResponse := <-pingChannel:
 			return pingResponse.Err == nil
 		case <-time.After(m.defaultTtl):
-			m.invalidateConnection()
+			m.handleTimeout()
 			return false
 		}
 	}
@@ -914,6 +921,10 @@ func (m *KnxNetIpConnection) sendGatewaySearchRequest() (*driverModel.SearchResp
 			return nil
 		},
 		func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+				m.handleTimeout()
+			}
 			errorResult <- errors.New(fmt.Sprintf("got error processing search request: %s", err))
 			return nil
 		},
@@ -957,6 +968,10 @@ func (m *KnxNetIpConnection) sendGatewayConnectionRequest() (*driverModel.Connec
 			return nil
 		},
 		func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+				m.handleTimeout()
+			}
 			errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
 			return nil
 		},
@@ -1001,6 +1016,10 @@ func (m *KnxNetIpConnection) sendGatewayDisconnectionRequest() (*driverModel.Dis
 			return nil
 		},
 		func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+				m.handleTimeout()
+			}
 			errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
 			return nil
 		},
@@ -1044,6 +1063,10 @@ func (m *KnxNetIpConnection) sendConnectionStateRequest() (*driverModel.Connecti
 			return nil
 		},
 		func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+				m.handleTimeout()
+			}
 			errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
 			return nil
 		},
@@ -1112,6 +1135,10 @@ func (m *KnxNetIpConnection) sendGroupAddressReadRequest(groupAddress []int8) (*
 			return nil
 		},
 		func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+				m.handleTimeout()
+			}
 			errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
 			return nil
 		},
@@ -1178,14 +1205,18 @@ func (m *KnxNetIpConnection) sendDeviceConnectionRequest(targetAddress driverMod
 
 			// If the error flag is set, there was an error connecting
 			if lDataCon.DataFrame.ErrorFlag {
-				errorResult <- errors.New("error connecting")
-				return nil
+				errorResult <- errors.New("error connecting to device at: " + KnxAddressToString(&targetAddress))
+			} else {
+				result <- apduControlConnect
 			}
 
-			result <- apduControlConnect
 			return nil
 		},
 		func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+				m.handleTimeout()
+			}
 			errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
 			return nil
 		},
@@ -1253,14 +1284,18 @@ func (m *KnxNetIpConnection) sendDeviceDisconnectionRequest(targetAddress driver
 
 			// If the error flag is set, there was an error disconnecting
 			if lDataCon.DataFrame.ErrorFlag {
-				errorResult <- errors.New("error disconnecting")
-				return nil
+				errorResult <- errors.New("error disconnecting from device at: " + KnxAddressToString(&targetAddress))
+			} else {
+				result <- apduControlDisconnect
 			}
 
-			result <- apduControlDisconnect
 			return nil
 		},
 		func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+				m.handleTimeout()
+			}
 			errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
 			return nil
 		},
@@ -1356,16 +1391,21 @@ func (m *KnxNetIpConnection) sendDeviceAuthentication(targetAddress driverModel.
 			_ = m.sendDeviceAck(targetAddress, dataFrameExt.Apdu.Counter, func(err error) {
 				// If the error flag is set, there was an error authenticating
 				if lDataInd.DataFrame.ErrorFlag {
-					errorResult <- errors.New("error authenticating")
-					return
+					errorResult <- errors.New("error authenticating at device: " + KnxAddressToString(&targetAddress))
+				} else if err != nil {
+					errorResult <- errors.New("error sending ack to device: " + KnxAddressToString(&targetAddress))
+				} else {
+					result <- apduAuthorizeResponse
 				}
-
-				result <- apduAuthorizeResponse
 			})
 
 			return nil
 		},
 		func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+				m.handleTimeout()
+			}
 			errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
 			return nil
 		},
@@ -1441,12 +1481,23 @@ func (m *KnxNetIpConnection) sendDeviceDeviceDescriptorReadRequest(targetAddress
 
 			// Acknowledge the receipt
 			_ = m.sendDeviceAck(targetAddress, dataFrame.Apdu.Counter, func(err error) {
-				result <- deviceDescriptorResponse
+				// If the error flag is set, there was an error authenticating
+				if lDataInd.DataFrame.ErrorFlag {
+					errorResult <- errors.New("error reading device descriptor from device: " + KnxAddressToString(&targetAddress))
+				} else if err != nil {
+					errorResult <- errors.New("error sending ack to device: " + KnxAddressToString(&targetAddress))
+				} else {
+					result <- deviceDescriptorResponse
+				}
 			})
 
 			return nil
 		},
 		func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+				m.handleTimeout()
+			}
 			errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
 			return nil
 		},
@@ -1530,12 +1581,23 @@ func (m *KnxNetIpConnection) sendDevicePropertyReadRequest(targetAddress driverM
 
 			// Acknowledge the receipt
 			_ = m.sendDeviceAck(targetAddress, dataFrameExt.Apdu.Counter, func(err error) {
-				result <- propertyValueResponse
+				// If the error flag is set, there was an error authenticating
+				if lDataInd.DataFrame.ErrorFlag {
+					errorResult <- errors.New("error reading property value from device: " + KnxAddressToString(&targetAddress))
+				} else if err != nil {
+					errorResult <- errors.New("error sending ack to device: " + KnxAddressToString(&targetAddress))
+				} else {
+					result <- propertyValueResponse
+				}
 			})
 
 			return nil
 		},
 		func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+				m.handleTimeout()
+			}
 			errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
 			return nil
 		},
@@ -1619,12 +1681,23 @@ func (m *KnxNetIpConnection) sendDevicePropertyDescriptionReadRequest(targetAddr
 
 			// Acknowledge the receipt
 			_ = m.sendDeviceAck(targetAddress, dataFrameExt.Apdu.Counter, func(err error) {
-				result <- propertyDescriptionResponse
+				// If the error flag is set, there was an error authenticating
+				if lDataInd.DataFrame.ErrorFlag {
+					errorResult <- errors.New("error reading property description from device: " + KnxAddressToString(&targetAddress))
+				} else if err != nil {
+					errorResult <- errors.New("error sending ack to device: " + KnxAddressToString(&targetAddress))
+				} else {
+					result <- propertyDescriptionResponse
+				}
 			})
 
 			return nil
 		},
 		func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+				m.handleTimeout()
+			}
 			errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
 			return nil
 		},
@@ -1704,12 +1777,23 @@ func (m *KnxNetIpConnection) sendDeviceMemoryReadRequest(targetAddress driverMod
 
 			// Acknowledge the receipt
 			_ = m.sendDeviceAck(targetAddress, dataFrameExt.Apdu.Counter, func(err error) {
-				result <- dataApduMemoryResponse
+				// If the error flag is set, there was an error authenticating
+				if lDataInd.DataFrame.ErrorFlag {
+					errorResult <- errors.New("error reading memory from device: " + KnxAddressToString(&targetAddress))
+				} else if err != nil {
+					errorResult <- errors.New("error sending ack to device: " + KnxAddressToString(&targetAddress))
+				} else {
+					result <- dataApduMemoryResponse
+				}
 			})
 
 			return nil
 		},
 		func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+				m.handleTimeout()
+			}
 			errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
 			return nil
 		},
@@ -1779,6 +1863,10 @@ func (m *KnxNetIpConnection) sendDeviceAck(targetAddress driverModel.KnxAddress,
 			return nil
 		},
 		func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+				m.handleTimeout()
+			}
 			callback(errors.New(fmt.Sprintf("got error processing request: %s", err)))
 			return nil
 		},
@@ -1796,6 +1884,7 @@ func (m *KnxNetIpConnection) sendDeviceAck(targetAddress driverModel.KnxAddress,
 ///////////////////////////////////////////////////////////////////////////////////////////////////////
 
 func (m *KnxNetIpConnection) interceptIncomingMessage(interface{}) {
+	m.resetTimeout()
 	if m.connectionStateTimer != nil {
 		// Reset the timer for sending the ConnectionStateRequest
 		m.connectionStateTimer.Reset(60 * time.Second)
@@ -1875,8 +1964,26 @@ func (m *KnxNetIpConnection) handleValueCacheUpdate(destinationAddress []int8, p
 	}
 }
 
-func (m *KnxNetIpConnection) invalidateConnection() {
+func (m *KnxNetIpConnection) handleTimeout() {
+	// If this is the first timeout in a sequence, start the timer.
+	if m.connectionTimeoutTimer == nil {
+		m.connectionTimeoutTimer = time.NewTimer(m.connectionTtl)
+		go func() {
+			<-m.connectionTimeoutTimer.C
+			m.resetConnection()
+		}()
+	}
+}
+
+func (m *KnxNetIpConnection) resetTimeout() {
+	if m.connectionTimeoutTimer != nil {
+		m.connectionTimeoutTimer.Stop()
+		m.connectionTimeoutTimer = nil
+	}
+}
 
+func (m *KnxNetIpConnection) resetConnection() {
+	fmt.Println("Bad connection detected")
 }
 
 func (m *KnxNetIpConnection) getGroupAddressNumLevels() uint8 {