You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/19 11:02:09 UTC

[plc4x] branch develop updated (12790c1100 -> 6dac19d75e)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from 12790c1100 build(deps): bump nifi-nar-maven-plugin from 1.5.0 to 1.5.1 (#1003)
     new 89d210f4dd feat(plc4go/spi): allow overriding of options by always use the last option found.
     new ddda58bb3e fix(plc4go/spi): multierror only returns a error if it has an error
     new 9a21a7e03e feat(plc4go/spi): added env test util option for traceExecutorWorkers
     new 7727bc814b test(plc4go/cbus): fixed possible concurrency issues in test
     new 6dac19d75e test(plc4go/cbus): shutdown workers of discoverer once done

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/cbus/CBusMessageMapper_test.go     | 152 +++++++++------------
 plc4go/internal/cbus/Discoverer.go                 |  21 ++-
 plc4go/internal/cbus/Discoverer_test.go            |  14 +-
 plc4go/internal/cbus/Driver_test.go                |  35 +++--
 plc4go/internal/cbus/Reader.go                     |   1 +
 plc4go/internal/cbus/Reader_test.go                |  70 +++++++---
 plc4go/spi/options/Option.go                       |  36 ++---
 plc4go/spi/testutils/TestUtils.go                  |  14 +-
 .../spi/transactions/RequestTransactionManager.go  |   1 +
 plc4go/spi/utils/Errors.go                         |   3 +
 plc4go/spi/utils/Errors_test.go                    |   2 +-
 11 files changed, 205 insertions(+), 144 deletions(-)


[plc4x] 02/05: fix(plc4go/spi): multierror only returns a error if it has an error

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit ddda58bb3e4b9d0d8a188a1b534d64a17c4a37d0
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 11:58:33 2023 +0200

    fix(plc4go/spi): multierror only returns a error if it has an error
---
 plc4go/spi/utils/Errors.go      | 3 +++
 plc4go/spi/utils/Errors_test.go | 2 +-
 2 files changed, 4 insertions(+), 1 deletion(-)

diff --git a/plc4go/spi/utils/Errors.go b/plc4go/spi/utils/Errors.go
index 697dffaeee..ec7ff5e4bd 100644
--- a/plc4go/spi/utils/Errors.go
+++ b/plc4go/spi/utils/Errors.go
@@ -39,6 +39,9 @@ type MultiError struct {
 }
 
 func (m MultiError) Error() string {
+	if m.MainError == nil && len(m.Errors) == 0 {
+		return ""
+	}
 	mainErrorText := "Child errors:\n"
 	if m.MainError != nil {
 		mainErrorText = fmt.Sprintf("Main Error: %v\nChild errors:\n", m.MainError)
diff --git a/plc4go/spi/utils/Errors_test.go b/plc4go/spi/utils/Errors_test.go
index 5fa3d28f0f..cd4a2e7bc8 100644
--- a/plc4go/spi/utils/Errors_test.go
+++ b/plc4go/spi/utils/Errors_test.go
@@ -38,7 +38,7 @@ func TestMultiError_Error(t *testing.T) {
 	}{
 		{
 			name: "empty multi error",
-			want: "Child errors:\nNo errors",
+			want: "",
 		},
 		{
 			name: "some error",


[plc4x] 04/05: test(plc4go/cbus): fixed possible concurrency issues in test

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 7727bc814ba86c61c02f30c877c7651589b5f87d
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 13:01:32 2023 +0200

    test(plc4go/cbus): fixed possible concurrency issues in test
---
 plc4go/internal/cbus/CBusMessageMapper_test.go     | 152 +++++++++------------
 plc4go/internal/cbus/Discoverer_test.go            |  14 +-
 plc4go/internal/cbus/Driver_test.go                |  35 +++--
 plc4go/internal/cbus/Reader.go                     |   1 +
 plc4go/internal/cbus/Reader_test.go                |  70 +++++++---
 .../spi/transactions/RequestTransactionManager.go  |   1 +
 6 files changed, 153 insertions(+), 120 deletions(-)

diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go b/plc4go/internal/cbus/CBusMessageMapper_test.go
index aa9a8a90a0..1bacc4775b 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -1468,11 +1468,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1539,13 +1540,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1614,13 +1614,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1678,13 +1677,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1732,13 +1730,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1789,13 +1786,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1866,13 +1862,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1961,13 +1956,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2019,13 +2013,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2073,13 +2066,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2127,13 +2119,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2181,13 +2172,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2235,13 +2225,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2309,13 +2298,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2363,13 +2351,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2417,13 +2404,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2471,13 +2457,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2525,13 +2510,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2579,13 +2563,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2649,13 +2632,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2703,13 +2685,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2757,13 +2738,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index 9e8270929e..5ac2519d9f 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -94,8 +94,11 @@ func TestDiscoverer_Discover(t *testing.T) {
 				})
 				args.ctx = ctx
 
-				fields.transportInstanceCreationQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
-				fields.deviceScanningQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor := pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
+				t.Cleanup(executor.Stop)
+				fields.transportInstanceCreationQueue = executor
+				fields.deviceScanningQueue = executor
 				return nil
 			},
 			wantErr: assert.NoError,
@@ -118,8 +121,11 @@ func TestDiscoverer_Discover(t *testing.T) {
 					time.Sleep(1 * time.Second)
 				})
 				args.ctx = ctx
-				fields.transportInstanceCreationQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
-				fields.deviceScanningQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor := pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
+				t.Cleanup(executor.Stop)
+				fields.transportInstanceCreationQueue = executor
+				fields.deviceScanningQueue = executor
 				oldaddressProviderRetriever := addressProviderRetriever
 				addressProviderRetriever = func(log zerolog.Logger, _ []string) ([]addressProvider, error) {
 					loopbackInterface, err := nettest.LoopbackInterface()
diff --git a/plc4go/internal/cbus/Driver_test.go b/plc4go/internal/cbus/Driver_test.go
index ae01bb6f69..5d92be01f8 100644
--- a/plc4go/internal/cbus/Driver_test.go
+++ b/plc4go/internal/cbus/Driver_test.go
@@ -22,6 +22,10 @@ package cbus
 import (
 	"context"
 	"fmt"
+	"net/url"
+	"testing"
+	"time"
+
 	plc4go "github.com/apache/plc4x/plc4go/pkg/api"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
 	_default "github.com/apache/plc4x/plc4go/spi/default"
@@ -31,11 +35,9 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/spi/transports/test"
 	"github.com/apache/plc4x/plc4go/spi/utils"
+
 	"github.com/pkg/errors"
 	"github.com/stretchr/testify/assert"
-	"net/url"
-	"testing"
-	"time"
 )
 
 func TestDriver_DiscoverWithContext(t *testing.T) {
@@ -111,6 +113,7 @@ func TestDriver_GetConnectionWithContext(t *testing.T) {
 		name         string
 		fields       fields
 		args         args
+		setup        func(t *testing.T, fields *fields, args *args)
 		wantVerifier func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool
 	}{
 		{
@@ -153,13 +156,14 @@ func TestDriver_GetConnectionWithContext(t *testing.T) {
 				transportUrl: url.URL{
 					Scheme: "test",
 				},
-				transports: map[string]transports.Transport{
-					"test": test.NewTransport(),
-				},
+				transports: map[string]transports.Transport{},
 				options: map[string][]string{
 					"failTestTransport": {"yesSir"},
 				},
 			},
+			setup: func(t *testing.T, fields *fields, args *args) {
+				args.transports["test"] = test.NewTransport(testutils.EnrichOptionsWithOptionsForTesting(t)...)
+			},
 			wantVerifier: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool {
 				timeout := time.NewTimer(20 * time.Millisecond)
 				defer utils.CleanupTimer(timeout)
@@ -186,13 +190,14 @@ func TestDriver_GetConnectionWithContext(t *testing.T) {
 				transportUrl: url.URL{
 					Scheme: "test",
 				},
-				transports: map[string]transports.Transport{
-					"test": test.NewTransport(),
-				},
+				transports: map[string]transports.Transport{},
 				options: map[string][]string{
 					"MonitoredApplication1": {"pineapple"},
 				},
 			},
+			setup: func(t *testing.T, fields *fields, args *args) {
+				args.transports["test"] = test.NewTransport(testutils.EnrichOptionsWithOptionsForTesting(t)...)
+			},
 			wantVerifier: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool {
 				timeout := time.NewTimer(20 * time.Millisecond)
 				defer utils.CleanupTimer(timeout)
@@ -218,10 +223,11 @@ func TestDriver_GetConnectionWithContext(t *testing.T) {
 				transportUrl: url.URL{
 					Scheme: "test",
 				},
-				transports: map[string]transports.Transport{
-					"test": test.NewTransport(),
-				},
-				options: map[string][]string{},
+				transports: map[string]transports.Transport{},
+				options:    map[string][]string{},
+			},
+			setup: func(t *testing.T, fields *fields, args *args) {
+				args.transports["test"] = test.NewTransport(testutils.EnrichOptionsWithOptionsForTesting(t)...)
 			},
 			wantVerifier: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool {
 				timeout := time.NewTimer(20 * time.Millisecond)
@@ -240,6 +246,9 @@ func TestDriver_GetConnectionWithContext(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields, &tt.args)
+			}
 			m := &Driver{
 				DefaultDriver:           tt.fields.DefaultDriver,
 				awaitSetupComplete:      tt.fields.awaitSetupComplete,
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 70aa83016d..59fa3460e6 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -144,6 +144,7 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac
 		ttl = -time.Since(deadline)
 		m.log.Debug().Msgf("setting ttl to %s", ttl)
 	}
+	m.log.Trace().Msgf("sending with ctx %s", ctx)
 	if err := m.messageCodec.SendRequest(
 		ctx,
 		messageToSend,
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index 80506ca3ec..faf5822937 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -21,25 +21,29 @@ package cbus
 
 import (
 	"context"
+	"encoding/hex"
+	"net/url"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
 	apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
 	spiModel "github.com/apache/plc4x/plc4go/spi/model"
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/pool"
 	"github.com/apache/plc4x/plc4go/spi/testutils"
 	"github.com/apache/plc4x/plc4go/spi/transactions"
 	"github.com/apache/plc4x/plc4go/spi/transports/test"
 	"github.com/apache/plc4x/plc4go/spi/utils"
+
 	"github.com/pkg/errors"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
-	"net/url"
-	"strings"
-	"sync"
-	"sync/atomic"
-	"testing"
-	"time"
 )
 
 func TestNewReader(t *testing.T) {
@@ -180,6 +184,18 @@ func TestReader_readSync(t *testing.T) {
 			setup: func(t *testing.T, fields *fields) {
 				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
+				executor := pool.NewFixedSizeExecutor(5, 5)
+				executor.Start()
+				t.Cleanup(executor.Stop)
+				transactionManager := transactions.NewRequestTransactionManager(
+					10,
+					append(_options, transactions.WithCustomExecutor(executor))...,
+				)
+				t.Cleanup(func() {
+					assert.NoError(t, transactionManager.Close())
+				})
+				fields.tm = transactionManager
+
 				transport := test.NewTransport(_options...)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
@@ -189,11 +205,6 @@ func TestReader_readSync(t *testing.T) {
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
-				fields.messageCodec = codec
-				fields.tm = transactions.NewRequestTransactionManager(10, _options...)
-				t.Cleanup(func() {
-					assert.NoError(t, fields.tm.Close())
-				})
 			},
 			resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
 				timer := time.NewTimer(2 * time.Second)
@@ -254,11 +265,19 @@ func TestReader_readSync(t *testing.T) {
 			setup: func(t *testing.T, fields *fields) {
 				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
-				fields.tm = transactions.NewRequestTransactionManager(10, _options...)
+				executor := pool.NewFixedSizeExecutor(5, 5, _options...)
+				executor.Start()
+				t.Cleanup(executor.Stop)
+				transactionManager := transactions.NewRequestTransactionManager(
+					10,
+					append(_options, transactions.WithCustomExecutor(executor))...,
+				)
 				t.Cleanup(func() {
-					assert.NoError(t, fields.tm.Close())
+					assert.NoError(t, transactionManager.Close())
 				})
-				transport := test.NewTransport()
+				fields.tm = transactionManager
+
+				transport := test.NewTransport(_options...)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
 				require.NoError(t, err)
@@ -271,6 +290,7 @@ func TestReader_readSync(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -333,12 +353,19 @@ func TestReader_readSync(t *testing.T) {
 			setup: func(t *testing.T, fields *fields) {
 				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
-				fields.tm = transactions.NewRequestTransactionManager(10, _options...)
+				executor := pool.NewFixedSizeExecutor(5, 5)
+				executor.Start()
+				t.Cleanup(executor.Stop)
+				transactionManager := transactions.NewRequestTransactionManager(
+					10,
+					append(_options, transactions.WithCustomExecutor(executor))...,
+				)
 				t.Cleanup(func() {
-					assert.NoError(t, fields.tm.Close())
+					assert.NoError(t, transactionManager.Close())
 				})
+				fields.tm = transactionManager
 
-				transport := test.NewTransport()
+				transport := test.NewTransport(_options...)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
 				require.NoError(t, err)
@@ -374,6 +401,7 @@ func TestReader_readSync(t *testing.T) {
 				log:            testutils.ProduceTestingLogger(t),
 			}
 			m.readSync(tt.args.ctx, tt.args.readRequest, tt.args.result)
+			t.Log("done read sync")
 			assert.True(t, tt.resultEvaluator(t, tt.args.result))
 		})
 	}
@@ -502,6 +530,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -574,6 +603,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -662,6 +692,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -743,6 +774,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -824,6 +856,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -905,6 +938,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -986,6 +1020,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -1067,6 +1102,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index 4d8105b7f3..83551e0805 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -274,5 +274,6 @@ func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
 	} else {
 		r.log.Warn().Msg("not closing shared instance")
 	}
+	r.log.Debug().Msg("closed")
 	return nil
 }


[plc4x] 03/05: feat(plc4go/spi): added env test util option for traceExecutorWorkers

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 9a21a7e03e8bf2a89e353ddd90c5a7875c688e28
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 12:00:12 2023 +0200

    feat(plc4go/spi): added env test util option for traceExecutorWorkers
---
 plc4go/spi/testutils/TestUtils.go | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git a/plc4go/spi/testutils/TestUtils.go b/plc4go/spi/testutils/TestUtils.go
index 66c1b6f907..a13ac3100d 100644
--- a/plc4go/spi/testutils/TestUtils.go
+++ b/plc4go/spi/testutils/TestUtils.go
@@ -114,13 +114,20 @@ func TestContext(t *testing.T) context.Context {
 	return ctx
 }
 
-var highLogPrecision bool
+var (
+	highLogPrecision     bool
+	traceExecutorWorkers bool
+)
 
 func init() {
 	highLogPrecision = os.Getenv("PLC4X_TEST_HIGH_TEST_LOG_PRECISION") == "true"
 	if highLogPrecision {
 		zerolog.TimeFieldFormat = time.RFC3339Nano
 	}
+	traceExecutorWorkers = true
+	if traceExecutorWorkersEnv := os.Getenv("PLC4X_TEST_TRACE_EXECUTOR_WORKERS"); traceExecutorWorkersEnv != "" {
+		traceExecutorWorkers = traceExecutorWorkersEnv == "true"
+	}
 }
 
 // ProduceTestingLogger produces a logger which redirects to testing.T
@@ -153,15 +160,14 @@ func ProduceTestingLogger(t *testing.T) zerolog.Logger {
 
 // EnrichOptionsWithOptionsForTesting appends options useful for testing to config.WithOption s
 func EnrichOptionsWithOptionsForTesting(t *testing.T, _options ...options.WithOption) []options.WithOption {
-	traceWorkers := true
 	if extractedTraceWorkers, found := options.ExtractTracerWorkers(_options...); found {
-		traceWorkers = extractedTraceWorkers
+		traceExecutorWorkers = extractedTraceWorkers
 	}
 	// TODO: apply to other options like above
 	return append(_options,
 		options.WithCustomLogger(ProduceTestingLogger(t)),
 		options.WithPassLoggerToModel(true),
-		options.WithExecutorOptionTracerWorkers(traceWorkers),
+		options.WithExecutorOptionTracerWorkers(traceExecutorWorkers),
 	)
 }
 


[plc4x] 01/05: feat(plc4go/spi): allow overriding of options by always use the last option found.

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 89d210f4dda0ba557e8deeb19efe477a85a4eaed
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 11:23:53 2023 +0200

    feat(plc4go/spi): allow overriding of options by always use the last option found.
---
 plc4go/spi/options/Option.go | 36 ++++++++++++++++++------------------
 1 file changed, 18 insertions(+), 18 deletions(-)

diff --git a/plc4go/spi/options/Option.go b/plc4go/spi/options/Option.go
index e3dbc9fc48..6dede787b0 100644
--- a/plc4go/spi/options/Option.go
+++ b/plc4go/spi/options/Option.go
@@ -49,7 +49,6 @@ func ExtractCustomLogger(options ...WithOption) (customLogger zerolog.Logger) {
 		switch option := option.(type) {
 		case withCustomLogger:
 			customLogger = option.logger
-			return
 		}
 	}
 	return
@@ -66,25 +65,26 @@ func WithReceiveTimeout(timeout time.Duration) WithOption {
 }
 
 // ExtractReceiveTimeout to extract the receive-timeout for reading operations. Defaults to 10 seconds
-func ExtractReceiveTimeout(options ...WithOption) time.Duration {
+func ExtractReceiveTimeout(options ...WithOption) (receiveDuration time.Duration) {
+	receiveDuration = 10 * time.Second
 	for _, option := range options {
 		switch option := option.(type) {
 		case withReceiveTimeout:
-			return option.timeout
+			receiveDuration = option.timeout
 		}
 	}
-	return 10 * time.Second
+	return
 }
 
 // ExtractPassLoggerToModel to extract the flag indicating that model should be passed to Model
-func ExtractPassLoggerToModel(options ...WithOption) bool {
+func ExtractPassLoggerToModel(options ...WithOption) (passLogger bool) {
 	for _, option := range options {
 		switch option := option.(type) {
 		case withPassLoggerToModel:
-			return option.passLogger
+			passLogger = option.passLogger
 		}
 	}
-	return false
+	return
 }
 
 // WithTraceTransactionManagerWorkers enables trace transaction manager workers
@@ -93,14 +93,14 @@ func WithTraceTransactionManagerWorkers(traceWorkers bool) WithOption {
 }
 
 // ExtractTransactionManagerWorkers to extract the flag indicating to trace transaction manager workers
-func ExtractTransactionManagerWorkers(options ...WithOption) bool {
+func ExtractTransactionManagerWorkers(options ...WithOption) (traceWorkers bool) {
 	for _, option := range options {
 		switch option := option.(type) {
 		case withTraceTransactionManagerWorkers:
-			return option.traceWorkers
+			traceWorkers = option.traceWorkers
 		}
 	}
-	return false
+	return
 }
 
 // WithTraceTransactionManagerTransactions enables trace transaction manager transactions
@@ -109,14 +109,14 @@ func WithTraceTransactionManagerTransactions(traceTransactions bool) WithOption
 }
 
 // ExtractTraceTransactionManagerTransactions to extract the flag indicating to trace transaction manager transactions
-func ExtractTraceTransactionManagerTransactions(options ...WithOption) bool {
+func ExtractTraceTransactionManagerTransactions(options ...WithOption) (traceTransactions bool) {
 	for _, option := range options {
 		switch option := option.(type) {
 		case withTraceTransactionManagerTransactions:
-			return option.traceTransactions
+			traceTransactions = option.traceTransactions
 		}
 	}
-	return false
+	return
 }
 
 // WithTraceDefaultMessageCodecWorker enables trace default message codec worker
@@ -125,14 +125,14 @@ func WithTraceDefaultMessageCodecWorker(traceWorkers bool) WithOption {
 }
 
 // ExtractTraceDefaultMessageCodecWorker to extract the flag indicating to trace default message codec workers
-func ExtractTraceDefaultMessageCodecWorker(options ...WithOption) bool {
+func ExtractTraceDefaultMessageCodecWorker(options ...WithOption) (traceTransactions bool) {
 	for _, option := range options {
 		switch option := option.(type) {
 		case withTraceTransactionManagerTransactions:
-			return option.traceTransactions
+			traceTransactions = option.traceTransactions
 		}
 	}
-	return false
+	return
 }
 
 // WithExecutorOptionTracerWorkers sets a flag which extends logging for workers
@@ -145,10 +145,10 @@ func ExtractTracerWorkers(_options ...WithOption) (traceWorkers bool, found bool
 	for _, option := range _options {
 		switch option := option.(type) {
 		case *withTracerExecutorWorkersOption:
-			return option.traceWorkers, true
+			traceWorkers, found = option.traceWorkers, true
 		}
 	}
-	return false, false
+	return
 }
 
 // GetLoggerContextForModel returns a log context if the WithPassLoggerToModel WithOption is set


[plc4x] 05/05: test(plc4go/cbus): shutdown workers of discoverer once done

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 6dac19d75e08e8955fdc49c636aa119d5bd77f97
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 13:01:57 2023 +0200

    test(plc4go/cbus): shutdown workers of discoverer once done
---
 plc4go/internal/cbus/Discoverer.go | 21 ++++++++++++++++++++-
 1 file changed, 20 insertions(+), 1 deletion(-)

diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
index 8c3606b64d..f65850f413 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -161,9 +161,22 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 				d.log.Error().Msgf("panic-ed %v. Stack: %s; ", err, debug.Stack())
 			}
 		}()
+		deviceScanWg := sync.WaitGroup{}
 		for transportInstance := range transportInstances {
 			d.log.Debug().Stringer("transportInstance", transportInstance).Msg("submitting device scan")
-			d.deviceScanningQueue.Submit(ctx, d.deviceScanningWorkItemId.Add(1), d.createDeviceScanDispatcher(transportInstance.(*tcp.TransportInstance), callback))
+			completionFuture := d.deviceScanningQueue.Submit(ctx, d.deviceScanningWorkItemId.Add(1), d.createDeviceScanDispatcher(transportInstance.(*tcp.TransportInstance), callback))
+			deviceScanWg.Add(1)
+			go func() {
+				defer deviceScanWg.Done()
+				if err := completionFuture.AwaitCompletion(context.TODO()); err != nil {
+					d.log.Debug().Err(err).Msg("error waiting for completion")
+				}
+			}()
+			deviceScanWg.Wait()
+			d.log.Info().Msg("Discovery done")
+			d.transportInstanceCreationQueue.Stop()
+			d.deviceScanningQueue.Stop()
+			// TODO: do we maybe want a callback for that? As option for example
 		}
 	}()
 	return nil
@@ -320,6 +333,12 @@ func (d *Discoverer) extractDeviceNames(discoveryOptions ...options.WithDiscover
 	return deviceNames
 }
 
+func (d *Discoverer) Close() error {
+	d.transportInstanceCreationQueue.Stop()
+	d.deviceScanningQueue.Stop()
+	return nil
+}
+
 // addressProvider is used to make discover testable
 type addressProvider interface {
 	fmt.Stringer