You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/06/23 13:41:54 UTC

[plc4x] 02/02: feat(plc4go): added SO_REUSE support for udp

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit f219a659dc07c2338bd128415bd84547570d55d8
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Jun 23 15:41:44 2022 +0200

    feat(plc4go): added SO_REUSE support for udp
---
 plc4go/internal/bacnetip/Discoverer.go          | 94 ++++++++++++++-----------
 plc4go/internal/bacnetip/Driver.go              |  4 ++
 plc4go/internal/spi/transports/udp/Transport.go | 36 ++++++++--
 3 files changed, 86 insertions(+), 48 deletions(-)

diff --git a/plc4go/internal/bacnetip/Discoverer.go b/plc4go/internal/bacnetip/Discoverer.go
index 885f4f3a1..e39b690af 100644
--- a/plc4go/internal/bacnetip/Discoverer.go
+++ b/plc4go/internal/bacnetip/Discoverer.go
@@ -35,8 +35,6 @@ import (
 
 	"github.com/apache/plc4x/plc4go/internal/spi"
 	"github.com/apache/plc4x/plc4go/internal/spi/options"
-	"github.com/apache/plc4x/plc4go/internal/spi/transports"
-	"github.com/apache/plc4x/plc4go/internal/spi/transports/udp"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
 	driverModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
 )
