You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/05 08:36:18 UTC

[plc4x] 02/02: refactor(plc4go/eip): cleanup connection code

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 39448aaf5cd1827255787d381c2b2580cee6f28a
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 5 10:36:06 2023 +0200

    refactor(plc4go/eip): cleanup connection code
---
 plc4go/internal/eip/Connection.go   | 319 ++++++++++++++++++++----------------
 plc4go/internal/eip/MessageCodec.go |   7 +-
 2 files changed, 178 insertions(+), 148 deletions(-)

diff --git a/plc4go/internal/eip/Connection.go b/plc4go/internal/eip/Connection.go
index fbb85e2a14..2d854beea6 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -26,6 +26,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/tracer"
 	"github.com/apache/plc4x/plc4go/spi/transactions"
 	"github.com/rs/zerolog"
+	"time"
 
 	"github.com/apache/plc4x/plc4go/pkg/api"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -172,9 +173,30 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
 }
 
 func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
-	////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-	// List Services Request
+	if err := m.listServiceRequest(ctx, ch); err != nil {
+		m.fireConnectionError(errors.Wrap(err, "error listing service request"), ch)
+		return
+	}
+
+	if err := m.connectRegisterSession(ctx, ch); err != nil {
+		m.fireConnectionError(errors.Wrap(err, "error connect register session"), ch)
+		return
+	}
+
+	if err := m.listAllAttributes(ctx, ch); err != nil {
+		m.fireConnectionError(errors.Wrap(err, "error list all attributes"), ch)
+		return
+	}
+
+	if m.useConnectionManager {
+		// TODO: Continue here ....
+	} else {
+		// Send an event that connection setup is complete.
+		m.fireConnected(ch)
+	}
+}
 
+func (m *Connection) listServiceRequest(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) error {
 	m.log.Debug().Msg("Sending ListServices Request")
 	listServicesResultChan := make(chan readWriteModel.ListServicesResponse)
 	listServicesResultErrorChan := make(chan error)
@@ -206,162 +228,169 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
 		m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
 	}
 
