You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/06/23 13:41:54 UTC
[plc4x] 02/02: feat(plc4go): added SO_REUSE support for udp
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit f219a659dc07c2338bd128415bd84547570d55d8
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Jun 23 15:41:44 2022 +0200
feat(plc4go): added SO_REUSE support for udp
---
plc4go/internal/bacnetip/Discoverer.go | 94 ++++++++++++++-----------
plc4go/internal/bacnetip/Driver.go | 4 ++
plc4go/internal/spi/transports/udp/Transport.go | 36 ++++++++--
3 files changed, 86 insertions(+), 48 deletions(-)
diff --git a/plc4go/internal/bacnetip/Discoverer.go b/plc4go/internal/bacnetip/Discoverer.go
index 885f4f3a1..e39b690af 100644
--- a/plc4go/internal/bacnetip/Discoverer.go
+++ b/plc4go/internal/bacnetip/Discoverer.go
@@ -35,8 +35,6 @@ import (
"github.com/apache/plc4x/plc4go/internal/spi"
"github.com/apache/plc4x/plc4go/internal/spi/options"
- "github.com/apache/plc4x/plc4go/internal/spi/transports"
- "github.com/apache/plc4x/plc4go/internal/spi/transports/udp"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
driverModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
)
@@ -86,14 +84,6 @@ func (d *Discoverer) Discover(callback func(event apiModel.PlcDiscoveryEvent), d
func broadcastAndDiscover(ctx context.Context, communicationChannels []communicationChannel, whoIsLowLimit *uint, whoIsHighLimit *uint) (chan receivedBvlcMessage, error) {
incomingBVLCChannel := make(chan receivedBvlcMessage, 0)
for _, communicationChannelInstance := range communicationChannels {
- // Create a codec for sending and receiving messages.
- codec := NewMessageCodec(communicationChannelInstance.unicastTransport)
- // Explicitly start the worker
- if err := codec.Connect(); err != nil {
- log.Warn().Err(err).Msg("Error connecting")
- continue
- }
-
// Prepare the discovery packet data
var lowLimit driverModel.BACnetContextTagUnsignedInteger
if whoIsLowLimit != nil {
@@ -111,17 +101,42 @@ func broadcastAndDiscover(ctx context.Context, communicationChannels []communica
bvlc := driverModel.NewBVLCOriginalUnicastNPDU(npdu, 0)
// Send the search request.
- if err := codec.Send(bvlc); err != nil {
+ wbbb := utils.NewWriteBufferByteBased()
+ if err := bvlc.Serialize(wbbb); err != nil {
+ panic(err)
+ }
+ if _, err := communicationChannelInstance.broadcastConnection.WriteTo(wbbb.GetBytes(), communicationChannelInstance.broadcastConnection.LocalAddr()); err != nil {
log.Debug().Err(err).Msg("Error sending broadcast")
}
+
go func(communicationChannelInstance communicationChannel) {
for {
+ blockingReadChan := make(chan bool, 0)
+ go func() {
+ buf := make([]byte, 4096)
+ n, addr, err := communicationChannelInstance.unicastConnection.ReadFrom(buf)
+ if err != nil {
+ log.Debug().Err(err).Msg("Ending unicast receive")
+ blockingReadChan <- false
+ return
+ }
+ log.Debug().Stringer("addr", addr).Msg("Received broadcast bvlc")
+ incomingBvlc, err := driverModel.BVLCParse(utils.NewReadBufferByteBased(buf[:n]))
+ if err != nil {
+ log.Warn().Err(err).Msg("Could not parse bvlc")
+ blockingReadChan <- true
+ return
+ }
+ incomingBVLCChannel <- receivedBvlcMessage{incomingBvlc, addr}
+ blockingReadChan <- true
+ }()
select {
- case message := <-codec.GetDefaultIncomingMessageChannel():
- if incomingBvlc, ok := message.(driverModel.BVLC); ok {
- // TODO: how to get the receiverd ip from that?
- incomingBVLCChannel <- receivedBvlcMessage{incomingBvlc, nil}
+ case ok := <-blockingReadChan:
+ if !ok {
+ log.Debug().Msg("Ending reading")
+ return
}
+ log.Trace().Msg("Received something")
case <-ctx.Done():
log.Debug().Err(ctx.Err()).Msg("Ending unicast receive")
return
@@ -136,18 +151,25 @@ func broadcastAndDiscover(ctx context.Context, communicationChannels []communica
buf := make([]byte, 4096)
n, addr, err := communicationChannelInstance.broadcastConnection.ReadFrom(buf)
if err != nil {
- panic(err)
+ log.Debug().Err(err).Msg("Ending unicast receive")
+ blockingReadChan <- false
+ return
}
log.Debug().Stringer("addr", addr).Msg("Received broadcast bvlc")
incomingBvlc, err := driverModel.BVLCParse(utils.NewReadBufferByteBased(buf[:n]))
if err != nil {
- panic(err)
+ log.Warn().Err(err).Msg("Could not parse bvlc")
+ blockingReadChan <- true
}
incomingBVLCChannel <- receivedBvlcMessage{incomingBvlc, addr}
blockingReadChan <- true
}()
select {
- case <-blockingReadChan:
+ case ok := <-blockingReadChan:
+ if !ok {
+ log.Debug().Msg("Ending reading")
+ return
+ }
log.Trace().Msg("Received something")
case <-ctx.Done():
log.Debug().Err(ctx.Err()).Msg("Ending unicast receive")
@@ -206,7 +228,6 @@ func handleIncomingBVLCs(ctx context.Context, callback func(event apiModel.PlcDi
}
func buildupCommunicationChannels(interfaces []net.Interface, bacNetPort int) (communicationChannels []communicationChannel, err error) {
- udpTransport := udp.NewTransport()
// Iterate over all network devices of this system.
for _, networkInterface := range interfaces {
unicastInterfaceAddress, err := networkInterface.Addrs()
@@ -242,37 +263,28 @@ func buildupCommunicationChannels(interfaces []net.Interface, bacNetPort int) (c
continue
}
- _, cidr, _ := net.ParseCIDR(unicastAddress.String())
- broadcastAddr := netaddr.BroadcastAddr(cidr)
- udpAddr := &net.UDPAddr{IP: broadcastAddr, Port: bacNetPort}
- connectionUrl, err := url.Parse(fmt.Sprintf("udp://%s", udpAddr))
- if err != nil {
- log.Debug().Err(err).Msg("error parsing url")
- continue
- }
- localAddr := &net.UDPAddr{IP: ipAddr, Port: bacNetPort}
- transportInstance, err :=
- udpTransport.CreateTransportInstanceForLocalAddress(*connectionUrl, nil, localAddr)
+ // Handle undirected
+ unicastConnection, err := reuseport.ListenPacket("udp4", fmt.Sprintf("%v:%d", ipAddr, bacNetPort))
if err != nil {
- return nil, errors.Wrap(err, "error creating transport instance")
- }
- if err := transportInstance.Connect(); err != nil {
- log.Warn().Err(err).Msgf("Can't connect to %v", localAddr)
+ log.Debug().Err(err).Msg("Error building unicast Port")
continue
}
+ _, cidr, _ := net.ParseCIDR(unicastAddress.String())
+ broadcastAddr := netaddr.BroadcastAddr(cidr)
// Handle undirected
- pc, err := reuseport.ListenPacket("udp4", broadcastAddr.String()+":47808")
+ broadcastConnection, err := reuseport.ListenPacket("udp4", fmt.Sprintf("%v:%d", broadcastAddr, bacNetPort))
if err != nil {
- if err := transportInstance.Close(); err != nil {
+ if err := unicastConnection.Close(); err != nil {
log.Debug().Err(err).Msg("Error closing transport instance")
}
- return nil, err
+ log.Debug().Err(err).Msg("Error building broadcast Port")
+ continue
}
communicationChannels = append(communicationChannels, communicationChannel{
networkInterface: networkInterface,
- unicastTransport: transportInstance,
- broadcastConnection: pc,
+ unicastConnection: unicastConnection,
+ broadcastConnection: broadcastConnection,
})
}
}
@@ -286,12 +298,12 @@ type receivedBvlcMessage struct {
type communicationChannel struct {
networkInterface net.Interface
- unicastTransport transports.TransportInstance
+ unicastConnection net.PacketConn
broadcastConnection net.PacketConn
}
func (c communicationChannel) Close() error {
- _ = c.unicastTransport.Close()
+ _ = c.unicastConnection.Close()
_ = c.broadcastConnection.Close()
return nil
}
diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index 635ffc14a..25b47b759 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -58,6 +58,10 @@ func (m *Driver) GetConnection(transportUrl url.URL, transports map[string]trans
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
options["defaultUdpPort"] = []string{"47808"}
+ // Set so_reuse by default
+ if _, ok := options["so-reuse"]; !ok {
+ options["so-reuse"] = []string{"true"}
+ }
// Have the transport create a new transport-instance.
transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
if err != nil {
diff --git a/plc4go/internal/spi/transports/udp/Transport.go b/plc4go/internal/spi/transports/udp/Transport.go
index f83106ddc..94ebe4f6b 100644
--- a/plc4go/internal/spi/transports/udp/Transport.go
+++ b/plc4go/internal/spi/transports/udp/Transport.go
@@ -23,6 +23,7 @@ import (
"bufio"
"github.com/apache/plc4x/plc4go/internal/spi/transports"
"github.com/apache/plc4x/plc4go/internal/spi/utils"
+ "github.com/libp2p/go-reuseport"
"github.com/pkg/errors"
"net"
"net/url"
@@ -80,15 +81,21 @@ func (m Transport) CreateTransportInstanceForLocalAddress(transportUrl url.URL,
}
var connectTimeout uint32 = 1000
if val, ok := options["connect-timeout"]; ok {
- parsedConnectTimeout, err := strconv.ParseUint(val[0], 10, 32)
- if err != nil {
+ if parsedConnectTimeout, err := strconv.ParseUint(val[0], 10, 32); err != nil {
return nil, errors.Wrap(err, "error setting connect-timeout")
} else {
connectTimeout = uint32(parsedConnectTimeout)
}
}
- // TODO: get reuse option from options
+ var soReUse bool
+ if val, ok := options["so-reuse"]; ok {
+ if parseBool, err := strconv.ParseBool(val[0]); err != nil {
+ return nil, errors.Wrap(err, "error setting so-reuse")
+ } else {
+ soReUse = parseBool
+ }
+ }
// Potentially resolve the ip address, if a hostname was provided
remoteAddress, err := net.ResolveUDPAddr("udp", remoteAddressString+":"+strconv.Itoa(remotePort))
@@ -96,23 +103,25 @@ func (m Transport) CreateTransportInstanceForLocalAddress(transportUrl url.URL,
return nil, errors.Wrap(err, "error resolving typ address")
}
- return NewTransportInstance(localAddress, remoteAddress, connectTimeout, &m), nil
+ return NewTransportInstance(localAddress, remoteAddress, connectTimeout, soReUse, &m), nil
}
type TransportInstance struct {
LocalAddress *net.UDPAddr
RemoteAddress *net.UDPAddr
ConnectTimeout uint32
+ SoReUse bool
transport *Transport
udpConn *net.UDPConn
reader *bufio.Reader
}
-func NewTransportInstance(localAddress *net.UDPAddr, remoteAddress *net.UDPAddr, connectTimeout uint32, transport *Transport) *TransportInstance {
+func NewTransportInstance(localAddress *net.UDPAddr, remoteAddress *net.UDPAddr, connectTimeout uint32, soReUse bool, transport *Transport) *TransportInstance {
return &TransportInstance{
LocalAddress: localAddress,
RemoteAddress: remoteAddress,
ConnectTimeout: connectTimeout,
+ SoReUse: soReUse,
transport: transport,
}
}
@@ -134,8 +143,21 @@ func (m *TransportInstance) Connect() error {
// "connect" to the remote
var err error
- if m.udpConn, err = net.ListenUDP("udp", m.LocalAddress); err != nil {
- return errors.Wrap(err, "error connecting to remote address")
+ if m.SoReUse {
+ if m.udpConn, err = net.ListenUDP("udp", m.LocalAddress); err != nil {
+ return errors.Wrap(err, "error connecting to remote address")
+ }
+ rawConn, err := m.udpConn.SyscallConn()
+ if err != nil {
+ return errors.Wrap(err, "Error getting syscall connection")
+ }
+ if err := reuseport.Control("", "", rawConn); err != nil {
+ return errors.Wrap(err, "Error setting re-use control")
+ }
+ } else {
+ if m.udpConn, err = net.ListenUDP("udp", m.LocalAddress); err != nil {
+ return errors.Wrap(err, "error connecting to remote address")
+ }
}
// TODO: Start a worker that uses m.udpConn.ReadFromUDP() to fill a buffer