@@ -86,14 +84,6 @@ func (d *Discoverer) Discover(callback func(event apiModel.PlcDiscoveryEvent), d
 func broadcastAndDiscover(ctx context.Context, communicationChannels []communicationChannel, whoIsLowLimit *uint, whoIsHighLimit *uint) (chan receivedBvlcMessage, error) {
 	incomingBVLCChannel := make(chan receivedBvlcMessage, 0)
 	for _, communicationChannelInstance := range communicationChannels {
-		// Create a codec for sending and receiving messages.
-		codec := NewMessageCodec(communicationChannelInstance.unicastTransport)
-		// Explicitly start the worker
-		if err := codec.Connect(); err != nil {
-			log.Warn().Err(err).Msg("Error connecting")
-			continue
-		}
-
 		// Prepare the discovery packet data
 		var lowLimit driverModel.BACnetContextTagUnsignedInteger
 		if whoIsLowLimit != nil {
@@ -111,17 +101,42 @@ func broadcastAndDiscover(ctx context.Context, communicationChannels []communica
 		bvlc := driverModel.NewBVLCOriginalUnicastNPDU(npdu, 0)
 
 		// Send the search request.
-		if err := codec.Send(bvlc); err != nil {
+		wbbb := utils.NewWriteBufferByteBased()
+		if err := bvlc.Serialize(wbbb); err != nil {
+			panic(err)
+		}
+		if _, err := communicationChannelInstance.broadcastConnection.WriteTo(wbbb.GetBytes(), communicationChannelInstance.broadcastConnection.LocalAddr()); err != nil {
 			log.Debug().Err(err).Msg("Error sending broadcast")
 		}
+
 		go func(communicationChannelInstance communicationChannel) {
 			for {
+				blockingReadChan := make(chan bool, 0)
+				go func() {
+					buf := make([]byte, 4096)
+					n, addr, err := communicationChannelInstance.unicastConnection.ReadFrom(buf)
+					if err != nil {
+						log.Debug().Err(err).Msg("Ending unicast receive")
+						blockingReadChan <- false
+						return
+					}
+					log.Debug().Stringer("addr", addr).Msg("Received broadcast bvlc")
+					incomingBvlc, err := driverModel.BVLCParse(utils.NewReadBufferByteBased(buf[:n]))
+					if err != nil {
+						log.Warn().Err(err).Msg("Could not parse bvlc")
+						blockingReadChan <- true
+						return
+					}
+					incomingBVLCChannel <- receivedBvlcMessage{incomingBvlc, addr}
+					blockingReadChan <- true
+				}()
 				select {
-				case message := <-codec.GetDefaultIncomingMessageChannel():
-					if incomingBvlc, ok := message.(driverModel.BVLC); ok {
-						// TODO: how to get the receiverd ip from that?
-						incomingBVLCChannel <- receivedBvlcMessage{incomingBvlc, nil}
+				case ok := <-blockingReadChan:
+					if !ok {
+						log.Debug().Msg("Ending reading")
+						return
 					}
+					log.Trace().Msg("Received something")
 				case <-ctx.Done():
 					log.Debug().Err(ctx.Err()).Msg("Ending unicast receive")
 					return
@@ -136,18 +151,25 @@ func broadcastAndDiscover(ctx context.Context, communicationChannels []communica
 					buf := make([]byte, 4096)
 					n, addr, err := communicationChannelInstance.broadcastConnection.ReadFrom(buf)
 					if err != nil {
-						panic(err)
+						log.Debug().Err(err).Msg("Ending unicast receive")
+						blockingReadChan <- false
+						return
 					}
 					log.Debug().Stringer("addr", addr).Msg("Received broadcast bvlc")
 					incomingBvlc, err := driverModel.BVLCParse(utils.NewReadBufferByteBased(buf[:n]))
 					if err != nil {
-						panic(err)
+						log.Warn().Err(err).Msg("Could not parse bvlc")
+						blockingReadChan <- true
 					}
 					incomingBVLCChannel <- receivedBvlcMessage{incomingBvlc, addr}
 					blockingReadChan <- true
 				}()
 				select {
-				case <-blockingReadChan:
+				case ok := <-blockingReadChan:
+					if !ok {
+						log.Debug().Msg("Ending reading")
+						return
+					}
 					log.Trace().Msg("Received something")
 				case <-ctx.Done():
 					log.Debug().Err(ctx.Err()).Msg("Ending unicast receive")
@@ -206,7 +228,6 @@ func handleIncomingBVLCs(ctx context.Context, callback func(event apiModel.PlcDi
 }
 
 func buildupCommunicationChannels(interfaces []net.Interface, bacNetPort int) (communicationChannels []communicationChannel, err error) {
-	udpTransport := udp.NewTransport()
 	// Iterate over all network devices of this system.
 	for _, networkInterface := range interfaces {
 		unicastInterfaceAddress, err := networkInterface.Addrs()
@@ -242,37 +263,28 @@ func buildupCommunicationChannels(interfaces []net.Interface, bacNetPort int) (c
 				continue
 			}
 
-			_, cidr, _ := net.ParseCIDR(unicastAddress.String())
-			broadcastAddr := netaddr.BroadcastAddr(cidr)
-			udpAddr := &net.UDPAddr{IP: broadcastAddr, Port: bacNetPort}
-			connectionUrl, err := url.Parse(fmt.Sprintf("udp://%s", udpAddr))
-			if err != nil {
-				log.Debug().Err(err).Msg("error parsing url")
-				continue
-			}
-			localAddr := &net.UDPAddr{IP: ipAddr, Port: bacNetPort}
-			transportInstance, err :=
-				udpTransport.CreateTransportInstanceForLocalAddress(*connectionUrl, nil, localAddr)
+			// Handle undirected
+			unicastConnection, err := reuseport.ListenPacket("udp4", fmt.Sprintf("%v:%d", ipAddr, bacNetPort))
 			if err != nil {
-				return nil, errors.Wrap(err, "error creating transport instance")
-			}
-			if err := transportInstance.Connect(); err != nil {
-				log.Warn().Err(err).Msgf("Can't connect to %v", localAddr)
+				log.Debug().Err(err).Msg("Error building unicast Port")
 				continue
 			}
 
+			_, cidr, _ := net.ParseCIDR(unicastAddress.String())
+			broadcastAddr := netaddr.BroadcastAddr(cidr)
 			// Handle undirected
-			pc, err := reuseport.ListenPacket("udp4", broadcastAddr.String()+":47808")
+			broadcastConnection, err := reuseport.ListenPacket("udp4", fmt.Sprintf("%v:%d", broadcastAddr, bacNetPort))
 			if err != nil {
-				if err := transportInstance.Close(); err != nil {
+				if err := unicastConnection.Close(); err != nil {
 					log.Debug().Err(err).Msg("Error closing transport instance")
 				}
-				return nil, err
+				log.Debug().Err(err).Msg("Error building broadcast Port")
+				continue
 			}
 			communicationChannels = append(communicationChannels, communicationChannel{
 				networkInterface:    networkInterface,
-				unicastTransport:    transportInstance,
-				broadcastConnection: pc,
+				unicastConnection:   unicastConnection,
+				broadcastConnection: broadcastConnection,
 			})
 		}
 	}
@@ -286,12 +298,12 @@ type receivedBvlcMessage struct {
 
 type communicationChannel struct {
 	networkInterface    net.Interface
-	unicastTransport    transports.TransportInstance
+	unicastConnection   net.PacketConn
 	broadcastConnection net.PacketConn
 }
 
 func (c communicationChannel) Close() error {
-	_ = c.unicastTransport.Close()
+	_ = c.unicastConnection.Close()
 	_ = c.broadcastConnection.Close()
 	return nil
 }
diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index 635ffc14a..25b47b759 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -58,6 +58,10 @@ func (m *Driver) GetConnection(transportUrl url.URL, transports map[string]trans
 	}
 	// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
 	options["defaultUdpPort"] = []string{"47808"}
+	// Set so_reuse by default
+	if _, ok := options["so-reuse"]; !ok {
+		options["so-reuse"] = []string{"true"}
+	}
 	// Have the transport create a new transport-instance.
 	transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
 	if err != nil {
diff --git a/plc4go/internal/spi/transports/udp/Transport.go b/plc4go/internal/spi/transports/udp/Transport.go
index f83106ddc..94ebe4f6b 100644
--- a/plc4go/internal/spi/transports/udp/Transport.go
+++ b/plc4go/internal/spi/transports/udp/Transport.go
@@ -23,6 +23,7 @@ import (
 	"bufio"
 	"github.com/apache/plc4x/plc4go/internal/spi/transports"
 	"github.com/apache/plc4x/plc4go/internal/spi/utils"
+	"github.com/libp2p/go-reuseport"
 	"github.com/pkg/errors"
 	"net"
 	"net/url"
@@ -80,15 +81,21 @@ func (m Transport) CreateTransportInstanceForLocalAddress(transportUrl url.URL,
 	}
 	var connectTimeout uint32 = 1000
 	if val, ok := options["connect-timeout"]; ok {
-		parsedConnectTimeout, err := strconv.ParseUint(val[0], 10, 32)
-		if err != nil {
+		if parsedConnectTimeout, err := strconv.ParseUint(val[0], 10, 32); err != nil {
 			return nil, errors.Wrap(err, "error setting connect-timeout")
 		} else {
 			connectTimeout = uint32(parsedConnectTimeout)
 		}
 	}
 
-	// TODO: get reuse option from options
+	var soReUse bool
+	if val, ok := options["so-reuse"]; ok {
+		if parseBool, err := strconv.ParseBool(val[0]); err != nil {
+			return nil, errors.Wrap(err, "error setting so-reuse")
+		} else {
+			soReUse = parseBool
+		}
+	}
 
 	// Potentially resolve the ip address, if a hostname was provided
 	remoteAddress, err := net.ResolveUDPAddr("udp", remoteAddressString+":"+strconv.Itoa(remotePort))
@@ -96,23 +103,25 @@ func (m Transport) CreateTransportInstanceForLocalAddress(transportUrl url.URL,
 		return nil, errors.Wrap(err, "error resolving typ address")
 	}
 
-	return NewTransportInstance(localAddress, remoteAddress, connectTimeout, &m), nil
+	return NewTransportInstance(localAddress, remoteAddress, connectTimeout, soReUse, &m), nil
 }
 
 type TransportInstance struct {
 	LocalAddress   *net.UDPAddr
 	RemoteAddress  *net.UDPAddr
 	ConnectTimeout uint32
+	SoReUse        bool
 	transport      *Transport
 	udpConn        *net.UDPConn
 	reader         *bufio.Reader
 }
 
-func NewTransportInstance(localAddress *net.UDPAddr, remoteAddress *net.UDPAddr, connectTimeout uint32, transport *Transport) *TransportInstance {
+func NewTransportInstance(localAddress *net.UDPAddr, remoteAddress *net.UDPAddr, connectTimeout uint32, soReUse bool, transport *Transport) *TransportInstance {
 	return &TransportInstance{
 		LocalAddress:   localAddress,
 		RemoteAddress:  remoteAddress,
 		ConnectTimeout: connectTimeout,
+		SoReUse:        soReUse,
 		transport:      transport,
 	}
 }
@@ -134,8 +143,21 @@ func (m *TransportInstance) Connect() error {
 
 	// "connect" to the remote
 	var err error
-	if m.udpConn, err = net.ListenUDP("udp", m.LocalAddress); err != nil {
-		return errors.Wrap(err, "error connecting to remote address")
+	if m.SoReUse {
+		if m.udpConn, err = net.ListenUDP("udp", m.LocalAddress); err != nil {
+			return errors.Wrap(err, "error connecting to remote address")
+		}
+		rawConn, err := m.udpConn.SyscallConn()
+		if err != nil {
+			return errors.Wrap(err, "Error getting syscall connection")
+		}
+		if err := reuseport.Control("", "", rawConn); err != nil {
+			return errors.Wrap(err, "Error setting re-use control")
+		}
+	} else {
+		if m.udpConn, err = net.ListenUDP("udp", m.LocalAddress); err != nil {
+			return errors.Wrap(err, "error connecting to remote address")
+		}
 	}
 
 	// TODO: Start a worker that uses m.udpConn.ReadFromUDP() to fill a buffer