You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/17 11:23:27 UTC

[plc4x] branch develop updated (a64125789 -> ce2db6b31)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from a64125789 fix(plc4go/cbus): fixed go routine leak
     new de4fab5f1 feat(plc4go/spi): added new ConnectWithContext to transport instance
     new 1f38cc9f8 feat(plc4go/spi): added new ConnectWithContext to message codec
     new 04fa66d53 feat(plc4go/knxnetip): use context for discovery
     new ce2db6b31 fix(plc4go/cbus): avoid channel leak by adding wait groups

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/cbus/Discoverer.go                 | 207 ++++++++++++---------
 plc4go/internal/knxnetip/Discoverer.go             | 124 ++++++------
 plc4go/spi/MessageCodec.go                         |   2 +
 plc4go/spi/default/DefaultCodec.go                 |   6 +-
 plc4go/spi/testutils/DriverTestRunner.go           |   9 +-
 plc4go/spi/transports/TransportInstance.go         |  67 +++++--
 plc4go/spi/transports/pcap/Transport.go            |  11 +-
 plc4go/spi/transports/serial/Transport.go          |  11 +-
 plc4go/spi/transports/tcp/Transport.go             |  19 +-
 plc4go/spi/transports/test/Transport.go            |   5 +
 plc4go/spi/transports/udp/Transport.go             |   8 +-
 plc4go/spi/utils/net.go                            |  25 ++-
 .../tests/drivers/tests/manual_cbus_driver_test.go |  21 +++
 13 files changed, 325 insertions(+), 190 deletions(-)


[plc4x] 02/04: feat(plc4go/spi): added new ConnectWithContext to message codec

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 1f38cc9f8d0eccbd9f7dcebbcd19f1c300f1b641
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Aug 17 13:22:22 2022 +0200

    feat(plc4go/spi): added new ConnectWithContext to message codec
