You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/05 08:36:18 UTC
[plc4x] 02/02: refactor(plc4go/eip): cleanup connection code
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 39448aaf5cd1827255787d381c2b2580cee6f28a
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 5 10:36:06 2023 +0200
refactor(plc4go/eip): cleanup connection code
---
plc4go/internal/eip/Connection.go | 319 ++++++++++++++++++++----------------
plc4go/internal/eip/MessageCodec.go | 7 +-
2 files changed, 178 insertions(+), 148 deletions(-)
diff --git a/plc4go/internal/eip/Connection.go b/plc4go/internal/eip/Connection.go
index fbb85e2a14..2d854beea6 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/plc4x/plc4go/spi/tracer"
"github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/rs/zerolog"
+ "time"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -172,9 +173,30 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
}
func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // List Services Request
+ if err := m.listServiceRequest(ctx, ch); err != nil {
+ m.fireConnectionError(errors.Wrap(err, "error listing service request"), ch)
+ return
+ }
+
+ if err := m.connectRegisterSession(ctx, ch); err != nil {
+ m.fireConnectionError(errors.Wrap(err, "error connect register session"), ch)
+ return
+ }
+
+ if err := m.listAllAttributes(ctx, ch); err != nil {
+ m.fireConnectionError(errors.Wrap(err, "error list all attributes"), ch)
+ return
+ }
+
+ if m.useConnectionManager {
+ // TODO: Continue here ....
+ } else {
+ // Send an event that connection setup is complete.
+ m.fireConnected(ch)
+ }
+}
+func (m *Connection) listServiceRequest(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) error {
m.log.Debug().Msg("Sending ListServices Request")
listServicesResultChan := make(chan readWriteModel.ListServicesResponse)
listServicesResultErrorChan := make(chan error)
@@ -206,162 +228,169 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
}
+ timeout := time.NewTimer(1 * time.Second)
+ defer utils.CleanupTimer(timeout)
select {
+ case <-timeout.C:
+ return errors.New("timeout")
case err := <-listServicesResultErrorChan:
- m.fireConnectionError(errors.Wrap(err, "Error receiving of ListServices response"), ch)
+ return errors.Wrap(err, "Error receiving of ListServices response")
case _ = <-listServicesResultChan:
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Connect Register Session
-
- m.log.Debug().Msg("Sending EipConnectionRequest")
- connectionResponseChan := make(chan readWriteModel.EipConnectionResponse)
- connectionResponseErrorChan := make(chan error)
- if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewEipConnectionRequest(EmptySessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), uint32(0)), func(message spi.Message) bool {
- eipPacket := message.(readWriteModel.EipPacket)
- return eipPacket != nil
- }, func(message spi.Message) error {
- eipPacket := message.(readWriteModel.EipPacket)
- connectionResponse := eipPacket.(readWriteModel.EipConnectionResponse)
- if connectionResponse != nil {
- if connectionResponse.GetStatus() == 0 {
- m.sessionHandle = connectionResponse.GetSessionHandle()
- m.senderContext = connectionResponse.GetSenderContext()
- m.log.Debug().Msgf("Got assigned with Session %d", m.sessionHandle)
- connectionResponseChan <- connectionResponse
- } else {
- m.log.Error().Msgf("Got unsuccessful status for connection request: %d", connectionResponse.GetStatus())
- connectionResponseErrorChan <- errors.New("got unsuccessful connection response")
- }
+ return nil
+ }
+}
+
+func (m *Connection) connectRegisterSession(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) error {
+ m.log.Debug().Msg("Sending EipConnectionRequest")
+ connectionResponseChan := make(chan readWriteModel.EipConnectionResponse)
+ connectionResponseErrorChan := make(chan error)
+ if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewEipConnectionRequest(EmptySessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), uint32(0)), func(message spi.Message) bool {
+ eipPacket := message.(readWriteModel.EipPacket)
+ return eipPacket != nil
+ }, func(message spi.Message) error {
+ eipPacket := message.(readWriteModel.EipPacket)
+ connectionResponse := eipPacket.(readWriteModel.EipConnectionResponse)
+ if connectionResponse != nil {
+ if connectionResponse.GetStatus() == 0 {
+ m.sessionHandle = connectionResponse.GetSessionHandle()
+ m.senderContext = connectionResponse.GetSenderContext()
+ m.log.Debug().Msgf("Got assigned with Session %d", m.sessionHandle)
+ connectionResponseChan <- connectionResponse
} else {
- // TODO: This seems pretty hard-coded ... possibly find out if we can't simplify this.
- classSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(0, 6))
- instanceSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(0, 1))
- exchange := readWriteModel.NewUnConnectedDataItem(
- readWriteModel.NewCipConnectionManagerRequest(classSegment, instanceSegment, 0, 10,
- 14, 536870914, 33944, m.connectionSerialNumber,
- 4919, 42, 3, 2101812,
- readWriteModel.NewNetworkConnectionParameters(4002, false, 2, 0, true),
- 2113537,
- readWriteModel.NewNetworkConnectionParameters(4002, false, 2, 0, true),
- readWriteModel.NewTransportType(true, 2, 3),
- m.connectionPathSize, m.routingAddress, 1))
- typeIds := []readWriteModel.TypeId{readWriteModel.NewNullAddressItem(), exchange}
- eipWrapper := readWriteModel.NewCipRRData(m.sessionHandle, 0, typeIds,
- m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), m.senderContext, 0)
- if err := m.messageCodec.SendRequest(ctx, eipWrapper,
- func(message spi.Message) bool {
- eipPacket := message.(readWriteModel.EipPacket)
- if eipPacket == nil {
- return false
- }
- cipRRData := eipPacket.(readWriteModel.CipRRData)
- return cipRRData != nil
- }, func(message spi.Message) error {
- cipRRData := message.(readWriteModel.CipRRData)
- if cipRRData.GetStatus() == 0 {
- unconnectedDataItem := cipRRData.GetTypeIds()[1].(readWriteModel.UnConnectedDataItem)
- connectionManagerResponse := unconnectedDataItem.GetService().(readWriteModel.CipConnectionManagerResponse)
- m.connectionId = connectionManagerResponse.GetOtConnectionId()
- m.log.Debug().Msgf("Got assigned with connection if %d", m.connectionId)
- connectionResponseChan <- connectionResponse
- } else {
- connectionResponseErrorChan <- fmt.Errorf("got status code while opening Connection manager: %d", cipRRData.GetStatus())
- }
- return nil
- }, func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
- }
- connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
- return nil
- }, m.GetTtl()); err != nil {
- m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
- }
+ m.log.Error().Msgf("Got unsuccessful status for connection request: %d", connectionResponse.GetStatus())
+ connectionResponseErrorChan <- errors.New("got unsuccessful connection response")
}
- return nil
- }, func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
+ } else {
+ // TODO: This seems pretty hard-coded ... possibly find out if we can't simplify this.
+ classSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(0, 6))
+ instanceSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(0, 1))
+ exchange := readWriteModel.NewUnConnectedDataItem(
+ readWriteModel.NewCipConnectionManagerRequest(classSegment, instanceSegment, 0, 10,
+ 14, 536870914, 33944, m.connectionSerialNumber,
+ 4919, 42, 3, 2101812,
+ readWriteModel.NewNetworkConnectionParameters(4002, false, 2, 0, true),
+ 2113537,
+ readWriteModel.NewNetworkConnectionParameters(4002, false, 2, 0, true),
+ readWriteModel.NewTransportType(true, 2, 3),
+ m.connectionPathSize, m.routingAddress, 1))
+ typeIds := []readWriteModel.TypeId{readWriteModel.NewNullAddressItem(), exchange}
+ eipWrapper := readWriteModel.NewCipRRData(m.sessionHandle, 0, typeIds,
+ m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), m.senderContext, 0)
+ if err := m.messageCodec.SendRequest(ctx, eipWrapper,
+ func(message spi.Message) bool {
+ eipPacket := message.(readWriteModel.EipPacket)
+ if eipPacket == nil {
+ return false
+ }
+ cipRRData := eipPacket.(readWriteModel.CipRRData)
+ return cipRRData != nil
+ }, func(message spi.Message) error {
+ cipRRData := message.(readWriteModel.CipRRData)
+ if cipRRData.GetStatus() == 0 {
+ unconnectedDataItem := cipRRData.GetTypeIds()[1].(readWriteModel.UnConnectedDataItem)
+ connectionManagerResponse := unconnectedDataItem.GetService().(readWriteModel.CipConnectionManagerResponse)
+ m.connectionId = connectionManagerResponse.GetOtConnectionId()
+ m.log.Debug().Msgf("Got assigned with connection if %d", m.connectionId)
+ connectionResponseChan <- connectionResponse
+ } else {
+ connectionResponseErrorChan <- fmt.Errorf("got status code while opening Connection manager: %d", cipRRData.GetStatus())
+ }
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(utils.TimeoutError); isTimeout {
+ m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.Close()
+ }
+ connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.GetTtl()); err != nil {
+ m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
}
- connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
- return nil
- }, m.GetTtl()); err != nil {
- m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
}
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(utils.TimeoutError); isTimeout {
+ m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.Close()
+ }
+ connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
+ return nil
+ }, m.GetTtl()); err != nil {
+ m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
+ }
+ timeout := time.NewTimer(1 * time.Second)
+ defer utils.CleanupTimer(timeout)
+ select {
+ case <-timeout.C:
+ return errors.New("timeout")
+ case err := <-connectionResponseErrorChan:
+ return errors.Wrap(err, "Error receiving of ListServices response")
+ case _ = <-connectionResponseChan:
+ return nil
+ }
+}
- select {
- case err := <-connectionResponseErrorChan:
- m.fireConnectionError(errors.Wrap(err, "Error receiving of ListServices response"), ch)
- case _ = <-connectionResponseChan:
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // List All Attributes
-
- m.log.Debug().Msg("Sending ListAllAttributes Request")
- listAllAttributesResponseChan := make(chan readWriteModel.GetAttributeAllResponse)
- listAllAttributesErrorChan := make(chan error)
- classSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(uint8(0), uint8(2)))
- instanceSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewInstanceID(uint8(0), uint8(1)))
- if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewCipRRData(EmptyInterfaceHandle, 0,
- []readWriteModel.TypeId{
- readWriteModel.NewNullAddressItem(),
- readWriteModel.NewUnConnectedDataItem(
- readWriteModel.NewGetAttributeAllRequest(
- classSegment, instanceSegment, uint16(0))),
- }, m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), m.senderContext, 0), func(message spi.Message) bool {
- eipPacket := message.(readWriteModel.CipRRData)
- return eipPacket != nil
- }, func(message spi.Message) error {
- cipRrData := message.(readWriteModel.CipRRData)
- if cipRrData.GetStatus() == uint32(readWriteModel.CIPStatus_Success) {
- dataItem := cipRrData.GetTypeIds()[1].(readWriteModel.UnConnectedDataItem)
- response := dataItem.GetService().(readWriteModel.GetAttributeAllResponse)
- if response.GetStatus() != uint8(readWriteModel.CIPStatus_Success) {
- // TODO: Return an error ...
- } else if response.GetAttributes() != nil {
- for _, classId := range response.GetAttributes().GetClassId() {
- if curCipClassId, ok := readWriteModel.CIPClassIDByValue(classId); ok {
- switch curCipClassId {
- case readWriteModel.CIPClassID_MessageRouter:
- m.useMessageRouter = true
- case readWriteModel.CIPClassID_ConnectionManager:
- m.useConnectionManager = true
- }
- }
+func (m *Connection) listAllAttributes(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) error {
+ m.log.Debug().Msg("Sending ListAllAttributes Request")
+ listAllAttributesResponseChan := make(chan readWriteModel.GetAttributeAllResponse)
+ listAllAttributesErrorChan := make(chan error)
+ classSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(uint8(0), uint8(2)))
+ instanceSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewInstanceID(uint8(0), uint8(1)))
+ if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewCipRRData(EmptyInterfaceHandle, 0,
+ []readWriteModel.TypeId{
+ readWriteModel.NewNullAddressItem(),
+ readWriteModel.NewUnConnectedDataItem(
+ readWriteModel.NewGetAttributeAllRequest(
+ classSegment, instanceSegment, uint16(0))),
+ }, m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), m.senderContext, 0), func(message spi.Message) bool {
+ eipPacket := message.(readWriteModel.CipRRData)
+ return eipPacket != nil
+ }, func(message spi.Message) error {
+ cipRrData := message.(readWriteModel.CipRRData)
+ if cipRrData.GetStatus() == uint32(readWriteModel.CIPStatus_Success) {
+ dataItem := cipRrData.GetTypeIds()[1].(readWriteModel.UnConnectedDataItem)
+ response := dataItem.GetService().(readWriteModel.GetAttributeAllResponse)
+ if response.GetStatus() != uint8(readWriteModel.CIPStatus_Success) {
+ // TODO: Return an error ...
+ } else if response.GetAttributes() != nil {
+ for _, classId := range response.GetAttributes().GetClassId() {
+ if curCipClassId, ok := readWriteModel.CIPClassIDByValue(classId); ok {
+ switch curCipClassId {
+ case readWriteModel.CIPClassID_MessageRouter:
+ m.useMessageRouter = true
+ case readWriteModel.CIPClassID_ConnectionManager:
+ m.useConnectionManager = true
}
}
- m.log.Debug().Msgf("Connection using message router %t, using connection manager %t", m.useMessageRouter, m.useConnectionManager)
- listAllAttributesResponseChan <- response
- }
- return nil
- }, func(err error) error {
- // If this is a timeout, do a check if the connection requires a reconnection
- if _, isTimeout := err.(utils.TimeoutError); isTimeout {
- m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
- m.Close()
- }
- connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
- return nil
- }, m.GetTtl()); err != nil {
- m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
- }
-
- select {
- case err := <-listAllAttributesErrorChan:
- m.fireConnectionError(errors.Wrap(err, "Error receiving of ListServices response"), ch)
- case _ = <-listAllAttributesResponseChan:
- if m.useConnectionManager {
- // TODO: Continue here ....
- } else {
- // Send an event that connection setup is complete.
- m.fireConnected(ch)
}
}
+ m.log.Debug().Msgf("Connection using message router %t, using connection manager %t", m.useMessageRouter, m.useConnectionManager)
+ listAllAttributesResponseChan <- response
+ }
+ return nil
+ }, func(err error) error {
+ // If this is a timeout, do a check if the connection requires a reconnection
+ if _, isTimeout := err.(utils.TimeoutError); isTimeout {
+ m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+ m.Close()
}
+ m.fireConnectionError(errors.Wrap(err, "got error processing request"), ch)
+ return nil
+ }, m.GetTtl()); err != nil {
+ m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
+ }
+
+ timeout := time.NewTimer(1 * time.Second)
+ defer utils.CleanupTimer(timeout)
+ select {
+ case <-timeout.C:
+ return errors.New("timeout")
+ case err := <-listAllAttributesErrorChan:
+ return errors.Wrap(err, "Error receiving of ListServices response")
+ case _ = <-listAllAttributesResponseChan:
+ return nil
}
}
diff --git a/plc4go/internal/eip/MessageCodec.go b/plc4go/internal/eip/MessageCodec.go
index b7717cffd6..2844bb7f14 100644
--- a/plc4go/internal/eip/MessageCodec.go
+++ b/plc4go/internal/eip/MessageCodec.go
@@ -71,9 +71,10 @@ func (m *MessageCodec) Send(message spi.Message) error {
func (m *MessageCodec) Receive() (spi.Message, error) {
// We need at least 6 bytes in order to know how big the packet is in total
- if num, err := m.GetTransportInstance().GetNumBytesAvailableInBuffer(); (err == nil) && (num >= 4) {
+ transportInstance := m.GetTransportInstance()
+ if num, err := transportInstance.GetNumBytesAvailableInBuffer(); (err == nil) && (num >= 4) {
m.log.Debug().Msgf("we got %d readable bytes", num)
- data, err := m.GetTransportInstance().PeekReadableBytes(4)
+ data, err := transportInstance.PeekReadableBytes(4)
if err != nil {
m.log.Warn().Err(err).Msg("error peeking")
// TODO: Possibly clean up ...
@@ -85,7 +86,7 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
m.log.Debug().Msgf("Not enough bytes. Got: %d Need: %d\n", num, packetSize)
return nil, nil
}
- data, err = m.GetTransportInstance().Read(packetSize)
+ data, err = transportInstance.Read(packetSize)
if err != nil {
m.log.Debug().Err(err).Msg("Error reading")
// TODO: Possibly clean up ...