You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/19 11:02:13 UTC
[plc4x] 04/05: test(plc4go/cbus): fixed possible concurrency issues in test
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 7727bc814ba86c61c02f30c877c7651589b5f87d
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 13:01:32 2023 +0200
test(plc4go/cbus): fixed possible concurrency issues in test
---
plc4go/internal/cbus/CBusMessageMapper_test.go | 152 +++++++++------------
plc4go/internal/cbus/Discoverer_test.go | 14 +-
plc4go/internal/cbus/Driver_test.go | 35 +++--
plc4go/internal/cbus/Reader.go | 1 +
plc4go/internal/cbus/Reader_test.go | 70 +++++++---
.../spi/transactions/RequestTransactionManager.go | 1 +
6 files changed, 153 insertions(+), 120 deletions(-)
diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go b/plc4go/internal/cbus/CBusMessageMapper_test.go
index aa9a8a90a0..1bacc4775b 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -1468,11 +1468,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -1539,13 +1540,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -1614,13 +1614,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -1678,13 +1677,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -1732,13 +1730,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -1789,13 +1786,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -1866,13 +1862,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -1961,13 +1956,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2019,13 +2013,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2073,13 +2066,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2127,13 +2119,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2181,13 +2172,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2235,13 +2225,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2309,13 +2298,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2363,13 +2351,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2417,13 +2404,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2471,13 +2457,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2525,13 +2510,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2579,13 +2563,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2649,13 +2632,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2703,13 +2685,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2757,13 +2738,12 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
+ executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- options.WithPassLoggerToModel(true),
- transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
- ),
+ transactions.WithCustomExecutor(executor),
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index 9e8270929e..5ac2519d9f 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -94,8 +94,11 @@ func TestDiscoverer_Discover(t *testing.T) {
})
args.ctx = ctx
- fields.transportInstanceCreationQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
- fields.deviceScanningQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor := pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
+ t.Cleanup(executor.Stop)
+ fields.transportInstanceCreationQueue = executor
+ fields.deviceScanningQueue = executor
return nil
},
wantErr: assert.NoError,
@@ -118,8 +121,11 @@ func TestDiscoverer_Discover(t *testing.T) {
time.Sleep(1 * time.Second)
})
args.ctx = ctx
- fields.transportInstanceCreationQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
- fields.deviceScanningQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor := pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ executor.Start()
+ t.Cleanup(executor.Stop)
+ fields.transportInstanceCreationQueue = executor
+ fields.deviceScanningQueue = executor
oldaddressProviderRetriever := addressProviderRetriever
addressProviderRetriever = func(log zerolog.Logger, _ []string) ([]addressProvider, error) {
loopbackInterface, err := nettest.LoopbackInterface()
diff --git a/plc4go/internal/cbus/Driver_test.go b/plc4go/internal/cbus/Driver_test.go
index ae01bb6f69..5d92be01f8 100644
--- a/plc4go/internal/cbus/Driver_test.go
+++ b/plc4go/internal/cbus/Driver_test.go
@@ -22,6 +22,10 @@ package cbus
import (
"context"
"fmt"
+ "net/url"
+ "testing"
+ "time"
+
plc4go "github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
_default "github.com/apache/plc4x/plc4go/spi/default"
@@ -31,11 +35,9 @@ import (
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/test"
"github.com/apache/plc4x/plc4go/spi/utils"
+
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
- "net/url"
- "testing"
- "time"
)
func TestDriver_DiscoverWithContext(t *testing.T) {
@@ -111,6 +113,7 @@ func TestDriver_GetConnectionWithContext(t *testing.T) {
name string
fields fields
args args
+ setup func(t *testing.T, fields *fields, args *args)
wantVerifier func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool
}{
{
@@ -153,13 +156,14 @@ func TestDriver_GetConnectionWithContext(t *testing.T) {
transportUrl: url.URL{
Scheme: "test",
},
- transports: map[string]transports.Transport{
- "test": test.NewTransport(),
- },
+ transports: map[string]transports.Transport{},
options: map[string][]string{
"failTestTransport": {"yesSir"},
},
},
+ setup: func(t *testing.T, fields *fields, args *args) {
+ args.transports["test"] = test.NewTransport(testutils.EnrichOptionsWithOptionsForTesting(t)...)
+ },
wantVerifier: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool {
timeout := time.NewTimer(20 * time.Millisecond)
defer utils.CleanupTimer(timeout)
@@ -186,13 +190,14 @@ func TestDriver_GetConnectionWithContext(t *testing.T) {
transportUrl: url.URL{
Scheme: "test",
},
- transports: map[string]transports.Transport{
- "test": test.NewTransport(),
- },
+ transports: map[string]transports.Transport{},
options: map[string][]string{
"MonitoredApplication1": {"pineapple"},
},
},
+ setup: func(t *testing.T, fields *fields, args *args) {
+ args.transports["test"] = test.NewTransport(testutils.EnrichOptionsWithOptionsForTesting(t)...)
+ },
wantVerifier: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool {
timeout := time.NewTimer(20 * time.Millisecond)
defer utils.CleanupTimer(timeout)
@@ -218,10 +223,11 @@ func TestDriver_GetConnectionWithContext(t *testing.T) {
transportUrl: url.URL{
Scheme: "test",
},
- transports: map[string]transports.Transport{
- "test": test.NewTransport(),
- },
- options: map[string][]string{},
+ transports: map[string]transports.Transport{},
+ options: map[string][]string{},
+ },
+ setup: func(t *testing.T, fields *fields, args *args) {
+ args.transports["test"] = test.NewTransport(testutils.EnrichOptionsWithOptionsForTesting(t)...)
},
wantVerifier: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool {
timeout := time.NewTimer(20 * time.Millisecond)
@@ -240,6 +246,9 @@ func TestDriver_GetConnectionWithContext(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields, &tt.args)
+ }
m := &Driver{
DefaultDriver: tt.fields.DefaultDriver,
awaitSetupComplete: tt.fields.awaitSetupComplete,
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 70aa83016d..59fa3460e6 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -144,6 +144,7 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac
ttl = -time.Since(deadline)
m.log.Debug().Msgf("setting ttl to %s", ttl)
}
+ m.log.Trace().Msgf("sending with ctx %s", ctx)
if err := m.messageCodec.SendRequest(
ctx,
messageToSend,
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index 80506ca3ec..faf5822937 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -21,25 +21,29 @@ package cbus
import (
"context"
+ "encoding/hex"
+ "net/url"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/pool"
"github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/apache/plc4x/plc4go/spi/transports/test"
"github.com/apache/plc4x/plc4go/spi/utils"
+
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
- "net/url"
- "strings"
- "sync"
- "sync/atomic"
- "testing"
- "time"
)
func TestNewReader(t *testing.T) {
@@ -180,6 +184,18 @@ func TestReader_readSync(t *testing.T) {
setup: func(t *testing.T, fields *fields) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(5, 5)
+ executor.Start()
+ t.Cleanup(executor.Stop)
+ transactionManager := transactions.NewRequestTransactionManager(
+ 10,
+ append(_options, transactions.WithCustomExecutor(executor))...,
+ )
+ t.Cleanup(func() {
+ assert.NoError(t, transactionManager.Close())
+ })
+ fields.tm = transactionManager
+
transport := test.NewTransport(_options...)
transportUrl := url.URL{Scheme: "test"}
transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
@@ -189,11 +205,6 @@ func TestReader_readSync(t *testing.T) {
t.Cleanup(func() {
assert.NoError(t, codec.Disconnect())
})
- fields.messageCodec = codec
- fields.tm = transactions.NewRequestTransactionManager(10, _options...)
- t.Cleanup(func() {
- assert.NoError(t, fields.tm.Close())
- })
},
resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
timer := time.NewTimer(2 * time.Second)
@@ -254,11 +265,19 @@ func TestReader_readSync(t *testing.T) {
setup: func(t *testing.T, fields *fields) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
- fields.tm = transactions.NewRequestTransactionManager(10, _options...)
+ executor := pool.NewFixedSizeExecutor(5, 5, _options...)
+ executor.Start()
+ t.Cleanup(executor.Stop)
+ transactionManager := transactions.NewRequestTransactionManager(
+ 10,
+ append(_options, transactions.WithCustomExecutor(executor))...,
+ )
t.Cleanup(func() {
- assert.NoError(t, fields.tm.Close())
+ assert.NoError(t, transactionManager.Close())
})
- transport := test.NewTransport()
+ fields.tm = transactionManager
+
+ transport := test.NewTransport(_options...)
transportUrl := url.URL{Scheme: "test"}
transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
require.NoError(t, err)
@@ -271,6 +290,7 @@ func TestReader_readSync(t *testing.T) {
currentState.Store(INITIAL)
stateChangeMutex := sync.Mutex{}
transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
@@ -333,12 +353,19 @@ func TestReader_readSync(t *testing.T) {
setup: func(t *testing.T, fields *fields) {
_options := testutils.EnrichOptionsWithOptionsForTesting(t)
- fields.tm = transactions.NewRequestTransactionManager(10, _options...)
+ executor := pool.NewFixedSizeExecutor(5, 5)
+ executor.Start()
+ t.Cleanup(executor.Stop)
+ transactionManager := transactions.NewRequestTransactionManager(
+ 10,
+ append(_options, transactions.WithCustomExecutor(executor))...,
+ )
t.Cleanup(func() {
- assert.NoError(t, fields.tm.Close())
+ assert.NoError(t, transactionManager.Close())
})
+ fields.tm = transactionManager
- transport := test.NewTransport()
+ transport := test.NewTransport(_options...)
transportUrl := url.URL{Scheme: "test"}
transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
require.NoError(t, err)
@@ -374,6 +401,7 @@ func TestReader_readSync(t *testing.T) {
log: testutils.ProduceTestingLogger(t),
}
m.readSync(tt.args.ctx, tt.args.readRequest, tt.args.result)
+ t.Log("done read sync")
assert.True(t, tt.resultEvaluator(t, tt.args.result))
})
}
@@ -502,6 +530,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
currentState.Store(INITIAL)
stateChangeMutex := sync.Mutex{}
transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
@@ -574,6 +603,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
currentState.Store(INITIAL)
stateChangeMutex := sync.Mutex{}
transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
@@ -662,6 +692,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
currentState.Store(INITIAL)
stateChangeMutex := sync.Mutex{}
transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
@@ -743,6 +774,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
currentState.Store(INITIAL)
stateChangeMutex := sync.Mutex{}
transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
@@ -824,6 +856,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
currentState.Store(INITIAL)
stateChangeMutex := sync.Mutex{}
transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
@@ -905,6 +938,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
currentState.Store(INITIAL)
stateChangeMutex := sync.Mutex{}
transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
@@ -986,6 +1020,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
currentState.Store(INITIAL)
stateChangeMutex := sync.Mutex{}
transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
@@ -1067,6 +1102,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
currentState.Store(INITIAL)
stateChangeMutex := sync.Mutex{}
transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ t.Logf("reacting to\n%s", hex.Dump(data))
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
switch currentState.Load().(MockState) {
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index 4d8105b7f3..83551e0805 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -274,5 +274,6 @@ func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
} else {
r.log.Warn().Msg("not closing shared instance")
}
+ r.log.Debug().Msg("closed")
return nil
}