---
 plc4go/spi/MessageCodec.go         | 2 ++
 plc4go/spi/default/DefaultCodec.go | 6 +++++-
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/plc4go/spi/MessageCodec.go b/plc4go/spi/MessageCodec.go
index 8507dae74..2e373ed4a 100644
--- a/plc4go/spi/MessageCodec.go
+++ b/plc4go/spi/MessageCodec.go
@@ -46,6 +46,8 @@ type HandleError func(err error) error
 type MessageCodec interface {
 	// Connect connects this codec
 	Connect() error
+	// ConnectWithContext connects this codec with the supplied context
+	ConnectWithContext(ctx context.Context) error
 	// Disconnect disconnects this codec
 	Disconnect() error
 	// IsRunning returns tur if the codec (workers are running)
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 6fee263e8..25937d8a5 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -146,9 +146,13 @@ func (m *defaultCodec) GetDefaultIncomingMessageChannel() chan spi.Message {
 }
 
 func (m *defaultCodec) Connect() error {
+	return m.ConnectWithContext(context.Background())
+}
+
+func (m *defaultCodec) ConnectWithContext(ctx context.Context) error {
 	log.Trace().Msg("Connecting")
 	if !m.transportInstance.IsConnected() {
-		if err := m.transportInstance.Connect(); err != nil {
+		if err := m.transportInstance.ConnectWithContext(ctx); err != nil {
 			return err
 		}
 	} else {


[plc4x] 01/04: feat(plc4go/spi): added new ConnectWithContext to transport instance

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit de4fab5f1fe2dfcf2b7a432081c47e80c5d190fe
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Aug 17 13:18:29 2022 +0200

    feat(plc4go/spi): added new ConnectWithContext to transport instance
---
 plc4go/spi/testutils/DriverTestRunner.go   |  9 +++-
 plc4go/spi/transports/TransportInstance.go | 67 +++++++++++++++++++++---------
 plc4go/spi/transports/pcap/Transport.go    | 11 ++++-
 plc4go/spi/transports/serial/Transport.go  | 11 ++++-
 plc4go/spi/transports/tcp/Transport.go     | 19 +++++++--
 plc4go/spi/transports/test/Transport.go    |  5 +++
 plc4go/spi/transports/udp/Transport.go     |  8 +++-
 plc4go/spi/utils/net.go                    | 25 ++++++++---
 8 files changed, 120 insertions(+), 35 deletions(-)

diff --git a/plc4go/spi/testutils/DriverTestRunner.go b/plc4go/spi/testutils/DriverTestRunner.go
index e257b110f..7604c3adb 100644
--- a/plc4go/spi/testutils/DriverTestRunner.go
+++ b/plc4go/spi/testutils/DriverTestRunner.go
@@ -74,6 +74,13 @@ func WithRootTypeParser(rootTypeParser func(utils.ReadBufferByteBased) (interfac
 	return withRootTypeParser{rootTypeParser: rootTypeParser}
 }
 
+type TestTransportInstance interface {
+	transports.TransportInstance
+	FillReadBuffer(data []uint8) error
+	GetNumDrainableBytes() uint32
+	DrainWriteBuffer(numBytes uint32) ([]uint8, error)
+}
+
 type withRootTypeParser struct {
 	option
 	rootTypeParser func(utils.ReadBufferByteBased) (interface{}, error)
@@ -136,7 +143,7 @@ func (m DriverTestsuite) ExecuteStep(connection plc4go.PlcConnection, testcase *
 	if !ok {
 		return errors.New("couldn't access connections transport instance")
 	}
-	testTransportInstance, ok := mc.GetTransportInstance().(transports.TestTransportInstance)
+	testTransportInstance, ok := mc.GetTransportInstance().(TestTransportInstance)
 	if !ok {
 		return errors.New("transport must be of type TestTransport")
 	}
diff --git a/plc4go/spi/transports/TransportInstance.go b/plc4go/spi/transports/TransportInstance.go
index 061f0330f..973557883 100644
--- a/plc4go/spi/transports/TransportInstance.go
+++ b/plc4go/spi/transports/TransportInstance.go
@@ -21,11 +21,13 @@ package transports
 
 import (
 	"bufio"
+	"context"
 	"github.com/pkg/errors"
 )
 
 type TransportInstance interface {
 	Connect() error
+	ConnectWithContext(ctx context.Context) error
 	Close() error
 
 	IsConnected() bool
@@ -40,27 +42,52 @@ type TransportInstance interface {
 	Write(data []uint8) error
 }
 
-type TestTransportInstance interface {
-	TransportInstance
-	FillReadBuffer(data []uint8) error
-	GetNumDrainableBytes() uint32
-	DrainWriteBuffer(numBytes uint32) ([]uint8, error)
+type DefaultBufferedTransportInstanceRequirements interface {
+	GetReader() *bufio.Reader
+	Connect() error
+}
+
+type DefaultBufferedTransportInstance interface {
+	ConnectWithContext(ctx context.Context) error
+	GetNumBytesAvailableInBuffer() (uint32, error)
+	FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error
+	PeekReadableBytes(numBytes uint32) ([]uint8, error)
+	Read(numBytes uint32) ([]uint8, error)
+}
+
+func NewDefaultBufferedTransportInstance(defaultBufferedTransportInstanceRequirements DefaultBufferedTransportInstanceRequirements) DefaultBufferedTransportInstance {
+	return &defaultBufferedTransportInstance{defaultBufferedTransportInstanceRequirements}
 }
 
-type DefaultBufferedTransportInstance struct {
-	*bufio.Reader
+type defaultBufferedTransportInstance struct {
+	DefaultBufferedTransportInstanceRequirements
+}
+
+// ConnectWithContext is a compatibility implementation for those transports not implementing this function
+func (m *defaultBufferedTransportInstance) ConnectWithContext(ctx context.Context) error {
+	ch := make(chan error, 1)
+	go func() {
+		ch <- m.Connect()
+		close(ch)
+	}()
+	select {
+	case err := <-ch:
+		return err
+	case <-ctx.Done():
+		return ctx.Err()
+	}
 }
 
-func (m *DefaultBufferedTransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
-	if m.Reader == nil {
+func (m *defaultBufferedTransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
+	if m.GetReader() == nil {
 		return 0, nil
 	}
-	_, _ = m.Peek(1)
-	return uint32(m.Buffered()), nil
+	_, _ = m.GetReader().Peek(1)
+	return uint32(m.GetReader().Buffered()), nil
 }
 
-func (m *DefaultBufferedTransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
-	if m.Reader == nil {
+func (m *defaultBufferedTransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
+	if m.GetReader() == nil {
 		return nil
 	}
 	nBytes := uint32(1)
@@ -69,27 +96,27 @@ func (m *DefaultBufferedTransportInstance) FillBuffer(until func(pos uint, curre
 		if err != nil {
 			return errors.Wrap(err, "Error while peeking")
 		}
-		if keepGoing := until(uint(nBytes-1), bytes[len(bytes)-1], m.Reader); !keepGoing {
+		if keepGoing := until(uint(nBytes-1), bytes[len(bytes)-1], m.GetReader()); !keepGoing {
 			return nil
 		}
 		nBytes++
 	}
 }
 
-func (m *DefaultBufferedTransportInstance) PeekReadableBytes(numBytes uint32) ([]uint8, error) {
-	if m.Reader == nil {
+func (m *defaultBufferedTransportInstance) PeekReadableBytes(numBytes uint32) ([]uint8, error) {
+	if m.GetReader() == nil {
 		return nil, errors.New("error peeking from transport. No reader available")
 	}
-	return m.Peek(int(numBytes))
+	return m.GetReader().Peek(int(numBytes))
 }
 
-func (m *DefaultBufferedTransportInstance) Read(numBytes uint32) ([]uint8, error) {
-	if m.Reader == nil {
+func (m *defaultBufferedTransportInstance) Read(numBytes uint32) ([]uint8, error) {
+	if m.GetReader() == nil {
 		return nil, errors.New("error reading from transport. No reader available")
 	}
 	data := make([]uint8, numBytes)
 	for i := uint32(0); i < numBytes; i++ {
-		val, err := m.ReadByte()
+		val, err := m.GetReader().ReadByte()
 		if err != nil {
 			return nil, errors.Wrap(err, "error reading")
 		}
diff --git a/plc4go/spi/transports/pcap/Transport.go b/plc4go/spi/transports/pcap/Transport.go
index 2c5181b84..bdeea2d7d 100644
--- a/plc4go/spi/transports/pcap/Transport.go
+++ b/plc4go/spi/transports/pcap/Transport.go
@@ -88,16 +88,19 @@ type TransportInstance struct {
 	transport     *Transport
 	handle        *pcap.Handle
 	mutex         sync.Mutex
+	reader        *bufio.Reader
 }
 
 func NewPcapTransportInstance(transportFile string, transportType TransportType, portRange string, speedFactor float32, transport *Transport) *TransportInstance {
-	return &TransportInstance{
+	transportInstance := &TransportInstance{
 		transportFile: transportFile,
 		transportType: transportType,
 		portRange:     portRange,
 		speedFactor:   speedFactor,
 		transport:     transport,
 	}
+	transportInstance.DefaultBufferedTransportInstance = transports.NewDefaultBufferedTransportInstance(transportInstance)
+	return transportInstance
 }
 
 func (m *TransportInstance) Connect() error {
@@ -118,7 +121,7 @@ func (m *TransportInstance) Connect() error {
 	m.handle = handle
 	m.connected = true
 	buffer := new(bytes.Buffer)
-	m.Reader = bufio.NewReader(buffer)
+	m.reader = bufio.NewReader(buffer)
 
 	go func(m *TransportInstance, buffer *bytes.Buffer) {
 		packageCount := 0
@@ -186,3 +189,7 @@ func (m *TransportInstance) IsConnected() bool {
 func (m *TransportInstance) Write(_ []uint8) error {
 	panic("Write to pcap not supported")
 }
+
+func (m *TransportInstance) GetReader() *bufio.Reader {
+	return m.reader
+}
diff --git a/plc4go/spi/transports/serial/Transport.go b/plc4go/spi/transports/serial/Transport.go
index f7c046e57..1ccd68939 100644
--- a/plc4go/spi/transports/serial/Transport.go
+++ b/plc4go/spi/transports/serial/Transport.go
@@ -82,15 +82,18 @@ type TransportInstance struct {
 	ConnectTimeout uint32
 	transport      *Transport
 	serialPort     io.ReadWriteCloser
+	reader         *bufio.Reader
 }
 
 func NewTransportInstance(serialPortName string, baudRate uint, connectTimeout uint32, transport *Transport) *TransportInstance {
-	return &TransportInstance{
+	transportInstance := &TransportInstance{
 		SerialPortName: serialPortName,
 		BaudRate:       baudRate,
 		ConnectTimeout: connectTimeout,
 		transport:      transport,
 	}
+	transportInstance.DefaultBufferedTransportInstance = transports.NewDefaultBufferedTransportInstance(transportInstance)
+	return transportInstance
 }
 
 func (m *TransportInstance) Connect() error {
@@ -109,7 +112,7 @@ func (m *TransportInstance) Connect() error {
 		m.serialPort = utils.NewTransportLogger(m.serialPort, utils.WithLogger(fileLogger))
 		log.Trace().Msgf("Logging Transport to file %s", logFile.Name())
 	}*/
-	m.Reader = bufio.NewReader(m.serialPort)
+	m.reader = bufio.NewReader(m.serialPort)
 
 	return nil
 }
@@ -143,3 +146,7 @@ func (m *TransportInstance) Write(data []uint8) error {
 	}
 	return nil
 }
+
+func (m *TransportInstance) GetReader() *bufio.Reader {
+	return m.reader
+}
diff --git a/plc4go/spi/transports/tcp/Transport.go b/plc4go/spi/transports/tcp/Transport.go
index c8b0e8773..caf5cf147 100644
--- a/plc4go/spi/transports/tcp/Transport.go
+++ b/plc4go/spi/transports/tcp/Transport.go
@@ -21,6 +21,7 @@ package tcp
 
 import (
 	"bufio"
+	"context"
 	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/spi/utils"
@@ -100,26 +101,34 @@ type TransportInstance struct {
 	ConnectTimeout uint32
 	transport      *Transport
 	tcpConn        net.Conn
+	reader         *bufio.Reader
 }
 
 func NewTcpTransportInstance(remoteAddress *net.TCPAddr, connectTimeout uint32, transport *Transport) *TransportInstance {
-	return &TransportInstance{
+	transportInstance := &TransportInstance{
 		RemoteAddress:  remoteAddress,
 		ConnectTimeout: connectTimeout,
 		transport:      transport,
 	}
+	transportInstance.DefaultBufferedTransportInstance = transports.NewDefaultBufferedTransportInstance(transportInstance)
+	return transportInstance
 }
 
 func (m *TransportInstance) Connect() error {
+	return m.ConnectWithContext(context.Background())
+}
+
+func (m *TransportInstance) ConnectWithContext(ctx context.Context) error {
 	var err error
-	m.tcpConn, err = net.Dial("tcp", m.RemoteAddress.String())
+	var d net.Dialer
+	m.tcpConn, err = d.DialContext(ctx, "tcp", m.RemoteAddress.String())
 	if err != nil {
 		return errors.Wrap(err, "error connecting to remote address")
 	}
 
 	m.LocalAddress = m.tcpConn.LocalAddr().(*net.TCPAddr)
 
-	m.Reader = bufio.NewReader(m.tcpConn)
+	m.reader = bufio.NewReader(m.tcpConn)
 
 	return nil
 }
@@ -153,3 +162,7 @@ func (m *TransportInstance) Write(data []uint8) error {
 	}
 	return nil
 }
+
+func (m *TransportInstance) GetReader() *bufio.Reader {
+	return m.reader
+}
diff --git a/plc4go/spi/transports/test/Transport.go b/plc4go/spi/transports/test/Transport.go
index 65b041752..ab326b819 100644
--- a/plc4go/spi/transports/test/Transport.go
+++ b/plc4go/spi/transports/test/Transport.go
@@ -22,6 +22,7 @@ package test
 import (
 	"bufio"
 	"bytes"
+	"context"
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
@@ -70,6 +71,10 @@ func (m *TransportInstance) Connect() error {
 	return nil
 }
 
+func (m *TransportInstance) ConnectWithContext(_ context.Context) error {
+	return m.Connect()
+}
+
 func (m *TransportInstance) Close() error {
 	log.Trace().Msg("Close")
 	m.connected = false
diff --git a/plc4go/spi/transports/udp/Transport.go b/plc4go/spi/transports/udp/Transport.go
index 001fa381e..337f9f16a 100644
--- a/plc4go/spi/transports/udp/Transport.go
+++ b/plc4go/spi/transports/udp/Transport.go
@@ -21,6 +21,7 @@ package udp
 
 import (
 	"bufio"
+	"context"
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/libp2p/go-reuseport"
@@ -127,10 +128,15 @@ func NewTransportInstance(localAddress *net.UDPAddr, remoteAddress *net.UDPAddr,
 }
 
 func (m *TransportInstance) Connect() error {
+	return m.ConnectWithContext(context.Background())
+}
+
+func (m *TransportInstance) ConnectWithContext(ctx context.Context) error {
 	// If we haven't provided a local address, have the system figure it out by dialing
 	// the remote address and then using that connections local address as local address.
 	if m.LocalAddress == nil {
-		udpTest, err := net.Dial("udp", m.RemoteAddress.String())
+		var d net.Dialer
+		udpTest, err := d.DialContext(ctx, "udp", m.RemoteAddress.String())
 		if err != nil {
 			return errors.Wrap(err, "error connecting to remote address")
 		}
diff --git a/plc4go/spi/utils/net.go b/plc4go/spi/utils/net.go
index f05a9dc4c..476d08bb4 100644
--- a/plc4go/spi/utils/net.go
+++ b/plc4go/spi/utils/net.go
@@ -23,6 +23,7 @@ import (
 	"bytes"
 	"context"
 	"net"
+	"sync"
 	"time"
 
 	"github.com/google/gopacket"
@@ -32,13 +33,14 @@ import (
 	"github.com/rs/zerolog/log"
 )
 
-func GetIPAddresses(ctx context.Context, netInterface net.Interface, useArpBasedScan bool) (chan net.IP, error) {
-	foundIps := make(chan net.IP, 65536)
+func GetIPAddresses(ctx context.Context, netInterface net.Interface, useArpBasedScan bool) (foundIps chan net.IP, err error) {
+	foundIps = make(chan net.IP, 65536)
 	addrs, err := netInterface.Addrs()
 	if err != nil {
 		return nil, errors.Wrap(err, "Error getting addresses")
 	}
 	go func() {
+		wg := &sync.WaitGroup{}
 		for _, address := range addrs {
 			// Check if context has been cancelled before continuing
 			select {
@@ -64,17 +66,20 @@ func GetIPAddresses(ctx context.Context, netInterface net.Interface, useArpBased
 
 			log.Debug().Stringer("IP", ipnet.IP).Stringer("Mask", ipnet.Mask).Msg("Expanding local subnet")
 			if useArpBasedScan {
-				if err := lockupIpsUsingArp(ctx, netInterface, ipnet, foundIps); err != nil {
+				if err := lockupIpsUsingArp(ctx, netInterface, ipnet, foundIps, wg); err != nil {
 					log.Error().Err(err).Msg("failing to resolve using arp scan. Falling back to ip based scan")
 					useArpBasedScan = false
 				}
 			}
 			if !useArpBasedScan {
-				if err := lookupIps(ctx, ipnet, foundIps); err != nil {
+				if err := lookupIps(ctx, ipnet, foundIps, wg); err != nil {
 					log.Error().Err(err).Msg("error looking up ips")
 				}
 			}
 		}
+		wg.Wait()
+		log.Trace().Msg("Closing found ips channel")
+		close(foundIps)
 	}()
 	return foundIps, nil
 }
@@ -82,7 +87,10 @@ func GetIPAddresses(ctx context.Context, netInterface net.Interface, useArpBased
 // As PING operations might be blocked by a firewall, responding to ARP packets is mandatory for IP based
 // systems. So we are using an ARP scan to resolve the ethernet hardware addresses of each possible ip in range
 // Only for devices that respond will we schedule a discovery.
-func lockupIpsUsingArp(ctx context.Context, netInterface net.Interface, ipNet *net.IPNet, foundIps chan net.IP) error {
+func lockupIpsUsingArp(ctx context.Context, netInterface net.Interface, ipNet *net.IPNet, foundIps chan net.IP, wg *sync.WaitGroup) error {
+	// We add on signal for error handling
+	wg.Add(1)
+	go func() { wg.Done() }()
 	log.Debug().Msgf("Scanning for alive IP addresses for interface '%s' and net: %s", netInterface.Name, ipNet)
 	// First find the pcap device name for the given interface.
 	allDevs, _ := pcap.FindAllDevs()
@@ -108,6 +116,8 @@ func lockupIpsUsingArp(ctx context.Context, netInterface net.Interface, ipNet *n
 
 	// Start up a goroutine to read in packet data.
 	stop := make(chan struct{})
+	// As we don't know how much the handler will find we use a value of 1 and set that to done after the 10 sec in the cleanup function directly after
+	wg.Add(1)
 	// Handler for processing incoming ARP responses.
 	go func(handle *pcap.Handle, iface net.Interface, stop chan struct{}) {
 		src := gopacket.NewPacketSource(handle, layers.LayerTypeEthernet)
@@ -146,6 +156,7 @@ func lockupIpsUsingArp(ctx context.Context, netInterface net.Interface, ipNet *n
 	// Make sure we clean up after 10 seconds.
 	defer func() {
 		go func() {
+			wg.Done()
 			time.Sleep(10 * time.Second)
 			handle.Close()
 			close(stop)
@@ -202,7 +213,7 @@ func lockupIpsUsingArp(ctx context.Context, netInterface net.Interface, ipNet *n
 }
 
 // Simply takes the IP address and the netmask and schedules one discovery task for every possible IP
-func lookupIps(ctx context.Context, ipnet *net.IPNet, foundIps chan net.IP) error {
+func lookupIps(ctx context.Context, ipnet *net.IPNet, foundIps chan net.IP, wg *sync.WaitGroup) error {
 	log.Debug().Msgf("Scanning all IP addresses for network: %s", ipnet)
 	// expand CIDR-block into one target for each IP
 	// Remark: The last IP address a network contains is a special broadcast address. We don't want to check that one.
@@ -214,7 +225,9 @@ func lookupIps(ctx context.Context, ipnet *net.IPNet, foundIps chan net.IP) erro
 		default:
 		}
 
+		wg.Add(1)
 		go func(ip net.IP) {
+			defer func() { wg.Done() }()
 			select {
 			case <-ctx.Done():
 			case foundIps <- ip:


[plc4x] 03/04: feat(plc4go/knxnetip): use context for discovery

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 04fa66d532cedf17c6a9dc518c5ef2b4bd1a38f9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Aug 17 13:22:49 2022 +0200

    feat(plc4go/knxnetip): use context for discovery
---
 plc4go/internal/knxnetip/Discoverer.go | 124 +++++++++++++++++----------------
 1 file changed, 63 insertions(+), 61 deletions(-)

diff --git a/plc4go/internal/knxnetip/Discoverer.go b/plc4go/internal/knxnetip/Discoverer.go
index 4e7f776cc..7c435bca2 100644
--- a/plc4go/internal/knxnetip/Discoverer.go
+++ b/plc4go/internal/knxnetip/Discoverer.go
@@ -111,7 +111,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 				if err != nil {
 					return err
 				}
-				err = transportInstance.Connect()
+				err = transportInstance.ConnectWithContext(ctx)
 				if err != nil {
 					continue
 				}
@@ -121,72 +121,74 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 		}
 	}
 
-	if len(tranportInstances) > 0 {
-		for _, transportInstance := range tranportInstances {
-			// Create a codec for sending and receiving messages.
-			codec := NewMessageCodec(transportInstance, nil)
-			// Explicitly start the worker
-			if err := codec.Connect(); err != nil {
-				return errors.Wrap(err, "Error connecting")
-			}
+	if len(tranportInstances) <= 0 {
+		return nil
+	}
 
-			// Cast to the UDP transport instance so we can access information on the local port.
-			udpTransportInstance, ok := transportInstance.(*udp.TransportInstance)
-			if !ok {
-				return errors.New("couldn't cast transport instance to UDP transport instance")
-			}
-			localAddress := udpTransportInstance.LocalAddress
-			localAddr := driverModel.NewIPAddress(localAddress.IP)
-
-			// Prepare the discovery packet data
-			discoveryEndpoint := driverModel.NewHPAIDiscoveryEndpoint(
-				driverModel.HostProtocolCode_IPV4_UDP, localAddr, uint16(localAddress.Port))
-			searchRequestMessage := driverModel.NewSearchRequest(discoveryEndpoint)
-			// Send the search request.
-			err = codec.Send(searchRequestMessage)
-			go func() {
-				// Keep on reading responses till the timeout is done.
-				// TODO: Make this configurable
-				timeout := time.NewTimer(time.Second * 1)
-				timeout.Stop()
-				for start := time.Now(); time.Since(start) < time.Second*5; {
-					timeout.Reset(time.Second * 1)
-					select {
-					case message := <-codec.GetDefaultIncomingMessageChannel():
-						{
-							if !timeout.Stop() {
-								<-timeout.C
+	for _, transportInstance := range tranportInstances {
+		// Create a codec for sending and receiving messages.
+		codec := NewMessageCodec(transportInstance, nil)
+		// Explicitly start the worker
+		if err := codec.Connect(); err != nil {
+			return errors.Wrap(err, "Error connecting")
+		}
+
+		// Cast to the UDP transport instance so we can access information on the local port.
+		udpTransportInstance, ok := transportInstance.(*udp.TransportInstance)
+		if !ok {
+			return errors.New("couldn't cast transport instance to UDP transport instance")
+		}
+		localAddress := udpTransportInstance.LocalAddress
+		localAddr := driverModel.NewIPAddress(localAddress.IP)
+
+		// Prepare the discovery packet data
+		discoveryEndpoint := driverModel.NewHPAIDiscoveryEndpoint(
+			driverModel.HostProtocolCode_IPV4_UDP, localAddr, uint16(localAddress.Port))
+		searchRequestMessage := driverModel.NewSearchRequest(discoveryEndpoint)
+		// Send the search request.
+		err = codec.Send(searchRequestMessage)
+		go func() {
+			// Keep on reading responses till the timeout is done.
+			// TODO: Make this configurable
+			timeout := time.NewTimer(time.Second * 1)
+			timeout.Stop()
+			for start := time.Now(); time.Since(start) < time.Second*5; {
+				timeout.Reset(time.Second * 1)
+				select {
+				case message := <-codec.GetDefaultIncomingMessageChannel():
+					{
+						if !timeout.Stop() {
+							<-timeout.C
+						}
+						searchResponse := message.(driverModel.SearchResponse)
+						if searchResponse != nil {
+							addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr()
+							remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d",
+								uint8(addr[0]), uint8(addr[1]), uint8(addr[2]), uint8(addr[3]), searchResponse.GetHpaiControlEndpoint().GetIpPort()))
+							if err != nil {
+								continue
 							}
-							searchResponse := message.(driverModel.SearchResponse)
-							if searchResponse != nil {
-								addr := searchResponse.GetHpaiControlEndpoint().GetIpAddress().GetAddr()
-								remoteUrl, err := url.Parse(fmt.Sprintf("udp://%d.%d.%d.%d:%d",
-									uint8(addr[0]), uint8(addr[1]), uint8(addr[2]), uint8(addr[3]), searchResponse.GetHpaiControlEndpoint().GetIpPort()))
-								if err != nil {
-									continue
-								}
-								deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00"))
-								discoveryEvent := &internalModel.DefaultPlcDiscoveryEvent{
-									ProtocolCode:  "knxnet-ip",
-									TransportCode: "udp",
-									TransportUrl:  *remoteUrl,
-									Options:       nil,
-									Name:          deviceName,
-								}
-								// Pass the event back to the callback
-								callback(discoveryEvent)
+							deviceName := string(bytes.Trim(searchResponse.GetDibDeviceInfo().GetDeviceFriendlyName(), "\x00"))
+							discoveryEvent := &internalModel.DefaultPlcDiscoveryEvent{
+								ProtocolCode:  "knxnet-ip",
+								TransportCode: "udp",
+								TransportUrl:  *remoteUrl,
+								Options:       nil,
+								Name:          deviceName,
 							}
-							continue
-						}
-					case <-timeout.C:
-						{
-							timeout.Stop()
-							continue
+							// Pass the event back to the callback
+							callback(discoveryEvent)
 						}
+						continue
+					}
+				case <-timeout.C:
+					{
+						timeout.Stop()
+						continue
 					}
 				}
-			}()
-		}
+			}
+		}()
 	}
 	return nil
 }


[plc4x] 04/04: fix(plc4go/cbus): avoid channel leak by adding wait groups

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit ce2db6b314c397ffa092aa4046941a6813d3a798
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Aug 17 13:23:19 2022 +0200

    fix(plc4go/cbus): avoid channel leak by adding wait groups
---
 plc4go/internal/cbus/Discoverer.go                 | 207 ++++++++++++---------
 .../tests/drivers/tests/manual_cbus_driver_test.go |  21 +++
 2 files changed, 135 insertions(+), 93 deletions(-)

diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
index 731c2b4a9..3ef576304 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -25,6 +25,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/transports/tcp"
 	"net"
 	"net/url"
+	"sync"
 	"time"
 
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -73,13 +74,16 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 	}
 
 	transportInstances := make(chan transports.TransportInstance)
+	wg := &sync.WaitGroup{}
 	// Iterate over all network devices of this system.
 	for _, netInterface := range interfaces {
 		addrs, err := netInterface.Addrs()
 		if err != nil {
 			return err
 		}
+		wg.Add(1)
 		go func(netInterface net.Interface) {
+			defer func() { wg.Done() }()
 			// Iterate over all addresses the current interface has configured
 			// For KNX we're only interested in IPv4 addresses, as it doesn't
 			// seem to work with IPv6.
@@ -106,24 +110,34 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 					log.Warn().Err(err).Msgf("Can't get addresses for %v", netInterface)
 					continue
 				}
+				wg.Add(1)
 				go func() {
+					defer func() { wg.Done() }()
 					for ip := range addresses {
+						log.Trace().Msgf("Handling found ip %v", ip)
+						wg.Add(1)
 						go func(ip net.IP) {
+							defer func() { wg.Done() }()
 							// Create a new "connection" (Actually open a local udp socket and target outgoing packets to that address)
-							connectionUrl, err := url.Parse(fmt.Sprintf("tcp://%s:%d", ip, readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT))
-							if err != nil {
-								log.Error().Err(err).Msgf("Error parsing url for lookup")
-								return
+							var connectionUrl url.URL
+							{
+								connectionUrlParsed, err := url.Parse(fmt.Sprintf("tcp://%s:%d", ip, readWriteModel.CBusConstants_CBUSTCPDEFAULTPORT))
+								if err != nil {
+									log.Error().Err(err).Msgf("Error parsing url for lookup")
+									return
+								}
+								connectionUrl = *connectionUrlParsed
 							}
-							transportInstance, err := tcpTransport.CreateTransportInstance(*connectionUrl, nil)
+
+							transportInstance, err := tcpTransport.CreateTransportInstance(connectionUrl, nil)
 							if err != nil {
 								log.Error().Err(err).Msgf("Error creating transport instance")
 								return
 							}
-							log.Trace().Msgf("trying %s", connectionUrl)
-							err = transportInstance.Connect()
+							log.Trace().Msgf("trying %v", connectionUrl)
+							err = transportInstance.ConnectWithContext(ctx)
 							if err != nil {
-								secondErr := transportInstance.Connect()
+								secondErr := transportInstance.ConnectWithContext(ctx)
 								if secondErr != nil {
 									log.Trace().Err(err).Msgf("Error connecting transport instance")
 									return
@@ -137,99 +151,106 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 			}
 		}(netInterface)
 	}
-
 	go func() {
-		for transportInstance := range transportInstances {
-			tcpTransportInstance := transportInstance.(*tcp.TransportInstance)
-			// Create a codec for sending and receiving messages.
-			codec := NewMessageCodec(transportInstance)
-			// Explicitly start the worker
-			if err := codec.Connect(); err != nil {
-				log.Debug().Err(err).Msg("Error connecting")
-				continue
-			}
+		wg.Wait()
+		log.Trace().Msg("Closing transport instance channel")
+		close(transportInstances)
+	}()
 
-			// Prepare the discovery packet data
-			cBusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, true)
-			requestContext := readWriteModel.NewRequestContext(false)
-			calData := readWriteModel.NewCALDataIdentify(readWriteModel.Attribute_Manufacturer, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, requestContext)
-			alpha := readWriteModel.NewAlpha('x')
-			request := readWriteModel.NewRequestDirectCommandAccess(calData, alpha, 0x0, nil, nil, readWriteModel.RequestType_DIRECT_COMMAND, readWriteModel.NewRequestTermination(), cBusOptions)
-			cBusMessageToServer := readWriteModel.NewCBusMessageToServer(request, requestContext, cBusOptions)
-			// Send the search request.
-			err = codec.Send(cBusMessageToServer)
-			go func() {
-				// Keep on reading responses till the timeout is done.
-				// TODO: Make this configurable
-				timeout := time.NewTimer(time.Second * 1)
-				timeout.Stop()
-				for start := time.Now(); time.Since(start) < time.Second*5; {
-					timeout.Reset(time.Second * 1)
-					select {
-					case receivedMessage := <-codec.GetDefaultIncomingMessageChannel():
-						if !timeout.Stop() {
-							<-timeout.C
-						}
-						cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessage)
-						if !ok {
-							continue
-						}
-						messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClient)
-						if !ok {
-							continue
-						}
-						replyOrConfirmationConfirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
-						if !ok {
-							continue
-						}
-						if receivedAlpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha(); receivedAlpha != nil && alpha.GetCharacter() != receivedAlpha.GetCharacter() {
-							continue
-						}
-						embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
-						if !ok {
-							continue
-						}
-						encodedReply, ok := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReplyExactly)
-						if !ok {
-							continue
-						}
-						encodedReplyCALReply, ok := encodedReply.GetEncodedReply().(readWriteModel.EncodedReplyCALReplyExactly)
-						if !ok {
-							continue
-						}
-						calDataIdentifyReply, ok := encodedReplyCALReply.GetCalReply().GetCalData().(readWriteModel.CALDataIdentifyReplyExactly)
-						if !ok {
-							continue
-						}
-						identifyReplyCommand, ok := calDataIdentifyReply.GetIdentifyReplyCommand().(readWriteModel.IdentifyReplyCommandManufacturerExactly)
-						if !ok {
-							continue
-						}
+	for transportInstance := range transportInstances {
+		tcpTransportInstance := transportInstance.(*tcp.TransportInstance)
+		// Create a codec for sending and receiving messages.
+		codec := NewMessageCodec(transportInstance)
+		// Explicitly start the worker
+		if err := codec.Connect(); err != nil {
+			log.Debug().Err(err).Msg("Error connecting")
+			continue
+		}
+
+		// Prepare the discovery packet data
+		cBusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, true)
+		requestContext := readWriteModel.NewRequestContext(false)
+		calData := readWriteModel.NewCALDataIdentify(readWriteModel.Attribute_Manufacturer, readWriteModel.CALCommandTypeContainer_CALCommandIdentify, nil, requestContext)
+		alpha := readWriteModel.NewAlpha('x')
+		request := readWriteModel.NewRequestDirectCommandAccess(calData, alpha, 0x0, nil, nil, readWriteModel.RequestType_DIRECT_COMMAND, readWriteModel.NewRequestTermination(), cBusOptions)
+		cBusMessageToServer := readWriteModel.NewCBusMessageToServer(request, requestContext, cBusOptions)
+		// Send the search request.
+		err = codec.Send(cBusMessageToServer)
+		go func() {
+			// Keep on reading responses till the timeout is done.
+			// TODO: Make this configurable
+			timeout := time.NewTimer(time.Second * 1)
+			timeout.Stop()
+			for start := time.Now(); time.Since(start) < time.Second*5; {
+				timeout.Reset(time.Second * 1)
+				select {
+				case receivedMessage := <-codec.GetDefaultIncomingMessageChannel():
+					if !timeout.Stop() {
+						<-timeout.C
+					}
+					cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessage)
+					if !ok {
+						continue
+					}
+					messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClient)
+					if !ok {
+						continue
+					}
+					replyOrConfirmationConfirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+					if !ok {
+						continue
+					}
+					if receivedAlpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha(); receivedAlpha != nil && alpha.GetCharacter() != receivedAlpha.GetCharacter() {
+						continue
+					}
+					embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+					if !ok {
+						continue
+					}
+					encodedReply, ok := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReplyExactly)
+					if !ok {
+						continue
+					}
+					encodedReplyCALReply, ok := encodedReply.GetEncodedReply().(readWriteModel.EncodedReplyCALReplyExactly)
+					if !ok {
+						continue
+					}
+					calDataIdentifyReply, ok := encodedReplyCALReply.GetCalReply().GetCalData().(readWriteModel.CALDataIdentifyReplyExactly)
+					if !ok {
+						continue
+					}
+					identifyReplyCommand, ok := calDataIdentifyReply.GetIdentifyReplyCommand().(readWriteModel.IdentifyReplyCommandManufacturerExactly)
+					if !ok {
+						continue
+					}
+					var remoteUrl url.URL
+					{
 						// TODO: we could check for the exact reponse
-						remoteUrl, err := url.Parse(fmt.Sprintf("tcp://%s", tcpTransportInstance.RemoteAddress))
+						remoteUrlParse, err := url.Parse(fmt.Sprintf("tcp://%s", tcpTransportInstance.RemoteAddress))
 						if err != nil {
 							log.Error().Err(err).Msg("Error creating url")
 							continue
 						}
-						// TODO: manufaturer + type would be good but this means two requests then
-						deviceName := identifyReplyCommand.GetManufacturerName()
-						discoveryEvent := &internalModel.DefaultPlcDiscoveryEvent{
-							ProtocolCode:  "c-bus",
-							TransportCode: "tcp",
-							TransportUrl:  *remoteUrl,
-							Options:       nil,
-							Name:          deviceName,
-						}
-						// Pass the event back to the callback
-						callback(discoveryEvent)
-						continue
-					case <-timeout.C:
-						timeout.Stop()
-						continue
+						remoteUrl = *remoteUrlParse
 					}
+					// TODO: manufaturer + type would be good but this means two requests then
+					deviceName := identifyReplyCommand.GetManufacturerName()
+					discoveryEvent := &internalModel.DefaultPlcDiscoveryEvent{
+						ProtocolCode:  "c-bus",
+						TransportCode: "tcp",
+						TransportUrl:  remoteUrl,
+						Options:       nil,
+						Name:          deviceName,
+					}
+					// Pass the event back to the callback
+					callback(discoveryEvent)
+					continue
+				case <-timeout.C:
+					timeout.Stop()
+					continue
 				}
-			}()
-		}
-	}()
+			}
+		}()
+	}
 	return nil
 }
diff --git a/plc4go/tests/drivers/tests/manual_cbus_driver_test.go b/plc4go/tests/drivers/tests/manual_cbus_driver_test.go
index 98fcadc37..798111bf7 100644
--- a/plc4go/tests/drivers/tests/manual_cbus_driver_test.go
+++ b/plc4go/tests/drivers/tests/manual_cbus_driver_test.go
@@ -158,3 +158,24 @@ func TestManualCBusRead(t *testing.T) {
 	readRequestResult := <-readRequest.Execute()
 	fmt.Printf("%s", readRequestResult.GetResponse())
 }
+
+func TestManualDiscovery(t *testing.T) {
+	log.Logger = log.
+		With().Caller().Logger().
+		Output(zerolog.ConsoleWriter{Out: os.Stderr}).
+		Level(zerolog.TraceLevel)
+	config.TraceTransactionManagerWorkers = false
+	config.TraceTransactionManagerTransactions = false
+	config.TraceDefaultMessageCodecWorker = false
+	t.Skip()
+
+	driverManager := plc4go.NewPlcDriverManager()
+	driver := cbus.NewDriver()
+	driverManager.RegisterDriver(driver)
+	transports.RegisterTcpTransport(driverManager)
+	err := driver.Discover(func(event model.PlcDiscoveryEvent) {
+		println(event.(fmt.Stringer).String())
+	})
+	require.NoError(t, err)
+
+}