+	timeout := time.NewTimer(1 * time.Second)
+	defer utils.CleanupTimer(timeout)
 	select {
+	case <-timeout.C:
+		return errors.New("timeout")
 	case err := <-listServicesResultErrorChan:
-		m.fireConnectionError(errors.Wrap(err, "Error receiving of ListServices response"), ch)
+		return errors.Wrap(err, "Error receiving of ListServices response")
 	case _ = <-listServicesResultChan:
-		////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-		// Connect Register Session
-
-		m.log.Debug().Msg("Sending EipConnectionRequest")
-		connectionResponseChan := make(chan readWriteModel.EipConnectionResponse)
-		connectionResponseErrorChan := make(chan error)
-		if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewEipConnectionRequest(EmptySessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), uint32(0)), func(message spi.Message) bool {
-			eipPacket := message.(readWriteModel.EipPacket)
-			return eipPacket != nil
-		}, func(message spi.Message) error {
-			eipPacket := message.(readWriteModel.EipPacket)
-			connectionResponse := eipPacket.(readWriteModel.EipConnectionResponse)
-			if connectionResponse != nil {
-				if connectionResponse.GetStatus() == 0 {
-					m.sessionHandle = connectionResponse.GetSessionHandle()
-					m.senderContext = connectionResponse.GetSenderContext()
-					m.log.Debug().Msgf("Got assigned with Session %d", m.sessionHandle)
-					connectionResponseChan <- connectionResponse
-				} else {
-					m.log.Error().Msgf("Got unsuccessful status for connection request: %d", connectionResponse.GetStatus())
-					connectionResponseErrorChan <- errors.New("got unsuccessful connection response")
-				}
+		return nil
+	}
+}
+
+func (m *Connection) connectRegisterSession(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) error {
+	m.log.Debug().Msg("Sending EipConnectionRequest")
+	connectionResponseChan := make(chan readWriteModel.EipConnectionResponse)
+	connectionResponseErrorChan := make(chan error)
+	if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewEipConnectionRequest(EmptySessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), uint32(0)), func(message spi.Message) bool {
+		eipPacket := message.(readWriteModel.EipPacket)
+		return eipPacket != nil
+	}, func(message spi.Message) error {
+		eipPacket := message.(readWriteModel.EipPacket)
+		connectionResponse := eipPacket.(readWriteModel.EipConnectionResponse)
+		if connectionResponse != nil {
+			if connectionResponse.GetStatus() == 0 {
+				m.sessionHandle = connectionResponse.GetSessionHandle()
+				m.senderContext = connectionResponse.GetSenderContext()
+				m.log.Debug().Msgf("Got assigned with Session %d", m.sessionHandle)
+				connectionResponseChan <- connectionResponse
 			} else {
-				// TODO: This seems pretty hard-coded ... possibly find out if we can't simplify this.
-				classSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(0, 6))
-				instanceSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(0, 1))
-				exchange := readWriteModel.NewUnConnectedDataItem(
-					readWriteModel.NewCipConnectionManagerRequest(classSegment, instanceSegment, 0, 10,
-						14, 536870914, 33944, m.connectionSerialNumber,
-						4919, 42, 3, 2101812,
-						readWriteModel.NewNetworkConnectionParameters(4002, false, 2, 0, true),
-						2113537,
-						readWriteModel.NewNetworkConnectionParameters(4002, false, 2, 0, true),
-						readWriteModel.NewTransportType(true, 2, 3),
-						m.connectionPathSize, m.routingAddress, 1))
-				typeIds := []readWriteModel.TypeId{readWriteModel.NewNullAddressItem(), exchange}
-				eipWrapper := readWriteModel.NewCipRRData(m.sessionHandle, 0, typeIds,
-					m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), m.senderContext, 0)
-				if err := m.messageCodec.SendRequest(ctx, eipWrapper,
-					func(message spi.Message) bool {
-						eipPacket := message.(readWriteModel.EipPacket)
-						if eipPacket == nil {
-							return false
-						}
-						cipRRData := eipPacket.(readWriteModel.CipRRData)
-						return cipRRData != nil
-					}, func(message spi.Message) error {
-						cipRRData := message.(readWriteModel.CipRRData)
-						if cipRRData.GetStatus() == 0 {
-							unconnectedDataItem := cipRRData.GetTypeIds()[1].(readWriteModel.UnConnectedDataItem)
-							connectionManagerResponse := unconnectedDataItem.GetService().(readWriteModel.CipConnectionManagerResponse)
-							m.connectionId = connectionManagerResponse.GetOtConnectionId()
-							m.log.Debug().Msgf("Got assigned with connection if %d", m.connectionId)
-							connectionResponseChan <- connectionResponse
-						} else {
-							connectionResponseErrorChan <- fmt.Errorf("got status code while opening Connection manager: %d", cipRRData.GetStatus())
-						}
-						return nil
-					}, func(err error) error {
-						// If this is a timeout, do a check if the connection requires a reconnection
-						if _, isTimeout := err.(utils.TimeoutError); isTimeout {
-							m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
-							m.Close()
-						}
-						connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
-						return nil
-					}, m.GetTtl()); err != nil {
-					m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
-				}
+				m.log.Error().Msgf("Got unsuccessful status for connection request: %d", connectionResponse.GetStatus())
+				connectionResponseErrorChan <- errors.New("got unsuccessful connection response")
 			}
-			return nil
-		}, func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(utils.TimeoutError); isTimeout {
-				m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
-				m.Close()
+		} else {
+			// TODO: This seems pretty hard-coded ... possibly find out if we can't simplify this.
+			classSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(0, 6))
+			instanceSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(0, 1))
+			exchange := readWriteModel.NewUnConnectedDataItem(
+				readWriteModel.NewCipConnectionManagerRequest(classSegment, instanceSegment, 0, 10,
+					14, 536870914, 33944, m.connectionSerialNumber,
+					4919, 42, 3, 2101812,
+					readWriteModel.NewNetworkConnectionParameters(4002, false, 2, 0, true),
+					2113537,
+					readWriteModel.NewNetworkConnectionParameters(4002, false, 2, 0, true),
+					readWriteModel.NewTransportType(true, 2, 3),
+					m.connectionPathSize, m.routingAddress, 1))
+			typeIds := []readWriteModel.TypeId{readWriteModel.NewNullAddressItem(), exchange}
+			eipWrapper := readWriteModel.NewCipRRData(m.sessionHandle, 0, typeIds,
+				m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), m.senderContext, 0)
+			if err := m.messageCodec.SendRequest(ctx, eipWrapper,
+				func(message spi.Message) bool {
+					eipPacket := message.(readWriteModel.EipPacket)
+					if eipPacket == nil {
+						return false
+					}
+					cipRRData := eipPacket.(readWriteModel.CipRRData)
+					return cipRRData != nil
+				}, func(message spi.Message) error {
+					cipRRData := message.(readWriteModel.CipRRData)
+					if cipRRData.GetStatus() == 0 {
+						unconnectedDataItem := cipRRData.GetTypeIds()[1].(readWriteModel.UnConnectedDataItem)
+						connectionManagerResponse := unconnectedDataItem.GetService().(readWriteModel.CipConnectionManagerResponse)
+						m.connectionId = connectionManagerResponse.GetOtConnectionId()
+						m.log.Debug().Msgf("Got assigned with connection if %d", m.connectionId)
+						connectionResponseChan <- connectionResponse
+					} else {
+						connectionResponseErrorChan <- fmt.Errorf("got status code while opening Connection manager: %d", cipRRData.GetStatus())
+					}
+					return nil
+				}, func(err error) error {
+					// If this is a timeout, do a check if the connection requires a reconnection
+					if _, isTimeout := err.(utils.TimeoutError); isTimeout {
+						m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+						m.Close()
+					}
+					connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
+					return nil
+				}, m.GetTtl()); err != nil {
+				m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
 			}
