You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/15 16:06:14 UTC

[plc4x] 01/02: refactor(plc4go): use buffered channels when possible

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit d9155112910be36914c2190b4e2fe3d310708a8f
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon May 15 18:04:59 2023 +0200

    refactor(plc4go): use buffered channels when possible
---
 plc4go/internal/ads/Browser.go                     |  2 +-
 plc4go/internal/ads/Connection.go                  |  2 +-
 plc4go/internal/ads/Driver.go                      |  4 +-
 plc4go/internal/ads/Interactions.go                | 12 +++---
 plc4go/internal/ads/Reader.go                      |  2 +-
 plc4go/internal/ads/Subscriber.go                  |  6 +--
 plc4go/internal/ads/Writer.go                      |  2 +-
 plc4go/internal/bacnetip/Connection.go             |  2 +-
 plc4go/internal/bacnetip/Discoverer.go             |  4 +-
 plc4go/internal/bacnetip/Driver.go                 | 18 +++-----
 plc4go/internal/bacnetip/Reader.go                 |  2 +-
 plc4go/internal/bacnetip/Subscriber.go             |  6 +--
 plc4go/internal/cbus/Reader.go                     |  2 +-
 plc4go/internal/cbus/Subscriber.go                 |  5 ++-
 plc4go/internal/cbus/Writer.go                     |  2 +-
 plc4go/internal/eip/Connection.go                  |  4 +-
 plc4go/internal/eip/Reader.go                      |  2 +-
 plc4go/internal/eip/Writer.go                      |  2 +-
 plc4go/internal/knxnetip/Connection.go             |  6 +--
 .../knxnetip/ConnectionDriverSpecificOperations.go | 14 +++----
 .../knxnetip/ConnectionInternalOperations.go       | 48 +++++++++++-----------
 plc4go/internal/knxnetip/Reader.go                 |  2 +-
 plc4go/internal/knxnetip/Subscriber.go             |  6 +--
 plc4go/internal/knxnetip/Writer.go                 |  2 +-
 plc4go/internal/modbus/Connection.go               |  2 +-
 plc4go/internal/modbus/Reader.go                   |  2 +-
 plc4go/internal/modbus/Writer.go                   |  2 +-
 plc4go/internal/s7/Connection.go                   | 14 +++----
 plc4go/internal/s7/Reader.go                       |  2 +-
 plc4go/internal/s7/Writer.go                       |  2 +-
 plc4go/internal/simulated/Connection.go            |  6 +--
 plc4go/internal/simulated/Reader.go                |  2 +-
 plc4go/internal/simulated/Writer.go                |  2 +-
 plc4go/spi/default/DefaultBrowser.go               |  2 +-
 plc4go/spi/default/DefaultConnection.go            |  4 +-
 plc4go/spi/model/DefaultPlcReadRequest.go          |  2 +-
 plc4go/spi/model/DefaultPlcWriteRequest.go         |  2 +-
 plc4go/spi/utils/Net.go                            |  2 +-
 plc4go/spi/utils/WorkerPool.go                     |  2 +-
 39 files changed, 100 insertions(+), 105 deletions(-)

