You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/15 13:41:32 UTC

[plc4x] 01/02: fix(plc4go/cbus): handle pre registered consumers

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit c12edf33d65c293dcfc8dfb5172905520cec6184
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Jun 15 15:11:24 2023 +0200

    fix(plc4go/cbus): handle pre registered consumers
---
 plc4go/internal/ads/Subscriber.go  |  6 ++++--
 plc4go/internal/cbus/Subscriber.go | 13 ++++++++++++-
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/plc4go/internal/ads/Subscriber.go b/plc4go/internal/ads/Subscriber.go
index 232ce42816..6a0c874859 100644
--- a/plc4go/internal/ads/Subscriber.go
+++ b/plc4go/internal/ads/Subscriber.go
@@ -74,12 +74,14 @@ func (m *Connection) Subscribe(ctx context.Context, subscriptionRequest apiModel
 		subscriptionType := defaultSubscriptionRequest.GetType(tagName)
 		interval := defaultSubscriptionRequest.GetInterval(tagName)
 		preRegisteredConsumers := defaultSubscriptionRequest.GetPreRegisteredConsumers(tagName)
-		subSubscriptionRequests[tagName] = spiModel.NewDefaultPlcSubscriptionRequest(m,
+		subSubscriptionRequests[tagName] = spiModel.NewDefaultPlcSubscriptionRequest(
+			m,
 			[]string{tagName},
 			map[string]apiModel.PlcTag{tagName: directTag},
 			map[string]spiModel.SubscriptionType{tagName: subscriptionType},
 			map[string]time.Duration{tagName: interval},
-			map[string][]apiModel.PlcSubscriptionEventConsumer{tagName: preRegisteredConsumers})
+			map[string][]apiModel.PlcSubscriptionEventConsumer{tagName: preRegisteredConsumers},
+		)
 	}
 
 	// If this is a single item request, we can take a shortcut.
diff --git a/plc4go/internal/cbus/Subscriber.go b/plc4go/internal/cbus/Subscriber.go
index a9224fd79a..b88d5da690 100644
--- a/plc4go/internal/cbus/Subscriber.go
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -74,7 +74,18 @@ func (s *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.P
 		subscriptionValues := make(map[string]apiModel.PlcSubscriptionHandle)
 		for _, tagName := range internalPlcSubscriptionRequest.GetTagNames() {
 			responseCodes[tagName] = apiModel.PlcResponseCode_OK
-			subscriptionValues[tagName] = NewSubscriptionHandle(s, tagName, internalPlcSubscriptionRequest.GetTag(tagName), internalPlcSubscriptionRequest.GetType(tagName), internalPlcSubscriptionRequest.GetInterval(tagName))
+			handle := NewSubscriptionHandle(
+				s,
+				tagName,
+				internalPlcSubscriptionRequest.GetTag(tagName),
+				internalPlcSubscriptionRequest.GetType(tagName),
+				internalPlcSubscriptionRequest.GetInterval(tagName),
+			)
+			preRegisteredConsumers := internalPlcSubscriptionRequest.GetPreRegisteredConsumers(tagName)
+			for _, consumer := range preRegisteredConsumers {
+				_ = handle.Register(consumer)
+			}
+			subscriptionValues[tagName] = handle
 		}
 
 		result <- spiModel.NewDefaultPlcSubscriptionRequestResult(