-			connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
-			return nil
-		}, m.GetTtl()); err != nil {
-			m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
 		}
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(utils.TimeoutError); isTimeout {
+			m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+			m.Close()
+		}
+		connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
+		return nil
+	}, m.GetTtl()); err != nil {
+		m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
+	}
+	timeout := time.NewTimer(1 * time.Second)
+	defer utils.CleanupTimer(timeout)
+	select {
+	case <-timeout.C:
+		return errors.New("timeout")
+	case err := <-connectionResponseErrorChan:
+		return errors.Wrap(err, "Error receiving of ListServices response")
+	case _ = <-connectionResponseChan:
+		return nil
+	}
+}
 
-		select {
-		case err := <-connectionResponseErrorChan:
-			m.fireConnectionError(errors.Wrap(err, "Error receiving of ListServices response"), ch)
-		case _ = <-connectionResponseChan:
-
-			////////////////////////////////////////////////////////////////////////////////////////////////////////////
-			// List All Attributes
-
-			m.log.Debug().Msg("Sending ListAllAttributes Request")
-			listAllAttributesResponseChan := make(chan readWriteModel.GetAttributeAllResponse)
-			listAllAttributesErrorChan := make(chan error)
-			classSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(uint8(0), uint8(2)))
-			instanceSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewInstanceID(uint8(0), uint8(1)))
-			if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewCipRRData(EmptyInterfaceHandle, 0,
-				[]readWriteModel.TypeId{
-					readWriteModel.NewNullAddressItem(),
-					readWriteModel.NewUnConnectedDataItem(
-						readWriteModel.NewGetAttributeAllRequest(
-							classSegment, instanceSegment, uint16(0))),
-				}, m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), m.senderContext, 0), func(message spi.Message) bool {
-				eipPacket := message.(readWriteModel.CipRRData)
-				return eipPacket != nil
-			}, func(message spi.Message) error {
-				cipRrData := message.(readWriteModel.CipRRData)
-				if cipRrData.GetStatus() == uint32(readWriteModel.CIPStatus_Success) {
-					dataItem := cipRrData.GetTypeIds()[1].(readWriteModel.UnConnectedDataItem)
-					response := dataItem.GetService().(readWriteModel.GetAttributeAllResponse)
-					if response.GetStatus() != uint8(readWriteModel.CIPStatus_Success) {
-						// TODO: Return an error ...
-					} else if response.GetAttributes() != nil {
-						for _, classId := range response.GetAttributes().GetClassId() {
-							if curCipClassId, ok := readWriteModel.CIPClassIDByValue(classId); ok {
-								switch curCipClassId {
-								case readWriteModel.CIPClassID_MessageRouter:
-									m.useMessageRouter = true
-								case readWriteModel.CIPClassID_ConnectionManager:
-									m.useConnectionManager = true
-								}
-							}
+func (m *Connection) listAllAttributes(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) error {
+	m.log.Debug().Msg("Sending ListAllAttributes Request")
+	listAllAttributesResponseChan := make(chan readWriteModel.GetAttributeAllResponse)
+	listAllAttributesErrorChan := make(chan error)
+	classSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(uint8(0), uint8(2)))
+	instanceSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewInstanceID(uint8(0), uint8(1)))
+	if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewCipRRData(EmptyInterfaceHandle, 0,
+		[]readWriteModel.TypeId{
+			readWriteModel.NewNullAddressItem(),
+			readWriteModel.NewUnConnectedDataItem(
+				readWriteModel.NewGetAttributeAllRequest(
+					classSegment, instanceSegment, uint16(0))),
+		}, m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), m.senderContext, 0), func(message spi.Message) bool {
+		eipPacket := message.(readWriteModel.CipRRData)
+		return eipPacket != nil
+	}, func(message spi.Message) error {
+		cipRrData := message.(readWriteModel.CipRRData)
+		if cipRrData.GetStatus() == uint32(readWriteModel.CIPStatus_Success) {
+			dataItem := cipRrData.GetTypeIds()[1].(readWriteModel.UnConnectedDataItem)
+			response := dataItem.GetService().(readWriteModel.GetAttributeAllResponse)
+			if response.GetStatus() != uint8(readWriteModel.CIPStatus_Success) {
+				// TODO: Return an error ...
+			} else if response.GetAttributes() != nil {
+				for _, classId := range response.GetAttributes().GetClassId() {
+					if curCipClassId, ok := readWriteModel.CIPClassIDByValue(classId); ok {
+						switch curCipClassId {
+						case readWriteModel.CIPClassID_MessageRouter:
+							m.useMessageRouter = true
+						case readWriteModel.CIPClassID_ConnectionManager:
+							m.useConnectionManager = true
 						}
 					}
-					m.log.Debug().Msgf("Connection using message router %t, using connection manager %t", m.useMessageRouter, m.useConnectionManager)
-					listAllAttributesResponseChan <- response
-				}
-				return nil
-			}, func(err error) error {
-				// If this is a timeout, do a check if the connection requires a reconnection
-				if _, isTimeout := err.(utils.TimeoutError); isTimeout {
-					m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
-					m.Close()
-				}
-				connectionResponseErrorChan <- errors.Wrap(err, "got error processing request")
-				return nil
-			}, m.GetTtl()); err != nil {
-				m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
-			}
-
-			select {
-			case err := <-listAllAttributesErrorChan:
-				m.fireConnectionError(errors.Wrap(err, "Error receiving of ListServices response"), ch)
-			case _ = <-listAllAttributesResponseChan:
-				if m.useConnectionManager {
-					// TODO: Continue here ....
-				} else {
-					// Send an event that connection setup is complete.
-					m.fireConnected(ch)
 				}
 			}
+			m.log.Debug().Msgf("Connection using message router %t, using connection manager %t", m.useMessageRouter, m.useConnectionManager)
+			listAllAttributesResponseChan <- response
+		}
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(utils.TimeoutError); isTimeout {
+			m.log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+			m.Close()
 		}
+		m.fireConnectionError(errors.Wrap(err, "got error processing request"), ch)
+		return nil
+	}, m.GetTtl()); err != nil {
+		m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP ListServices Request"), ch)
+	}
+
+	timeout := time.NewTimer(1 * time.Second)
+	defer utils.CleanupTimer(timeout)
+	select {
+	case <-timeout.C:
+		return errors.New("timeout")
+	case err := <-listAllAttributesErrorChan:
+		return errors.Wrap(err, "Error receiving of ListServices response")
+	case _ = <-listAllAttributesResponseChan:
+		return nil
 	}
 }
 