diff --git a/plc4go/internal/ads/Browser.go b/plc4go/internal/ads/Browser.go
index 1c5862df4b..20e8fdebfd 100644
--- a/plc4go/internal/ads/Browser.go
+++ b/plc4go/internal/ads/Browser.go
@@ -41,7 +41,7 @@ func (m *Connection) Browse(ctx context.Context, browseRequest apiModel.PlcBrows
 }
 
 func (m *Connection) BrowseWithInterceptor(ctx context.Context, browseRequest apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseItem) bool) <-chan apiModel.PlcBrowseRequestResult {
-	result := make(chan apiModel.PlcBrowseRequestResult)
+	result := make(chan apiModel.PlcBrowseRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/ads/Connection.go b/plc4go/internal/ads/Connection.go
index c5982d04c1..a03aec00e2 100644
--- a/plc4go/internal/ads/Connection.go
+++ b/plc4go/internal/ads/Connection.go
@@ -97,7 +97,7 @@ func (m *Connection) GetConnection() plc4go.PlcConnection {
 
 func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
 	log.Trace().Msg("Connecting")
-	ch := make(chan plc4go.PlcConnectionConnectResult)
+	ch := make(chan plc4go.PlcConnectionConnectResult, 1)
 
 	// Reset the driver context (Actually this should not be required, but just to be on the safe side)
 	m.driverContext.clear()
diff --git a/plc4go/internal/ads/Driver.go b/plc4go/internal/ads/Driver.go
index d102ef24f1..c01f4024c9 100644
--- a/plc4go/internal/ads/Driver.go
+++ b/plc4go/internal/ads/Driver.go
@@ -61,7 +61,7 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
 	transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
 	if err != nil {
 		log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
-		ch := make(chan plc4go.PlcConnectionConnectResult)
+		ch := make(chan plc4go.PlcConnectionConnectResult, 1)
 		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
 		return ch
 	}
@@ -73,7 +73,7 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
 	configuration, err := model.ParseFromOptions(options)
 	if err != nil {
 		log.Error().Err(err).Msgf("Invalid options")
-		ch := make(chan plc4go.PlcConnectionConnectResult)
+		ch := make(chan plc4go.PlcConnectionConnectResult, 1)
 		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "invalid configuration"))
 		return ch
 	}
diff --git a/plc4go/internal/ads/Interactions.go b/plc4go/internal/ads/Interactions.go
index bf575f343e..00209ae811 100644
--- a/plc4go/internal/ads/Interactions.go
+++ b/plc4go/internal/ads/Interactions.go
@@ -30,7 +30,7 @@ import (
 )
 
 func (m *Connection) ExecuteAdsReadDeviceInfoRequest(ctx context.Context) (model.AdsReadDeviceInfoResponse, error) {
-	responseChannel := make(chan model.AdsReadDeviceInfoResponse)
+	responseChannel := make(chan model.AdsReadDeviceInfoResponse, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
@@ -71,7 +71,7 @@ func (m *Connection) ExecuteAdsReadDeviceInfoRequest(ctx context.Context) (model
 }
 
 func (m *Connection) ExecuteAdsReadRequest(ctx context.Context, indexGroup uint32, indexOffset uint32, length uint32) (model.AdsReadResponse, error) {
-	responseChannel := make(chan model.AdsReadResponse)
+	responseChannel := make(chan model.AdsReadResponse, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
@@ -112,7 +112,7 @@ func (m *Connection) ExecuteAdsReadRequest(ctx context.Context, indexGroup uint3
 }
 
 func (m *Connection) ExecuteAdsWriteRequest(ctx context.Context, indexGroup uint32, indexOffset uint32, data []byte) (model.AdsWriteResponse, error) {
-	responseChannel := make(chan model.AdsWriteResponse)
+	responseChannel := make(chan model.AdsWriteResponse, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
@@ -153,7 +153,7 @@ func (m *Connection) ExecuteAdsWriteRequest(ctx context.Context, indexGroup uint
 }
 
 func (m *Connection) ExecuteAdsReadWriteRequest(ctx context.Context, indexGroup uint32, indexOffset uint32, readLength uint32, items []model.AdsMultiRequestItem, writeData []byte) (model.AdsReadWriteResponse, error) {
-	responseChannel := make(chan model.AdsReadWriteResponse)
+	responseChannel := make(chan model.AdsReadWriteResponse, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
@@ -194,7 +194,7 @@ func (m *Connection) ExecuteAdsReadWriteRequest(ctx context.Context, indexGroup
 }
 
 func (m *Connection) ExecuteAdsAddDeviceNotificationRequest(ctx context.Context, indexGroup uint32, indexOffset uint32, length uint32, transmissionMode model.AdsTransMode, maxDelay uint32, cycleTime uint32) (model.AdsAddDeviceNotificationResponse, error) {
-	responseChannel := make(chan model.AdsAddDeviceNotificationResponse)
+	responseChannel := make(chan model.AdsAddDeviceNotificationResponse, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
@@ -235,7 +235,7 @@ func (m *Connection) ExecuteAdsAddDeviceNotificationRequest(ctx context.Context,
 }
 
 func (m *Connection) ExecuteAdsDeleteDeviceNotificationRequest(ctx context.Context, notificationHandle uint32) (model.AdsDeleteDeviceNotificationResponse, error) {
-	responseChannel := make(chan model.AdsDeleteDeviceNotificationResponse)
+	responseChannel := make(chan model.AdsDeleteDeviceNotificationResponse, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/ads/Reader.go b/plc4go/internal/ads/Reader.go
index 2dfdfa461f..4a820729cb 100644
--- a/plc4go/internal/ads/Reader.go
+++ b/plc4go/internal/ads/Reader.go
@@ -42,7 +42,7 @@ func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
 
 func (m *Connection) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
 	log.Trace().Msg("Reading")
-	result := make(chan apiModel.PlcReadRequestResult)
+	result := make(chan apiModel.PlcReadRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/ads/Subscriber.go b/plc4go/internal/ads/Subscriber.go
index 6c899d464d..3a66f23847 100644
--- a/plc4go/internal/ads/Subscriber.go
+++ b/plc4go/internal/ads/Subscriber.go
@@ -97,7 +97,7 @@ func (m *Connection) Subscribe(ctx context.Context, subscriptionRequest apiModel
 	}
 
 	// Create a new result-channel, which completes as soon as all sub-result-channels have returned
-	globalResultChannel := make(chan apiModel.PlcSubscriptionRequestResult)
+	globalResultChannel := make(chan apiModel.PlcSubscriptionRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
@@ -126,8 +126,8 @@ func (m *Connection) Subscribe(ctx context.Context, subscriptionRequest apiModel
 }
 
 func (m *Connection) subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
-	responseChan := make(chan apiModel.PlcSubscriptionRequestResult)
-	go func(respChan chan apiModel.PlcSubscriptionRequestResult) {
+	responseChan := make(chan apiModel.PlcSubscriptionRequestResult, 1)
+	go func() {
 		defer func() {
 			if err := recover(); err != nil {
 				responseChan <- internalModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil, errors.Errorf("panic-ed %v", err))
diff --git a/plc4go/internal/ads/Writer.go b/plc4go/internal/ads/Writer.go
index 423c4e61e1..d2330be112 100644
--- a/plc4go/internal/ads/Writer.go
+++ b/plc4go/internal/ads/Writer.go
@@ -41,7 +41,7 @@ func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
 
 func (m *Connection) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
 	log.Trace().Msg("Writing")
-	result := make(chan apiModel.PlcWriteRequestResult)
+	result := make(chan apiModel.PlcWriteRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/bacnetip/Connection.go b/plc4go/internal/bacnetip/Connection.go
index 5967ca1136..2d1cb5c5db 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -78,7 +78,7 @@ func (c *Connection) GetTracer() *spi.Tracer {
 
 func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
 	log.Trace().Msg("Connecting")
-	ch := make(chan plc4go.PlcConnectionConnectResult)
+	ch := make(chan plc4go.PlcConnectionConnectResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/bacnetip/Discoverer.go b/plc4go/internal/bacnetip/Discoverer.go
index 81c0dba93d..a5f3deb9a1 100644
--- a/plc4go/internal/bacnetip/Discoverer.go
+++ b/plc4go/internal/bacnetip/Discoverer.go
@@ -84,7 +84,7 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
 }
 
 func broadcastAndDiscover(ctx context.Context, communicationChannels []communicationChannel, specificOptions *protocolSpecificOptions) (chan receivedBvlcMessage, error) {
-	incomingBVLCChannel := make(chan receivedBvlcMessage, 0)
+	incomingBVLCChannel := make(chan receivedBvlcMessage)
 	for _, communicationChannelInstance := range communicationChannels {
 		// Prepare the discovery packet data
 		{
@@ -192,7 +192,7 @@ func broadcastAndDiscover(ctx context.Context, communicationChannels []communica
 
 		go func(communicationChannelInstance communicationChannel) {
 			for {
-				blockingReadChan := make(chan bool, 0)
+				blockingReadChan := make(chan bool)
 				go func() {
 					buf := make([]byte, 4096)
 					n, addr, err := communicationChannelInstance.broadcastConnection.ReadFrom(buf)
diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index afa7fbe606..1315e400c1 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -67,10 +67,8 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
 	transport, ok := transports[transportUrl.Scheme]
 	if !ok {
 		log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
-		ch := make(chan plc4go.PlcConnectionConnectResult)
-		go func() {
-			ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
-		}()
+		ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
 		return ch
 	}
 	// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
@@ -85,19 +83,15 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
 		udpTransport = transport
 	default:
 		log.Error().Stringer("transportUrl", &transportUrl).Msg("Only udp supported at the moment")
-		ch := make(chan plc4go.PlcConnectionConnectResult)
-		go func() {
-			ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
-		}()
+		ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
 		return ch
 	}
 
 	codec, err := m.applicationManager.getApplicationLayerMessageCodec(udpTransport, transportUrl, options)
 	if err != nil {
-		ch := make(chan plc4go.PlcConnectionConnectResult)
-		go func() {
-			ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "error getting application layer message codec"))
-		}()
+		ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "error getting application layer message codec"))
 		return ch
 	}
 	log.Debug().Msgf("working with codec %#v", codec)
diff --git a/plc4go/internal/bacnetip/Reader.go b/plc4go/internal/bacnetip/Reader.go
index e67149466d..a1c1e7985a 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -57,7 +57,7 @@ func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCod
 func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
 	// TODO: handle ctx
 	log.Trace().Msg("Reading")
-	result := make(chan apiModel.PlcReadRequestResult)
+	result := make(chan apiModel.PlcReadRequestResult, 1)
 	go func() {
 		if len(readRequest.GetTagNames()) == 0 {
 			result <- &spiModel.DefaultPlcReadRequestResult{
diff --git a/plc4go/internal/bacnetip/Subscriber.go b/plc4go/internal/bacnetip/Subscriber.go
index 4fae04d0fd..b60ea0b60a 100644
--- a/plc4go/internal/bacnetip/Subscriber.go
+++ b/plc4go/internal/bacnetip/Subscriber.go
@@ -40,8 +40,7 @@ func NewSubscriber(connection *Connection) *Subscriber {
 }
 
 func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
-	// TODO: handle ctx
-	result := make(chan apiModel.PlcSubscriptionRequestResult)
+	result := make(chan apiModel.PlcSubscriptionRequestResult, 1)
 	go func() {
 		internalPlcSubscriptionRequest := subscriptionRequest.(*spiModel.DefaultPlcSubscriptionRequest)
 
@@ -67,7 +66,8 @@ func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel
 
 func (m *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
 	// TODO: handle ctx
-	result := make(chan apiModel.PlcUnsubscriptionRequestResult)
+	result := make(chan apiModel.PlcUnsubscriptionRequestResult, 1)
+	result <- spiModel.NewDefaultPlcUnsubscriptionRequestResult(unsubscriptionRequest, nil, errors.New("not implemented"))
 
 	// TODO: As soon as we establish a connection, we start getting data...
 	// subscriptions are more an internal handling of which values to pass where.
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index fdb8a4c2ee..c6df55f7cf 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -49,7 +49,7 @@ func NewReader(tpduGenerator *AlphaGenerator, messageCodec *MessageCodec, tm spi
 
 func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
 	log.Trace().Msg("Reading")
-	result := make(chan apiModel.PlcReadRequestResult)
+	result := make(chan apiModel.PlcReadRequestResult, 1)
 	go m.readSync(ctx, readRequest, result)
 	return result
 }
diff --git a/plc4go/internal/cbus/Subscriber.go b/plc4go/internal/cbus/Subscriber.go
index aa1794f472..f1c72fa747 100644
--- a/plc4go/internal/cbus/Subscriber.go
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -47,7 +47,7 @@ func NewSubscriber(connection *Connection) *Subscriber {
 }
 
 func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
-	result := make(chan apiModel.PlcSubscriptionRequestResult)
+	result := make(chan apiModel.PlcSubscriptionRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
@@ -78,7 +78,8 @@ func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.P
 
 func (m *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
 	// TODO: handle context
-	result := make(chan apiModel.PlcUnsubscriptionRequestResult)
+	result := make(chan apiModel.PlcUnsubscriptionRequestResult, 1)
+	result <- spiModel.NewDefaultPlcUnsubscriptionRequestResult(unsubscriptionRequest, nil, errors.New("Not Implemented"))
 
 	// TODO: As soon as we establish a connection, we start getting data...
 	// subscriptions are more a internal handling of which values to pass where.
diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go
index 47d15cc258..1808d03cf7 100644
--- a/plc4go/internal/cbus/Writer.go
+++ b/plc4go/internal/cbus/Writer.go
@@ -48,7 +48,7 @@ func NewWriter(tpduGenerator *AlphaGenerator, messageCodec *MessageCodec, tm spi
 
 func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest) <-chan apiModel.PlcWriteRequestResult {
 	log.Trace().Msg("Writing")
-	result := make(chan apiModel.PlcWriteRequestResult)
+	result := make(chan apiModel.PlcWriteRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/eip/Connection.go b/plc4go/internal/eip/Connection.go
index 9bdd960ca8..242d143a83 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -104,7 +104,7 @@ func (m *Connection) GetMessageCodec() spi.MessageCodec {
 
 func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
 	log.Trace().Msg("Connecting")
-	ch := make(chan plc4go.PlcConnectionConnectResult)
+	ch := make(chan plc4go.PlcConnectionConnectResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
@@ -135,7 +135,7 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
 func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
 	// TODO: use proper context
 	ctx := context.TODO()
-	result := make(chan plc4go.PlcConnectionCloseResult)
+	result := make(chan plc4go.PlcConnectionCloseResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 1ce0e914b4..b6f5576310 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -58,7 +58,7 @@ func NewReader(messageCodec spi.MessageCodec, tm spi.RequestTransactionManager,
 func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
 	// TODO: handle ctx
 	log.Trace().Msg("Reading")
-	result := make(chan apiModel.PlcReadRequestResult)
+	result := make(chan apiModel.PlcReadRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index b37457d6b1..8636a8d1d4 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -55,7 +55,7 @@ func NewWriter(messageCodec spi.MessageCodec, tm spi.RequestTransactionManager,
 
 func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
 	// TODO: handle context
-	result := make(chan model.PlcWriteRequestResult)
+	result := make(chan model.PlcWriteRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/knxnetip/Connection.go b/plc4go/internal/knxnetip/Connection.go
index 5191371c83..51c04f6b09 100644
--- a/plc4go/internal/knxnetip/Connection.go
+++ b/plc4go/internal/knxnetip/Connection.go
@@ -221,7 +221,7 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 }
 
 func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
-	result := make(chan plc4go.PlcConnectionConnectResult)
+	result := make(chan plc4go.PlcConnectionConnectResult, 1)
 	sendResult := func(connection plc4go.PlcConnection, err error) {
 		result <- _default.NewDefaultPlcConnectionConnectResult(connection, err)
 	}
@@ -396,7 +396,7 @@ func (m *Connection) BlockingClose() {
 func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
 	// TODO: use proper context
 	ctx := context.TODO()
-	result := make(chan plc4go.PlcConnectionCloseResult)
+	result := make(chan plc4go.PlcConnectionCloseResult, 1)
 
 	go func() {
 		defer func() {
@@ -459,7 +459,7 @@ func (m *Connection) IsConnected() bool {
 func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
 	// TODO: use proper context
 	ctx := context.TODO()
-	result := make(chan plc4go.PlcConnectionPingResult)
+	result := make(chan plc4go.PlcConnectionPingResult, 1)
 
 	go func() {
 		defer func() {
diff --git a/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go b/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
index 1b7ac49b50..5e551225eb 100644
--- a/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
+++ b/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
@@ -45,7 +45,7 @@ import (
 ///////////////////////////////////////////////////////////////////////////////////////////////////////
 
 func (m *Connection) ReadGroupAddress(ctx context.Context, groupAddress []byte, datapointType *driverModel.KnxDatapointType) <-chan KnxReadResult {
-	result := make(chan KnxReadResult)
+	result := make(chan KnxReadResult, 1)
 
 	sendResponse := func(value values.PlcValue, numItems uint8, err error) {
 		timeout := time.NewTimer(time.Millisecond * 10)
@@ -106,7 +106,7 @@ func (m *Connection) ReadGroupAddress(ctx context.Context, groupAddress []byte,
 }
 
 func (m *Connection) DeviceConnect(ctx context.Context, targetAddress driverModel.KnxAddress) <-chan KnxDeviceConnectResult {
-	result := make(chan KnxDeviceConnectResult)
+	result := make(chan KnxDeviceConnectResult, 1)
 
 	sendResponse := func(connection *KnxDeviceConnection, err error) {
 		timeout := time.NewTimer(time.Millisecond * 10)
@@ -201,7 +201,7 @@ func (m *Connection) DeviceConnect(ctx context.Context, targetAddress driverMode
 }
 
 func (m *Connection) DeviceDisconnect(ctx context.Context, targetAddress driverModel.KnxAddress) <-chan KnxDeviceDisconnectResult {
-	result := make(chan KnxDeviceDisconnectResult)
+	result := make(chan KnxDeviceDisconnectResult, 1)
 
 	sendResponse := func(connection *KnxDeviceConnection, err error) {
 		timeout := time.NewTimer(time.Millisecond * 10)
@@ -240,7 +240,7 @@ func (m *Connection) DeviceDisconnect(ctx context.Context, targetAddress driverM
 }
 
 func (m *Connection) DeviceAuthenticate(ctx context.Context, targetAddress driverModel.KnxAddress, buildingKey []byte) <-chan KnxDeviceAuthenticateResult {
-	result := make(chan KnxDeviceAuthenticateResult)
+	result := make(chan KnxDeviceAuthenticateResult, 1)
 
 	sendResponse := func(err error) {
 		timeout := time.NewTimer(time.Millisecond * 10)
@@ -298,7 +298,7 @@ func (m *Connection) DeviceAuthenticate(ctx context.Context, targetAddress drive
 }
 
 func (m *Connection) DeviceReadProperty(ctx context.Context, targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8, propertyIndex uint16, numElements uint8) <-chan KnxReadResult {
-	result := make(chan KnxReadResult)
+	result := make(chan KnxReadResult, 1)
 
 	sendResponse := func(value values.PlcValue, numItems uint8, err error) {
 		timeout := time.NewTimer(time.Millisecond * 10)
@@ -382,7 +382,7 @@ func (m *Connection) DeviceReadProperty(ctx context.Context, targetAddress drive
 }
 
 func (m *Connection) DeviceReadPropertyDescriptor(ctx context.Context, targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8) <-chan KnxReadResult {
-	result := make(chan KnxReadResult)
+	result := make(chan KnxReadResult, 1)
 
 	sendResponse := func(value values.PlcValue, numItems uint8, err error) {
 		timeout := time.NewTimer(time.Millisecond * 10)
@@ -447,7 +447,7 @@ func (m *Connection) DeviceReadPropertyDescriptor(ctx context.Context, targetAdd
 }
 
 func (m *Connection) DeviceReadMemory(ctx context.Context, targetAddress driverModel.KnxAddress, address uint16, numElements uint8, datapointType *driverModel.KnxDatapointType) <-chan KnxReadResult {
-	result := make(chan KnxReadResult)
+	result := make(chan KnxReadResult, 1)
 
 	sendResponse := func(value values.PlcValue, numItems uint8, err error) {
 		timeout := time.NewTimer(time.Millisecond * 10)
diff --git a/plc4go/internal/knxnetip/ConnectionInternalOperations.go b/plc4go/internal/knxnetip/ConnectionInternalOperations.go
index 027b42505b..1419603638 100644
--- a/plc4go/internal/knxnetip/ConnectionInternalOperations.go
+++ b/plc4go/internal/knxnetip/ConnectionInternalOperations.go
@@ -54,8 +54,8 @@ func (m *Connection) sendGatewaySearchRequest(ctx context.Context) (driverModel.
 	)
 	searchRequest := driverModel.NewSearchRequest(discoveryEndpoint)
 
-	result := make(chan driverModel.SearchResponse)
-	errorResult := make(chan error)
+	result := make(chan driverModel.SearchResponse, 1)
+	errorResult := make(chan error, 1)
 	err = m.messageCodec.SendRequest(ctx, searchRequest,
 		func(message spi.Message) bool {
 			_, ok := message.(driverModel.SearchResponseExactly)
@@ -112,8 +112,8 @@ func (m *Connection) sendGatewayConnectionRequest(ctx context.Context) (driverMo
 		driverModel.NewConnectionRequestInformationTunnelConnection(driverModel.KnxLayer_TUNNEL_LINK_LAYER),
 	)
 
-	result := make(chan driverModel.ConnectionResponse)
-	errorResult := make(chan error)
+	result := make(chan driverModel.ConnectionResponse, 1)
+	errorResult := make(chan error, 1)
 	err = m.messageCodec.SendRequest(ctx, connectionRequest,
 		func(message spi.Message) bool {
 			_, ok := message.(driverModel.ConnectionResponseExactly)
@@ -162,8 +162,8 @@ func (m *Connection) sendGatewayDisconnectionRequest(ctx context.Context) (drive
 		),
 	)
 
-	result := make(chan driverModel.DisconnectResponse)
-	errorResult := make(chan error)
+	result := make(chan driverModel.DisconnectResponse, 1)
+	errorResult := make(chan error, 1)
 	err = m.messageCodec.SendRequest(ctx, disconnectRequest,
 		func(message spi.Message) bool {
 			_, ok := message.(driverModel.DisconnectResponseExactly)
@@ -210,8 +210,8 @@ func (m *Connection) sendConnectionStateRequest(ctx context.Context) (driverMode
 			driverModel.HostProtocolCode_IPV4_UDP,
 			localAddr, uint16(localAddress.Port)))
 
-	result := make(chan driverModel.ConnectionStateResponse)
-	errorResult := make(chan error)
+	result := make(chan driverModel.ConnectionStateResponse, 1)
+	errorResult := make(chan error, 1)
 	err = m.messageCodec.SendRequest(ctx, connectionStateRequest,
 		func(message spi.Message) bool {
 			_, ok := message.(driverModel.ConnectionStateResponseExactly)
@@ -269,8 +269,8 @@ func (m *Connection) sendGroupAddressReadRequest(ctx context.Context, groupAddre
 		0,
 	)
 
-	result := make(chan driverModel.ApduDataGroupValueResponse)
-	errorResult := make(chan error)
+	result := make(chan driverModel.ApduDataGroupValueResponse, 1)
+	errorResult := make(chan error, 1)
 	err := m.messageCodec.SendRequest(ctx, groupAddressReadRequest,
 		func(message spi.Message) bool {
 			tunnelingRequest, ok := message.(driverModel.TunnelingRequestExactly)
@@ -353,8 +353,8 @@ func (m *Connection) sendDeviceConnectionRequest(ctx context.Context, targetAddr
 		0,
 	)
 
-	result := make(chan driverModel.ApduControlConnect)
-	errorResult := make(chan error)
+	result := make(chan driverModel.ApduControlConnect, 1)
+	errorResult := make(chan error, 1)
 	err := m.messageCodec.SendRequest(ctx, deviceConnectionRequest,
 		func(message spi.Message) bool {
 			tunnelingRequest, ok := message.(driverModel.TunnelingRequestExactly)
@@ -443,8 +443,8 @@ func (m *Connection) sendDeviceDisconnectionRequest(ctx context.Context, targetA
 		0,
 	)
 
-	result := make(chan driverModel.ApduControlDisconnect)
-	errorResult := make(chan error)
+	result := make(chan driverModel.ApduControlDisconnect, 1)
+	errorResult := make(chan error, 1)
 	if err := m.messageCodec.SendRequest(ctx, deviceDisconnectionRequest,
 		func(message spi.Message) bool {
 			tunnelingRequest, ok := message.(driverModel.TunnelingRequestExactly)
@@ -549,8 +549,8 @@ func (m *Connection) sendDeviceAuthentication(ctx context.Context, targetAddress
 		0,
 	)
 
-	result := make(chan driverModel.ApduDataExtAuthorizeResponse)
-	errorResult := make(chan error)
+	result := make(chan driverModel.ApduDataExtAuthorizeResponse, 1)
+	errorResult := make(chan error, 1)
 	if err := m.messageCodec.SendRequest(ctx, deviceAuthenticationRequest,
 		func(message spi.Message) bool {
 			tunnelingRequest, ok := message.(driverModel.TunnelingRequestExactly)
@@ -660,8 +660,8 @@ func (m *Connection) sendDeviceDeviceDescriptorReadRequest(ctx context.Context,
 		0,
 	)
 
-	result := make(chan driverModel.ApduDataDeviceDescriptorResponse)
-	errorResult := make(chan error)
+	result := make(chan driverModel.ApduDataDeviceDescriptorResponse, 1)
+	errorResult := make(chan error, 1)
 	err := m.messageCodec.SendRequest(ctx, deviceDescriptorReadRequest, func(message spi.Message) bool {
 		tunnelingRequest, ok := message.(driverModel.TunnelingRequestExactly)
 		if !ok || tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
@@ -768,8 +768,8 @@ func (m *Connection) sendDevicePropertyReadRequest(ctx context.Context, targetAd
 		0,
 	)
 
-	result := make(chan driverModel.ApduDataExtPropertyValueResponse)
-	errorResult := make(chan error)
+	result := make(chan driverModel.ApduDataExtPropertyValueResponse, 1)
+	errorResult := make(chan error, 1)
 	if err := m.messageCodec.SendRequest(ctx, propertyReadRequest,
 		func(message spi.Message) bool {
 			tunnelingRequest, ok := message.(driverModel.TunnelingRequestExactly)
@@ -884,8 +884,8 @@ func (m *Connection) sendDevicePropertyDescriptionReadRequest(ctx context.Contex
 		0,
 	)
 
-	result := make(chan driverModel.ApduDataExtPropertyDescriptionResponse)
-	errorResult := make(chan error)
+	result := make(chan driverModel.ApduDataExtPropertyDescriptionResponse, 1)
+	errorResult := make(chan error, 1)
 	err := m.messageCodec.SendRequest(ctx, propertyReadRequest,
 		func(message spi.Message) bool {
 			tunnelingRequest, ok := message.(driverModel.TunnelingRequestExactly)
@@ -997,8 +997,8 @@ func (m *Connection) sendDeviceMemoryReadRequest(ctx context.Context, targetAddr
 		0,
 	)
 
-	result := make(chan driverModel.ApduDataMemoryResponse)
-	errorResult := make(chan error)
+	result := make(chan driverModel.ApduDataMemoryResponse, 1)
+	errorResult := make(chan error, 1)
 	if err := m.messageCodec.SendRequest(ctx, propertyReadRequest,
 		func(message spi.Message) bool {
 			tunnelingRequest, ok := message.(driverModel.TunnelingRequestExactly)
diff --git a/plc4go/internal/knxnetip/Reader.go b/plc4go/internal/knxnetip/Reader.go
index 27c635a004..e3f22f7b2d 100644
--- a/plc4go/internal/knxnetip/Reader.go
+++ b/plc4go/internal/knxnetip/Reader.go
@@ -48,7 +48,7 @@ func NewReader(connection *Connection) *Reader {
 
 func (m Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
 	// TODO: handle ctx
-	resultChan := make(chan apiModel.PlcReadRequestResult)
+	resultChan := make(chan apiModel.PlcReadRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/knxnetip/Subscriber.go b/plc4go/internal/knxnetip/Subscriber.go
index 33fc2c762f..7355e7e544 100644
--- a/plc4go/internal/knxnetip/Subscriber.go
+++ b/plc4go/internal/knxnetip/Subscriber.go
@@ -46,7 +46,7 @@ func NewSubscriber(connection *Connection) *Subscriber {
 
 func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
 	// TODO: handle context
-	result := make(chan apiModel.PlcSubscriptionRequestResult)
+	result := make(chan apiModel.PlcSubscriptionRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
@@ -78,8 +78,8 @@ func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel
 
 func (m *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
 	// TODO: handle context
-	result := make(chan apiModel.PlcUnsubscriptionRequestResult)
-
+	result := make(chan apiModel.PlcUnsubscriptionRequestResult, 1)
+	result <- spiModel.NewDefaultPlcUnsubscriptionRequestResult(unsubscriptionRequest, nil, errors.New("Not Implemented"))
 	// TODO: As soon as we establish a connection, we start getting data...
 	// subscriptions are more an internal handling of which values to pass where.
 
diff --git a/plc4go/internal/knxnetip/Writer.go b/plc4go/internal/knxnetip/Writer.go
index cdd10eec53..2ee56238e2 100644
--- a/plc4go/internal/knxnetip/Writer.go
+++ b/plc4go/internal/knxnetip/Writer.go
@@ -41,7 +41,7 @@ func NewWriter(messageCodec spi.MessageCodec) Writer {
 
 func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
 	// TODO: handle context
-	result := make(chan model.PlcWriteRequestResult)
+	result := make(chan model.PlcWriteRequestResult, 1)
 	// If we are requesting only one tag, use a
 	if len(writeRequest.GetTagNames()) == 1 {
 		tagName := writeRequest.GetTagNames()[0]
diff --git a/plc4go/internal/modbus/Connection.go b/plc4go/internal/modbus/Connection.go
index 476ec7cf56..97afeaa7d0 100644
--- a/plc4go/internal/modbus/Connection.go
+++ b/plc4go/internal/modbus/Connection.go
@@ -96,7 +96,7 @@ func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
 	// TODO: use proper context
 	ctx := context.TODO()
 	log.Trace().Msg("Pinging")
-	result := make(chan plc4go.PlcConnectionPingResult)
+	result := make(chan plc4go.PlcConnectionPingResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/modbus/Reader.go b/plc4go/internal/modbus/Reader.go
index d0914a4730..cac63f0b7e 100644
--- a/plc4go/internal/modbus/Reader.go
+++ b/plc4go/internal/modbus/Reader.go
@@ -51,7 +51,7 @@ func NewReader(unitIdentifier uint8, messageCodec spi.MessageCodec) *Reader {
 func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
 	// TODO: handle ctx
 	log.Trace().Msg("Reading")
-	result := make(chan model.PlcReadRequestResult)
+	result := make(chan model.PlcReadRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/modbus/Writer.go b/plc4go/internal/modbus/Writer.go
index d13bcf38a8..73251d2187 100644
--- a/plc4go/internal/modbus/Writer.go
+++ b/plc4go/internal/modbus/Writer.go
@@ -49,7 +49,7 @@ func NewWriter(unitIdentifier uint8, messageCodec spi.MessageCodec) Writer {
 
 func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
 	// TODO: handle context
-	result := make(chan model.PlcWriteRequestResult)
+	result := make(chan model.PlcWriteRequestResult, 1)
 	go func() {
 		// If we are requesting only one tag, use a
 		if len(writeRequest.GetTagNames()) != 1 {
diff --git a/plc4go/internal/s7/Connection.go b/plc4go/internal/s7/Connection.go
index 8e1ab6870d..e980e7bbed 100644
--- a/plc4go/internal/s7/Connection.go
+++ b/plc4go/internal/s7/Connection.go
@@ -108,7 +108,7 @@ func (m *Connection) GetMessageCodec() spi.MessageCodec {
 
 func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
 	log.Trace().Msg("Connecting")
-	ch := make(chan plc4go.PlcConnectionConnectResult)
+	ch := make(chan plc4go.PlcConnectionConnectResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
@@ -149,8 +149,8 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
 func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
 	log.Debug().Msg("Sending COTP Connection Request")
 	// Open the session on ISO Transport Protocol first.
-	cotpConnectionResult := make(chan readWriteModel.COTPPacketConnectionResponse)
-	cotpConnectionErrorChan := make(chan error)
+	cotpConnectionResult := make(chan readWriteModel.COTPPacketConnectionResponse, 1)
+	cotpConnectionErrorChan := make(chan error, 1)
 	if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewTPKTPacket(m.createCOTPConnectionRequest()), func(message spi.Message) bool {
 		tpktPacket := message.(readWriteModel.TPKTPacket)
 		if tpktPacket == nil {
@@ -180,8 +180,8 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
 		log.Debug().Msg("Sending S7 Connection Request")
 
 		// Send an S7 login message.
-		s7ConnectionResult := make(chan readWriteModel.S7ParameterSetupCommunication)
-		s7ConnectionErrorChan := make(chan error)
+		s7ConnectionResult := make(chan readWriteModel.S7ParameterSetupCommunication, 1)
+		s7ConnectionErrorChan := make(chan error, 1)
 		if err := m.messageCodec.SendRequest(ctx, m.createS7ConnectionRequest(cotpPacketConnectionResponse), func(message spi.Message) bool {
 			tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
 			if !ok {
@@ -241,8 +241,8 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
 
 			// Prepare a message to request the remote to identify itself.
 			log.Debug().Msg("Sending S7 Identification Request")
-			s7IdentificationResult := make(chan readWriteModel.S7PayloadUserData)
-			s7IdentificationErrorChan := make(chan error)
+			s7IdentificationResult := make(chan readWriteModel.S7PayloadUserData, 1)
+			s7IdentificationErrorChan := make(chan error, 1)
 			if err := m.messageCodec.SendRequest(ctx, m.createIdentifyRemoteMessage(), func(message spi.Message) bool {
 				tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
 				if !ok {
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index 7b6226cdb9..8b327f973e 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -50,7 +50,7 @@ func NewReader(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm s
 func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
 	// TODO: handle ctx
 	log.Trace().Msg("Reading")
-	result := make(chan model.PlcReadRequestResult)
+	result := make(chan model.PlcReadRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index 7053daa9b5..b9a1773c48 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -48,7 +48,7 @@ func NewWriter(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm s
 
 func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
 	// TODO: handle context
-	result := make(chan model.PlcWriteRequestResult)
+	result := make(chan model.PlcWriteRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/simulated/Connection.go b/plc4go/internal/simulated/Connection.go
index 95e4c35015..371303a1c5 100644
--- a/plc4go/internal/simulated/Connection.go
+++ b/plc4go/internal/simulated/Connection.go
@@ -78,7 +78,7 @@ func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 }
 
 func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
-	ch := make(chan plc4go.PlcConnectionConnectResult)
+	ch := make(chan plc4go.PlcConnectionConnectResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
@@ -134,7 +134,7 @@ func (c *Connection) BlockingClose() {
 }
 
 func (c *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
-	ch := make(chan plc4go.PlcConnectionCloseResult)
+	ch := make(chan plc4go.PlcConnectionCloseResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
@@ -180,7 +180,7 @@ func (c *Connection) IsConnected() bool {
 }
 
 func (c *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
-	ch := make(chan plc4go.PlcConnectionPingResult)
+	ch := make(chan plc4go.PlcConnectionPingResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/simulated/Reader.go b/plc4go/internal/simulated/Reader.go
index ea3e2fcda9..cdb77f7350 100644
--- a/plc4go/internal/simulated/Reader.go
+++ b/plc4go/internal/simulated/Reader.go
@@ -46,7 +46,7 @@ func NewReader(device *Device, options map[string][]string, tracer *spi.Tracer)
 }
 
 func (r *Reader) Read(_ context.Context, readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
-	ch := make(chan model.PlcReadRequestResult)
+	ch := make(chan model.PlcReadRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/internal/simulated/Writer.go b/plc4go/internal/simulated/Writer.go
index b8cae999ad..0c2cd25447 100644
--- a/plc4go/internal/simulated/Writer.go
+++ b/plc4go/internal/simulated/Writer.go
@@ -45,7 +45,7 @@ func NewWriter(device *Device, options map[string][]string, tracer *spi.Tracer)
 }
 
 func (w *Writer) Write(_ context.Context, writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
-	ch := make(chan model.PlcWriteRequestResult)
+	ch := make(chan model.PlcWriteRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/spi/default/DefaultBrowser.go b/plc4go/spi/default/DefaultBrowser.go
index ab7a628f48..4688d0aaf1 100644
--- a/plc4go/spi/default/DefaultBrowser.go
+++ b/plc4go/spi/default/DefaultBrowser.go
@@ -66,7 +66,7 @@ func (m *defaultBrowser) Browse(ctx context.Context, browseRequest apiModel.PlcB
 }
 
 func (m *defaultBrowser) BrowseWithInterceptor(ctx context.Context, browseRequest apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseItem) bool) <-chan apiModel.PlcBrowseRequestResult {
-	result := make(chan apiModel.PlcBrowseRequestResult)
+	result := make(chan apiModel.PlcBrowseRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/spi/default/DefaultConnection.go b/plc4go/spi/default/DefaultConnection.go
index c0505bcea4..2a8c58656a 100644
--- a/plc4go/spi/default/DefaultConnection.go
+++ b/plc4go/spi/default/DefaultConnection.go
@@ -234,7 +234,7 @@ func (d *defaultConnection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 
 func (d *defaultConnection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
 	log.Trace().Msg("Connecting")
-	ch := make(chan plc4go.PlcConnectionConnectResult)
+	ch := make(chan plc4go.PlcConnectionConnectResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
@@ -284,7 +284,7 @@ func (d *defaultConnection) IsConnected() bool {
 }
 
 func (d *defaultConnection) Ping() <-chan plc4go.PlcConnectionPingResult {
-	ch := make(chan plc4go.PlcConnectionPingResult)
+	ch := make(chan plc4go.PlcConnectionPingResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/spi/model/DefaultPlcReadRequest.go b/plc4go/spi/model/DefaultPlcReadRequest.go
index 67775e8c9f..23653e5acb 100644
--- a/plc4go/spi/model/DefaultPlcReadRequest.go
+++ b/plc4go/spi/model/DefaultPlcReadRequest.go
@@ -136,7 +136,7 @@ func (d *DefaultPlcReadRequest) ExecuteWithContext(ctx context.Context) <-chan a
 	}
 
 	// Create a new result-channel, which completes as soon as all sub-result-channels have returned
-	resultChannel := make(chan apiModel.PlcReadRequestResult)
+	resultChannel := make(chan apiModel.PlcReadRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/spi/model/DefaultPlcWriteRequest.go b/plc4go/spi/model/DefaultPlcWriteRequest.go
index 9e846e10d0..c0146e0e83 100644
--- a/plc4go/spi/model/DefaultPlcWriteRequest.go
+++ b/plc4go/spi/model/DefaultPlcWriteRequest.go
@@ -155,7 +155,7 @@ func (d *DefaultPlcWriteRequest) ExecuteWithContext(ctx context.Context) <-chan
 	}
 
 	// Create a new result-channel, which completes as soon as all sub-result-channels have returned
-	resultChannel := make(chan apiModel.PlcWriteRequestResult)
+	resultChannel := make(chan apiModel.PlcWriteRequestResult, 1)
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
diff --git a/plc4go/spi/utils/Net.go b/plc4go/spi/utils/Net.go
index 59b41e1ad0..a2a6e539be 100644
--- a/plc4go/spi/utils/Net.go
+++ b/plc4go/spi/utils/Net.go
@@ -120,7 +120,7 @@ func lockupIpsUsingArp(ctx context.Context, netInterface net.Interface, ipNet *n
 	}
 
 	// Start up a goroutine to read in packet data.
-	stop := make(chan struct{})
+	stop := make(chan struct{}, 1)
 	// As we don't know how much the handler will find we use a value of 1 and set that to done after the 10 sec in the cleanup function directly after
 	wg.Add(1)
 	// Handler for processing incoming ARP responses.
diff --git a/plc4go/spi/utils/WorkerPool.go b/plc4go/spi/utils/WorkerPool.go
index a2247065e9..a1d8202a7c 100644
--- a/plc4go/spi/utils/WorkerPool.go
+++ b/plc4go/spi/utils/WorkerPool.go
@@ -46,7 +46,7 @@ type worker struct {
 func (w *worker) initialize() {
 	w.shutdown.Store(false)
 	w.interrupted.Store(false)
-	w.interrupter = make(chan struct{})
+	w.interrupter = make(chan struct{}, 1)
 	w.hasEnded.Store(false)
 	w.lastReceived = time.Now()
 }