You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2021/03/08 12:45:48 UTC
[plc4x] branch develop updated (06fbedd -> 3142a57)
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git.
from 06fbedd - Updated the knx codec to check expectations for expirations, even if no message was received - Added a dedicated TimeoutError type to allow easy detection of timeout errors
new d4194ab - Clean up the read-memory function and the updating of the max pdu size, if a mem-read returns less than expected.
new 8a7548e - Clean up the read-memory function and the updating of the max pdu size, if a mem-read returns less than expected.
new 3142a57 - Worked on detecting bad connections.
The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../internal/plc4go/knxnetip/KnxNetIpConnection.go | 171 +++++++++++++++------
1 file changed, 126 insertions(+), 45 deletions(-)
[plc4x] 03/03: - Worked on detecting bad connections.
Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 3142a575b0569e56110524814d234393c109402a
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Mon Mar 8 13:45:41 2021 +0100
- Worked on detecting bad connections.
---
.../internal/plc4go/knxnetip/KnxNetIpConnection.go | 139 ++++++++++++++++++---
1 file changed, 123 insertions(+), 16 deletions(-)
diff --git a/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go b/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
index b7620b8..d2d1e62 100644
--- a/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
+++ b/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
@@ -25,6 +25,7 @@ import (
"fmt"
driverModel "github.com/apache/plc4x/plc4go/internal/plc4go/knxnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
+ errors2 "github.com/apache/plc4x/plc4go/internal/plc4go/spi/errors"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/interceptors"
internalModel "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/transports"
@@ -117,8 +118,12 @@ type KnxNetIpConnection struct {
valueCacheMutex sync.RWMutex
metadata *ConnectionMetadata
defaultTtl time.Duration
+ connectionTtl time.Duration
buildingKey []byte
+ // Used for detecting connection problems
+ connectionTimeoutTimer *time.Timer
+
GatewayKnxAddress *driverModel.KnxAddress
ClientKnxAddress *driverModel.KnxAddress
CommunicationChannelId uint8
@@ -169,6 +174,8 @@ func NewKnxNetIpConnection(transportInstance transports.TransportInstance, optio
defaultTtl: time.Second * 10,
DeviceConnections: map[driverModel.KnxAddress]*KnxDeviceConnection{},
}
+ connection.connectionTtl = connection.defaultTtl * 2
+
// If a building key was provided, save that in a dedicated variable
if buildingKey, ok := options["buildingKey"]; ok {
bc, err := hex.DecodeString(buildingKey[0])
@@ -362,7 +369,7 @@ func (m *KnxNetIpConnection) IsConnected() bool {
case pingResponse := <-pingChannel:
return pingResponse.Err == nil
case <-time.After(m.defaultTtl):
- m.invalidateConnection()
+ m.handleTimeout()
return false
}
}
@@ -914,6 +921,10 @@ func (m *KnxNetIpConnection) sendGatewaySearchRequest() (*driverModel.SearchResp
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing search request: %s", err))
return nil
},
@@ -957,6 +968,10 @@ func (m *KnxNetIpConnection) sendGatewayConnectionRequest() (*driverModel.Connec
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1001,6 +1016,10 @@ func (m *KnxNetIpConnection) sendGatewayDisconnectionRequest() (*driverModel.Dis
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1044,6 +1063,10 @@ func (m *KnxNetIpConnection) sendConnectionStateRequest() (*driverModel.Connecti
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1112,6 +1135,10 @@ func (m *KnxNetIpConnection) sendGroupAddressReadRequest(groupAddress []int8) (*
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1178,14 +1205,18 @@ func (m *KnxNetIpConnection) sendDeviceConnectionRequest(targetAddress driverMod
// If the error flag is set, there was an error connecting
if lDataCon.DataFrame.ErrorFlag {
- errorResult <- errors.New("error connecting")
- return nil
+ errorResult <- errors.New("error connecting to device at: " + KnxAddressToString(&targetAddress))
+ } else {
+ result <- apduControlConnect
}
- result <- apduControlConnect
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1253,14 +1284,18 @@ func (m *KnxNetIpConnection) sendDeviceDisconnectionRequest(targetAddress driver
// If the error flag is set, there was an error disconnecting
if lDataCon.DataFrame.ErrorFlag {
- errorResult <- errors.New("error disconnecting")
- return nil
+ errorResult <- errors.New("error disconnecting from device at: " + KnxAddressToString(&targetAddress))
+ } else {
+ result <- apduControlDisconnect
}
- result <- apduControlDisconnect
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1356,16 +1391,21 @@ func (m *KnxNetIpConnection) sendDeviceAuthentication(targetAddress driverModel.
_ = m.sendDeviceAck(targetAddress, dataFrameExt.Apdu.Counter, func(err error) {
// If the error flag is set, there was an error authenticating
if lDataInd.DataFrame.ErrorFlag {
- errorResult <- errors.New("error authenticating")
- return
+ errorResult <- errors.New("error authenticating at device: " + KnxAddressToString(&targetAddress))
+ } else if err != nil {
+ errorResult <- errors.New("error sending ack to device: " + KnxAddressToString(&targetAddress))
+ } else {
+ result <- apduAuthorizeResponse
}
-
- result <- apduAuthorizeResponse
})
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1441,12 +1481,23 @@ func (m *KnxNetIpConnection) sendDeviceDeviceDescriptorReadRequest(targetAddress
// Acknowledge the receipt
_ = m.sendDeviceAck(targetAddress, dataFrame.Apdu.Counter, func(err error) {
- result <- deviceDescriptorResponse
+ // If the error flag is set, there was an error authenticating
+ if lDataInd.DataFrame.ErrorFlag {
+ errorResult <- errors.New("error reading device descriptor from device: " + KnxAddressToString(&targetAddress))
+ } else if err != nil {
+ errorResult <- errors.New("error sending ack to device: " + KnxAddressToString(&targetAddress))
+ } else {
+ result <- deviceDescriptorResponse
+ }
})
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1530,12 +1581,23 @@ func (m *KnxNetIpConnection) sendDevicePropertyReadRequest(targetAddress driverM
// Acknowledge the receipt
_ = m.sendDeviceAck(targetAddress, dataFrameExt.Apdu.Counter, func(err error) {
- result <- propertyValueResponse
+ // If the error flag is set, there was an error authenticating
+ if lDataInd.DataFrame.ErrorFlag {
+ errorResult <- errors.New("error reading property value from device: " + KnxAddressToString(&targetAddress))
+ } else if err != nil {
+ errorResult <- errors.New("error sending ack to device: " + KnxAddressToString(&targetAddress))
+ } else {
+ result <- propertyValueResponse
+ }
})
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1619,12 +1681,23 @@ func (m *KnxNetIpConnection) sendDevicePropertyDescriptionReadRequest(targetAddr
// Acknowledge the receipt
_ = m.sendDeviceAck(targetAddress, dataFrameExt.Apdu.Counter, func(err error) {
- result <- propertyDescriptionResponse
+ // If the error flag is set, there was an error authenticating
+ if lDataInd.DataFrame.ErrorFlag {
+ errorResult <- errors.New("error reading property description from device: " + KnxAddressToString(&targetAddress))
+ } else if err != nil {
+ errorResult <- errors.New("error sending ack to device: " + KnxAddressToString(&targetAddress))
+ } else {
+ result <- propertyDescriptionResponse
+ }
})
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1704,12 +1777,23 @@ func (m *KnxNetIpConnection) sendDeviceMemoryReadRequest(targetAddress driverMod
// Acknowledge the receipt
_ = m.sendDeviceAck(targetAddress, dataFrameExt.Apdu.Counter, func(err error) {
- result <- dataApduMemoryResponse
+ // If the error flag is set, there was an error authenticating
+ if lDataInd.DataFrame.ErrorFlag {
+ errorResult <- errors.New("error reading memory from device: " + KnxAddressToString(&targetAddress))
+ } else if err != nil {
+ errorResult <- errors.New("error sending ack to device: " + KnxAddressToString(&targetAddress))
+ } else {
+ result <- dataApduMemoryResponse
+ }
})
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
errorResult <- errors.New(fmt.Sprintf("got error processing request: %s", err))
return nil
},
@@ -1779,6 +1863,10 @@ func (m *KnxNetIpConnection) sendDeviceAck(targetAddress driverModel.KnxAddress,
return nil
},
func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(errors2.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
callback(errors.New(fmt.Sprintf("got error processing request: %s", err)))
return nil
},
@@ -1796,6 +1884,7 @@ func (m *KnxNetIpConnection) sendDeviceAck(targetAddress driverModel.KnxAddress,
///////////////////////////////////////////////////////////////////////////////////////////////////////
func (m *KnxNetIpConnection) interceptIncomingMessage(interface{}) {
+ m.resetTimeout()
if m.connectionStateTimer != nil {
// Reset the timer for sending the ConnectionStateRequest
m.connectionStateTimer.Reset(60 * time.Second)
@@ -1875,8 +1964,26 @@ func (m *KnxNetIpConnection) handleValueCacheUpdate(destinationAddress []int8, p
}
}
-func (m *KnxNetIpConnection) invalidateConnection() {
+func (m *KnxNetIpConnection) handleTimeout() {
+ // If this is the first timeout in a sequence, start the timer.
+ if m.connectionTimeoutTimer == nil {
+ m.connectionTimeoutTimer = time.NewTimer(m.connectionTtl)
+ go func() {
+ <-m.connectionTimeoutTimer.C
+ m.resetConnection()
+ }()
+ }
+}
+
+func (m *KnxNetIpConnection) resetTimeout() {
+ if m.connectionTimeoutTimer != nil {
+ m.connectionTimeoutTimer.Stop()
+ m.connectionTimeoutTimer = nil
+ }
+}
+func (m *KnxNetIpConnection) resetConnection() {
+ fmt.Println("Bad connection detected")
}
func (m *KnxNetIpConnection) getGroupAddressNumLevels() uint8 {
[plc4x] 01/03: - Clean up the read-memory function and the updating
of the max pdu size, if a mem-read returns less than expected.
Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit d4194ab0493ad9706cb43c2cf47dc412d905257a
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Mon Mar 8 12:39:47 2021 +0100
- Clean up the read-memory function and the updating of the max pdu size, if a mem-read returns less than expected.
---
.../internal/plc4go/knxnetip/KnxNetIpConnection.go | 31 +++-------------------
1 file changed, 3 insertions(+), 28 deletions(-)
diff --git a/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go b/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
index 275857c..a838826 100644
--- a/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
+++ b/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
@@ -822,7 +822,6 @@ func (m *KnxNetIpConnection) DeviceReadMemory(targetAddress driverModel.KnxAddre
// If we successfully got a connection, read the property
if connection != nil {
- numBytes := numElements * uint8(math.Max(float64(1), float64(datapointType.DatapointMainType().SizeInBits()/8)))
// Depending on the gateway Max APDU and the device Max APDU, split this up into multiple requests.
// An APDU starts with the last 6 bits of the first data byte containing the count
// followed by the 16-bit address, so these are already used.
@@ -835,7 +834,8 @@ func (m *KnxNetIpConnection) DeviceReadMemory(targetAddress driverModel.KnxAddre
maxNumBytes := uint8(math.Min(float64(connection.maxApdu-3), float64(63)))
maxNumElementsPerRequest := uint8(math.Floor(float64(maxNumBytes / elementSize)))
numElements := uint8(math.Min(float64(remainingRequestElements), float64(maxNumElementsPerRequest)))
- memoryReadResponse, err := m.sendDeviceMemoryReadRequest(targetAddress, curStartingAddress, numElements, *datapointType)
+ numBytes := numElements * uint8(math.Max(float64(1), float64(datapointType.DatapointMainType().SizeInBits()/8)))
+ memoryReadResponse, err := m.sendDeviceMemoryReadRequest(targetAddress, curStartingAddress, numBytes)
if err != nil {
return
}
@@ -929,8 +929,6 @@ func (m *KnxNetIpConnection) sendGatewaySearchRequest() (*driverModel.SearchResp
return response, nil
case errorResponse := <-errorResult:
return nil, errorResponse
- /*case <-time.After(m.defaultTtl):
- return nil, errors.New("got timeout waiting for search response")*/
}
}
@@ -974,8 +972,6 @@ func (m *KnxNetIpConnection) sendGatewayConnectionRequest() (*driverModel.Connec
return response, nil
case errorResponse := <-errorResult:
return nil, errorResponse
- /*case <-time.After(m.defaultTtl):
- return nil, errors.New("got timeout waiting for connection response")*/
}
}
@@ -1020,8 +1016,6 @@ func (m *KnxNetIpConnection) sendGatewayDisconnectionRequest() (*driverModel.Dis
return response, nil
case errorResponse := <-errorResult:
return nil, errorResponse
- /*case <-time.After(m.defaultTtl):
- return nil, errors.New("got timeout waiting for disconnect response")*/
}
}
@@ -1065,8 +1059,6 @@ func (m *KnxNetIpConnection) sendConnectionStateRequest() (*driverModel.Connecti
return response, nil
case errorResponse := <-errorResult:
return nil, errorResponse
- /*case <-time.After(m.defaultTtl):
- return nil, errors.New("got timeout waiting for connection state response")*/
}
}
@@ -1135,8 +1127,6 @@ func (m *KnxNetIpConnection) sendGroupAddressReadRequest(groupAddress []int8) (*
return response, nil
case errorResponse := <-errorResult:
return nil, errorResponse
- /*case <-time.After(m.defaultTtl):
- return nil, errors.New("got timeout waiting for group address read response")*/
}
}
@@ -1211,8 +1201,6 @@ func (m *KnxNetIpConnection) sendDeviceConnectionRequest(targetAddress driverMod
return response, nil
case errorResponse := <-errorResult:
return nil, errorResponse
- /*case <-time.After(m.defaultTtl):
- return nil, errors.New("got timeout waiting for device connection response")*/
}
}
@@ -1288,8 +1276,6 @@ func (m *KnxNetIpConnection) sendDeviceDisconnectionRequest(targetAddress driver
return response, nil
case errorResponse := <-errorResult:
return nil, errorResponse
- /*case <-time.After(m.defaultTtl):
- return nil, errors.New("got timeout waiting for device disconnection response")*/
}
}
@@ -1395,8 +1381,6 @@ func (m *KnxNetIpConnection) sendDeviceAuthentication(targetAddress driverModel.
return response, nil
case errorResponse := <-errorResult:
return nil, errorResponse
- /*case <-time.After(m.defaultTtl):
- return nil, errors.New("got timeout waiting for device authentication response")*/
}
}
@@ -1478,8 +1462,6 @@ func (m *KnxNetIpConnection) sendDeviceDeviceDescriptorReadRequest(targetAddress
return response, nil
case errorResponse := <-errorResult:
return nil, errorResponse
- /*case <-time.After(m.defaultTtl):
- return nil, errors.New("got timeout waiting for device descriptor read response")*/
}
}
@@ -1569,8 +1551,6 @@ func (m *KnxNetIpConnection) sendDevicePropertyReadRequest(targetAddress driverM
return response, nil
case errorResponse := <-errorResult:
return nil, errorResponse
- /*case <-time.After(m.defaultTtl):
- return nil, errors.New("got timeout waiting for device descriptor read response")*/
}
}
@@ -1660,15 +1640,12 @@ func (m *KnxNetIpConnection) sendDevicePropertyDescriptionReadRequest(targetAddr
return response, nil
case errorResponse := <-errorResult:
return nil, errorResponse
- /*case <-time.After(m.defaultTtl):
- return nil, errors.New("got timeout waiting for property description read response")*/
}
}
-func (m *KnxNetIpConnection) sendDeviceMemoryReadRequest(targetAddress driverModel.KnxAddress, address uint16, numElements uint8, datapointType driverModel.KnxDatapointType) (*driverModel.ApduDataMemoryResponse, error) {
+func (m *KnxNetIpConnection) sendDeviceMemoryReadRequest(targetAddress driverModel.KnxAddress, address uint16, numBytes uint8) (*driverModel.ApduDataMemoryResponse, error) {
// Next, read the device descriptor so we know how we have to communicate with the device.
counter := m.getNextCounter(targetAddress)
- numBytes := numElements * uint8(math.Max(float64(1), float64(datapointType.DatapointMainType().SizeInBits()/8)))
// Send the property read request and wait for a confirmation that this property is readable.
propertyReadRequest := driverModel.NewTunnelingRequest(
@@ -1748,8 +1725,6 @@ func (m *KnxNetIpConnection) sendDeviceMemoryReadRequest(targetAddress driverMod
return response, nil
case errorResponse := <-errorResult:
return nil, errorResponse
- /*case <-time.After(m.defaultTtl):
- return nil, errors.New("got timeout waiting for memory read response")*/
}
}
[plc4x] 02/03: - Clean up the read-memory function and the updating
of the max pdu size, if a mem-read returns less than expected.
Posted by cd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 8a7548edeb964a1d230c66dd3e15dadf5d2025d5
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Mon Mar 8 12:41:08 2021 +0100
- Clean up the read-memory function and the updating of the max pdu size, if a mem-read returns less than expected.
---
plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go | 1 -
1 file changed, 1 deletion(-)
diff --git a/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go b/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
index a838826..b7620b8 100644
--- a/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
+++ b/plc4go/internal/plc4go/knxnetip/KnxNetIpConnection.go
@@ -845,7 +845,6 @@ func (m *KnxNetIpConnection) DeviceReadMemory(targetAddress driverModel.KnxAddre
// as some devices seem to be sending back less than the
// number of bytes specified than the maxApdu.
if uint8(len(memoryReadResponse.Data)) < numBytes {
- // TODO: This is actually not correct ... it should be if the number is less than the ones requested in the current request (not in total)
connection.maxApdu = uint16(len(memoryReadResponse.Data) + 3)
}