diff --git a/plc4go/internal/eip/MessageCodec.go b/plc4go/internal/eip/MessageCodec.go
index b7717cffd6..2844bb7f14 100644
--- a/plc4go/internal/eip/MessageCodec.go
+++ b/plc4go/internal/eip/MessageCodec.go
@@ -71,9 +71,10 @@ func (m *MessageCodec) Send(message spi.Message) error {
 
 func (m *MessageCodec) Receive() (spi.Message, error) {
 	// We need at least 6 bytes in order to know how big the packet is in total
-	if num, err := m.GetTransportInstance().GetNumBytesAvailableInBuffer(); (err == nil) && (num >= 4) {
+	transportInstance := m.GetTransportInstance()
+	if num, err := transportInstance.GetNumBytesAvailableInBuffer(); (err == nil) && (num >= 4) {
 		m.log.Debug().Msgf("we got %d readable bytes", num)
-		data, err := m.GetTransportInstance().PeekReadableBytes(4)
+		data, err := transportInstance.PeekReadableBytes(4)
 		if err != nil {
 			m.log.Warn().Err(err).Msg("error peeking")
 			// TODO: Possibly clean up ...
@@ -85,7 +86,7 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
 			m.log.Debug().Msgf("Not enough bytes. Got: %d Need: %d\n", num, packetSize)
 			return nil, nil
 		}
-		data, err = m.GetTransportInstance().Read(packetSize)
+		data, err = transportInstance.Read(packetSize)
 		if err != nil {
 			m.log.Debug().Err(err).Msg("Error reading")
 			// TODO: Possibly clean up ...