You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/15 10:55:43 UTC
[plc4x] branch develop updated: feat(plc4go/spi): integrate ctx into DefaultCodec
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new aa93c2784 feat(plc4go/spi): integrate ctx into DefaultCodec
aa93c2784 is described below
commit aa93c27845fc415ec7a73af2c6950f41a673f88c
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Aug 15 12:55:35 2022 +0200
feat(plc4go/spi): integrate ctx into DefaultCodec
---
plc4go/internal/ads/Reader.go | 79 +-
plc4go/internal/ads/Writer.go | 55 +-
plc4go/internal/bacnetip/Reader.go | 95 +-
plc4go/internal/cbus/Connection.go | 149 ++-
plc4go/internal/cbus/Reader.go | 255 ++---
plc4go/internal/eip/Connection.go | 93 +-
plc4go/internal/eip/Reader.go | 180 ++-
plc4go/internal/eip/Writer.go | 180 ++-
plc4go/internal/knxnetip/Browser.go | 15 +-
plc4go/internal/knxnetip/Connection.go | 19 +-
.../knxnetip/ConnectionDriverSpecificOperations.go | 41 +-
plc4go/internal/knxnetip/ConnectionHelper.go | 7 +-
.../knxnetip/ConnectionInternalOperations.go | 1189 +++++++++-----------
plc4go/internal/knxnetip/Reader.go | 10 +-
plc4go/internal/modbus/Connection.go | 46 +-
plc4go/internal/modbus/Reader.go | 59 +-
plc4go/internal/modbus/Writer.go | 57 +-
plc4go/internal/s7/Connection.go | 191 ++--
plc4go/internal/s7/Reader.go | 81 +-
plc4go/internal/s7/Writer.go | 83 +-
plc4go/spi/MessageCodec.go | 8 +-
plc4go/spi/default/DefaultCodec.go | 33 +-
22 files changed, 1398 insertions(+), 1527 deletions(-)
diff --git a/plc4go/internal/ads/Reader.go b/plc4go/internal/ads/Reader.go
index a574b948c..bc0c0e652 100644
--- a/plc4go/internal/ads/Reader.go
+++ b/plc4go/internal/ads/Reader.go
@@ -64,15 +64,15 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
result := make(chan model.PlcReadRequestResult)
go func() {
if len(readRequest.GetFieldNames()) <= 1 {
- m.singleRead(readRequest, result)
+ m.singleRead(ctx, readRequest, result)
} else {
- m.multiRead(readRequest, result)
+ m.multiRead(ctx, readRequest, result)
}
}()
return result
}
-func (m *Reader) singleRead(readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
+func (m *Reader) singleRead(ctx context.Context, readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
if len(readRequest.GetFieldNames()) != 1 {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
@@ -96,7 +96,7 @@ func (m *Reader) singleRead(readRequest model.PlcReadRequest, result chan model.
log.Debug().Msgf("Invalid field item type %T", field)
return
}
- field, err = m.resolveField(adsField)
+ field, err = m.resolveField(ctx, adsField)
if err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
@@ -150,10 +150,10 @@ func (m *Reader) singleRead(readRequest model.PlcReadRequest, result chan model.
}
userdata.Data = readWriteModel.NewAdsReadRequest(adsField.IndexGroup, adsField.IndexOffset, readLength)
- m.sendOverTheWire(userdata, readRequest, result)
+ m.sendOverTheWire(ctx, userdata, readRequest, result)
}
-func (m *Reader) multiRead(readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
+func (m *Reader) multiRead(ctx context.Context, readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
// Calculate the size of all fields together.
// Calculate the expected size of the response data.
expectedResponseDataSize := uint32(0)
@@ -216,7 +216,7 @@ func (m *Reader) multiRead(readRequest model.PlcReadRequest, result chan model.P
log.Debug().Msgf("Invalid field item type %T", field)
return
}
- field, err = m.resolveField(adsField)
+ field, err = m.resolveField(ctx, adsField)
if err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
@@ -242,10 +242,10 @@ func (m *Reader) multiRead(readRequest model.PlcReadRequest, result chan model.P
}
userdata.Data = readWriteModel.NewAdsReadWriteRequest(uint32(readWriteModel.ReservedIndexGroups_ADSIGRP_MULTIPLE_READ), uint32(len(readRequest.GetFieldNames())), expectedResponseDataSize, items, nil)
- m.sendOverTheWire(userdata, readRequest, result)
+ m.sendOverTheWire(ctx, userdata, readRequest, result)
}
-func (m *Reader) sendOverTheWire(userdata readWriteModel.AmsPacket, readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
+func (m *Reader) sendOverTheWire(ctx context.Context, userdata readWriteModel.AmsPacket, readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
// Calculate a new transaction identifier
transactionIdentifier := atomic.AddUint32(&m.transactionIdentifier, 1)
if transactionIdentifier > math.MaxUint8 {
@@ -271,42 +271,37 @@ func (m *Reader) sendOverTheWire(userdata readWriteModel.AmsPacket, readRequest
// Send the TCP Paket over the wire
log.Trace().Msg("Send TCP Paket")
- if err := m.messageCodec.SendRequest(
- amsTcpPaket,
- func(message spi.Message) bool {
- paket := message.(readWriteModel.AmsTCPPacket)
- return paket.GetUserdata().GetInvokeId() == transactionIdentifier
- },
- func(message spi.Message) error {
- // Convert the response into an amsTcpPaket
- log.Trace().Msg("convert response to amsTcpPaket")
- receivedAmsTcpPaket := message.(readWriteModel.AmsTCPPacket)
- // Convert the ads response into a PLC4X response
- log.Trace().Msg("convert response to PLC4X response")
- readResponse, err := m.ToPlc4xReadResponse(receivedAmsTcpPaket, readRequest)
+ if err := m.messageCodec.SendRequest(ctx, amsTcpPaket, func(message spi.Message) bool {
+ paket := message.(readWriteModel.AmsTCPPacket)
+ return paket.GetUserdata().GetInvokeId() == transactionIdentifier
+ }, func(message spi.Message) error {
+ // Convert the response into an amsTcpPaket
+ log.Trace().Msg("convert response to amsTcpPaket")
+ receivedAmsTcpPaket := message.(readWriteModel.AmsTCPPacket)
+ // Convert the ads response into a PLC4X response
+ log.Trace().Msg("convert response to PLC4X response")
+ readResponse, err := m.ToPlc4xReadResponse(receivedAmsTcpPaket, readRequest)
- if err != nil {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Err: errors.Wrap(err, "Error decoding response"),
- }
- // TODO: should we return the error here?
- return nil
- }
- result <- &plc4goModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Response: readResponse,
- }
- return nil
- },
- func(err error) error {
+ if err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
- Err: errors.Wrap(err, "got timeout while waiting for response"),
+ Err: errors.Wrap(err, "Error decoding response"),
}
+ // TODO: should we return the error here?
return nil
- },
- time.Second*1); err != nil {
+ }
+ result <- &plc4goModel.DefaultPlcReadRequestResult{
+ Request: readRequest,
+ Response: readResponse,
+ }
+ return nil
+ }, func(err error) error {
+ result <- &plc4goModel.DefaultPlcReadRequestResult{
+ Request: readRequest,
+ Err: errors.Wrap(err, "got timeout while waiting for response"),
+ }
+ return nil
+ }, time.Second*1); err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: nil,
@@ -315,7 +310,7 @@ func (m *Reader) sendOverTheWire(userdata readWriteModel.AmsPacket, readRequest
}
}
-func (m *Reader) resolveField(symbolicField SymbolicPlcField) (DirectPlcField, error) {
+func (m *Reader) resolveField(ctx context.Context, symbolicField SymbolicPlcField) (DirectPlcField, error) {
if directPlcField, ok := m.fieldMapping[symbolicField]; ok {
return directPlcField, nil
}
@@ -346,7 +341,7 @@ func (m *Reader) resolveField(symbolicField SymbolicPlcField) (DirectPlcField, e
result := make(chan model.PlcReadRequestResult)
go func() {
dummyRequest := plc4goModel.NewDefaultPlcReadRequest(map[string]model.PlcField{"dummy": DirectPlcField{PlcField: PlcField{Datatype: readWriteModel.AdsDataType_UINT32}}}, []string{"dummy"}, nil, nil)
- m.sendOverTheWire(userdata, dummyRequest, result)
+ m.sendOverTheWire(ctx, userdata, dummyRequest, result)
}()
// We wait synchronous for the resolution response before we can continue
response := <-result
diff --git a/plc4go/internal/ads/Writer.go b/plc4go/internal/ads/Writer.go
index fa41fbdae..a6f5ba200 100644
--- a/plc4go/internal/ads/Writer.go
+++ b/plc4go/internal/ads/Writer.go
@@ -83,7 +83,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest)
log.Debug().Msgf("Invalid field item type %T", field)
return
}
- field, err = m.reader.resolveField(adsField)
+ field, err = m.reader.resolveField(ctx, adsField)
if err != nil {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
@@ -159,39 +159,34 @@ func (m *Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest)
amsTcpPaket := readWriteModel.NewAmsTCPPacket(userdata)
// Send the TCP Paket over the wire
- err = m.messageCodec.SendRequest(
- amsTcpPaket,
- func(message spi.Message) bool {
- paket := readWriteModel.CastAmsTCPPacket(message)
- return paket.GetUserdata().GetInvokeId() == transactionIdentifier
- },
- func(message spi.Message) error {
- // Convert the response into an responseAmsTcpPaket
- responseAmsTcpPaket := readWriteModel.CastAmsTCPPacket(message)
- // Convert the ads response into a PLC4X response
- readResponse, err := m.ToPlc4xWriteResponse(amsTcpPaket, responseAmsTcpPaket, writeRequest)
+ err = m.messageCodec.SendRequest(ctx, amsTcpPaket, func(message spi.Message) bool {
+ paket := readWriteModel.CastAmsTCPPacket(message)
+ return paket.GetUserdata().GetInvokeId() == transactionIdentifier
+ }, func(message spi.Message) error {
+ // Convert the response into an responseAmsTcpPaket
+ responseAmsTcpPaket := readWriteModel.CastAmsTCPPacket(message)
+ // Convert the ads response into a PLC4X response
+ readResponse, err := m.ToPlc4xWriteResponse(amsTcpPaket, responseAmsTcpPaket, writeRequest)
- if err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
- Request: writeRequest,
- Err: errors.Wrap(err, "Error decoding response"),
- }
- } else {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
- Request: writeRequest,
- Response: readResponse,
- }
- }
- return nil
- },
- func(err error) error {
+ if err != nil {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
- Err: errors.New("got timeout while waiting for response"),
+ Err: errors.Wrap(err, "Error decoding response"),
+ }
+ } else {
+ result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ Request: writeRequest,
+ Response: readResponse,
}
- return nil
- },
- time.Second*1)
+ }
+ return nil
+ }, func(err error) error {
+ result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ Request: writeRequest,
+ Err: errors.New("got timeout while waiting for response"),
+ }
+ return nil
+ }, time.Second*1)
}()
return result
}
diff --git a/plc4go/internal/bacnetip/Reader.go b/plc4go/internal/bacnetip/Reader.go
index 13e3c17cc..a950cccb5 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -188,63 +188,58 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
// Send the over the wire
log.Trace().Msg("Send ")
- if err := m.messageCodec.SendRequest(
- bvlc,
- func(message spi.Message) bool {
- bvlc, ok := message.(readWriteModel.BVLC)
- if !ok {
- log.Debug().Msgf("Received strange type %T", bvlc)
- return false
- }
- var npdu readWriteModel.NPDU
- if npduRetriever, ok := bvlc.(interface{ GetNpdu() readWriteModel.NPDU }); ok {
- npdu = npduRetriever.GetNpdu()
- } else {
- log.Debug().Msgf("bvlc has no way to give a npdu %T", bvlc)
- return false
- }
- if npdu.GetControl().GetMessageTypeFieldPresent() {
- return false
- }
- if invokeIdFromApdu, err := getInvokeIdFromApdu(npdu.GetApdu()); err != nil {
- log.Debug().Err(err).Msg("Error getting invoke id")
- return false
- } else {
- return invokeIdFromApdu == invokeId
- }
- },
- func(message spi.Message) error {
- // Convert the response into an
- log.Trace().Msg("convert response to ")
- apdu := message.(readWriteModel.BVLC).(interface{ GetNpdu() readWriteModel.NPDU }).GetNpdu().GetApdu()
+ if err := m.messageCodec.SendRequest(ctx, bvlc, func(message spi.Message) bool {
+ bvlc, ok := message.(readWriteModel.BVLC)
+ if !ok {
+ log.Debug().Msgf("Received strange type %T", bvlc)
+ return false
+ }
+ var npdu readWriteModel.NPDU
+ if npduRetriever, ok := bvlc.(interface{ GetNpdu() readWriteModel.NPDU }); ok {
+ npdu = npduRetriever.GetNpdu()
+ } else {
+ log.Debug().Msgf("bvlc has no way to give a npdu %T", bvlc)
+ return false
+ }
+ if npdu.GetControl().GetMessageTypeFieldPresent() {
+ return false
+ }
+ if invokeIdFromApdu, err := getInvokeIdFromApdu(npdu.GetApdu()); err != nil {
+ log.Debug().Err(err).Msg("Error getting invoke id")
+ return false
+ } else {
+ return invokeIdFromApdu == invokeId
+ }
+ }, func(message spi.Message) error {
+ // Convert the response into an
+ log.Trace().Msg("convert response to ")
+ apdu := message.(readWriteModel.BVLC).(interface{ GetNpdu() readWriteModel.NPDU }).GetNpdu().GetApdu()
- // TODO: implement segment handling
+ // TODO: implement segment handling
- // Convert the bacnet response into a PLC4X response
- log.Trace().Msg("convert response to PLC4X response")
- readResponse, err := m.ToPlc4xReadResponse(apdu, readRequest)
+ // Convert the bacnet response into a PLC4X response
+ log.Trace().Msg("convert response to PLC4X response")
+ readResponse, err := m.ToPlc4xReadResponse(apdu, readRequest)
- if err != nil {
- result <- &spiModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Err: errors.Wrap(err, "Error decoding response"),
- }
- return transaction.EndRequest()
- }
- result <- &spiModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Response: readResponse,
- }
- return transaction.EndRequest()
- },
- func(err error) error {
+ if err != nil {
result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
- Err: errors.Wrap(err, "got timeout while waiting for response"),
+ Err: errors.Wrap(err, "Error decoding response"),
}
return transaction.EndRequest()
- },
- time.Second*1); err != nil {
+ }
+ result <- &spiModel.DefaultPlcReadRequestResult{
+ Request: readRequest,
+ Response: readResponse,
+ }
+ return transaction.EndRequest()
+ }, func(err error) error {
+ result <- &spiModel.DefaultPlcReadRequestResult{
+ Request: readRequest,
+ Err: errors.Wrap(err, "got timeout while waiting for response"),
+ }
+ return transaction.EndRequest()
+ }, time.Second*1); err != nil {
result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: nil,
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index f341de773..16b245c33 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -20,6 +20,7 @@
package cbus
import (
+ "context"
"fmt"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -106,6 +107,8 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
}
func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
+ // TODO: use proper context
+ ctx := context.TODO()
log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
@@ -116,7 +119,7 @@ func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
// For testing purposes we can skip the waiting for a complete connection
if !c.driverContext.awaitSetupComplete {
- go c.setupConnection(ch)
+ go c.setupConnection(ctx, ch)
log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
// Here we write directly and don't wait till the connection is "really" connected
// Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete
@@ -125,7 +128,7 @@ func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
return
}
- c.setupConnection(ch)
+ c.setupConnection(ctx, ch)
}()
return ch
}
@@ -174,7 +177,7 @@ func (c *Connection) String() string {
return fmt.Sprintf("cbus.Connection")
}
-func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult) {
+func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
cbusOptions := &c.messageCodec.(*MessageCodec).cbusOptions
requestContext := &c.messageCodec.(*MessageCodec).requestContext
@@ -186,31 +189,25 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
receivedResetEchoChan := make(chan bool)
receivedResetEchoErrorChan := make(chan error)
- if err := c.messageCodec.SendRequest(
- cBusMessage,
- func(message spi.Message) bool {
- cbusMessageToServer, ok := message.(readWriteModel.CBusMessageToServerExactly)
- if !ok {
- return false
- }
- _, ok = cbusMessageToServer.GetRequest().(readWriteModel.RequestResetExactly)
- return ok
- },
- func(message spi.Message) error {
- receivedResetEchoChan <- true
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- c.Close()
- }
- receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request")
- return nil
- },
- c.GetTtl(),
- ); err != nil {
+ if err := c.messageCodec.SendRequest(ctx, cBusMessage, func(message spi.Message) bool {
+ cbusMessageToServer, ok := message.(readWriteModel.CBusMessageToServerExactly)
+ if !ok {
+ return false
+ }
+ _, ok = cbusMessageToServer.GetRequest().(readWriteModel.RequestResetExactly)
+ return ok
+ }, func(message spi.Message) error {
+ receivedResetEchoChan <- true
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ c.Close()
+ }
+ receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, c.GetTtl()); err != nil {
c.fireConnectionError(errors.Wrap(err, "Error during sending of Reset Request"), ch)
return
}
@@ -231,11 +228,11 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
{
log.Debug().Msg("Set application filter to all")
applicationAddress1 := readWriteModel.NewParameterValueApplicationAddress1(readWriteModel.NewApplicationAddress1(c.configuration.MonitoredApplication1), nil, 1)
- if !c.sendCalDataWrite(ch, readWriteModel.Parameter_APPLICATION_ADDRESS_1, applicationAddress1, requestContext, cbusOptions) {
+ if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_APPLICATION_ADDRESS_1, applicationAddress1, requestContext, cbusOptions) {
return
}
applicationAddress2 := readWriteModel.NewParameterValueApplicationAddress2(readWriteModel.NewApplicationAddress2(c.configuration.MonitoredApplication2), nil, 1)
- if !c.sendCalDataWrite(ch, readWriteModel.Parameter_APPLICATION_ADDRESS_2, applicationAddress2, requestContext, cbusOptions) {
+ if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_APPLICATION_ADDRESS_2, applicationAddress2, requestContext, cbusOptions) {
return
}
log.Debug().Msg("Application filter set")
@@ -243,7 +240,7 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
{
log.Debug().Msg("Set interface options 3")
interfaceOptions3 := readWriteModel.NewParameterValueInterfaceOptions3(readWriteModel.NewInterfaceOptions3(c.configuration.Exstat, c.configuration.Pun, c.configuration.LocalSal, c.configuration.Pcn), nil, 1)
- if !c.sendCalDataWrite(ch, readWriteModel.Parameter_INTERFACE_OPTIONS_3, interfaceOptions3, requestContext, cbusOptions) {
+ if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_INTERFACE_OPTIONS_3, interfaceOptions3, requestContext, cbusOptions) {
return
}
// TODO: add localsal to the options
@@ -253,7 +250,7 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
{
log.Debug().Msg("Set interface options 1 power up settings")
interfaceOptions1PowerUpSettings := readWriteModel.NewParameterValueInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1(c.configuration.Idmon, c.configuration.Monitor, c.configuration.Smart, c.configuration.Srchk, c.configuration.XonXoff, c.configuration.Connect)), 1)
- if !c.sendCalDataWrite(ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1_POWER_UP_SETTINGS, interfaceOptions1PowerUpSettings, requestContext, cbusOptions) {
+ if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1_POWER_UP_SETTINGS, interfaceOptions1PowerUpSettings, requestContext, cbusOptions) {
return
}
// TODO: what is with monall
@@ -263,7 +260,7 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
{
log.Debug().Msg("Set interface options 1")
interfaceOptions1 := readWriteModel.NewParameterValueInterfaceOptions1(readWriteModel.NewInterfaceOptions1(c.configuration.Idmon, c.configuration.Monitor, c.configuration.Smart, c.configuration.Srchk, c.configuration.XonXoff, c.configuration.Connect), nil, 1)
- if !c.sendCalDataWrite(ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1, interfaceOptions1, requestContext, cbusOptions) {
+ if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1, interfaceOptions1, requestContext, cbusOptions) {
return
}
// TODO: what is with monall
@@ -319,7 +316,7 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
}
// This is used for connection setup
-func (c *Connection) sendCalDataWrite(ch chan plc4go.PlcConnectionConnectResult, paramNo readWriteModel.Parameter, parameterValue readWriteModel.ParameterValue, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) bool {
+func (c *Connection) sendCalDataWrite(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, paramNo readWriteModel.Parameter, parameterValue readWriteModel.ParameterValue, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) bool {
// TODO: we assume that is always a one byte request otherwise we need to map the length here
calData := readWriteModel.NewCALDataWrite(paramNo, 0x0, parameterValue, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
directCommand := readWriteModel.NewRequestDirectCommandAccess(calData /*we don't want a alpha otherwise the PCI will auto-switch*/, nil, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
@@ -327,61 +324,55 @@ func (c *Connection) sendCalDataWrite(ch chan plc4go.PlcConnectionConnectResult,
directCommandAckChan := make(chan bool)
directCommandAckErrorChan := make(chan error)
- if err := c.messageCodec.SendRequest(
- cBusMessage,
- func(message spi.Message) bool {
- switch message := message.(type) {
- case readWriteModel.CBusMessageToClientExactly:
- switch reply := message.GetReply().(type) {
- case readWriteModel.ReplyOrConfirmationReplyExactly:
- switch reply := reply.GetReply().(type) {
- case readWriteModel.ReplyEncodedReplyExactly:
- switch encodedReply := reply.GetEncodedReply().(type) {
- case readWriteModel.EncodedReplyCALReplyExactly:
- switch data := encodedReply.GetCalReply().GetCalData().(type) {
- case readWriteModel.CALDataAcknowledgeExactly:
- if data.GetParamNo() == paramNo {
- return true
- }
+ if err := c.messageCodec.SendRequest(ctx, cBusMessage, func(message spi.Message) bool {
+ switch message := message.(type) {
+ case readWriteModel.CBusMessageToClientExactly:
+ switch reply := message.GetReply().(type) {
+ case readWriteModel.ReplyOrConfirmationReplyExactly:
+ switch reply := reply.GetReply().(type) {
+ case readWriteModel.ReplyEncodedReplyExactly:
+ switch encodedReply := reply.GetEncodedReply().(type) {
+ case readWriteModel.EncodedReplyCALReplyExactly:
+ switch data := encodedReply.GetCalReply().GetCalData().(type) {
+ case readWriteModel.CALDataAcknowledgeExactly:
+ if data.GetParamNo() == paramNo {
+ return true
}
}
}
}
}
- return false
- },
- func(message spi.Message) error {
- switch message := message.(type) {
- case readWriteModel.CBusMessageToClientExactly:
- switch reply := message.GetReply().(type) {
- case readWriteModel.ReplyOrConfirmationReplyExactly:
- switch reply := reply.GetReply().(type) {
- case readWriteModel.ReplyEncodedReplyExactly:
- switch encodedReply := reply.GetEncodedReply().(type) {
- case readWriteModel.EncodedReplyCALReplyExactly:
- switch data := encodedReply.GetCalReply().GetCalData().(type) {
- case readWriteModel.CALDataAcknowledgeExactly:
- if data.GetParamNo() == paramNo {
- directCommandAckChan <- true
- }
+ }
+ return false
+ }, func(message spi.Message) error {
+ switch message := message.(type) {
+ case readWriteModel.CBusMessageToClientExactly:
+ switch reply := message.GetReply().(type) {
+ case readWriteModel.ReplyOrConfirmationReplyExactly:
+ switch reply := reply.GetReply().(type) {
+ case readWriteModel.ReplyEncodedReplyExactly:
+ switch encodedReply := reply.GetEncodedReply().(type) {
+ case readWriteModel.EncodedReplyCALReplyExactly:
+ switch data := encodedReply.GetCalReply().GetCalData().(type) {
+ case readWriteModel.CALDataAcknowledgeExactly:
+ if data.GetParamNo() == paramNo {
+ directCommandAckChan <- true
}
}
}
}
}
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- c.Close()
- }
- directCommandAckErrorChan <- errors.Wrap(err, "got error processing request")
- return nil
- },
- c.GetTtl(),
- ); err != nil {
+ }
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ c.Close()
+ }
+ directCommandAckErrorChan <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, c.GetTtl()); err != nil {
c.fireConnectionError(errors.Wrap(err, "Error during sending of write request"), ch)
return false
}
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index cd5840f93..59f4f1511 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -101,84 +101,105 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
// Send the over the wire
log.Trace().Msg("Send ")
- if err := m.messageCodec.SendRequest(
- messageToSend,
- func(receivedMessage spi.Message) bool {
- cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
- if !ok {
- return false
- }
- messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
- if !ok {
- return false
- }
- // Check if this errored
- if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
- // This means we must handle this below
- return true
- }
+ if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
+ cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
+ if !ok {
+ return false
+ }
+ messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
+ if !ok {
+ return false
+ }
+ // Check if this errored
+ if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+ // This means we must handle this below
+ return true
+ }
- confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
- if !ok {
- return false
- }
- return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
- },
- func(receivedMessage spi.Message) error {
- // Convert the response into an
- log.Trace().Msg("convert response to ")
- cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
- messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
- if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
- log.Trace().Msg("We got a server failure")
- addResponseCode(fieldNameCopy, model.PlcResponseCode_INVALID_DATA)
- requestWasOk <- false
- return transaction.EndRequest()
- }
- replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
- if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
- var responseCode model.PlcResponseCode
- switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
- responseCode = model.PlcResponseCode_REMOTE_ERROR
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
- responseCode = model.PlcResponseCode_INVALID_DATA
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
- responseCode = model.PlcResponseCode_REMOTE_BUSY
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
- responseCode = model.PlcResponseCode_INVALID_DATA
- default:
- panic("Every code should be mapped here")
- }
- log.Trace().Msgf("Was no success %s:%v", fieldNameCopy, responseCode)
- addResponseCode(fieldNameCopy, responseCode)
- requestWasOk <- true
- return transaction.EndRequest()
+ confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+ if !ok {
+ return false
+ }
+ return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
+ }, func(receivedMessage spi.Message) error {
+ // Convert the response into an
+ log.Trace().Msg("convert response to ")
+ cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
+ messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
+ if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+ log.Trace().Msg("We got a server failure")
+ addResponseCode(fieldNameCopy, model.PlcResponseCode_INVALID_DATA)
+ requestWasOk <- false
+ return transaction.EndRequest()
+ }
+ replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+ if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
+ var responseCode model.PlcResponseCode
+ switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
+ responseCode = model.PlcResponseCode_REMOTE_ERROR
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
+ responseCode = model.PlcResponseCode_INVALID_DATA
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
+ responseCode = model.PlcResponseCode_REMOTE_BUSY
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
+ responseCode = model.PlcResponseCode_INVALID_DATA
+ default:
+ panic("Every code should be mapped here")
}
+ log.Trace().Msgf("Was no success %s:%v", fieldNameCopy, responseCode)
+ addResponseCode(fieldNameCopy, responseCode)
+ requestWasOk <- true
+ return transaction.EndRequest()
+ }
- alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
- // TODO: it could be double confirmed but this is not implemented yet
- embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
- if !ok {
- log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
- addResponseCode(fieldNameCopy, model.PlcResponseCode_NOT_FOUND)
- requestWasOk <- true
- return transaction.EndRequest()
- }
+ alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
+ // TODO: it could be double confirmed but this is not implemented yet
+ embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+ if !ok {
+ log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
+ addResponseCode(fieldNameCopy, model.PlcResponseCode_NOT_FOUND)
+ requestWasOk <- true
+ return transaction.EndRequest()
+ }
- log.Trace().Msg("Handling confirmed data")
- switch reply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply().(type) {
- case readWriteModel.EncodedReplyCALReplyExactly:
- calData := reply.GetCalReply().GetCalData()
+ log.Trace().Msg("Handling confirmed data")
+ switch reply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply().(type) {
+ case readWriteModel.EncodedReplyCALReplyExactly:
+ calData := reply.GetCalReply().GetCalData()
+ addResponseCode(fieldNameCopy, model.PlcResponseCode_OK)
+ switch calData := calData.(type) {
+ case readWriteModel.CALDataStatusExactly:
+ application := calData.GetApplication()
+ // TODO: verify application... this should be the same
+ _ = application
+ blockStart := calData.GetBlockStart()
+ // TODO: verify application... this should be the same
+ _ = blockStart
+ statusBytes := calData.GetStatusBytes()
addResponseCode(fieldNameCopy, model.PlcResponseCode_OK)
- switch calData := calData.(type) {
- case readWriteModel.CALDataStatusExactly:
- application := calData.GetApplication()
- // TODO: verify application... this should be the same
- _ = application
- blockStart := calData.GetBlockStart()
- // TODO: verify application... this should be the same
- _ = blockStart
+ plcListValues := make([]values.PlcValue, len(statusBytes)*4)
+ for i, statusByte := range statusBytes {
+ plcListValues[i*4+0] = spiValues.NewPlcSTRING(statusByte.GetGav0().String())
+ plcListValues[i*4+1] = spiValues.NewPlcSTRING(statusByte.GetGav1().String())
+ plcListValues[i*4+2] = spiValues.NewPlcSTRING(statusByte.GetGav2().String())
+ plcListValues[i*4+3] = spiValues.NewPlcSTRING(statusByte.GetGav3().String())
+ }
+ addPlcValue(fieldNameCopy, spiValues.NewPlcList(plcListValues))
+ case readWriteModel.CALDataStatusExtendedExactly:
+ coding := calData.GetCoding()
+ // TODO: verify coding... this should be the same
+ _ = coding
+ application := calData.GetApplication()
+ // TODO: verify application... this should be the same
+ _ = application
+ blockStart := calData.GetBlockStart()
+ // TODO: verify application... this should be the same
+ _ = blockStart
+ switch coding {
+ case readWriteModel.StatusCoding_BINARY_BY_THIS_SERIAL_INTERFACE:
+ fallthrough
+ case readWriteModel.StatusCoding_BINARY_BY_ELSEWHERE:
statusBytes := calData.GetStatusBytes()
addResponseCode(fieldNameCopy, model.PlcResponseCode_OK)
plcListValues := make([]values.PlcValue, len(statusBytes)*4)
@@ -189,67 +210,41 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
plcListValues[i*4+3] = spiValues.NewPlcSTRING(statusByte.GetGav3().String())
}
addPlcValue(fieldNameCopy, spiValues.NewPlcList(plcListValues))
- case readWriteModel.CALDataStatusExtendedExactly:
- coding := calData.GetCoding()
- // TODO: verify coding... this should be the same
- _ = coding
- application := calData.GetApplication()
- // TODO: verify application... this should be the same
- _ = application
- blockStart := calData.GetBlockStart()
- // TODO: verify application... this should be the same
- _ = blockStart
- switch coding {
- case readWriteModel.StatusCoding_BINARY_BY_THIS_SERIAL_INTERFACE:
- fallthrough
- case readWriteModel.StatusCoding_BINARY_BY_ELSEWHERE:
- statusBytes := calData.GetStatusBytes()
- addResponseCode(fieldNameCopy, model.PlcResponseCode_OK)
- plcListValues := make([]values.PlcValue, len(statusBytes)*4)
- for i, statusByte := range statusBytes {
- plcListValues[i*4+0] = spiValues.NewPlcSTRING(statusByte.GetGav0().String())
- plcListValues[i*4+1] = spiValues.NewPlcSTRING(statusByte.GetGav1().String())
- plcListValues[i*4+2] = spiValues.NewPlcSTRING(statusByte.GetGav2().String())
- plcListValues[i*4+3] = spiValues.NewPlcSTRING(statusByte.GetGav3().String())
- }
- addPlcValue(fieldNameCopy, spiValues.NewPlcList(plcListValues))
- case readWriteModel.StatusCoding_LEVEL_BY_THIS_SERIAL_INTERFACE:
- fallthrough
- case readWriteModel.StatusCoding_LEVEL_BY_ELSEWHERE:
- levelInformation := calData.GetLevelInformation()
- addResponseCode(fieldNameCopy, model.PlcResponseCode_OK)
- plcListValues := make([]values.PlcValue, len(levelInformation))
- for i, levelInformation := range levelInformation {
- switch levelInformation := levelInformation.(type) {
- case readWriteModel.LevelInformationAbsentExactly:
- plcListValues[i] = spiValues.NewPlcSTRING("is absent")
- case readWriteModel.LevelInformationCorruptedExactly:
- plcListValues[i] = spiValues.NewPlcSTRING("corrupted")
- case readWriteModel.LevelInformationNormalExactly:
- plcListValues[i] = spiValues.NewPlcUSINT(levelInformation.GetActualLevel())
- default:
- panic("Impossible case")
- }
+ case readWriteModel.StatusCoding_LEVEL_BY_THIS_SERIAL_INTERFACE:
+ fallthrough
+ case readWriteModel.StatusCoding_LEVEL_BY_ELSEWHERE:
+ levelInformation := calData.GetLevelInformation()
+ addResponseCode(fieldNameCopy, model.PlcResponseCode_OK)
+ plcListValues := make([]values.PlcValue, len(levelInformation))
+ for i, levelInformation := range levelInformation {
+ switch levelInformation := levelInformation.(type) {
+ case readWriteModel.LevelInformationAbsentExactly:
+ plcListValues[i] = spiValues.NewPlcSTRING("is absent")
+ case readWriteModel.LevelInformationCorruptedExactly:
+ plcListValues[i] = spiValues.NewPlcSTRING("corrupted")
+ case readWriteModel.LevelInformationNormalExactly:
+ plcListValues[i] = spiValues.NewPlcUSINT(levelInformation.GetActualLevel())
+ default:
+ panic("Impossible case")
}
- addPlcValue(fieldNameCopy, spiValues.NewPlcList(plcListValues))
}
+ addPlcValue(fieldNameCopy, spiValues.NewPlcList(plcListValues))
}
- // TODO: how should we serialize that???
- addPlcValue(fieldNameCopy, spiValues.NewPlcSTRING(fmt.Sprintf("%s", calData)))
- default:
- panic(fmt.Sprintf("All types should be mapped here. Not mapped: %T", reply))
}
- requestWasOk <- true
- return transaction.EndRequest()
- },
- func(err error) error {
- log.Debug().Msgf("Error waiting for field %s", fieldNameCopy)
- addResponseCode(fieldNameCopy, model.PlcResponseCode_REQUEST_TIMEOUT)
- // TODO: ok or not ok?
- requestWasOk <- true
- return transaction.EndRequest()
- },
- time.Second*1); err != nil {
+ // TODO: how should we serialize that???
+ addPlcValue(fieldNameCopy, spiValues.NewPlcSTRING(fmt.Sprintf("%s", calData)))
+ default:
+ panic(fmt.Sprintf("All types should be mapped here. Not mapped: %T", reply))
+ }
+ requestWasOk <- true
+ return transaction.EndRequest()
+ }, func(err error) error {
+ log.Debug().Msgf("Error waiting for field %s", fieldNameCopy)
+ addResponseCode(fieldNameCopy, model.PlcResponseCode_REQUEST_TIMEOUT)
+ // TODO: ok or not ok?
+ requestWasOk <- true
+ return transaction.EndRequest()
+ }, time.Second*1); err != nil {
log.Debug().Err(err).Msgf("Error sending message for field %s", fieldNameCopy)
addResponseCode(fieldNameCopy, model.PlcResponseCode_INTERNAL_ERROR)
_ = transaction.EndRequest()
diff --git a/plc4go/internal/eip/Connection.go b/plc4go/internal/eip/Connection.go
index 362417d14..4db0d2381 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -20,6 +20,7 @@
package eip
import (
+ "context"
"fmt"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -84,6 +85,8 @@ func (m *Connection) GetMessageCodec() spi.MessageCodec {
}
func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
+ // TODO: use proper context
+ ctx := context.TODO()
log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
@@ -94,7 +97,7 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
// For testing purposes we can skip the waiting for a complete connection
if !m.driverContext.awaitSetupComplete {
- go m.setupConnection(ch)
+ go m.setupConnection(ctx, ch)
log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
// Here we write directly and don't wait till the connection is "really" connected
// Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete
@@ -103,69 +106,59 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
return
}
- m.setupConnection(ch)
+ m.setupConnection(ctx, ch)
}()
return ch
}
func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
+ // TODO: use proper context
+ ctx := context.TODO()
result := make(chan plc4go.PlcConnectionCloseResult)
go func() {
log.Debug().Msg("Sending UnregisterSession EIP Packet")
- _ = m.messageCodec.SendRequest(
- readWriteModel.NewEipDisconnectRequest(m.sessionHandle, 0, make([]byte, 8), 0),
- func(message spi.Message) bool {
- return true
- },
- func(message spi.Message) error {
- return nil
- },
- func(err error) error {
- return nil
- },
- m.GetTtl(),
- ) //Unregister gets no response
+ _ = m.messageCodec.SendRequest(ctx, readWriteModel.NewEipDisconnectRequest(m.sessionHandle, 0, make([]byte, 8), 0), func(message spi.Message) bool {
+ return true
+ }, func(message spi.Message) error {
+ return nil
+ }, func(err error) error {
+ return nil
+ }, m.GetTtl()) //Unregister gets no response
log.Debug().Msgf("Unregistred Session %d", m.sessionHandle)
}()
return result
}
-func (m *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult) {
+func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
log.Debug().Msg("Sending EIP Connection Request")
- if err := m.messageCodec.SendRequest(
- readWriteModel.NewEipConnectionRequest(0, 0, make([]byte, 8), 0),
- func(message spi.Message) bool {
- eipPacket := message.(readWriteModel.EipPacket)
- if eipPacket == nil {
- return false
- }
- eipPacketConnectionRequest := eipPacket.(readWriteModel.EipConnectionRequest)
- return eipPacketConnectionRequest != nil
- },
- func(message spi.Message) error {
- eipPacket := message.(readWriteModel.EipPacket)
- if eipPacket.GetStatus() == 0 {
- m.sessionHandle = eipPacket.GetSessionHandle()
- m.senderContext = eipPacket.GetSenderContext()
- log.Debug().Msgf("Got assigned with Session %d", m.sessionHandle)
- // Send an event that connection setup is complete.
- m.fireConnected(ch)
- } else {
-
- }
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
- }
- m.fireConnectionError(errors.Wrap(err, "got error processing request"), ch)
- return nil
- },
- m.GetTtl(),
- ); err != nil {
+ if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewEipConnectionRequest(0, 0, make([]byte, 8), 0), func(message spi.Message) bool {
+ eipPacket := message.(readWriteModel.EipPacket)
+ if eipPacket == nil {
+ return false
+ }
+ eipPacketConnectionRequest := eipPacket.(readWriteModel.EipConnectionRequest)
+ return eipPacketConnectionRequest != nil
+ }, func(message spi.Message) error {
+ eipPacket := message.(readWriteModel.EipPacket)
+ if eipPacket.GetStatus() == 0 {
+ m.sessionHandle = eipPacket.GetSessionHandle()
+ m.senderContext = eipPacket.GetSenderContext()
+ log.Debug().Msgf("Got assigned with Session %d", m.sessionHandle)
+ // Send an event that connection setup is complete.
+ m.fireConnected(ch)
+ } else {
+
+ }
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.Close()
+ }
+ m.fireConnectionError(errors.Wrap(err, "got error processing request"), ch)
+ return nil
+ }, m.GetTtl()); err != nil {
m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP Connection Request"), ch)
}
}
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 65caf6b68..f20058abb 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -115,60 +115,55 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
transaction.Submit(func() {
// Send the over the wire
log.Trace().Msg("Send ")
- if err := m.messageCodec.SendRequest(
- pkt,
- func(message spi.Message) bool {
- eipPacket := message.(readWriteModel.EipPacket)
- if eipPacket == nil {
- return false
- }
- cipRRData := eipPacket.(readWriteModel.CipRRData)
- if cipRRData == nil {
- return false
- }
- if eipPacket.GetSessionHandle() != *m.sessionHandle {
- return false
- }
- multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
- if multipleServiceResponse == nil {
- return false
- }
- if multipleServiceResponse.GetServiceNb() != nb {
- return false
- }
- return true
- },
- func(message spi.Message) error {
- // Convert the response into an
- log.Trace().Msg("convert response to ")
- eipPacket := message.(readWriteModel.EipPacket)
- cipRRData := eipPacket.(readWriteModel.CipRRData)
- multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
- // Convert the eip response into a PLC4X response
- log.Trace().Msg("convert response to PLC4X response")
- readResponse, err := m.ToPlc4xReadResponse(multipleServiceResponse, readRequest)
+ if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
+ eipPacket := message.(readWriteModel.EipPacket)
+ if eipPacket == nil {
+ return false
+ }
+ cipRRData := eipPacket.(readWriteModel.CipRRData)
+ if cipRRData == nil {
+ return false
+ }
+ if eipPacket.GetSessionHandle() != *m.sessionHandle {
+ return false
+ }
+ multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
+ if multipleServiceResponse == nil {
+ return false
+ }
+ if multipleServiceResponse.GetServiceNb() != nb {
+ return false
+ }
+ return true
+ }, func(message spi.Message) error {
+ // Convert the response into an
+ log.Trace().Msg("convert response to ")
+ eipPacket := message.(readWriteModel.EipPacket)
+ cipRRData := eipPacket.(readWriteModel.CipRRData)
+ multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
+ // Convert the eip response into a PLC4X response
+ log.Trace().Msg("convert response to PLC4X response")
+ readResponse, err := m.ToPlc4xReadResponse(multipleServiceResponse, readRequest)
- if err != nil {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Err: errors.Wrap(err, "Error decoding response"),
- }
- return transaction.EndRequest()
- }
- result <- &plc4goModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Response: readResponse,
- }
- return transaction.EndRequest()
- },
- func(err error) error {
+ if err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
- Err: errors.Wrap(err, "got timeout while waiting for response"),
+ Err: errors.Wrap(err, "Error decoding response"),
}
return transaction.EndRequest()
- },
- time.Second*1); err != nil {
+ }
+ result <- &plc4goModel.DefaultPlcReadRequestResult{
+ Request: readRequest,
+ Response: readResponse,
+ }
+ return transaction.EndRequest()
+ }, func(err error) error {
+ result <- &plc4goModel.DefaultPlcReadRequestResult{
+ Request: readRequest,
+ Err: errors.Wrap(err, "got timeout while waiting for response"),
+ }
+ return transaction.EndRequest()
+ }, time.Second*1); err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: nil,
@@ -201,57 +196,52 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
transaction.Submit(func() {
// Send the over the wire
log.Trace().Msg("Send ")
- if err := m.messageCodec.SendRequest(
- pkt,
- func(message spi.Message) bool {
- eipPacket := message.(readWriteModel.EipPacket)
- if eipPacket == nil {
- return false
- }
- cipRRData := eipPacket.(readWriteModel.CipRRData)
- if cipRRData == nil {
- return false
- }
- if eipPacket.GetSessionHandle() != *m.sessionHandle {
- return false
- }
- cipReadResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipReadResponse)
- if cipReadResponse == nil {
- return false
- }
- return true
- },
- func(message spi.Message) error {
- // Convert the response into an
- log.Trace().Msg("convert response to ")
- eipPacket := message.(readWriteModel.EipPacket)
- cipRRData := eipPacket.(readWriteModel.CipRRData)
- cipReadResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipReadResponse)
- // Convert the eip response into a PLC4X response
- log.Trace().Msg("convert response to PLC4X response")
- readResponse, err := m.ToPlc4xReadResponse(cipReadResponse, readRequest)
+ if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
+ eipPacket := message.(readWriteModel.EipPacket)
+ if eipPacket == nil {
+ return false
+ }
+ cipRRData := eipPacket.(readWriteModel.CipRRData)
+ if cipRRData == nil {
+ return false
+ }
+ if eipPacket.GetSessionHandle() != *m.sessionHandle {
+ return false
+ }
+ cipReadResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipReadResponse)
+ if cipReadResponse == nil {
+ return false
+ }
+ return true
+ }, func(message spi.Message) error {
+ // Convert the response into an
+ log.Trace().Msg("convert response to ")
+ eipPacket := message.(readWriteModel.EipPacket)
+ cipRRData := eipPacket.(readWriteModel.CipRRData)
+ cipReadResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipReadResponse)
+ // Convert the eip response into a PLC4X response
+ log.Trace().Msg("convert response to PLC4X response")
+ readResponse, err := m.ToPlc4xReadResponse(cipReadResponse, readRequest)
- if err != nil {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Err: errors.Wrap(err, "Error decoding response"),
- }
- return transaction.EndRequest()
- }
- result <- &plc4goModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Response: readResponse,
- }
- return transaction.EndRequest()
- },
- func(err error) error {
+ if err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
- Err: errors.Wrap(err, "got timeout while waiting for response"),
+ Err: errors.Wrap(err, "Error decoding response"),
}
return transaction.EndRequest()
- },
- time.Second*1); err != nil {
+ }
+ result <- &plc4goModel.DefaultPlcReadRequestResult{
+ Request: readRequest,
+ Response: readResponse,
+ }
+ return transaction.EndRequest()
+ }, func(err error) error {
+ result <- &plc4goModel.DefaultPlcReadRequestResult{
+ Request: readRequest,
+ Err: errors.Wrap(err, "got timeout while waiting for response"),
+ }
+ return transaction.EndRequest()
+ }, time.Second*1); err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: nil,
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index 7343b07af..86101cedf 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -121,57 +121,52 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
transaction := m.tm.StartTransaction()
transaction.Submit(func() {
// Send the over the wire
- if err := m.messageCodec.SendRequest(
- pkt,
- func(message spi.Message) bool {
- eipPacket := message.(readWriteModel.EipPacket)
- if eipPacket == nil {
- return false
- }
- cipRRData := eipPacket.(readWriteModel.CipRRData)
- if cipRRData == nil {
- return false
- }
- if eipPacket.GetSessionHandle() != *m.sessionHandle {
- return false
- }
- cipWriteResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipWriteResponse)
- if cipWriteResponse == nil {
- return false
- }
- return true
- },
- func(message spi.Message) error {
- // Convert the response into an
- log.Trace().Msg("convert response to ")
- eipPacket := message.(readWriteModel.EipPacket)
- cipRRData := eipPacket.(readWriteModel.CipRRData)
- cipWriteResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipWriteResponse)
- // Convert the eip response into a PLC4X response
- log.Trace().Msg("convert response to PLC4X response")
- readResponse, err := m.ToPlc4xWriteResponse(cipWriteResponse, writeRequest)
+ if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
+ eipPacket := message.(readWriteModel.EipPacket)
+ if eipPacket == nil {
+ return false
+ }
+ cipRRData := eipPacket.(readWriteModel.CipRRData)
+ if cipRRData == nil {
+ return false
+ }
+ if eipPacket.GetSessionHandle() != *m.sessionHandle {
+ return false
+ }
+ cipWriteResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipWriteResponse)
+ if cipWriteResponse == nil {
+ return false
+ }
+ return true
+ }, func(message spi.Message) error {
+ // Convert the response into an
+ log.Trace().Msg("convert response to ")
+ eipPacket := message.(readWriteModel.EipPacket)
+ cipRRData := eipPacket.(readWriteModel.CipRRData)
+ cipWriteResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipWriteResponse)
+ // Convert the eip response into a PLC4X response
+ log.Trace().Msg("convert response to PLC4X response")
+ readResponse, err := m.ToPlc4xWriteResponse(cipWriteResponse, writeRequest)
- if err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
- Request: writeRequest,
- Err: errors.Wrap(err, "Error decoding response"),
- }
- return transaction.EndRequest()
- }
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
- Request: writeRequest,
- Response: readResponse,
- }
- return transaction.EndRequest()
- },
- func(err error) error {
+ if err != nil {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
- Err: errors.New("got timeout while waiting for response"),
+ Err: errors.Wrap(err, "Error decoding response"),
}
return transaction.EndRequest()
- },
- time.Second*1); err != nil {
+ }
+ result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ Request: writeRequest,
+ Response: readResponse,
+ }
+ return transaction.EndRequest()
+ }, func(err error) error {
+ result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ Request: writeRequest,
+ Err: errors.New("got timeout while waiting for response"),
+ }
+ return transaction.EndRequest()
+ }, time.Second*1); err != nil {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: nil,
@@ -218,60 +213,55 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
transaction := m.tm.StartTransaction()
transaction.Submit(func() {
// Send the over the wire
- if err := m.messageCodec.SendRequest(
- pkt,
- func(message spi.Message) bool {
- eipPacket := message.(readWriteModel.EipPacket)
- if eipPacket == nil {
- return false
- }
- cipRRData := eipPacket.(readWriteModel.CipRRData)
- if cipRRData == nil {
- return false
- }
- if eipPacket.GetSessionHandle() != *m.sessionHandle {
- return false
- }
- multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
- if multipleServiceResponse == nil {
- return false
- }
- if multipleServiceResponse.GetServiceNb() != nb {
- return false
- }
- return true
- },
- func(message spi.Message) error {
- // Convert the response into an
- log.Trace().Msg("convert response to ")
- eipPacket := message.(readWriteModel.EipPacket)
- cipRRData := eipPacket.(readWriteModel.CipRRData)
- multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
- // Convert the eip response into a PLC4X response
- log.Trace().Msg("convert response to PLC4X response")
- readResponse, err := m.ToPlc4xWriteResponse(multipleServiceResponse, writeRequest)
+ if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
+ eipPacket := message.(readWriteModel.EipPacket)
+ if eipPacket == nil {
+ return false
+ }
+ cipRRData := eipPacket.(readWriteModel.CipRRData)
+ if cipRRData == nil {
+ return false
+ }
+ if eipPacket.GetSessionHandle() != *m.sessionHandle {
+ return false
+ }
+ multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
+ if multipleServiceResponse == nil {
+ return false
+ }
+ if multipleServiceResponse.GetServiceNb() != nb {
+ return false
+ }
+ return true
+ }, func(message spi.Message) error {
+ // Convert the response into an
+ log.Trace().Msg("convert response to ")
+ eipPacket := message.(readWriteModel.EipPacket)
+ cipRRData := eipPacket.(readWriteModel.CipRRData)
+ multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
+ // Convert the eip response into a PLC4X response
+ log.Trace().Msg("convert response to PLC4X response")
+ readResponse, err := m.ToPlc4xWriteResponse(multipleServiceResponse, writeRequest)
- if err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
- Request: writeRequest,
- Err: errors.Wrap(err, "Error decoding response"),
- }
- return transaction.EndRequest()
- }
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
- Request: writeRequest,
- Response: readResponse,
- }
- return transaction.EndRequest()
- },
- func(err error) error {
+ if err != nil {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
- Err: errors.New("got timeout while waiting for response"),
+ Err: errors.Wrap(err, "Error decoding response"),
}
return transaction.EndRequest()
- },
- time.Second*1); err != nil {
+ }
+ result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ Request: writeRequest,
+ Response: readResponse,
+ }
+ return transaction.EndRequest()
+ }, func(err error) error {
+ result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ Request: writeRequest,
+ Err: errors.New("got timeout while waiting for response"),
+ }
+ return transaction.EndRequest()
+ }, time.Second*1); err != nil {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: nil,
diff --git a/plc4go/internal/knxnetip/Browser.go b/plc4go/internal/knxnetip/Browser.go
index 74640234b..75fef4af6 100644
--- a/plc4go/internal/knxnetip/Browser.go
+++ b/plc4go/internal/knxnetip/Browser.go
@@ -56,10 +56,9 @@ func NewBrowser(connection *Connection, messageCodec spi.MessageCodec) *Browser
}
func (m Browser) BrowseField(ctx context.Context, browseRequest apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseEvent) bool, fieldName string, field apiModel.PlcField) (apiModel.PlcResponseCode, []apiModel.PlcBrowseFoundField) {
- // TODO: handle ctx
switch field.(type) {
case DeviceQueryField:
- queryResults, err := m.executeDeviceQuery(field.(DeviceQueryField), browseRequest, fieldName, interceptor)
+ queryResults, err := m.executeDeviceQuery(ctx, field.(DeviceQueryField), browseRequest, fieldName, interceptor)
if err != nil {
log.Warn().Err(err).Msg("Error executing device query")
return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
@@ -67,7 +66,7 @@ func (m Browser) BrowseField(ctx context.Context, browseRequest apiModel.PlcBrow
return apiModel.PlcResponseCode_OK, queryResults
}
case CommunicationObjectQueryField:
- queryResults, err := m.executeCommunicationObjectQuery(field.(CommunicationObjectQueryField))
+ queryResults, err := m.executeCommunicationObjectQuery(ctx, field.(CommunicationObjectQueryField))
if err != nil {
log.Warn().Err(err).Msg("Error executing device query")
return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
@@ -79,7 +78,7 @@ func (m Browser) BrowseField(ctx context.Context, browseRequest apiModel.PlcBrow
}
}
-func (m Browser) executeDeviceQuery(field DeviceQueryField, browseRequest apiModel.PlcBrowseRequest, fieldName string, interceptor func(result apiModel.PlcBrowseEvent) bool) ([]apiModel.PlcBrowseFoundField, error) {
+func (m Browser) executeDeviceQuery(ctx context.Context, field DeviceQueryField, browseRequest apiModel.PlcBrowseRequest, fieldName string, interceptor func(result apiModel.PlcBrowseEvent) bool) ([]apiModel.PlcBrowseFoundField, error) {
// Create a list of address strings, which doesn't contain any ranges, lists or wildcards
knxAddresses, err := m.calculateAddresses(field)
if err != nil {
@@ -94,7 +93,7 @@ func (m Browser) executeDeviceQuery(field DeviceQueryField, browseRequest apiMod
for _, knxAddress := range knxAddresses {
// Send a connection request to the device
connectTtlTimer := time.NewTimer(m.connection.defaultTtl)
- deviceConnections := m.connection.DeviceConnect(knxAddress)
+ deviceConnections := m.connection.DeviceConnect(ctx, knxAddress)
select {
case deviceConnection := <-deviceConnections:
if !connectTtlTimer.Stop() {
@@ -129,7 +128,7 @@ func (m Browser) executeDeviceQuery(field DeviceQueryField, browseRequest apiMod
}
disconnectTtlTimer := time.NewTimer(m.connection.defaultTtl * 10)
- deviceDisconnections := m.connection.DeviceDisconnect(knxAddress)
+ deviceDisconnections := m.connection.DeviceDisconnect(ctx, knxAddress)
select {
case _ = <-deviceDisconnections:
if !disconnectTtlTimer.Stop() {
@@ -150,7 +149,7 @@ func (m Browser) executeDeviceQuery(field DeviceQueryField, browseRequest apiMod
return queryResults, nil
}
-func (m Browser) executeCommunicationObjectQuery(field CommunicationObjectQueryField) ([]apiModel.PlcBrowseFoundField, error) {
+func (m Browser) executeCommunicationObjectQuery(ctx context.Context, field CommunicationObjectQueryField) ([]apiModel.PlcBrowseFoundField, error) {
var results []apiModel.PlcBrowseFoundField
knxAddress := field.toKnxAddress()
@@ -158,7 +157,7 @@ func (m Browser) executeCommunicationObjectQuery(field CommunicationObjectQueryF
// If we have a building Key, try that to login in order to access protected
if m.connection.buildingKey != nil {
- arr := m.connection.DeviceAuthenticate(knxAddress, m.connection.buildingKey)
+ arr := m.connection.DeviceAuthenticate(ctx, knxAddress, m.connection.buildingKey)
<-arr
}
diff --git a/plc4go/internal/knxnetip/Connection.go b/plc4go/internal/knxnetip/Connection.go
index 179e873a8..98b298ee3 100644
--- a/plc4go/internal/knxnetip/Connection.go
+++ b/plc4go/internal/knxnetip/Connection.go
@@ -21,6 +21,7 @@ package knxnetip
import (
"bytes"
+ "context"
"encoding/hex"
"fmt"
_default "github.com/apache/plc4x/plc4go/spi/default"
@@ -214,6 +215,8 @@ func (m *Connection) GetTracer() *spi.Tracer {
}
func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
+ // TODO: use proper context
+ ctx := context.TODO()
result := make(chan plc4go.PlcConnectionConnectResult)
sendResult := func(connection plc4go.PlcConnection, err error) {
result <- _default.NewDefaultPlcConnectionConnectResult(connection, err)
@@ -228,7 +231,7 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
}
// Send a search request before connecting to the device.
- searchResponse, err := m.sendGatewaySearchRequest()
+ searchResponse, err := m.sendGatewaySearchRequest(ctx)
if err != nil {
m.doSomethingAndClose(func() { sendResult(nil, errors.Wrap(err, "error discovering device capabilities")) })
return
@@ -261,7 +264,7 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
// Via this connection we then get access to the entire KNX network this Gateway is connected to.
if supportsTunneling {
// As soon as we got a successful search-response back, send a connection request.
- connectionResponse, err := m.sendGatewayConnectionRequest()
+ connectionResponse, err := m.sendGatewayConnectionRequest(ctx)
if err != nil {
m.doSomethingAndClose(func() { sendResult(nil, errors.Wrap(err, "error connecting to device")) })
return
@@ -334,7 +337,7 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
}
}
} else {
- m.handleIncomingTunnelingRequest(tunnelingRequest)
+ m.handleIncomingTunnelingRequest(ctx, tunnelingRequest)
}
}
log.Warn().Msg("Tunneling handler shat down")
@@ -379,6 +382,8 @@ func (m *Connection) BlockingClose() {
}
func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
+ // TODO: use proper context
+ ctx := context.TODO()
result := make(chan plc4go.PlcConnectionCloseResult)
go func() {
@@ -390,7 +395,7 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
// Disconnect from all knx devices we are still connected to.
for targetAddress := range m.DeviceConnections {
ttlTimer := time.NewTimer(m.defaultTtl)
- disconnects := m.DeviceDisconnect(targetAddress)
+ disconnects := m.DeviceDisconnect(ctx, targetAddress)
select {
case _ = <-disconnects:
if !ttlTimer.Stop() {
@@ -404,7 +409,7 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
}
// Send a disconnect request from the gateway.
- _, err := m.sendGatewayDisconnectionRequest()
+ _, err := m.sendGatewayDisconnectionRequest(ctx)
if err != nil {
result <- _default.NewDefaultPlcConnectionCloseResult(m, errors.Wrap(err, "got an error while disconnecting"))
} else {
@@ -435,11 +440,13 @@ func (m *Connection) IsConnected() bool {
}
func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
+ // TODO: use proper context
+ ctx := context.TODO()
result := make(chan plc4go.PlcConnectionPingResult)
go func() {
// Send the connection state request
- _, err := m.sendConnectionStateRequest()
+ _, err := m.sendConnectionStateRequest(ctx)
if err != nil {
result <- _default.NewDefaultPlcConnectionPingResult(errors.Wrap(err, "got an error"))
} else {
diff --git a/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go b/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
index 7694b1a2b..56b563559 100644
--- a/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
+++ b/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
@@ -20,6 +20,7 @@
package knxnetip
import (
+ "context"
"math"
"strconv"
"time"
@@ -43,7 +44,7 @@ import (
// They expect the called private functions to handle timeouts, so these will not.
///////////////////////////////////////////////////////////////////////////////////////////////////////
-func (m *Connection) ReadGroupAddress(groupAddress []byte, datapointType *driverModel.KnxDatapointType) <-chan KnxReadResult {
+func (m *Connection) ReadGroupAddress(ctx context.Context, groupAddress []byte, datapointType *driverModel.KnxDatapointType) <-chan KnxReadResult {
result := make(chan KnxReadResult)
sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
@@ -63,7 +64,7 @@ func (m *Connection) ReadGroupAddress(groupAddress []byte, datapointType *driver
}
go func() {
- groupAddressReadResponse, err := m.sendGroupAddressReadRequest(groupAddress)
+ groupAddressReadResponse, err := m.sendGroupAddressReadRequest(ctx, groupAddress)
if err != nil {
sendResponse(nil, 0, errors.Wrap(err, "error reading group address"))
return
@@ -99,7 +100,7 @@ func (m *Connection) ReadGroupAddress(groupAddress []byte, datapointType *driver
return result
}
-func (m *Connection) DeviceConnect(targetAddress driverModel.KnxAddress) <-chan KnxDeviceConnectResult {
+func (m *Connection) DeviceConnect(ctx context.Context, targetAddress driverModel.KnxAddress) <-chan KnxDeviceConnectResult {
result := make(chan KnxDeviceConnectResult)
sendResponse := func(connection *KnxDeviceConnection, err error) {
@@ -125,7 +126,7 @@ func (m *Connection) DeviceConnect(targetAddress driverModel.KnxAddress) <-chan
}
// First send a connection request
- controlConnectResponse, err := m.sendDeviceConnectionRequest(targetAddress)
+ controlConnectResponse, err := m.sendDeviceConnectionRequest(ctx, targetAddress)
if err != nil {
sendResponse(nil, errors.Wrap(err, "error creating device connection"))
return
@@ -145,7 +146,7 @@ func (m *Connection) DeviceConnect(targetAddress driverModel.KnxAddress) <-chan
m.DeviceConnections[targetAddress] = connection
// If the connection request was successful, try to read the device-descriptor
- deviceDescriptorResponse, err := m.sendDeviceDeviceDescriptorReadRequest(targetAddress)
+ deviceDescriptorResponse, err := m.sendDeviceDeviceDescriptorReadRequest(ctx, targetAddress)
if err != nil {
sendResponse(nil, errors.New(
"error reading device descriptor: "+err.Error()))
@@ -161,7 +162,7 @@ func (m *Connection) DeviceConnect(targetAddress driverModel.KnxAddress) <-chan
// default APDU Size of 15
// Defined in: 03_05_01 Resources v01.09.03 AS Page 40
deviceApduSize := uint16(15)
- propertyValueResponse, err := m.sendDevicePropertyReadRequest(targetAddress, 0, 56, 1, 1)
+ propertyValueResponse, err := m.sendDevicePropertyReadRequest(ctx, targetAddress, 0, 56, 1, 1)
if err == nil {
// If the count is 0, then this property doesn't exist or the user has no permission to read it.
// In all other cases we expect the response to contain the value.
@@ -190,7 +191,7 @@ func (m *Connection) DeviceConnect(targetAddress driverModel.KnxAddress) <-chan
return result
}
-func (m *Connection) DeviceDisconnect(targetAddress driverModel.KnxAddress) <-chan KnxDeviceDisconnectResult {
+func (m *Connection) DeviceDisconnect(ctx context.Context, targetAddress driverModel.KnxAddress) <-chan KnxDeviceDisconnectResult {
result := make(chan KnxDeviceDisconnectResult)
sendResponse := func(connection *KnxDeviceConnection, err error) {
@@ -210,7 +211,7 @@ func (m *Connection) DeviceDisconnect(targetAddress driverModel.KnxAddress) <-ch
go func() {
if connection, ok := m.DeviceConnections[targetAddress]; ok {
- _, err := m.sendDeviceDisconnectionRequest(targetAddress)
+ _, err := m.sendDeviceDisconnectionRequest(ctx, targetAddress)
// Remove the connection from the list.
delete(m.DeviceConnections, targetAddress)
@@ -224,7 +225,7 @@ func (m *Connection) DeviceDisconnect(targetAddress driverModel.KnxAddress) <-ch
return result
}
-func (m *Connection) DeviceAuthenticate(targetAddress driverModel.KnxAddress, buildingKey []byte) <-chan KnxDeviceAuthenticateResult {
+func (m *Connection) DeviceAuthenticate(ctx context.Context, targetAddress driverModel.KnxAddress, buildingKey []byte) <-chan KnxDeviceAuthenticateResult {
result := make(chan KnxDeviceAuthenticateResult)
sendResponse := func(err error) {
@@ -246,7 +247,7 @@ func (m *Connection) DeviceAuthenticate(targetAddress driverModel.KnxAddress, bu
// if not, create a new one.
connection, ok := m.DeviceConnections[targetAddress]
if !ok {
- connections := m.DeviceConnect(targetAddress)
+ connections := m.DeviceConnect(ctx, targetAddress)
deviceConnectionResult := <-connections
// If we didn't get a connect, abort
if deviceConnectionResult.err != nil {
@@ -260,7 +261,7 @@ func (m *Connection) DeviceAuthenticate(targetAddress driverModel.KnxAddress, bu
return
}
authenticationLevel := uint8(0)
- authenticationResponse, err := m.sendDeviceAuthentication(targetAddress, authenticationLevel, buildingKey)
+ authenticationResponse, err := m.sendDeviceAuthentication(ctx, targetAddress, authenticationLevel, buildingKey)
if err == nil {
if authenticationResponse.GetLevel() == authenticationLevel {
sendResponse(nil)
@@ -277,7 +278,7 @@ func (m *Connection) DeviceAuthenticate(targetAddress driverModel.KnxAddress, bu
return result
}
-func (m *Connection) DeviceReadProperty(targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8, propertyIndex uint16, numElements uint8) <-chan KnxReadResult {
+func (m *Connection) DeviceReadProperty(ctx context.Context, targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8, propertyIndex uint16, numElements uint8) <-chan KnxReadResult {
result := make(chan KnxReadResult)
sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
@@ -301,7 +302,7 @@ func (m *Connection) DeviceReadProperty(targetAddress driverModel.KnxAddress, ob
// if not, create a new one.
connection, ok := m.DeviceConnections[targetAddress]
if !ok {
- connections := m.DeviceConnect(targetAddress)
+ connections := m.DeviceConnect(ctx, targetAddress)
deviceConnectionResult := <-connections
// If we didn't get a connect, abort
if deviceConnectionResult.err != nil {
@@ -317,7 +318,7 @@ func (m *Connection) DeviceReadProperty(targetAddress driverModel.KnxAddress, ob
sendResponse(nil, 0, errors.New("unable to connect to device"))
return
}
- propertyValueResponse, err := m.sendDevicePropertyReadRequest(targetAddress, objectId, propertyId, propertyIndex, numElements)
+ propertyValueResponse, err := m.sendDevicePropertyReadRequest(ctx, targetAddress, objectId, propertyId, propertyIndex, numElements)
if err != nil {
sendResponse(nil, 0, err)
return
@@ -357,7 +358,7 @@ func (m *Connection) DeviceReadProperty(targetAddress driverModel.KnxAddress, ob
return result
}
-func (m *Connection) DeviceReadPropertyDescriptor(targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8) <-chan KnxReadResult {
+func (m *Connection) DeviceReadPropertyDescriptor(ctx context.Context, targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8) <-chan KnxReadResult {
result := make(chan KnxReadResult)
sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
@@ -381,7 +382,7 @@ func (m *Connection) DeviceReadPropertyDescriptor(targetAddress driverModel.KnxA
// if not, create a new one.
connection, ok := m.DeviceConnections[targetAddress]
if !ok {
- connections := m.DeviceConnect(targetAddress)
+ connections := m.DeviceConnect(ctx, targetAddress)
deviceConnectionResult := <-connections
// If we didn't get a connect, abort
if deviceConnectionResult.err != nil {
@@ -398,7 +399,7 @@ func (m *Connection) DeviceReadPropertyDescriptor(targetAddress driverModel.KnxA
return
}
// If we successfully got a connection, read the property
- propertyDescriptionResponse, err := m.sendDevicePropertyDescriptionReadRequest(targetAddress, objectId, propertyId)
+ propertyDescriptionResponse, err := m.sendDevicePropertyDescriptionReadRequest(ctx, targetAddress, objectId, propertyId)
if err != nil {
sendResponse(nil, 0, err)
return
@@ -417,7 +418,7 @@ func (m *Connection) DeviceReadPropertyDescriptor(targetAddress driverModel.KnxA
return result
}
-func (m *Connection) DeviceReadMemory(targetAddress driverModel.KnxAddress, address uint16, numElements uint8, datapointType *driverModel.KnxDatapointType) <-chan KnxReadResult {
+func (m *Connection) DeviceReadMemory(ctx context.Context, targetAddress driverModel.KnxAddress, address uint16, numElements uint8, datapointType *driverModel.KnxDatapointType) <-chan KnxReadResult {
result := make(chan KnxReadResult)
sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
@@ -447,7 +448,7 @@ func (m *Connection) DeviceReadMemory(targetAddress driverModel.KnxAddress, addr
// if not, create a new one.
connection, ok := m.DeviceConnections[targetAddress]
if !ok {
- connections := m.DeviceConnect(targetAddress)
+ connections := m.DeviceConnect(ctx, targetAddress)
deviceConnectionResult := <-connections
// If we didn't get a connect, abort
if deviceConnectionResult.err != nil {
@@ -477,7 +478,7 @@ func (m *Connection) DeviceReadMemory(targetAddress driverModel.KnxAddress, addr
maxNumElementsPerRequest := uint8(math.Floor(float64(maxNumBytes / elementSize)))
numElements := uint8(math.Min(float64(remainingRequestElements), float64(maxNumElementsPerRequest)))
numBytes := numElements * uint8(math.Max(float64(1), float64(datapointType.DatapointMainType().SizeInBits()/8)))
- memoryReadResponse, err := m.sendDeviceMemoryReadRequest(targetAddress, curStartingAddress, numBytes)
+ memoryReadResponse, err := m.sendDeviceMemoryReadRequest(ctx, targetAddress, curStartingAddress, numBytes)
if err != nil {
// TODO: do we need to send a response here
return
diff --git a/plc4go/internal/knxnetip/ConnectionHelper.go b/plc4go/internal/knxnetip/ConnectionHelper.go
index 1794c6bd3..11cc6c42b 100644
--- a/plc4go/internal/knxnetip/ConnectionHelper.go
+++ b/plc4go/internal/knxnetip/ConnectionHelper.go
@@ -20,6 +20,7 @@
package knxnetip
import (
+ "context"
"fmt"
"math"
"net"
@@ -50,7 +51,7 @@ func (m *Connection) castIpToKnxAddress(ip net.IP) driverModel.IPAddress {
return driverModel.NewIPAddress(ip[len(ip)-4:])
}
-func (m *Connection) handleIncomingTunnelingRequest(tunnelingRequest driverModel.TunnelingRequest) {
+func (m *Connection) handleIncomingTunnelingRequest(ctx context.Context, tunnelingRequest driverModel.TunnelingRequest) {
go func() {
lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
if lDataInd == nil {
@@ -83,7 +84,7 @@ func (m *Connection) handleIncomingTunnelingRequest(tunnelingRequest driverModel
targetAddress := ByteArrayToKnxAddress(dataFrame.GetDestinationAddress())
if targetAddress == m.ClientKnxAddress {
log.Info().Msg("Acknowleding an unhandled data message.")
- _ = m.sendDeviceAck(dataFrame.GetSourceAddress(), dataFrame.GetApdu().GetCounter(), func(err error) {})
+ _ = m.sendDeviceAck(ctx, dataFrame.GetSourceAddress(), dataFrame.GetApdu().GetCounter(), func(err error) {})
}
}
case driverModel.ApduControlContainer:
@@ -94,7 +95,7 @@ func (m *Connection) handleIncomingTunnelingRequest(tunnelingRequest driverModel
targetAddress := ByteArrayToKnxAddress(dataFrame.GetDestinationAddress())
if targetAddress == m.ClientKnxAddress {
log.Info().Msg("Acknowleding an unhandled contol message.")
- _ = m.sendDeviceAck(dataFrame.GetSourceAddress(), dataFrame.GetApdu().GetCounter(), func(err error) {})
+ _ = m.sendDeviceAck(ctx, dataFrame.GetSourceAddress(), dataFrame.GetApdu().GetCounter(), func(err error) {})
}
}
default:
diff --git a/plc4go/internal/knxnetip/ConnectionInternalOperations.go b/plc4go/internal/knxnetip/ConnectionInternalOperations.go
index 546e52ba9..c9a1e129b 100644
--- a/plc4go/internal/knxnetip/ConnectionInternalOperations.go
+++ b/plc4go/internal/knxnetip/ConnectionInternalOperations.go
@@ -20,6 +20,7 @@
package knxnetip
import (
+ "context"
"github.com/apache/plc4x/plc4go/spi"
"reflect"
"time"
@@ -42,7 +43,7 @@ import (
// They all assume the connection is checked and is available.
///////////////////////////////////////////////////////////////////////////////////////////////////////
-func (m *Connection) sendGatewaySearchRequest() (driverModel.SearchResponse, error) {
+func (m *Connection) sendGatewaySearchRequest(ctx context.Context) (driverModel.SearchResponse, error) {
localAddress, err := m.getLocalAddress()
if err != nil {
return nil, errors.Wrap(err, "error getting local address")
@@ -56,25 +57,21 @@ func (m *Connection) sendGatewaySearchRequest() (driverModel.SearchResponse, err
result := make(chan driverModel.SearchResponse)
errorResult := make(chan error)
- err = m.messageCodec.SendRequest(searchRequest,
- func(message spi.Message) bool {
- searchResponse := driverModel.CastSearchResponse(message)
- return searchResponse != nil
- },
- func(message spi.Message) error {
- searchResponse := driverModel.CastSearchResponse(message)
- result <- searchResponse
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- m.handleTimeout()
- }
- errorResult <- errors.Wrap(err, "got error processing request")
- return nil
- },
- m.defaultTtl)
+ err = m.messageCodec.SendRequest(ctx, searchRequest, func(message spi.Message) bool {
+ searchResponse := driverModel.CastSearchResponse(message)
+ return searchResponse != nil
+ }, func(message spi.Message) error {
+ searchResponse := driverModel.CastSearchResponse(message)
+ result <- searchResponse
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
+ errorResult <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.defaultTtl)
if err != nil {
return nil, errors.Wrap(err, "got error sending search request")
@@ -99,7 +96,7 @@ func (m *Connection) sendGatewaySearchRequest() (driverModel.SearchResponse, err
}
}
-func (m *Connection) sendGatewayConnectionRequest() (driverModel.ConnectionResponse, error) {
+func (m *Connection) sendGatewayConnectionRequest(ctx context.Context) (driverModel.ConnectionResponse, error) {
localAddress, err := m.getLocalAddress()
if err != nil {
return nil, errors.Wrap(err, "error getting local address")
@@ -114,25 +111,21 @@ func (m *Connection) sendGatewayConnectionRequest() (driverModel.ConnectionRespo
result := make(chan driverModel.ConnectionResponse)
errorResult := make(chan error)
- err = m.messageCodec.SendRequest(connectionRequest,
- func(message spi.Message) bool {
- connectionResponse := driverModel.CastConnectionResponse(message)
- return connectionResponse != nil
- },
- func(message spi.Message) error {
- connectionResponse := driverModel.CastConnectionResponse(message)
- result <- connectionResponse
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- m.handleTimeout()
- }
- errorResult <- errors.Wrap(err, "got error processing request")
- return nil
- },
- m.defaultTtl)
+ err = m.messageCodec.SendRequest(ctx, connectionRequest, func(message spi.Message) bool {
+ connectionResponse := driverModel.CastConnectionResponse(message)
+ return connectionResponse != nil
+ }, func(message spi.Message) error {
+ connectionResponse := driverModel.CastConnectionResponse(message)
+ result <- connectionResponse
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
+ errorResult <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.defaultTtl)
if err != nil {
return nil, errors.Wrap(err, "got error sending request")
@@ -146,7 +139,7 @@ func (m *Connection) sendGatewayConnectionRequest() (driverModel.ConnectionRespo
}
}
-func (m *Connection) sendGatewayDisconnectionRequest() (driverModel.DisconnectResponse, error) {
+func (m *Connection) sendGatewayDisconnectionRequest(ctx context.Context) (driverModel.DisconnectResponse, error) {
localAddress, err := m.getLocalAddress()
if err != nil {
return nil, errors.Wrap(err, "error getting local address")
@@ -164,25 +157,21 @@ func (m *Connection) sendGatewayDisconnectionRequest() (driverModel.DisconnectRe
result := make(chan driverModel.DisconnectResponse)
errorResult := make(chan error)
- err = m.messageCodec.SendRequest(disconnectRequest,
- func(message spi.Message) bool {
- disconnectResponse := driverModel.CastDisconnectResponse(message)
- return disconnectResponse != nil
- },
- func(message spi.Message) error {
- disconnectResponse := driverModel.CastDisconnectResponse(message)
- result <- disconnectResponse
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- m.handleTimeout()
- }
- errorResult <- errors.Wrap(err, "got error processing request")
- return nil
- },
- m.defaultTtl)
+ err = m.messageCodec.SendRequest(ctx, disconnectRequest, func(message spi.Message) bool {
+ disconnectResponse := driverModel.CastDisconnectResponse(message)
+ return disconnectResponse != nil
+ }, func(message spi.Message) error {
+ disconnectResponse := driverModel.CastDisconnectResponse(message)
+ result <- disconnectResponse
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
+ errorResult <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.defaultTtl)
if err != nil {
return nil, errors.Wrap(err, "got error sending request")
@@ -196,7 +185,7 @@ func (m *Connection) sendGatewayDisconnectionRequest() (driverModel.DisconnectRe
}
}
-func (m *Connection) sendConnectionStateRequest() (driverModel.ConnectionStateResponse, error) {
+func (m *Connection) sendConnectionStateRequest(ctx context.Context) (driverModel.ConnectionStateResponse, error) {
localAddress, err := m.getLocalAddress()
if err != nil {
return nil, errors.Wrap(err, "error getting local address")
@@ -211,25 +200,21 @@ func (m *Connection) sendConnectionStateRequest() (driverModel.ConnectionStateRe
result := make(chan driverModel.ConnectionStateResponse)
errorResult := make(chan error)
- err = m.messageCodec.SendRequest(connectionStateRequest,
- func(message spi.Message) bool {
- connectionStateResponse := driverModel.CastConnectionStateResponse(message)
- return connectionStateResponse != nil
- },
- func(message spi.Message) error {
- connectionStateResponse := driverModel.CastConnectionStateResponse(message)
- result <- connectionStateResponse
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- m.handleTimeout()
- }
- errorResult <- errors.Wrap(err, "got error processing request")
- return nil
- },
- m.defaultTtl)
+ err = m.messageCodec.SendRequest(ctx, connectionStateRequest, func(message spi.Message) bool {
+ connectionStateResponse := driverModel.CastConnectionStateResponse(message)
+ return connectionStateResponse != nil
+ }, func(message spi.Message) error {
+ connectionStateResponse := driverModel.CastConnectionStateResponse(message)
+ result <- connectionStateResponse
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
+ errorResult <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.defaultTtl)
if err != nil {
return nil, errors.Wrap(err, "got error sending request")
@@ -243,7 +228,7 @@ func (m *Connection) sendConnectionStateRequest() (driverModel.ConnectionStateRe
}
}
-func (m *Connection) sendGroupAddressReadRequest(groupAddress []byte) (driverModel.ApduDataGroupValueResponse, error) {
+func (m *Connection) sendGroupAddressReadRequest(ctx context.Context, groupAddress []byte) (driverModel.ApduDataGroupValueResponse, error) {
// Send the property read request and wait for a confirmation that this property is readable.
groupAddressReadRequest := driverModel.NewTunnelingRequest(
driverModel.NewTunnelingRequestDataBlock(m.CommunicationChannelId, m.getNewSequenceCounter()),
@@ -269,52 +254,47 @@ func (m *Connection) sendGroupAddressReadRequest(groupAddress []byte) (driverMod
result := make(chan driverModel.ApduDataGroupValueResponse)
errorResult := make(chan error)
- err := m.messageCodec.SendRequest(
- groupAddressReadRequest,
- func(message spi.Message) bool {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- if tunnelingRequest == nil ||
- tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
- return false
- }
- lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
- if lDataInd == nil {
- return false
- }
- dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
- if dataFrameExt == nil {
- return false
- }
- dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
- if dataContainer == nil {
- return false
- }
- groupReadResponse := driverModel.CastApduDataGroupValueResponse(dataContainer.GetDataApdu())
- if groupReadResponse == nil {
- return false
- }
- // Check if it's a value response for the given group address
- return dataFrameExt.GetGroupAddress() && reflect.DeepEqual(dataFrameExt.GetSourceAddress(), groupAddress)
- },
- func(message spi.Message) error {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
- dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
- dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
- groupReadResponse := driverModel.CastApduDataGroupValueResponse(dataContainer.GetDataApdu())
-
- result <- groupReadResponse
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- m.handleTimeout()
- }
- errorResult <- errors.Wrap(err, "got error processing request")
- return nil
- },
- m.defaultTtl)
+ err := m.messageCodec.SendRequest(ctx, groupAddressReadRequest, func(message spi.Message) bool {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ if tunnelingRequest == nil ||
+ tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+ return false
+ }
+ lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+ if lDataInd == nil {
+ return false
+ }
+ dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+ if dataFrameExt == nil {
+ return false
+ }
+ dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+ if dataContainer == nil {
+ return false
+ }
+ groupReadResponse := driverModel.CastApduDataGroupValueResponse(dataContainer.GetDataApdu())
+ if groupReadResponse == nil {
+ return false
+ }
+ // Check if it's a value response for the given group address
+ return dataFrameExt.GetGroupAddress() && reflect.DeepEqual(dataFrameExt.GetSourceAddress(), groupAddress)
+ }, func(message spi.Message) error {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+ dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+ dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+ groupReadResponse := driverModel.CastApduDataGroupValueResponse(dataContainer.GetDataApdu())
+
+ result <- groupReadResponse
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
+ errorResult <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.defaultTtl)
if err != nil {
return nil, errors.Wrap(err, "got error sending request")
@@ -328,7 +308,7 @@ func (m *Connection) sendGroupAddressReadRequest(groupAddress []byte) (driverMod
}
}
-func (m *Connection) sendDeviceConnectionRequest(targetAddress driverModel.KnxAddress) (driverModel.ApduControlConnect, error) {
+func (m *Connection) sendDeviceConnectionRequest(ctx context.Context, targetAddress driverModel.KnxAddress) (driverModel.ApduControlConnect, error) {
// Send a connection request to the individual KNX device
deviceConnectionRequest := driverModel.NewTunnelingRequest(
driverModel.NewTunnelingRequestDataBlock(m.CommunicationChannelId, m.getNewSequenceCounter()),
@@ -354,59 +334,53 @@ func (m *Connection) sendDeviceConnectionRequest(targetAddress driverModel.KnxAd
result := make(chan driverModel.ApduControlConnect)
errorResult := make(chan error)
- err := m.messageCodec.SendRequest(
- deviceConnectionRequest,
- // The Gateway is now supposed to send an Ack to this request.
- func(message spi.Message) bool {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- if tunnelingRequest == nil ||
- tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
- return false
- }
- lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
- if lDataCon == nil {
- return false
- }
- lDataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
- if lDataFrameExt == nil {
- return false
- }
- // Check if the address matches
- if ByteArrayToKnxAddress(lDataFrameExt.GetDestinationAddress()) != targetAddress {
- return false
- }
- apduControlContainer := driverModel.CastApduControlContainer(lDataFrameExt.GetApdu())
- if apduControlContainer == nil {
- return false
- }
- apduControlConnect := driverModel.CastApduControlConnect(apduControlContainer.GetControlApdu())
- return apduControlConnect != nil
- },
- func(message spi.Message) error {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
- lDataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
- apduControlContainer := driverModel.CastApduControlContainer(lDataFrameExt.GetApdu())
- apduControlConnect := driverModel.CastApduControlConnect(apduControlContainer.GetControlApdu())
-
- // If the error flag is set, there was an error connecting
- if lDataCon.GetDataFrame().GetErrorFlag() {
- errorResult <- errors.Errorf("error connecting to device at: %s", KnxAddressToString(targetAddress))
- } else {
- result <- apduControlConnect
- }
+ err := m.messageCodec.SendRequest(ctx, deviceConnectionRequest, func(message spi.Message) bool {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ if tunnelingRequest == nil ||
+ tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+ return false
+ }
+ lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
+ if lDataCon == nil {
+ return false
+ }
+ lDataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
+ if lDataFrameExt == nil {
+ return false
+ }
+ // Check if the address matches
+ if ByteArrayToKnxAddress(lDataFrameExt.GetDestinationAddress()) != targetAddress {
+ return false
+ }
+ apduControlContainer := driverModel.CastApduControlContainer(lDataFrameExt.GetApdu())
+ if apduControlContainer == nil {
+ return false
+ }
+ apduControlConnect := driverModel.CastApduControlConnect(apduControlContainer.GetControlApdu())
+ return apduControlConnect != nil
+ }, func(message spi.Message) error {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
+ lDataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
+ apduControlContainer := driverModel.CastApduControlContainer(lDataFrameExt.GetApdu())
+ apduControlConnect := driverModel.CastApduControlConnect(apduControlContainer.GetControlApdu())
+
+ // If the error flag is set, there was an error connecting
+ if lDataCon.GetDataFrame().GetErrorFlag() {
+ errorResult <- errors.Errorf("error connecting to device at: %s", KnxAddressToString(targetAddress))
+ } else {
+ result <- apduControlConnect
+ }
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- m.handleTimeout()
- }
- errorResult <- errors.Wrap(err, "got error processing request")
- return nil
- },
- m.defaultTtl)
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
+ errorResult <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.defaultTtl)
if err != nil {
return nil, errors.Wrap(err, "got error sending request")
@@ -420,7 +394,7 @@ func (m *Connection) sendDeviceConnectionRequest(targetAddress driverModel.KnxAd
}
}
-func (m *Connection) sendDeviceDisconnectionRequest(targetAddress driverModel.KnxAddress) (driverModel.ApduControlDisconnect, error) {
+func (m *Connection) sendDeviceDisconnectionRequest(ctx context.Context, targetAddress driverModel.KnxAddress) (driverModel.ApduControlDisconnect, error) {
// Send a connection request to the individual KNX device
deviceDisconnectionRequest := driverModel.NewTunnelingRequest(
driverModel.NewTunnelingRequestDataBlock(m.CommunicationChannelId, m.getNewSequenceCounter()),
@@ -446,60 +420,54 @@ func (m *Connection) sendDeviceDisconnectionRequest(targetAddress driverModel.Kn
result := make(chan driverModel.ApduControlDisconnect)
errorResult := make(chan error)
- err := m.messageCodec.SendRequest(
- deviceDisconnectionRequest,
- // The Gateway is now supposed to send an Ack to this request.
- func(message spi.Message) bool {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- if tunnelingRequest == nil ||
- tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
- return false
- }
- lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
- if lDataCon == nil {
- return false
- }
- dataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
- if dataFrameExt == nil {
- return false
- }
- curTargetAddress := ByteArrayToKnxAddress(dataFrameExt.GetDestinationAddress())
- // Check if the address matches
- if curTargetAddress != targetAddress {
- return false
- }
- apduControlContainer := driverModel.CastApduControlContainer(dataFrameExt.GetApdu())
- if apduControlContainer == nil {
- return false
- }
- apduControlDisconnect := driverModel.CastApduControlDisconnect(apduControlContainer.GetControlApdu())
- return apduControlDisconnect != nil
- },
- func(message spi.Message) error {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
- dataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
- apduControlContainer := driverModel.CastApduControlContainer(dataFrameExt.GetApdu())
- apduControlDisconnect := driverModel.CastApduControlDisconnect(apduControlContainer.GetControlApdu())
-
- // If the error flag is set, there was an error disconnecting
- if lDataCon.GetDataFrame().GetErrorFlag() {
- errorResult <- errors.Errorf("error disconnecting from device at: %s", KnxAddressToString(targetAddress))
- } else {
- result <- apduControlDisconnect
- }
+ err := m.messageCodec.SendRequest(ctx, deviceDisconnectionRequest, func(message spi.Message) bool {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ if tunnelingRequest == nil ||
+ tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+ return false
+ }
+ lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
+ if lDataCon == nil {
+ return false
+ }
+ dataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
+ if dataFrameExt == nil {
+ return false
+ }
+ curTargetAddress := ByteArrayToKnxAddress(dataFrameExt.GetDestinationAddress())
+ // Check if the address matches
+ if curTargetAddress != targetAddress {
+ return false
+ }
+ apduControlContainer := driverModel.CastApduControlContainer(dataFrameExt.GetApdu())
+ if apduControlContainer == nil {
+ return false
+ }
+ apduControlDisconnect := driverModel.CastApduControlDisconnect(apduControlContainer.GetControlApdu())
+ return apduControlDisconnect != nil
+ }, func(message spi.Message) error {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
+ dataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
+ apduControlContainer := driverModel.CastApduControlContainer(dataFrameExt.GetApdu())
+ apduControlDisconnect := driverModel.CastApduControlDisconnect(apduControlContainer.GetControlApdu())
+
+ // If the error flag is set, there was an error disconnecting
+ if lDataCon.GetDataFrame().GetErrorFlag() {
+ errorResult <- errors.Errorf("error disconnecting from device at: %s", KnxAddressToString(targetAddress))
+ } else {
+ result <- apduControlDisconnect
+ }
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- m.handleTimeout()
- }
- errorResult <- errors.Wrap(err, "got error processing request")
- return nil
- },
- m.defaultTtl)
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
+ errorResult <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.defaultTtl)
if err != nil {
return nil, errors.Wrap(err, "got error sending request")
@@ -513,7 +481,7 @@ func (m *Connection) sendDeviceDisconnectionRequest(targetAddress driverModel.Kn
}
}
-func (m *Connection) sendDeviceAuthentication(targetAddress driverModel.KnxAddress, authenticationLevel uint8, buildingKey []byte) (driverModel.ApduDataExtAuthorizeResponse, error) {
+func (m *Connection) sendDeviceAuthentication(ctx context.Context, targetAddress driverModel.KnxAddress, authenticationLevel uint8, buildingKey []byte) (driverModel.ApduDataExtAuthorizeResponse, error) {
// Check if there is already a connection available,
// if not, create a new one.
connection, ok := m.DeviceConnections[targetAddress]
@@ -556,80 +524,74 @@ func (m *Connection) sendDeviceAuthentication(targetAddress driverModel.KnxAddre
result := make(chan driverModel.ApduDataExtAuthorizeResponse)
errorResult := make(chan error)
- err := m.messageCodec.SendRequest(
- deviceAuthenticationRequest,
- // The Gateway is now supposed to send an Ack to this request.
- func(message spi.Message) bool {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- if tunnelingRequest == nil ||
- tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
- return false
- }
- lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
- if lDataInd == nil {
- return false
- }
- dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
- if dataFrameExt == nil {
- return false
- }
- apduDataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
- if apduDataContainer == nil {
- return false
- }
- apduDataOther := driverModel.CastApduDataOther(apduDataContainer.GetDataApdu())
- if apduDataOther == nil {
- return false
- }
- apduAuthorizeResponse := driverModel.CastApduDataExtAuthorizeResponse(apduDataOther.GetExtendedApdu())
- if apduAuthorizeResponse == nil {
- return false
- }
- curTargetAddress := ByteArrayToKnxAddress(dataFrameExt.GetDestinationAddress())
- // Check if the addresses match
- if curTargetAddress != m.ClientKnxAddress {
- return false
- }
- if dataFrameExt.GetSourceAddress() != targetAddress {
- return false
- }
- // Check if the counter matches
- if dataFrameExt.GetApdu().GetCounter() != counter {
- return false
- }
- return true
- },
- func(message spi.Message) error {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
- dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
- apduDataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
- apduDataOther := driverModel.CastApduDataOther(apduDataContainer.GetDataApdu())
- apduAuthorizeResponse := driverModel.CastApduDataExtAuthorizeResponse(apduDataOther.GetExtendedApdu())
-
- // Acknowledge the receipt
- _ = m.sendDeviceAck(targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
- // If the error flag is set, there was an error authenticating
- if lDataInd.GetDataFrame().GetErrorFlag() {
- errorResult <- errors.New("error authenticating at device: " + KnxAddressToString(targetAddress))
- } else if err != nil {
- errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
- } else {
- result <- apduAuthorizeResponse
- }
- })
-
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- m.handleTimeout()
+ err := m.messageCodec.SendRequest(ctx, deviceAuthenticationRequest, func(message spi.Message) bool {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ if tunnelingRequest == nil ||
+ tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+ return false
+ }
+ lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+ if lDataInd == nil {
+ return false
+ }
+ dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+ if dataFrameExt == nil {
+ return false
+ }
+ apduDataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+ if apduDataContainer == nil {
+ return false
+ }
+ apduDataOther := driverModel.CastApduDataOther(apduDataContainer.GetDataApdu())
+ if apduDataOther == nil {
+ return false
+ }
+ apduAuthorizeResponse := driverModel.CastApduDataExtAuthorizeResponse(apduDataOther.GetExtendedApdu())
+ if apduAuthorizeResponse == nil {
+ return false
+ }
+ curTargetAddress := ByteArrayToKnxAddress(dataFrameExt.GetDestinationAddress())
+ // Check if the addresses match
+ if curTargetAddress != m.ClientKnxAddress {
+ return false
+ }
+ if dataFrameExt.GetSourceAddress() != targetAddress {
+ return false
+ }
+ // Check if the counter matches
+ if dataFrameExt.GetApdu().GetCounter() != counter {
+ return false
+ }
+ return true
+ }, func(message spi.Message) error {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+ dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+ apduDataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+ apduDataOther := driverModel.CastApduDataOther(apduDataContainer.GetDataApdu())
+ apduAuthorizeResponse := driverModel.CastApduDataExtAuthorizeResponse(apduDataOther.GetExtendedApdu())
+
+ // Acknowledge the receipt
+ _ = m.sendDeviceAck(ctx, targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
+ // If the error flag is set, there was an error authenticating
+ if lDataInd.GetDataFrame().GetErrorFlag() {
+ errorResult <- errors.New("error authenticating at device: " + KnxAddressToString(targetAddress))
+ } else if err != nil {
+ errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
+ } else {
+ result <- apduAuthorizeResponse
}
- errorResult <- errors.Wrap(err, "got error processing request")
- return nil
- },
- m.defaultTtl)
+ })
+
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
+ errorResult <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.defaultTtl)
if err != nil {
return nil, errors.Wrap(err, "got error sending request")
@@ -643,7 +605,7 @@ func (m *Connection) sendDeviceAuthentication(targetAddress driverModel.KnxAddre
}
}
-func (m *Connection) sendDeviceDeviceDescriptorReadRequest(targetAddress driverModel.KnxAddress) (driverModel.ApduDataDeviceDescriptorResponse, error) {
+func (m *Connection) sendDeviceDeviceDescriptorReadRequest(ctx context.Context, targetAddress driverModel.KnxAddress) (driverModel.ApduDataDeviceDescriptorResponse, error) {
// Next, read the device descriptor so we know how we have to communicate with the device.
counter := m.getNextCounter(targetAddress)
deviceDescriptorReadRequest := driverModel.NewTunnelingRequest(
@@ -673,70 +635,65 @@ func (m *Connection) sendDeviceDeviceDescriptorReadRequest(targetAddress driverM
result := make(chan driverModel.ApduDataDeviceDescriptorResponse)
errorResult := make(chan error)
- err := m.messageCodec.SendRequest(
- deviceDescriptorReadRequest,
- func(message spi.Message) bool {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- if tunnelingRequest == nil ||
- tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
- return false
- }
- lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
- if lDataInd == nil {
- return false
- }
- dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
- if dataFrameExt == nil {
- return false
- }
- // Check if the address matches
- if dataFrameExt.GetSourceAddress() != targetAddress {
- return false
- }
- // Check if the counter matches
- if dataFrameExt.GetApdu().GetCounter() != counter {
- return false
- }
- dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
- if dataContainer == nil {
- return false
- }
- deviceDescriptorResponse := driverModel.CastApduDataDeviceDescriptorResponse(dataContainer.GetDataApdu())
- if deviceDescriptorResponse == nil {
- return false
- }
- return true
- },
- func(message spi.Message) error {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
- dataFrame := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
- dataContainer := driverModel.CastApduDataContainer(dataFrame.GetApdu())
- deviceDescriptorResponse := driverModel.CastApduDataDeviceDescriptorResponse(dataContainer.GetDataApdu())
-
- // Acknowledge the receipt
- _ = m.sendDeviceAck(targetAddress, dataFrame.GetApdu().GetCounter(), func(err error) {
- // If the error flag is set, there was an error authenticating
- if lDataInd.GetDataFrame().GetErrorFlag() {
- errorResult <- errors.New("error reading device descriptor from device: " + KnxAddressToString(targetAddress))
- } else if err != nil {
- errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
- } else {
- result <- deviceDescriptorResponse
- }
- })
-
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- m.handleTimeout()
+ err := m.messageCodec.SendRequest(ctx, deviceDescriptorReadRequest, func(message spi.Message) bool {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ if tunnelingRequest == nil ||
+ tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+ return false
+ }
+ lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+ if lDataInd == nil {
+ return false
+ }
+ dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+ if dataFrameExt == nil {
+ return false
+ }
+ // Check if the address matches
+ if dataFrameExt.GetSourceAddress() != targetAddress {
+ return false
+ }
+ // Check if the counter matches
+ if dataFrameExt.GetApdu().GetCounter() != counter {
+ return false
+ }
+ dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+ if dataContainer == nil {
+ return false
+ }
+ deviceDescriptorResponse := driverModel.CastApduDataDeviceDescriptorResponse(dataContainer.GetDataApdu())
+ if deviceDescriptorResponse == nil {
+ return false
+ }
+ return true
+ }, func(message spi.Message) error {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+ dataFrame := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+ dataContainer := driverModel.CastApduDataContainer(dataFrame.GetApdu())
+ deviceDescriptorResponse := driverModel.CastApduDataDeviceDescriptorResponse(dataContainer.GetDataApdu())
+
+ // Acknowledge the receipt
+ _ = m.sendDeviceAck(ctx, targetAddress, dataFrame.GetApdu().GetCounter(), func(err error) {
+ // If the error flag is set, there was an error authenticating
+ if lDataInd.GetDataFrame().GetErrorFlag() {
+ errorResult <- errors.New("error reading device descriptor from device: " + KnxAddressToString(targetAddress))
+ } else if err != nil {
+ errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
+ } else {
+ result <- deviceDescriptorResponse
}
- errorResult <- errors.Wrap(err, "got error processing request")
- return nil
- },
- m.defaultTtl)
+ })
+
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
+ errorResult <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.defaultTtl)
if err != nil {
return nil, errors.Wrap(err, "got error sending device descriptor read request")
@@ -750,7 +707,7 @@ func (m *Connection) sendDeviceDeviceDescriptorReadRequest(targetAddress driverM
}
}
-func (m *Connection) sendDevicePropertyReadRequest(targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8, propertyIndex uint16, numElements uint8) (driverModel.ApduDataExtPropertyValueResponse, error) {
+func (m *Connection) sendDevicePropertyReadRequest(ctx context.Context, targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8, propertyIndex uint16, numElements uint8) (driverModel.ApduDataExtPropertyValueResponse, error) {
// Next, read the device descriptor so we know how we have to communicate with the device.
// Send the property read request and wait for a confirmation that this property is readable.
counter := m.getNextCounter(targetAddress)
@@ -787,75 +744,70 @@ func (m *Connection) sendDevicePropertyReadRequest(targetAddress driverModel.Knx
result := make(chan driverModel.ApduDataExtPropertyValueResponse)
errorResult := make(chan error)
- err := m.messageCodec.SendRequest(
- propertyReadRequest,
- func(message spi.Message) bool {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- if tunnelingRequest == nil ||
- tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
- return false
- }
- lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
- if lDataInd == nil {
- return false
- }
- dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
- if dataFrameExt == nil {
- return false
- }
- // Check if the address matches
- if dataFrameExt.GetSourceAddress() != targetAddress {
- return false
- }
- // Check if the counter matches
- if dataFrameExt.GetApdu().GetCounter() != counter {
- return false
- }
- dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
- if dataContainer == nil {
- return false
- }
- dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
- if dataApduOther == nil {
- return false
- }
- propertyValueResponse := driverModel.CastApduDataExtPropertyValueResponse(dataApduOther.GetExtendedApdu())
- if propertyValueResponse == nil {
- return false
- }
- return propertyValueResponse.GetObjectIndex() == objectId && propertyValueResponse.GetPropertyId() == propertyId
- },
- func(message spi.Message) error {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
- dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
- dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
- dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
- propertyValueResponse := driverModel.CastApduDataExtPropertyValueResponse(dataApduOther.GetExtendedApdu())
-
- // Acknowledge the receipt
- _ = m.sendDeviceAck(targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
- // If the error flag is set, there was an error authenticating
- if lDataInd.GetDataFrame().GetErrorFlag() {
- errorResult <- errors.New("error reading property value from device: " + KnxAddressToString(targetAddress))
- } else if err != nil {
- errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
- } else {
- result <- propertyValueResponse
- }
- })
-
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- m.handleTimeout()
+ err := m.messageCodec.SendRequest(ctx, propertyReadRequest, func(message spi.Message) bool {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ if tunnelingRequest == nil ||
+ tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+ return false
+ }
+ lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+ if lDataInd == nil {
+ return false
+ }
+ dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+ if dataFrameExt == nil {
+ return false
+ }
+ // Check if the address matches
+ if dataFrameExt.GetSourceAddress() != targetAddress {
+ return false
+ }
+ // Check if the counter matches
+ if dataFrameExt.GetApdu().GetCounter() != counter {
+ return false
+ }
+ dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+ if dataContainer == nil {
+ return false
+ }
+ dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
+ if dataApduOther == nil {
+ return false
+ }
+ propertyValueResponse := driverModel.CastApduDataExtPropertyValueResponse(dataApduOther.GetExtendedApdu())
+ if propertyValueResponse == nil {
+ return false
+ }
+ return propertyValueResponse.GetObjectIndex() == objectId && propertyValueResponse.GetPropertyId() == propertyId
+ }, func(message spi.Message) error {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+ dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+ dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+ dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
+ propertyValueResponse := driverModel.CastApduDataExtPropertyValueResponse(dataApduOther.GetExtendedApdu())
+
+ // Acknowledge the receipt
+ _ = m.sendDeviceAck(ctx, targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
+ // If the error flag is set, there was an error authenticating
+ if lDataInd.GetDataFrame().GetErrorFlag() {
+ errorResult <- errors.New("error reading property value from device: " + KnxAddressToString(targetAddress))
+ } else if err != nil {
+ errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
+ } else {
+ result <- propertyValueResponse
}
- errorResult <- errors.Wrap(err, "got error processing request")
- return nil
- },
- m.defaultTtl)
+ })
+
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
+ errorResult <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.defaultTtl)
if err != nil {
return nil, errors.Wrap(err, "got error sending device property read request")
@@ -869,7 +821,7 @@ func (m *Connection) sendDevicePropertyReadRequest(targetAddress driverModel.Knx
}
}
-func (m *Connection) sendDevicePropertyDescriptionReadRequest(targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8) (driverModel.ApduDataExtPropertyDescriptionResponse, error) {
+func (m *Connection) sendDevicePropertyDescriptionReadRequest(ctx context.Context, targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8) (driverModel.ApduDataExtPropertyDescriptionResponse, error) {
// Next, read the device descriptor so we know how we have to communicate with the device.
// Send the property read request and wait for a confirmation that this property is readable.
counter := m.getNextCounter(targetAddress)
@@ -906,75 +858,70 @@ func (m *Connection) sendDevicePropertyDescriptionReadRequest(targetAddress driv
result := make(chan driverModel.ApduDataExtPropertyDescriptionResponse)
errorResult := make(chan error)
- err := m.messageCodec.SendRequest(
- propertyReadRequest,
- func(message spi.Message) bool {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- if tunnelingRequest == nil ||
- tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
- return false
- }
- lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
- if lDataInd == nil {
- return false
- }
- dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
- if dataFrameExt == nil {
- return false
- }
- // Check if the address matches
- if dataFrameExt.GetSourceAddress() != targetAddress {
- return false
- }
- // Check if the counter matches
- if dataFrameExt.GetApdu().GetCounter() != counter {
- return false
- }
- dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
- if dataContainer == nil {
- return false
- }
- dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
- if dataApduOther == nil {
- return false
- }
- propertyDescriptionResponse := driverModel.CastApduDataExtPropertyDescriptionResponse(dataApduOther.GetExtendedApdu())
- if propertyDescriptionResponse == nil {
- return false
- }
- return propertyDescriptionResponse.GetObjectIndex() == objectId && propertyDescriptionResponse.GetPropertyId() == propertyId
- },
- func(message spi.Message) error {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
- dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
- dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
- dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
- propertyDescriptionResponse := driverModel.CastApduDataExtPropertyDescriptionResponse(dataApduOther.GetExtendedApdu())
-
- // Acknowledge the receipt
- _ = m.sendDeviceAck(targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
- // If the error flag is set, there was an error authenticating
- if lDataInd.GetDataFrame().GetErrorFlag() {
- errorResult <- errors.Errorf("error reading property description from device: %s", KnxAddressToString(targetAddress))
- } else if err != nil {
- errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
- } else {
- result <- propertyDescriptionResponse
- }
- })
-
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- m.handleTimeout()
+ err := m.messageCodec.SendRequest(ctx, propertyReadRequest, func(message spi.Message) bool {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ if tunnelingRequest == nil ||
+ tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+ return false
+ }
+ lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+ if lDataInd == nil {
+ return false
+ }
+ dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+ if dataFrameExt == nil {
+ return false
+ }
+ // Check if the address matches
+ if dataFrameExt.GetSourceAddress() != targetAddress {
+ return false
+ }
+ // Check if the counter matches
+ if dataFrameExt.GetApdu().GetCounter() != counter {
+ return false
+ }
+ dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+ if dataContainer == nil {
+ return false
+ }
+ dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
+ if dataApduOther == nil {
+ return false
+ }
+ propertyDescriptionResponse := driverModel.CastApduDataExtPropertyDescriptionResponse(dataApduOther.GetExtendedApdu())
+ if propertyDescriptionResponse == nil {
+ return false
+ }
+ return propertyDescriptionResponse.GetObjectIndex() == objectId && propertyDescriptionResponse.GetPropertyId() == propertyId
+ }, func(message spi.Message) error {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+ dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+ dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+ dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
+ propertyDescriptionResponse := driverModel.CastApduDataExtPropertyDescriptionResponse(dataApduOther.GetExtendedApdu())
+
+ // Acknowledge the receipt
+ _ = m.sendDeviceAck(ctx, targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
+ // If the error flag is set, there was an error authenticating
+ if lDataInd.GetDataFrame().GetErrorFlag() {
+ errorResult <- errors.Errorf("error reading property description from device: %s", KnxAddressToString(targetAddress))
+ } else if err != nil {
+ errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
+ } else {
+ result <- propertyDescriptionResponse
}
- errorResult <- errors.Wrapf(err, "got error processing request")
- return nil
- },
- m.defaultTtl)
+ })
+
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
+ errorResult <- errors.Wrapf(err, "got error processing request")
+ return nil
+ }, m.defaultTtl)
if err != nil {
return nil, errors.Wrap(err, "got error sending property description read request")
@@ -988,7 +935,7 @@ func (m *Connection) sendDevicePropertyDescriptionReadRequest(targetAddress driv
}
}
-func (m *Connection) sendDeviceMemoryReadRequest(targetAddress driverModel.KnxAddress, address uint16, numBytes uint8) (driverModel.ApduDataMemoryResponse, error) {
+func (m *Connection) sendDeviceMemoryReadRequest(ctx context.Context, targetAddress driverModel.KnxAddress, address uint16, numBytes uint8) (driverModel.ApduDataMemoryResponse, error) {
// Next, read the device descriptor so we know how we have to communicate with the device.
counter := m.getNextCounter(targetAddress)
@@ -1023,71 +970,66 @@ func (m *Connection) sendDeviceMemoryReadRequest(targetAddress driverModel.KnxAd
result := make(chan driverModel.ApduDataMemoryResponse)
errorResult := make(chan error)
- err := m.messageCodec.SendRequest(
- propertyReadRequest,
- func(message spi.Message) bool {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- if tunnelingRequest == nil ||
- tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
- return false
- }
- lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
- if lDataInd == nil {
- return false
- }
- dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
- if dataFrameExt == nil {
- return false
- }
- dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
- if dataContainer == nil {
- return false
- }
- dataApduMemoryResponse := driverModel.CastApduDataMemoryResponse(dataContainer.GetDataApdu())
- if dataApduMemoryResponse == nil {
- return false
- }
+ err := m.messageCodec.SendRequest(ctx, propertyReadRequest, func(message spi.Message) bool {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ if tunnelingRequest == nil ||
+ tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+ return false
+ }
+ lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+ if lDataInd == nil {
+ return false
+ }
+ dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+ if dataFrameExt == nil {
+ return false
+ }
+ dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+ if dataContainer == nil {
+ return false
+ }
+ dataApduMemoryResponse := driverModel.CastApduDataMemoryResponse(dataContainer.GetDataApdu())
+ if dataApduMemoryResponse == nil {
+ return false
+ }
- // Check if the address matches
- if dataFrameExt.GetSourceAddress() != targetAddress {
- return false
- }
- // Check if the counter matches
- if dataFrameExt.GetApdu().GetCounter() != counter {
- return false
- }
- return dataApduMemoryResponse.GetAddress() == address
- },
- func(message spi.Message) error {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
- dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
- dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
- dataApduMemoryResponse := driverModel.CastApduDataMemoryResponse(dataContainer.GetDataApdu())
-
- // Acknowledge the receipt
- _ = m.sendDeviceAck(targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
- // If the error flag is set, there was an error authenticating
- if lDataInd.GetDataFrame().GetErrorFlag() {
- errorResult <- errors.Errorf("error reading memory from device: %s", KnxAddressToString(targetAddress))
- } else if err != nil {
- errorResult <- errors.Errorf("error sending ack to device: %s", KnxAddressToString(targetAddress))
- } else {
- result <- dataApduMemoryResponse
- }
- })
-
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- m.handleTimeout()
+ // Check if the address matches
+ if dataFrameExt.GetSourceAddress() != targetAddress {
+ return false
+ }
+ // Check if the counter matches
+ if dataFrameExt.GetApdu().GetCounter() != counter {
+ return false
+ }
+ return dataApduMemoryResponse.GetAddress() == address
+ }, func(message spi.Message) error {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+ dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+ dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+ dataApduMemoryResponse := driverModel.CastApduDataMemoryResponse(dataContainer.GetDataApdu())
+
+ // Acknowledge the receipt
+ _ = m.sendDeviceAck(ctx, targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
+ // If the error flag is set, there was an error authenticating
+ if lDataInd.GetDataFrame().GetErrorFlag() {
+ errorResult <- errors.Errorf("error reading memory from device: %s", KnxAddressToString(targetAddress))
+ } else if err != nil {
+ errorResult <- errors.Errorf("error sending ack to device: %s", KnxAddressToString(targetAddress))
+ } else {
+ result <- dataApduMemoryResponse
}
- errorResult <- errors.Wrap(err, "got error processing request")
- return nil
- },
- m.defaultTtl)
+ })
+
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
+ errorResult <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.defaultTtl)
if err != nil {
return nil, errors.Wrap(err, "got error sending memory read request")
@@ -1101,7 +1043,7 @@ func (m *Connection) sendDeviceMemoryReadRequest(targetAddress driverModel.KnxAd
}
}
-func (m *Connection) sendDeviceAck(targetAddress driverModel.KnxAddress, counter uint8, callback func(err error)) error {
+func (m *Connection) sendDeviceAck(ctx context.Context, targetAddress driverModel.KnxAddress, counter uint8, callback func(err error)) error {
ack := driverModel.NewTunnelingRequest(
driverModel.NewTunnelingRequestDataBlock(m.CommunicationChannelId, m.getNewSequenceCounter()),
driverModel.NewLDataReq(
@@ -1124,57 +1066,52 @@ func (m *Connection) sendDeviceAck(targetAddress driverModel.KnxAddress, counter
0,
)
- err := m.messageCodec.SendRequest(
- ack,
- func(message spi.Message) bool {
- tunnelingRequest := driverModel.CastTunnelingRequest(message)
- if tunnelingRequest == nil ||
- tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
- return false
- }
- lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
- if lDataCon == nil {
- return false
- }
- dataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
- if dataFrameExt == nil {
- return false
- }
- // Check if the addresses match
- if dataFrameExt.GetSourceAddress() != m.ClientKnxAddress {
- return false
- }
- curTargetAddress := ByteArrayToKnxAddress(dataFrameExt.GetDestinationAddress())
- if curTargetAddress != targetAddress {
- return false
- }
- // Check if the counter matches
- if dataFrameExt.GetApdu().GetCounter() != counter {
- return false
- }
- controlContainer := driverModel.CastApduControlContainer(dataFrameExt.GetApdu())
- if controlContainer == nil {
- return false
- }
- dataApduAck := driverModel.CastApduControlAck(controlContainer.GetControlApdu())
- if dataApduAck == nil {
- return false
- }
- return true
- },
- func(message spi.Message) error {
- callback(nil)
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- m.handleTimeout()
- }
- callback(errors.Wrap(err, "got error processing request"))
- return nil
- },
- m.defaultTtl)
+ err := m.messageCodec.SendRequest(ctx, ack, func(message spi.Message) bool {
+ tunnelingRequest := driverModel.CastTunnelingRequest(message)
+ if tunnelingRequest == nil ||
+ tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+ return false
+ }
+ lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
+ if lDataCon == nil {
+ return false
+ }
+ dataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
+ if dataFrameExt == nil {
+ return false
+ }
+ // Check if the addresses match
+ if dataFrameExt.GetSourceAddress() != m.ClientKnxAddress {
+ return false
+ }
+ curTargetAddress := ByteArrayToKnxAddress(dataFrameExt.GetDestinationAddress())
+ if curTargetAddress != targetAddress {
+ return false
+ }
+ // Check if the counter matches
+ if dataFrameExt.GetApdu().GetCounter() != counter {
+ return false
+ }
+ controlContainer := driverModel.CastApduControlContainer(dataFrameExt.GetApdu())
+ if controlContainer == nil {
+ return false
+ }
+ dataApduAck := driverModel.CastApduControlAck(controlContainer.GetControlApdu())
+ if dataApduAck == nil {
+ return false
+ }
+ return true
+ }, func(message spi.Message) error {
+ callback(nil)
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ m.handleTimeout()
+ }
+ callback(errors.Wrap(err, "got error processing request"))
+ return nil
+ }, m.defaultTtl)
if err != nil {
return errors.Wrap(err, "got error sending ack request")
diff --git a/plc4go/internal/knxnetip/Reader.go b/plc4go/internal/knxnetip/Reader.go
index c1983d414..159739f06 100644
--- a/plc4go/internal/knxnetip/Reader.go
+++ b/plc4go/internal/knxnetip/Reader.go
@@ -108,7 +108,7 @@ func (m Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <
propertyField := field.(DevicePropertyAddressPlcField)
timeout := time.NewTimer(m.connection.defaultTtl)
- results := m.connection.DeviceReadProperty(deviceAddress, propertyField.ObjectId, propertyField.PropertyId, propertyField.PropertyIndex, propertyField.NumElements)
+ results := m.connection.DeviceReadProperty(ctx, deviceAddress, propertyField.ObjectId, propertyField.PropertyId, propertyField.PropertyIndex, propertyField.NumElements)
select {
case result := <-results:
if !timeout.Stop() {
@@ -129,7 +129,7 @@ func (m Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <
case DeviceMemoryAddressPlcField:
timeout := time.NewTimer(m.connection.defaultTtl)
memoryField := field.(DeviceMemoryAddressPlcField)
- results := m.connection.DeviceReadMemory(deviceAddress, memoryField.Address, memoryField.NumElements, memoryField.FieldType)
+ results := m.connection.DeviceReadMemory(ctx, deviceAddress, memoryField.Address, memoryField.NumElements, memoryField.FieldType)
select {
case result := <-results:
if !timeout.Stop() {
@@ -153,7 +153,7 @@ func (m Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <
// Get the group address values from the cache
for fieldName, field := range groupAddresses {
- responseCode, plcValue := m.readGroupAddress(field)
+ responseCode, plcValue := m.readGroupAddress(ctx, field)
responseCodes[fieldName] = responseCode
plcValues[fieldName] = plcValue
}
@@ -169,7 +169,7 @@ func (m Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <
return resultChan
}
-func (m Reader) readGroupAddress(field GroupAddressField) (apiModel.PlcResponseCode, apiValues.PlcValue) {
+func (m Reader) readGroupAddress(ctx context.Context, field GroupAddressField) (apiModel.PlcResponseCode, apiValues.PlcValue) {
rawAddresses, err := m.resolveAddresses(field)
if err != nil {
return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
@@ -193,7 +193,7 @@ func (m Reader) readGroupAddress(field GroupAddressField) (apiModel.PlcResponseC
// Otherwise respond with values from the cache.
if !ok {
addr := []byte{byte(numericAddress >> 8), byte(numericAddress & 0xFF)}
- rrc := m.connection.ReadGroupAddress(addr, field.GetFieldType())
+ rrc := m.connection.ReadGroupAddress(ctx, addr, field.GetFieldType())
select {
case readResult := <-rrc:
if readResult.value != nil {
diff --git a/plc4go/internal/modbus/Connection.go b/plc4go/internal/modbus/Connection.go
index 5ecdd6079..bd1772dc3 100644
--- a/plc4go/internal/modbus/Connection.go
+++ b/plc4go/internal/modbus/Connection.go
@@ -20,6 +20,7 @@
package modbus
import (
+ "context"
"fmt"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -90,35 +91,32 @@ func (m *Connection) GetMessageCodec() spi.MessageCodec {
}
func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
+ // TODO: use proper context
+ ctx := context.TODO()
log.Trace().Msg("Pinging")
result := make(chan plc4go.PlcConnectionPingResult)
go func() {
diagnosticRequestPdu := readWriteModel.NewModbusPDUDiagnosticRequest(0, 0x42)
pingRequest := readWriteModel.NewModbusTcpADU(1, m.unitIdentifier, diagnosticRequestPdu, false)
- if err := m.messageCodec.SendRequest(
- pingRequest,
- func(message spi.Message) bool {
- responseAdu := readWriteModel.CastModbusTcpADU(message)
- return responseAdu.GetTransactionIdentifier() == 1 && responseAdu.GetUnitIdentifier() == m.unitIdentifier
- },
- func(message spi.Message) error {
- log.Trace().Msgf("Received Message")
- if message != nil {
- // If we got a valid response (even if it will probably contain an error, we know the remote is available)
- log.Trace().Msg("got valid response")
- result <- _default.NewDefaultPlcConnectionPingResult(nil)
- } else {
- log.Trace().Msg("got no response")
- result <- _default.NewDefaultPlcConnectionPingResult(errors.New("no response"))
- }
- return nil
- },
- func(err error) error {
- log.Trace().Msgf("Received Error")
- result <- _default.NewDefaultPlcConnectionPingResult(errors.Wrap(err, "got error processing request"))
- return nil
- },
- time.Second*1); err != nil {
+ if err := m.messageCodec.SendRequest(ctx, pingRequest, func(message spi.Message) bool {
+ responseAdu := readWriteModel.CastModbusTcpADU(message)
+ return responseAdu.GetTransactionIdentifier() == 1 && responseAdu.GetUnitIdentifier() == m.unitIdentifier
+ }, func(message spi.Message) error {
+ log.Trace().Msgf("Received Message")
+ if message != nil {
+ // If we got a valid response (even if it will probably contain an error, we know the remote is available)
+ log.Trace().Msg("got valid response")
+ result <- _default.NewDefaultPlcConnectionPingResult(nil)
+ } else {
+ log.Trace().Msg("got no response")
+ result <- _default.NewDefaultPlcConnectionPingResult(errors.New("no response"))
+ }
+ return nil
+ }, func(err error) error {
+ log.Trace().Msgf("Received Error")
+ result <- _default.NewDefaultPlcConnectionPingResult(errors.Wrap(err, "got error processing request"))
+ return nil
+ }, time.Second*1); err != nil {
result <- _default.NewDefaultPlcConnectionPingResult(err)
}
}()
diff --git a/plc4go/internal/modbus/Reader.go b/plc4go/internal/modbus/Reader.go
index d85d1ed4e..219849e6c 100644
--- a/plc4go/internal/modbus/Reader.go
+++ b/plc4go/internal/modbus/Reader.go
@@ -118,43 +118,38 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
// Send the ADU over the wire
log.Trace().Msg("Send ADU")
- if err = m.messageCodec.SendRequest(
- requestAdu,
- func(message spi.Message) bool {
- responseAdu := message.(readWriteModel.ModbusTcpADU)
- return responseAdu.GetTransactionIdentifier() == uint16(transactionIdentifier) &&
- responseAdu.GetUnitIdentifier() == requestAdu.UnitIdentifier
- },
- func(message spi.Message) error {
- // Convert the response into an ADU
- log.Trace().Msg("convert response to ADU")
- responseAdu := message.(readWriteModel.ModbusTcpADU)
- // Convert the modbus response into a PLC4X response
- log.Trace().Msg("convert response to PLC4X response")
- readResponse, err := m.ToPlc4xReadResponse(responseAdu, readRequest)
+ if err = m.messageCodec.SendRequest(ctx, requestAdu, func(message spi.Message) bool {
+ responseAdu := message.(readWriteModel.ModbusTcpADU)
+ return responseAdu.GetTransactionIdentifier() == uint16(transactionIdentifier) &&
+ responseAdu.GetUnitIdentifier() == requestAdu.UnitIdentifier
+ }, func(message spi.Message) error {
+ // Convert the response into an ADU
+ log.Trace().Msg("convert response to ADU")
+ responseAdu := message.(readWriteModel.ModbusTcpADU)
+ // Convert the modbus response into a PLC4X response
+ log.Trace().Msg("convert response to PLC4X response")
+ readResponse, err := m.ToPlc4xReadResponse(responseAdu, readRequest)
- if err != nil {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Err: errors.Wrap(err, "Error decoding response"),
- }
- // TODO: should we return the error here?
- return nil
- }
- result <- &plc4goModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Response: readResponse,
- }
- return nil
- },
- func(err error) error {
+ if err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
- Err: errors.Wrap(err, "got timeout while waiting for response"),
+ Err: errors.Wrap(err, "Error decoding response"),
}
+ // TODO: should we return the error here?
return nil
- },
- time.Second*1); err != nil {
+ }
+ result <- &plc4goModel.DefaultPlcReadRequestResult{
+ Request: readRequest,
+ Response: readResponse,
+ }
+ return nil
+ }, func(err error) error {
+ result <- &plc4goModel.DefaultPlcReadRequestResult{
+ Request: readRequest,
+ Err: errors.Wrap(err, "got timeout while waiting for response"),
+ }
+ return nil
+ }, time.Second*1); err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: nil,
diff --git a/plc4go/internal/modbus/Writer.go b/plc4go/internal/modbus/Writer.go
index d0ff99d9d..3d949d238 100644
--- a/plc4go/internal/modbus/Writer.go
+++ b/plc4go/internal/modbus/Writer.go
@@ -129,40 +129,35 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
requestAdu := readWriteModel.NewModbusTcpADU(uint16(transactionIdentifier), m.unitIdentifier, pdu, false)
// Send the ADU over the wire
- err = m.messageCodec.SendRequest(
- requestAdu,
- func(message spi.Message) bool {
- responseAdu := message.(readWriteModel.ModbusTcpADU)
- return responseAdu.GetTransactionIdentifier() == uint16(transactionIdentifier) &&
- responseAdu.GetUnitIdentifier() == requestAdu.UnitIdentifier
- },
- func(message spi.Message) error {
- // Convert the response into an ADU
- responseAdu := message.(readWriteModel.ModbusTcpADU)
- // Convert the modbus response into a PLC4X response
- readResponse, err := m.ToPlc4xWriteResponse(requestAdu, responseAdu, writeRequest)
-
- if err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
- Request: writeRequest,
- Err: errors.Wrap(err, "Error decoding response"),
- }
- } else {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
- Request: writeRequest,
- Response: readResponse,
- }
- }
- return nil
- },
- func(err error) error {
+ err = m.messageCodec.SendRequest(ctx, requestAdu, func(message spi.Message) bool {
+ responseAdu := message.(readWriteModel.ModbusTcpADU)
+ return responseAdu.GetTransactionIdentifier() == uint16(transactionIdentifier) &&
+ responseAdu.GetUnitIdentifier() == requestAdu.UnitIdentifier
+ }, func(message spi.Message) error {
+ // Convert the response into an ADU
+ responseAdu := message.(readWriteModel.ModbusTcpADU)
+ // Convert the modbus response into a PLC4X response
+ readResponse, err := m.ToPlc4xWriteResponse(requestAdu, responseAdu, writeRequest)
+
+ if err != nil {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
- Err: errors.New("got timeout while waiting for response"),
+ Err: errors.Wrap(err, "Error decoding response"),
+ }
+ } else {
+ result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ Request: writeRequest,
+ Response: readResponse,
}
- return nil
- },
- time.Second*1)
+ }
+ return nil
+ }, func(err error) error {
+ result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ Request: writeRequest,
+ Err: errors.New("got timeout while waiting for response"),
+ }
+ return nil
+ }, time.Second*1)
}()
return result
}
diff --git a/plc4go/internal/s7/Connection.go b/plc4go/internal/s7/Connection.go
index fd3c94396..a68206819 100644
--- a/plc4go/internal/s7/Connection.go
+++ b/plc4go/internal/s7/Connection.go
@@ -20,6 +20,7 @@
package s7
import (
+ "context"
"fmt"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -105,6 +106,8 @@ func (m *Connection) GetMessageCodec() spi.MessageCodec {
}
func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
+ // TODO: use proper context
+ ctx := context.TODO()
log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
@@ -122,7 +125,7 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
// For testing purposes we can skip the waiting for a complete connection
if !m.driverContext.awaitSetupComplete {
- go m.setupConnection(ch)
+ go m.setupConnection(ctx, ch)
log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
// Here we write directly and don't wait till the connection is "really" connected
// Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete
@@ -134,43 +137,37 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
// Only the TCP transport supports login.
log.Info().Msg("S7 Driver running in ACTIVE mode.")
- m.setupConnection(ch)
+ m.setupConnection(ctx, ch)
}()
return ch
}
-func (m *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult) {
+func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
log.Debug().Msg("Sending COTP Connection Request")
// Open the session on ISO Transport Protocol first.
cotpConnectionResult := make(chan readWriteModel.COTPPacketConnectionResponse)
cotpConnectionErrorChan := make(chan error)
- if err := m.messageCodec.SendRequest(
- readWriteModel.NewTPKTPacket(m.createCOTPConnectionRequest()),
- func(message spi.Message) bool {
- tpktPacket := message.(readWriteModel.TPKTPacket)
- if tpktPacket == nil {
- return false
- }
- cotpPacketConnectionResponse := tpktPacket.GetPayload().(readWriteModel.COTPPacketConnectionResponse)
- return cotpPacketConnectionResponse != nil
- },
- func(message spi.Message) error {
- tpktPacket := message.(readWriteModel.TPKTPacket)
- cotpPacketConnectionResponse := tpktPacket.GetPayload().(readWriteModel.COTPPacketConnectionResponse)
- cotpConnectionResult <- cotpPacketConnectionResponse
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
- }
- cotpConnectionErrorChan <- errors.Wrap(err, "got error processing request")
- return nil
- },
- m.GetTtl(),
- ); err != nil {
+ if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewTPKTPacket(m.createCOTPConnectionRequest()), func(message spi.Message) bool {
+ tpktPacket := message.(readWriteModel.TPKTPacket)
+ if tpktPacket == nil {
+ return false
+ }
+ cotpPacketConnectionResponse := tpktPacket.GetPayload().(readWriteModel.COTPPacketConnectionResponse)
+ return cotpPacketConnectionResponse != nil
+ }, func(message spi.Message) error {
+ tpktPacket := message.(readWriteModel.TPKTPacket)
+ cotpPacketConnectionResponse := tpktPacket.GetPayload().(readWriteModel.COTPPacketConnectionResponse)
+ cotpConnectionResult <- cotpPacketConnectionResponse
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.Close()
+ }
+ cotpConnectionErrorChan <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.GetTtl()); err != nil {
m.fireConnectionError(errors.Wrap(err, "Error during sending of COTP Connection Request"), ch)
}
select {
@@ -181,43 +178,37 @@ func (m *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
// Send an S7 login message.
s7ConnectionResult := make(chan readWriteModel.S7ParameterSetupCommunication)
s7ConnectionErrorChan := make(chan error)
- if err := m.messageCodec.SendRequest(
- m.createS7ConnectionRequest(cotpPacketConnectionResponse),
- func(message spi.Message) bool {
- tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
- if !ok {
- return false
- }
- cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
- if !ok {
- return false
- }
- messageResponseData, ok := cotpPacketData.GetPayload().(readWriteModel.S7MessageResponseDataExactly)
- if !ok {
- return false
- }
- _, ok = messageResponseData.GetParameter().(readWriteModel.S7ParameterSetupCommunicationExactly)
- return ok
- },
- func(message spi.Message) error {
- tpktPacket := message.(readWriteModel.TPKTPacket)
- cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
- messageResponseData := cotpPacketData.GetPayload().(readWriteModel.S7MessageResponseData)
- setupCommunication := messageResponseData.GetParameter().(readWriteModel.S7ParameterSetupCommunication)
- s7ConnectionResult <- setupCommunication
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
- }
- s7ConnectionErrorChan <- errors.Wrap(err, "got error processing request")
- return nil
- },
- m.GetTtl(),
- ); err != nil {
+ if err := m.messageCodec.SendRequest(ctx, m.createS7ConnectionRequest(cotpPacketConnectionResponse), func(message spi.Message) bool {
+ tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
+ if !ok {
+ return false
+ }
+ cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
+ if !ok {
+ return false
+ }
+ messageResponseData, ok := cotpPacketData.GetPayload().(readWriteModel.S7MessageResponseDataExactly)
+ if !ok {
+ return false
+ }
+ _, ok = messageResponseData.GetParameter().(readWriteModel.S7ParameterSetupCommunicationExactly)
+ return ok
+ }, func(message spi.Message) error {
+ tpktPacket := message.(readWriteModel.TPKTPacket)
+ cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
+ messageResponseData := cotpPacketData.GetPayload().(readWriteModel.S7MessageResponseData)
+ setupCommunication := messageResponseData.GetParameter().(readWriteModel.S7ParameterSetupCommunication)
+ s7ConnectionResult <- setupCommunication
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.Close()
+ }
+ s7ConnectionErrorChan <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.GetTtl()); err != nil {
m.fireConnectionError(errors.Wrap(err, "Error during sending of S7 Connection Request"), ch)
}
select {
@@ -248,42 +239,36 @@ func (m *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
log.Debug().Msg("Sending S7 Identification Request")
s7IdentificationResult := make(chan readWriteModel.S7PayloadUserData)
s7IdentificationErrorChan := make(chan error)
- if err := m.messageCodec.SendRequest(
- m.createIdentifyRemoteMessage(),
- func(message spi.Message) bool {
- tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
- if !ok {
- return false
- }
- cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
- if !ok {
- return false
- }
- messageUserData, ok := cotpPacketData.GetPayload().(readWriteModel.S7MessageUserDataExactly)
- if !ok {
- return false
- }
- _, ok = messageUserData.GetPayload().(readWriteModel.S7PayloadUserDataExactly)
- return ok
- },
- func(message spi.Message) error {
- tpktPacket := message.(readWriteModel.TPKTPacket)
- cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
- messageUserData := cotpPacketData.GetPayload().(readWriteModel.S7MessageUserData)
- s7IdentificationResult <- messageUserData.GetPayload().(readWriteModel.S7PayloadUserData)
- return nil
- },
- func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
- log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
- }
- s7IdentificationErrorChan <- errors.Wrap(err, "got error processing request")
- return nil
- },
- m.GetTtl(),
- ); err != nil {
+ if err := m.messageCodec.SendRequest(ctx, m.createIdentifyRemoteMessage(), func(message spi.Message) bool {
+ tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
+ if !ok {
+ return false
+ }
+ cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
+ if !ok {
+ return false
+ }
+ messageUserData, ok := cotpPacketData.GetPayload().(readWriteModel.S7MessageUserDataExactly)
+ if !ok {
+ return false
+ }
+ _, ok = messageUserData.GetPayload().(readWriteModel.S7PayloadUserDataExactly)
+ return ok
+ }, func(message spi.Message) error {
+ tpktPacket := message.(readWriteModel.TPKTPacket)
+ cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
+ messageUserData := cotpPacketData.GetPayload().(readWriteModel.S7MessageUserData)
+ s7IdentificationResult <- messageUserData.GetPayload().(readWriteModel.S7PayloadUserData)
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+ log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.Close()
+ }
+ s7IdentificationErrorChan <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.GetTtl()); err != nil {
m.fireConnectionError(errors.Wrap(err, "Error during sending of identify remote Request"), ch)
}
select {
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index 68094ce47..5f10bd074 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -99,54 +99,49 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
// Send the over the wire
log.Trace().Msg("Send ")
- if err := m.messageCodec.SendRequest(
- tpktPacket,
- func(message spi.Message) bool {
- tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
- if !ok {
- return false
- }
- cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
- if !ok {
- return false
- }
- payload := cotpPacketData.GetPayload()
- if payload == nil {
- return false
- }
- return payload.GetTpduReference() == tpduId
- },
- func(message spi.Message) error {
- // Convert the response into an
- log.Trace().Msg("convert response to ")
- tpktPacket := message.(readWriteModel.TPKTPacket)
- cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
- payload := cotpPacketData.GetPayload()
- // Convert the s7 response into a PLC4X response
- log.Trace().Msg("convert response to PLC4X response")
- readResponse, err := m.ToPlc4xReadResponse(payload, readRequest)
+ if err := m.messageCodec.SendRequest(ctx, tpktPacket, func(message spi.Message) bool {
+ tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
+ if !ok {
+ return false
+ }
+ cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
+ if !ok {
+ return false
+ }
+ payload := cotpPacketData.GetPayload()
+ if payload == nil {
+ return false
+ }
+ return payload.GetTpduReference() == tpduId
+ }, func(message spi.Message) error {
+ // Convert the response into an
+ log.Trace().Msg("convert response to ")
+ tpktPacket := message.(readWriteModel.TPKTPacket)
+ cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
+ payload := cotpPacketData.GetPayload()
+ // Convert the s7 response into a PLC4X response
+ log.Trace().Msg("convert response to PLC4X response")
+ readResponse, err := m.ToPlc4xReadResponse(payload, readRequest)
- if err != nil {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Err: errors.Wrap(err, "Error decoding response"),
- }
- return transaction.EndRequest()
- }
- result <- &plc4goModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Response: readResponse,
- }
- return transaction.EndRequest()
- },
- func(err error) error {
+ if err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
- Err: errors.Wrap(err, "got timeout while waiting for response"),
+ Err: errors.Wrap(err, "Error decoding response"),
}
return transaction.EndRequest()
- },
- time.Second*1); err != nil {
+ }
+ result <- &plc4goModel.DefaultPlcReadRequestResult{
+ Request: readRequest,
+ Response: readResponse,
+ }
+ return transaction.EndRequest()
+ }, func(err error) error {
+ result <- &plc4goModel.DefaultPlcReadRequestResult{
+ Request: readRequest,
+ Err: errors.Wrap(err, "got timeout while waiting for response"),
+ }
+ return transaction.EndRequest()
+ }, time.Second*1); err != nil {
result <- &plc4goModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: nil,
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index f86369b1e..40a69b73b 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -102,54 +102,49 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
transaction := m.tm.StartTransaction()
transaction.Submit(func() {
// Send the over the wire
- if err := m.messageCodec.SendRequest(
- tpktPacket,
- func(message spi.Message) bool {
- tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
- if !ok {
- return false
- }
- cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
- if !ok {
- return false
- }
- payload := cotpPacketData.GetPayload()
- if payload == nil {
- return false
- }
- return payload.GetTpduReference() == tpduId
- },
- func(message spi.Message) error {
- // Convert the response into an
- log.Trace().Msg("convert response to ")
- tpktPacket := message.(readWriteModel.TPKTPacket)
- cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
- payload := cotpPacketData.GetPayload()
- // Convert the s7 response into a PLC4X response
- log.Trace().Msg("convert response to PLC4X response")
- readResponse, err := m.ToPlc4xWriteResponse(payload, writeRequest)
-
- if err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
- Request: writeRequest,
- Err: errors.Wrap(err, "Error decoding response"),
- }
- return transaction.EndRequest()
- }
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
- Request: writeRequest,
- Response: readResponse,
- }
- return transaction.EndRequest()
- },
- func(err error) error {
+ if err := m.messageCodec.SendRequest(ctx, tpktPacket, func(message spi.Message) bool {
+ tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
+ if !ok {
+ return false
+ }
+ cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
+ if !ok {
+ return false
+ }
+ payload := cotpPacketData.GetPayload()
+ if payload == nil {
+ return false
+ }
+ return payload.GetTpduReference() == tpduId
+ }, func(message spi.Message) error {
+ // Convert the response into an
+ log.Trace().Msg("convert response to ")
+ tpktPacket := message.(readWriteModel.TPKTPacket)
+ cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
+ payload := cotpPacketData.GetPayload()
+ // Convert the s7 response into a PLC4X response
+ log.Trace().Msg("convert response to PLC4X response")
+ readResponse, err := m.ToPlc4xWriteResponse(payload, writeRequest)
+
+ if err != nil {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
- Err: errors.New("got timeout while waiting for response"),
+ Err: errors.Wrap(err, "Error decoding response"),
}
return transaction.EndRequest()
- },
- time.Second*1); err != nil {
+ }
+ result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ Request: writeRequest,
+ Response: readResponse,
+ }
+ return transaction.EndRequest()
+ }, func(err error) error {
+ result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ Request: writeRequest,
+ Err: errors.New("got timeout while waiting for response"),
+ }
+ return transaction.EndRequest()
+ }, time.Second*1); err != nil {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: nil,
diff --git a/plc4go/spi/MessageCodec.go b/plc4go/spi/MessageCodec.go
index 55319625c..f8a537b11 100644
--- a/plc4go/spi/MessageCodec.go
+++ b/plc4go/spi/MessageCodec.go
@@ -20,16 +20,18 @@
package spi
import (
+ "context"
"fmt"
"time"
)
type Expectation interface {
+ fmt.Stringer
+ GetContext() context.Context
GetExpiration() time.Time
GetAcceptsMessage() AcceptsMessage
GetHandleMessage() HandleMessage
GetHandleError() HandleError
- fmt.Stringer
}
// AcceptsMessage If this function returns true, the message is forwarded to the message handler
@@ -53,9 +55,9 @@ type MessageCodec interface {
Send(message Message) error
// Expect Wait for a given timespan for a message to come in, which returns 'true' for 'acceptMessage'
// and is then forwarded to the 'handleMessage' function
- Expect(acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError, ttl time.Duration) error
+ Expect(ctx context.Context, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError, ttl time.Duration) error
// SendRequest A combination that sends a message first and then waits for a response
- SendRequest(message Message, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError, ttl time.Duration) error
+ SendRequest(ctx context.Context, message Message, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError, ttl time.Duration) error
// GetDefaultIncomingMessageChannel gives back the chan where unexpected messages arrive
GetDefaultIncomingMessageChannel() chan Message
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 8c4d967fe..6fee263e8 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -20,6 +20,7 @@
package _default
import (
+ "context"
"fmt"
"runtime/debug"
"time"
@@ -54,6 +55,7 @@ func NewDefaultCodec(requirements DefaultCodecRequirements, transportInstance tr
}
type DefaultExpectation struct {
+ Context context.Context
Expiration time.Time
AcceptsMessage spi.AcceptsMessage
HandleMessage spi.HandleMessage
@@ -111,6 +113,10 @@ func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transp
///////////////////////////////////////
///////////////////////////////////////
+func (m *DefaultExpectation) GetContext() context.Context {
+ return m.Context
+}
+
func (m *DefaultExpectation) GetExpiration() time.Time {
return m.Expiration
}
@@ -167,8 +173,9 @@ func (m *defaultCodec) IsRunning() bool {
return m.running
}
-func (m *defaultCodec) Expect(acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+func (m *defaultCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
expectation := &DefaultExpectation{
+ Context: ctx,
Expiration: time.Now().Add(ttl),
AcceptsMessage: acceptsMessage,
HandleMessage: handleMessage,
@@ -178,14 +185,17 @@ func (m *defaultCodec) Expect(acceptsMessage spi.AcceptsMessage, handleMessage s
return nil
}
-func (m *defaultCodec) SendRequest(message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+func (m *defaultCodec) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+ if err := ctx.Err(); err != nil {
+ return errors.Wrap(err, "Not sending message as context is aborted")
+ }
log.Trace().Msg("Sending request")
// Send the actual message
err := m.Send(message)
if err != nil {
return errors.Wrap(err, "Error sending the request")
}
- return m.Expect(acceptsMessage, handleMessage, handleError, ttl)
+ return m.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl)
}
func (m *defaultCodec) TimeoutExpectations(now time.Time) {
@@ -195,11 +205,18 @@ func (m *defaultCodec) TimeoutExpectations(now time.Time) {
// Remove this expectation from the list.
m.expectations = append(m.expectations[:index], m.expectations[index+1:]...)
// Call the error handler.
- // TODO: decouple from worker thread
- err := expectation.GetHandleError()(plcerrors.NewTimeoutError(now.Sub(expectation.GetExpiration())))
- if err != nil {
- log.Error().Err(err).Msg("Got an error handling error on expectation")
- }
+ go func() {
+ if err := expectation.GetHandleError()(plcerrors.NewTimeoutError(now.Sub(expectation.GetExpiration()))); err != nil {
+ log.Error().Err(err).Msg("Got an error handling error on expectation")
+ }
+ }()
+ }
+ if err := expectation.GetContext().Err(); err != nil {
+ go func() {
+ if err := expectation.GetHandleError()(err); err != nil {
+ log.Error().Err(err).Msg("Got an error handling error on expectation")
+ }
+ }()
}
}
}