You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/05 09:37:43 UTC
[plc4x] branch develop updated: fix(plc4go/epi): matches should work on exact basis
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 442960c2fe fix(plc4go/epi): matches should work on exact basis
442960c2fe is described below
commit 442960c2fe3e09a259c42c42371b7bd47b12cfee
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 5 11:37:35 2023 +0200
fix(plc4go/epi): matches should work on exact basis
---
plc4go/internal/eip/Connection.go | 329 ++++++++++++++++++++++----------------
plc4go/internal/eip/Reader.go | 12 +-
plc4go/internal/eip/Writer.go | 24 +--
3 files changed, 212 insertions(+), 153 deletions(-)
diff --git a/plc4go/internal/eip/Connection.go b/plc4go/internal/eip/Connection.go
index ece56dcff5..b6beeb6005 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -160,13 +160,19 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
}
}()
m.log.Debug().Msg("Sending UnregisterSession EIP Packet")
- _ = m.messageCodec.SendRequest(ctx, readWriteModel.NewEipDisconnectRequest(m.sessionHandle, 0, []byte(DefaultSenderContext), 0), func(message spi.Message) bool {
- return true
- }, func(message spi.Message) error {
- return nil
- }, func(err error) error {
- return nil
- }, m.GetTtl()) //Unregister gets no response
+ _ = m.messageCodec.SendRequest(
+ ctx,
+ readWriteModel.NewEipDisconnectRequest(m.sessionHandle, 0, []byte(DefaultSenderContext), 0), func(message spi.Message) bool {
+ return true
+ },
+ func(message spi.Message) error {
+ return nil
+ },
+ func(err error) error {
+ return nil
+ },
+ m.GetTtl(),
+ ) //Unregister gets no response
m.log.Debug().Msgf("Unregistred Session %d", m.sessionHandle)
}()
return result
@@ -200,31 +206,42 @@ func (m *Connection) listServiceRequest(ctx context.Context, ch chan plc4go.PlcC
m.log.Debug().Msg("Sending ListServices Request")
listServicesResultChan := make(chan readWriteModel.ListServicesResponse, 1)
listServicesResultErrorChan := make(chan error, 1)
- if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewListServicesRequest(EmptySessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), uint32(0)), func(message spi.Message) bool {
- eipPacket := message.(readWriteModel.EipPacket)
- if eipPacket == nil {
- return false
- }
- eipPacketListServicesResponse := eipPacket.(readWriteModel.ListServicesResponse)
- return eipPacketListServicesResponse != nil
- }, func(message spi.Message) error {
- listServicesResponse := message.(readWriteModel.ListServicesResponse)
- serviceResponse := listServicesResponse.GetTypeIds()[0].(readWriteModel.ServicesResponse)
- if serviceResponse.GetSupportsCIPEncapsulation() {
- m.log.Debug().Msg("Device is capable of CIP over EIP encapsulation")
- }
- m.cipEncapsulationAvailable = serviceResponse.GetSupportsCIPEncapsulation()
- listServicesResultChan <- listServicesResponse
- return nil
- }, func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
- }
- listServicesResultErrorChan <- errors.Wrap(err, "got error processing request")
- return nil
- }, m.GetTtl()); err != nil {
+ if err := m.messageCodec.SendRequest(
+ ctx,
+ readWriteModel.NewListServicesRequest(
+ EmptySessionHandle,
+ uint32(readWriteModel.CIPStatus_Success),
+ []byte(DefaultSenderContext),
+ uint32(0),
+ ),
+ func(message spi.Message) bool {
+ eipPacket := message.(readWriteModel.EipPacketExactly)
+ if eipPacket == nil {
+ return false
+ }
+ eipPacketListServicesResponse := eipPacket.(readWriteModel.ListServicesResponseExactly)
+ return eipPacketListServicesResponse != nil
+ },
+ func(message spi.Message) error {
+ listServicesResponse := message.(readWriteModel.ListServicesResponse)
+ serviceResponse := listServicesResponse.GetTypeIds()[0].(readWriteModel.ServicesResponse)
+ if serviceResponse.GetSupportsCIPEncapsulation() {
+ m.log.Debug().Msg("Device is capable of CIP over EIP encapsulation")
+ }
+ m.cipEncapsulationAvailable = serviceResponse.GetSupportsCIPEncapsulation()
+ listServicesResultChan <- listServicesResponse
+ return nil
+ },
+ func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(utils.TimeoutError); isTimeout {
+ m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.Close()
+ }
+ listServicesResultErrorChan <- errors.Wrap(err, "got error processing request")
+ return nil
+ },
+ m.GetTtl()); err != nil {
m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
}
@@ -244,80 +261,98 @@ func (m *Connection) connectRegisterSession(ctx context.Context, ch chan plc4go.
m.log.Debug().Msg("Sending EipConnectionRequest")
connectionResponseChan := make(chan readWriteModel.EipConnectionResponse, 1)
connectionResponseErrorChan := make(chan error, 1)
- if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewEipConnectionRequest(EmptySessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), uint32(0)), func(message spi.Message) bool {
- eipPacket := message.(readWriteModel.EipPacket)
- return eipPacket != nil
- }, func(message spi.Message) error {
- eipPacket := message.(readWriteModel.EipPacket)
- connectionResponse := eipPacket.(readWriteModel.EipConnectionResponse)
- if connectionResponse != nil {
- if connectionResponse.GetStatus() == 0 {
- m.sessionHandle = connectionResponse.GetSessionHandle()
- m.senderContext = connectionResponse.GetSenderContext()
- m.log.Debug().Msgf("Got assigned with Session %d", m.sessionHandle)
- connectionResponseChan <- connectionResponse
+ if err := m.messageCodec.SendRequest(
+ ctx,
+ readWriteModel.NewEipConnectionRequest(
+ EmptySessionHandle,
+ uint32(readWriteModel.CIPStatus_Success),
+ []byte(DefaultSenderContext),
+ uint32(0),
+ ),
+ func(message spi.Message) bool {
+ eipPacket := message.(readWriteModel.EipPacketExactly)
+ return eipPacket != nil
+ },
+ func(message spi.Message) error {
+ eipPacket := message.(readWriteModel.EipPacket)
+ connectionResponse := eipPacket.(readWriteModel.EipConnectionResponse)
+ if connectionResponse != nil {
+ if connectionResponse.GetStatus() == 0 {
+ m.sessionHandle = connectionResponse.GetSessionHandle()
+ m.senderContext = connectionResponse.GetSenderContext()
+ m.log.Debug().Msgf("Got assigned with Session %d", m.sessionHandle)
+ connectionResponseChan <- connectionResponse
+ } else {
+ m.log.Error().Msgf("Got unsuccessful status for connection request: %d", connectionResponse.GetStatus())
+ connectionResponseErrorChan <- errors.New("got unsuccessful connection response")
+ }
} else {
- m.log.Error().Msgf("Got unsuccessful status for connection request: %d", connectionResponse.GetStatus())
- connectionResponseErrorChan <- errors.New("got unsuccessful connection response")
+ // TODO: This seems pretty hard-coded ... possibly find out if we can't simplify this.
+ classSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(0, 6))
+ instanceSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(0, 1))
+ exchange := readWriteModel.NewUnConnectedDataItem(
+ readWriteModel.NewCipConnectionManagerRequest(classSegment, instanceSegment, 0, 10,
+ 14, 536870914, 33944, m.connectionSerialNumber,
+ 4919, 42, 3, 2101812,
+ readWriteModel.NewNetworkConnectionParameters(4002, false, 2, 0, true),
+ 2113537,
+ readWriteModel.NewNetworkConnectionParameters(4002, false, 2, 0, true),
+ readWriteModel.NewTransportType(true, 2, 3),
+ m.connectionPathSize, m.routingAddress, 1))
+ typeIds := []readWriteModel.TypeId{readWriteModel.NewNullAddressItem(), exchange}
+ eipWrapper := readWriteModel.NewCipRRData(m.sessionHandle, 0, typeIds,
+ m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), m.senderContext, 0)
+ if err := m.messageCodec.SendRequest(
+ ctx,
+ eipWrapper,
+ func(message spi.Message) bool {
+ eipPacket := message.(readWriteModel.EipPacketExactly)
+ if eipPacket == nil {
+ return false
+ }
+ cipRRData := eipPacket.(readWriteModel.CipRRDataExactly)
+ return cipRRData != nil
+ },
+ func(message spi.Message) error {
+ cipRRData := message.(readWriteModel.CipRRData)
+ if cipRRData.GetStatus() == 0 {
+ unconnectedDataItem := cipRRData.GetTypeIds()[1].(readWriteModel.UnConnectedDataItem)
+ connectionManagerResponse := unconnectedDataItem.GetService().(readWriteModel.CipConnectionManagerResponse)
+ m.connectionId = connectionManagerResponse.GetOtConnectionId()
+ m.log.Debug().Msgf("Got assigned with connection if %d", m.connectionId)
+ connectionResponseChan <- connectionResponse
+ } else {
+ connectionResponseErrorChan <- fmt.Errorf("got status code while opening Connection manager: %d", cipRRData.GetStatus())
+ }
+ return nil
+ },
+ func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(utils.TimeoutError); isTimeout {
+ m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.Close()
+ }
+ connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
+ return nil
+ },
+ m.GetTtl(),
+ ); err != nil {
+ m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
+ }
}
- } else {
- // TODO: This seems pretty hard-coded ... possibly find out if we can't simplify this.
- classSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(0, 6))
- instanceSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(0, 1))
- exchange := readWriteModel.NewUnConnectedDataItem(
- readWriteModel.NewCipConnectionManagerRequest(classSegment, instanceSegment, 0, 10,
- 14, 536870914, 33944, m.connectionSerialNumber,
- 4919, 42, 3, 2101812,
- readWriteModel.NewNetworkConnectionParameters(4002, false, 2, 0, true),
- 2113537,
- readWriteModel.NewNetworkConnectionParameters(4002, false, 2, 0, true),
- readWriteModel.NewTransportType(true, 2, 3),
- m.connectionPathSize, m.routingAddress, 1))
- typeIds := []readWriteModel.TypeId{readWriteModel.NewNullAddressItem(), exchange}
- eipWrapper := readWriteModel.NewCipRRData(m.sessionHandle, 0, typeIds,
- m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), m.senderContext, 0)
- if err := m.messageCodec.SendRequest(ctx, eipWrapper,
- func(message spi.Message) bool {
- eipPacket := message.(readWriteModel.EipPacket)
- if eipPacket == nil {
- return false
- }
- cipRRData := eipPacket.(readWriteModel.CipRRData)
- return cipRRData != nil
- }, func(message spi.Message) error {
- cipRRData := message.(readWriteModel.CipRRData)
- if cipRRData.GetStatus() == 0 {
- unconnectedDataItem := cipRRData.GetTypeIds()[1].(readWriteModel.UnConnectedDataItem)
- connectionManagerResponse := unconnectedDataItem.GetService().(readWriteModel.CipConnectionManagerResponse)
- m.connectionId = connectionManagerResponse.GetOtConnectionId()
- m.log.Debug().Msgf("Got assigned with connection if %d", m.connectionId)
- connectionResponseChan <- connectionResponse
- } else {
- connectionResponseErrorChan <- fmt.Errorf("got status code while opening Connection manager: %d", cipRRData.GetStatus())
- }
- return nil
- }, func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
- }
- connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
- return nil
- }, m.GetTtl()); err != nil {
- m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
+ return nil
+ },
+ func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(utils.TimeoutError); isTimeout {
+ m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.Close()
}
- }
- return nil
- }, func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
- }
- connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
- return nil
- }, m.GetTtl()); err != nil {
+ connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
+ return nil
+ },
+ m.GetTtl(),
+ ); err != nil {
m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
}
timeout := time.NewTimer(1 * time.Second)
@@ -338,47 +373,61 @@ func (m *Connection) listAllAttributes(ctx context.Context, ch chan plc4go.PlcCo
listAllAttributesErrorChan := make(chan error, 1)
classSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(uint8(0), uint8(2)))
instanceSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewInstanceID(uint8(0), uint8(1)))
- if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewCipRRData(EmptyInterfaceHandle, 0,
- []readWriteModel.TypeId{
- readWriteModel.NewNullAddressItem(),
- readWriteModel.NewUnConnectedDataItem(
- readWriteModel.NewGetAttributeAllRequest(
- classSegment, instanceSegment, uint16(0))),
- }, m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), m.senderContext, 0), func(message spi.Message) bool {
- eipPacket := message.(readWriteModel.CipRRData)
- return eipPacket != nil
- }, func(message spi.Message) error {
- cipRrData := message.(readWriteModel.CipRRData)
- if cipRrData.GetStatus() == uint32(readWriteModel.CIPStatus_Success) {
- dataItem := cipRrData.GetTypeIds()[1].(readWriteModel.UnConnectedDataItem)
- response := dataItem.GetService().(readWriteModel.GetAttributeAllResponse)
- if response.GetStatus() != uint8(readWriteModel.CIPStatus_Success) {
- // TODO: Return an error ...
- } else if response.GetAttributes() != nil {
- for _, classId := range response.GetAttributes().GetClassId() {
- if curCipClassId, ok := readWriteModel.CIPClassIDByValue(classId); ok {
- switch curCipClassId {
- case readWriteModel.CIPClassID_MessageRouter:
- m.useMessageRouter = true
- case readWriteModel.CIPClassID_ConnectionManager:
- m.useConnectionManager = true
+ if err := m.messageCodec.SendRequest(
+ ctx,
+ readWriteModel.NewCipRRData(
+ EmptyInterfaceHandle,
+ 0,
+ []readWriteModel.TypeId{
+ readWriteModel.NewNullAddressItem(),
+ readWriteModel.NewUnConnectedDataItem(
+ readWriteModel.NewGetAttributeAllRequest(
+ classSegment, instanceSegment, uint16(0))),
+ },
+ m.sessionHandle,
+ uint32(readWriteModel.CIPStatus_Success),
+ m.senderContext,
+ 0,
+ ),
+ func(message spi.Message) bool {
+ eipPacket := message.(readWriteModel.CipRRDataExactly)
+ return eipPacket != nil
+ },
+ func(message spi.Message) error {
+ cipRrData := message.(readWriteModel.CipRRData)
+ if cipRrData.GetStatus() == uint32(readWriteModel.CIPStatus_Success) {
+ dataItem := cipRrData.GetTypeIds()[1].(readWriteModel.UnConnectedDataItem)
+ response := dataItem.GetService().(readWriteModel.GetAttributeAllResponse)
+ if response.GetStatus() != uint8(readWriteModel.CIPStatus_Success) {
+ // TODO: Return an error ...
+ } else if response.GetAttributes() != nil {
+ for _, classId := range response.GetAttributes().GetClassId() {
+ if curCipClassId, ok := readWriteModel.CIPClassIDByValue(classId); ok {
+ switch curCipClassId {
+ case readWriteModel.CIPClassID_MessageRouter:
+ m.useMessageRouter = true
+ case readWriteModel.CIPClassID_ConnectionManager:
+ m.useConnectionManager = true
+ }
}
}
}
+ m.log.Debug().Msgf("Connection using message router %t, using connection manager %t", m.useMessageRouter, m.useConnectionManager)
+ listAllAttributesResponseChan <- response
}
- m.log.Debug().Msgf("Connection using message router %t, using connection manager %t", m.useMessageRouter, m.useConnectionManager)
- listAllAttributesResponseChan <- response
- }
- return nil
- }, func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
- }
- m.fireConnectionError(errors.Wrap(err, "got error processing request"), ch)
- return nil
- }, m.GetTtl()); err != nil {
+ return nil
+ },
+ func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(utils.TimeoutError); isTimeout {
+ m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.Close()
+ }
+ m.fireConnectionError(errors.Wrap(err, "got error processing request"), ch)
+ return nil
+ },
+ m.GetTtl(),
+ ); err != nil {
m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
}
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index c2013b86c8..f0ebcf1cd8 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -95,13 +95,15 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
request := readWriteModel.NewCipRRData(0, 0, typeIds, *m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), 0)
transaction := m.tm.StartTransaction()
transaction.Submit(func(transaction transactions.RequestTransaction) {
- if err := m.messageCodec.SendRequest(ctx, request,
+ if err := m.messageCodec.SendRequest(
+ ctx,
+ request,
func(message spi.Message) bool {
- eipPacket := message.(readWriteModel.EipPacket)
+ eipPacket := message.(readWriteModel.EipPacketExactly)
if eipPacket == nil {
return false
}
- cipRRData := eipPacket.(readWriteModel.CipRRData)
+ cipRRData := eipPacket.(readWriteModel.CipRRDataExactly)
if cipRRData == nil {
return false
}
@@ -135,7 +137,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
errors.Wrap(err, "got timeout while waiting for response"),
)
return transaction.EndRequest()
- }, time.Second*1); err != nil {
+ },
+ time.Second*1,
+ ); err != nil {
result <- spiModel.NewDefaultPlcReadRequestResult(
readRequest,
nil,
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index bd8c19235e..6ebb162028 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -126,11 +126,11 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
transaction.Submit(func(transaction transactions.RequestTransaction) {
// Send the over the wire
if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
- eipPacket := message.(readWriteModel.EipPacket)
+ eipPacket := message.(readWriteModel.EipPacketExactly)
if eipPacket == nil {
return false
}
- cipRRData := eipPacket.(readWriteModel.CipRRData)
+ cipRRData := eipPacket.(readWriteModel.CipRRDataExactly)
if cipRRData == nil {
return false
}
@@ -215,19 +215,22 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
transaction := m.tm.StartTransaction()
transaction.Submit(func(transaction transactions.RequestTransaction) {
// Send the over the wire
- if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
- eipPacket := message.(readWriteModel.EipPacket)
+ if err := m.messageCodec.SendRequest(
+ ctx,
+ pkt,
+ func(message spi.Message) bool {
+ eipPacket := message.(readWriteModel.EipPacketExactly)
if eipPacket == nil {
return false
}
- cipRRData := eipPacket.(readWriteModel.CipRRData)
+ cipRRData := eipPacket.(readWriteModel.CipRRDataExactly)
if cipRRData == nil {
return false
}
if eipPacket.GetSessionHandle() != *m.sessionHandle {
return false
}
- multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
+ multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponseExactly)
if multipleServiceResponse == nil {
return false
}
@@ -235,7 +238,8 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
return false
}
return true
- }, func(message spi.Message) error {
+ },
+ func(message spi.Message) error {
// Convert the response into an
m.log.Trace().Msg("convert response to ")
eipPacket := message.(readWriteModel.EipPacket)
@@ -257,13 +261,15 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
Response: readResponse,
}
return transaction.EndRequest()
- }, func(err error) error {
+ },
+ func(err error) error {
result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Err: errors.New("got timeout while waiting for response"),
}
return transaction.EndRequest()
- }, time.Second*1); err != nil {
+ },
+ time.Second*1); err != nil {
result <- spiModel.NewDefaultPlcWriteRequestResult( writeRequest, nil, errors.Wrap(err, "error sending message"))
if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
m.log.Debug().Err(err).Msg("Error failing request")