You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/15 10:18:30 UTC
[plc4x] branch develop updated: feat(plc4go/spi): propagate ctx down into implementations
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new fbe964bfa feat(plc4go/spi): propagate ctx down into implementations
fbe964bfa is described below
commit fbe964bfaa0284d7a95b133e6cb2e84971f58bae
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Aug 15 12:18:23 2022 +0200
feat(plc4go/spi): propagate ctx down into implementations
---
plc4go/internal/ads/Reader.go | 4 +-
plc4go/internal/ads/Writer.go | 4 +-
plc4go/internal/bacnetip/Reader.go | 4 +-
plc4go/internal/bacnetip/Subscriber.go | 7 +-
plc4go/internal/cbus/Browser.go | 183 ++++++++++-----------
plc4go/internal/cbus/Reader.go | 4 +-
plc4go/internal/cbus/Subscriber.go | 7 +-
plc4go/internal/cbus/Writer.go | 4 +-
plc4go/internal/eip/Reader.go | 4 +-
plc4go/internal/eip/Writer.go | 4 +-
plc4go/internal/knxnetip/Browser.go | 67 +++-----
plc4go/internal/knxnetip/Reader.go | 4 +-
plc4go/internal/knxnetip/Subscriber.go | 7 +-
plc4go/internal/knxnetip/Writer.go | 4 +-
plc4go/internal/modbus/Reader.go | 4 +-
plc4go/internal/modbus/Writer.go | 4 +-
plc4go/internal/s7/Reader.go | 4 +-
plc4go/internal/s7/Writer.go | 4 +-
plc4go/internal/simulated/Reader.go | 4 +-
plc4go/internal/simulated/Reader_test.go | 2 +-
plc4go/internal/simulated/Writer.go | 4 +-
plc4go/internal/simulated/Writer_test.go | 2 +-
plc4go/pkg/api/model/plc_browse.go | 2 +
plc4go/spi/PlcBrowser.go | 9 +-
plc4go/spi/PlcReader.go | 7 +-
plc4go/spi/PlcSubscriber.go | 9 +-
plc4go/spi/PlcWriter.go | 7 +-
plc4go/spi/default/DefaultBrowser.go | 84 ++++++++++
plc4go/spi/interceptors/RequestInterceptor.go | 9 +-
.../interceptors/SingleItemRequestInterceptor.go | 13 +-
plc4go/spi/model/DefaultPlcBrowseRequest.go | 12 +-
plc4go/spi/model/DefaultPlcReadRequest.go | 28 ++--
plc4go/spi/model/DefaultPlcSubscriptionRequest.go | 6 +-
plc4go/spi/model/DefaultPlcWriteRequest.go | 27 +--
34 files changed, 337 insertions(+), 211 deletions(-)
diff --git a/plc4go/internal/ads/Reader.go b/plc4go/internal/ads/Reader.go
index 83d08d35f..a574b948c 100644
--- a/plc4go/internal/ads/Reader.go
+++ b/plc4go/internal/ads/Reader.go
@@ -20,6 +20,7 @@
package ads
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/ads/readwrite/model"
@@ -57,7 +58,8 @@ func NewReader(messageCodec spi.MessageCodec, targetAmsNetId readWriteModel.AmsN
}
}
-func (m *Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
+func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
+ // TODO: handle ctx
log.Trace().Msg("Reading")
result := make(chan model.PlcReadRequestResult)
go func() {
diff --git a/plc4go/internal/ads/Writer.go b/plc4go/internal/ads/Writer.go
index f86990f58..fa41fbdae 100644
--- a/plc4go/internal/ads/Writer.go
+++ b/plc4go/internal/ads/Writer.go
@@ -20,6 +20,7 @@
package ads
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/ads/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
@@ -54,7 +55,8 @@ func NewWriter(messageCodec spi.MessageCodec, targetAmsNetId readWriteModel.AmsN
}
}
-func (m *Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
+func (m *Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
+ // TODO: handle context
result := make(chan model.PlcWriteRequestResult)
go func() {
// If we are requesting only one field, use a
diff --git a/plc4go/internal/bacnetip/Reader.go b/plc4go/internal/bacnetip/Reader.go
index 364680aa9..13e3c17cc 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -20,6 +20,7 @@
package bacnetip
import (
+ "context"
"fmt"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
@@ -66,7 +67,8 @@ func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCod
}
}
-func (m *Reader) Read(readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
+func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
+ // TODO: handle ctx
log.Trace().Msg("Reading")
result := make(chan apiModel.PlcReadRequestResult)
go func() {
diff --git a/plc4go/internal/bacnetip/Subscriber.go b/plc4go/internal/bacnetip/Subscriber.go
index 043c2f97f..383efd190 100644
--- a/plc4go/internal/bacnetip/Subscriber.go
+++ b/plc4go/internal/bacnetip/Subscriber.go
@@ -20,6 +20,7 @@
package bacnetip
import (
+ "context"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
internalModel "github.com/apache/plc4x/plc4go/spi/model"
plc4goModel "github.com/apache/plc4x/plc4go/spi/model"
@@ -37,7 +38,8 @@ func NewSubscriber(connection *Connection) *Subscriber {
}
}
-func (m *Subscriber) Subscribe(subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
+func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
+ // TODO: handle ctx
result := make(chan apiModel.PlcSubscriptionRequestResult)
go func() {
// Add this subscriber to the connection.
@@ -61,7 +63,8 @@ func (m *Subscriber) Subscribe(subscriptionRequest apiModel.PlcSubscriptionReque
return result
}
-func (m *Subscriber) Unsubscribe(unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
+func (m *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
+ // TODO: handle ctx
result := make(chan apiModel.PlcUnsubscriptionRequestResult)
// TODO: As soon as we establish a connection, we start getting data...
diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index 4a4fbd17c..782897aea 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -20,134 +20,119 @@
package cbus
import (
+ "context"
"fmt"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
+ _default "github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/model"
"github.com/rs/zerolog/log"
)
type Browser struct {
+ _default.DefaultBrowser
connection *Connection
messageCodec spi.MessageCodec
sequenceCounter uint8
}
func NewBrowser(connection *Connection, messageCodec spi.MessageCodec) *Browser {
- return &Browser{
+ browser := Browser{
connection: connection,
messageCodec: messageCodec,
sequenceCounter: 0,
}
+ browser.DefaultBrowser = _default.NewDefaultBrowser(browser)
+ return &browser
}
-func (m Browser) Browse(browseRequest apiModel.PlcBrowseRequest) <-chan apiModel.PlcBrowseRequestResult {
- return m.BrowseWithInterceptor(browseRequest, nil)
-}
-
-func (m Browser) BrowseWithInterceptor(browseRequest apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseEvent) bool) <-chan apiModel.PlcBrowseRequestResult {
- result := make(chan apiModel.PlcBrowseRequestResult)
-
- go func() {
- responseCodes := map[string]apiModel.PlcResponseCode{}
- results := map[string][]apiModel.PlcBrowseFoundField{}
- for _, fieldName := range browseRequest.GetFieldNames() {
- field := browseRequest.GetField(fieldName)
+func (m Browser) BrowseField(ctx context.Context, browseRequest apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseEvent) bool, fieldName string, field apiModel.PlcField) (apiModel.PlcResponseCode, []apiModel.PlcBrowseFoundField) {
+ // TODO: handle ctx
+ var queryResults []apiModel.PlcBrowseFoundField
+ switch field := field.(type) {
+ case *unitInfoField:
+ allUnits := false
+ var units []readWriteModel.UnitAddress
+ allAttributes := false
+ var attributes []readWriteModel.Attribute
+ if unitAddress := field.unitAddress; unitAddress != nil {
+ units = append(units, *unitAddress)
+ } else {
+ allUnits = true
+ for i := 0; i <= 0xFF; i++ {
+ units = append(units, readWriteModel.NewUnitAddress(byte(i)))
+ }
+ }
+ if attribute := field.attribute; attribute != nil {
+ attributes = append(attributes, *attribute)
+ } else {
+ allAttributes = true
+ for _, attribute := range readWriteModel.AttributeValues {
+ attributes = append(attributes, attribute)
+ }
+ }
- var queryResults []apiModel.PlcBrowseFoundField
- switch field := field.(type) {
- case *unitInfoField:
- allUnits := false
- var units []readWriteModel.UnitAddress
- allAttributes := false
- var attributes []readWriteModel.Attribute
- if unitAddress := field.unitAddress; unitAddress != nil {
- units = append(units, *unitAddress)
+ if allUnits {
+ log.Info().Msg("Querying all units")
+ }
+ unitLoop:
+ for _, unit := range units {
+ unitAddress := unit.GetAddress()
+ if !allUnits && allAttributes {
+ log.Info().Msgf("Querying all attributes of unit %d", unitAddress)
+ }
+ event := log.Info()
+ if allUnits {
+ event = log.Debug()
+ }
+ event.Msgf("Query unit %d", unitAddress)
+ for _, attribute := range attributes {
+ if !allUnits && !allAttributes {
+ log.Info().Msgf("Querying attribute %s of unit %d", attribute, unitAddress)
} else {
- allUnits = true
- for i := 0; i <= 0xFF; i++ {
- units = append(units, readWriteModel.NewUnitAddress(byte(i)))
- }
+ event.Msgf("unit %d: Query %s", unitAddress, attribute)
}
- if attribute := field.attribute; attribute != nil {
- attributes = append(attributes, *attribute)
- } else {
- allAttributes = true
- for _, attribute := range readWriteModel.AttributeValues {
- attributes = append(attributes, attribute)
+ readFieldName := fmt.Sprintf("%s/%d/%s", fieldName, unitAddress, attribute)
+ readRequest, _ := m.connection.ReadRequestBuilder().
+ AddField(readFieldName, NewCALIdentifyField(unit, attribute, 1)).
+ Build()
+ requestResult := <-readRequest.Execute()
+ if err := requestResult.GetErr(); err != nil {
+ if !allUnits && !allAttributes {
+ event.Err(err).Msgf("unit %d: Can't read attribute %s", unitAddress, attribute)
}
+ continue unitLoop
}
-
- if allUnits {
- log.Info().Msg("Querying all units")
+ response := requestResult.GetResponse()
+ if code := response.GetResponseCode(readFieldName); code != apiModel.PlcResponseCode_OK {
+ event.Msgf("unit %d: error reading field %s. Code %s", unitAddress, attribute, code)
+ continue unitLoop
}
- unitLoop:
- for _, unit := range units {
- unitAddress := unit.GetAddress()
- if !allUnits && allAttributes {
- log.Info().Msgf("Querying all attributes of unit %d", unitAddress)
- }
- event := log.Info()
- if allUnits {
- event = log.Debug()
- }
- event.Msgf("Query unit %d", unitAddress)
- for _, attribute := range attributes {
- if !allUnits && !allAttributes {
- log.Info().Msgf("Querying attribute %s of unit %d", attribute, unitAddress)
- } else {
- event.Msgf("unit %d: Query %s", unitAddress, attribute)
- }
- readFieldName := fmt.Sprintf("%s/%d/%s", fieldName, unitAddress, attribute)
- readRequest, _ := m.connection.ReadRequestBuilder().
- AddField(readFieldName, NewCALIdentifyField(unit, attribute, 1)).
- Build()
- requestResult := <-readRequest.Execute()
- if err := requestResult.GetErr(); err != nil {
- if !allUnits && !allAttributes {
- event.Err(err).Msgf("unit %d: Can't read attribute %s", unitAddress, attribute)
- }
- continue unitLoop
- }
- response := requestResult.GetResponse()
- if code := response.GetResponseCode(readFieldName); code != apiModel.PlcResponseCode_OK {
- event.Msgf("unit %d: error reading field %s. Code %s", unitAddress, attribute, code)
- continue unitLoop
- }
- queryResult := &model.DefaultPlcBrowseQueryResult{
- Field: NewCALIdentifyField(unit, attribute, 1),
- Name: fieldName,
- Readable: true,
- Writable: false,
- Subscribable: false,
- Attributes: map[string]values.PlcValue{
- "CurrentValue": response.GetValue(readFieldName),
- },
- }
- if interceptor != nil {
- interceptor(&model.DefaultPlcBrowseEvent{
- Request: browseRequest,
- FieldName: readFieldName,
- Result: queryResult,
- Err: nil,
- })
- }
- queryResults = append(queryResults, queryResult)
- }
+ queryResult := &model.DefaultPlcBrowseQueryResult{
+ Field: NewCALIdentifyField(unit, attribute, 1),
+ Name: fieldName,
+ Readable: true,
+ Writable: false,
+ Subscribable: false,
+ Attributes: map[string]values.PlcValue{
+ "CurrentValue": response.GetValue(readFieldName),
+ },
+ }
+ if interceptor != nil {
+ interceptor(&model.DefaultPlcBrowseEvent{
+ Request: browseRequest,
+ FieldName: readFieldName,
+ Result: queryResult,
+ Err: nil,
+ })
}
- responseCodes[fieldName] = apiModel.PlcResponseCode_OK
- default:
- responseCodes[fieldName] = apiModel.PlcResponseCode_INTERNAL_ERROR
+ queryResults = append(queryResults, queryResult)
}
- results[fieldName] = queryResults
}
- result <- &model.DefaultPlcBrowseRequestResult{
- Request: browseRequest,
- Response: model.NewDefaultPlcBrowseResponse(browseRequest, results, responseCodes),
- Err: nil,
- }
- }()
- return result
+ default:
+ return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
+ }
+ return apiModel.PlcResponseCode_OK, queryResults
}
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index b34382df9..cd5840f93 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -20,6 +20,7 @@
package cbus
import (
+ "context"
"fmt"
"sync"
"time"
@@ -49,7 +50,8 @@ func NewReader(tpduGenerator *AlphaGenerator, messageCodec spi.MessageCodec, tm
}
}
-func (m *Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
+func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
+ // TODO: handle ctx
log.Trace().Msg("Reading")
result := make(chan model.PlcReadRequestResult)
go func() {
diff --git a/plc4go/internal/cbus/Subscriber.go b/plc4go/internal/cbus/Subscriber.go
index 18f9e2aae..a61406a1e 100644
--- a/plc4go/internal/cbus/Subscriber.go
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -20,6 +20,7 @@
package cbus
import (
+ "context"
"fmt"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
@@ -42,7 +43,8 @@ func NewSubscriber(connection *Connection) *Subscriber {
}
}
-func (m *Subscriber) Subscribe(subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
+func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
+ // TODO: handle context
result := make(chan apiModel.PlcSubscriptionRequestResult)
go func() {
// Add this subscriber to the connection.
@@ -66,7 +68,8 @@ func (m *Subscriber) Subscribe(subscriptionRequest apiModel.PlcSubscriptionReque
return result
}
-func (m *Subscriber) Unsubscribe(unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
+func (m *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
+ // TODO: handle context
result := make(chan apiModel.PlcUnsubscriptionRequestResult)
// TODO: As soon as we establish a connection, we start getting data...
diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go
index 1db987273..b901f8493 100644
--- a/plc4go/internal/cbus/Writer.go
+++ b/plc4go/internal/cbus/Writer.go
@@ -20,6 +20,7 @@
package cbus
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi"
plc4goModel "github.com/apache/plc4x/plc4go/spi/model"
@@ -40,7 +41,8 @@ func NewWriter(tpduGenerator *AlphaGenerator, messageCodec spi.MessageCodec, tm
}
}
-func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
+func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
+ // TODO: handle context
result := make(chan model.PlcWriteRequestResult)
go func() {
result <- &plc4goModel.DefaultPlcWriteRequestResult{
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 1779b3d07..65caf6b68 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -20,6 +20,7 @@
package eip
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/eip/readwrite/model"
@@ -50,7 +51,8 @@ func NewReader(messageCodec spi.MessageCodec, tm *spi.RequestTransactionManager,
}
}
-func (m *Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
+func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
+ // TODO: handle ctx
log.Trace().Msg("Reading")
result := make(chan model.PlcReadRequestResult)
go func() {
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index bf208e03d..7343b07af 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -20,6 +20,7 @@
package eip
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/eip/readwrite/model"
@@ -50,7 +51,8 @@ func NewWriter(messageCodec spi.MessageCodec, tm *spi.RequestTransactionManager,
}
}
-func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
+func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
+ // TODO: handle context
result := make(chan model.PlcWriteRequestResult)
go func() {
items := make([]readWriteModel.CipService, len(writeRequest.GetFieldNames()))
diff --git a/plc4go/internal/knxnetip/Browser.go b/plc4go/internal/knxnetip/Browser.go
index dfd31f8bf..74640234b 100644
--- a/plc4go/internal/knxnetip/Browser.go
+++ b/plc4go/internal/knxnetip/Browser.go
@@ -20,8 +20,10 @@
package knxnetip
import (
+ "context"
"encoding/hex"
"fmt"
+ _default "github.com/apache/plc4x/plc4go/spi/default"
"strconv"
"strings"
"time"
@@ -37,61 +39,44 @@ import (
)
type Browser struct {
+ _default.DefaultBrowser
connection *Connection
messageCodec spi.MessageCodec
sequenceCounter uint8
}
func NewBrowser(connection *Connection, messageCodec spi.MessageCodec) *Browser {
- return &Browser{
+ browser := Browser{
connection: connection,
messageCodec: messageCodec,
sequenceCounter: 0,
}
+ browser.DefaultBrowser = _default.NewDefaultBrowser(browser)
+ return &browser
}
-func (m Browser) Browse(browseRequest apiModel.PlcBrowseRequest) <-chan apiModel.PlcBrowseRequestResult {
- return m.BrowseWithInterceptor(browseRequest, func(result apiModel.PlcBrowseEvent) bool {
- return true
- })
-}
-
-func (m Browser) BrowseWithInterceptor(browseRequest apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseEvent) bool) <-chan apiModel.PlcBrowseRequestResult {
- result := make(chan apiModel.PlcBrowseRequestResult)
- go func() {
- responseCodes := map[string]apiModel.PlcResponseCode{}
- results := map[string][]apiModel.PlcBrowseFoundField{}
- for _, fieldName := range browseRequest.GetFieldNames() {
- field := browseRequest.GetField(fieldName)
-
- switch field.(type) {
- case DeviceQueryField:
- queryResults, err := m.executeDeviceQuery(field.(DeviceQueryField), browseRequest, fieldName, interceptor)
- if err != nil {
- log.Warn().Err(err).Msg("Error executing device query")
- responseCodes[fieldName] = apiModel.PlcResponseCode_INTERNAL_ERROR
- } else {
- results[fieldName] = queryResults
- }
- case CommunicationObjectQueryField:
- queryResults, err := m.executeCommunicationObjectQuery(field.(CommunicationObjectQueryField))
- if err != nil {
- log.Warn().Err(err).Msg("Error executing device query")
- responseCodes[fieldName] = apiModel.PlcResponseCode_INTERNAL_ERROR
- } else {
- results[fieldName] = queryResults
- }
- default:
- responseCodes[fieldName] = apiModel.PlcResponseCode_INTERNAL_ERROR
- }
+func (m Browser) BrowseField(ctx context.Context, browseRequest apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseEvent) bool, fieldName string, field apiModel.PlcField) (apiModel.PlcResponseCode, []apiModel.PlcBrowseFoundField) {
+ // TODO: handle ctx
+ switch field.(type) {
+ case DeviceQueryField:
+ queryResults, err := m.executeDeviceQuery(field.(DeviceQueryField), browseRequest, fieldName, interceptor)
+ if err != nil {
+ log.Warn().Err(err).Msg("Error executing device query")
+ return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
+ } else {
+ return apiModel.PlcResponseCode_OK, queryResults
}
- result <- &model.DefaultPlcBrowseRequestResult{
- Request: browseRequest,
- Response: model.NewDefaultPlcBrowseResponse(browseRequest, results, responseCodes),
- Err: nil,
+ case CommunicationObjectQueryField:
+ queryResults, err := m.executeCommunicationObjectQuery(field.(CommunicationObjectQueryField))
+ if err != nil {
+ log.Warn().Err(err).Msg("Error executing device query")
+ return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
+ } else {
+ return apiModel.PlcResponseCode_OK, queryResults
}
- }()
- return result
+ default:
+ return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
+ }
}
func (m Browser) executeDeviceQuery(field DeviceQueryField, browseRequest apiModel.PlcBrowseRequest, fieldName string, interceptor func(result apiModel.PlcBrowseEvent) bool) ([]apiModel.PlcBrowseFoundField, error) {
diff --git a/plc4go/internal/knxnetip/Reader.go b/plc4go/internal/knxnetip/Reader.go
index 756716728..c1983d414 100644
--- a/plc4go/internal/knxnetip/Reader.go
+++ b/plc4go/internal/knxnetip/Reader.go
@@ -20,6 +20,7 @@
package knxnetip
import (
+ "context"
"errors"
"strconv"
"strings"
@@ -43,7 +44,8 @@ func NewReader(connection *Connection) *Reader {
}
}
-func (m Reader) Read(readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
+func (m Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
+ // TODO: handle ctx
resultChan := make(chan apiModel.PlcReadRequestResult)
go func() {
responseCodes := map[string]apiModel.PlcResponseCode{}
diff --git a/plc4go/internal/knxnetip/Subscriber.go b/plc4go/internal/knxnetip/Subscriber.go
index 901f71c98..7a84bcf95 100644
--- a/plc4go/internal/knxnetip/Subscriber.go
+++ b/plc4go/internal/knxnetip/Subscriber.go
@@ -20,6 +20,7 @@
package knxnetip
import (
+ "context"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
driverModel "github.com/apache/plc4x/plc4go/protocols/knxnetip/readwrite/model"
@@ -41,7 +42,8 @@ func NewSubscriber(connection *Connection) *Subscriber {
}
}
-func (m *Subscriber) Subscribe(subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
+func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
+ // TODO: handle context
result := make(chan apiModel.PlcSubscriptionRequestResult)
go func() {
// Add this subscriber to the connection.
@@ -65,7 +67,8 @@ func (m *Subscriber) Subscribe(subscriptionRequest apiModel.PlcSubscriptionReque
return result
}
-func (m *Subscriber) Unsubscribe(unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
+func (m *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
+ // TODO: handle context
result := make(chan apiModel.PlcUnsubscriptionRequestResult)
// TODO: As soon as we establish a connection, we start getting data...
diff --git a/plc4go/internal/knxnetip/Writer.go b/plc4go/internal/knxnetip/Writer.go
index cef21e120..9ff879b68 100644
--- a/plc4go/internal/knxnetip/Writer.go
+++ b/plc4go/internal/knxnetip/Writer.go
@@ -20,6 +20,7 @@
package knxnetip
import (
+ "context"
"errors"
"github.com/apache/plc4x/plc4go/pkg/api/model"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/knxnetip/readwrite/model"
@@ -38,7 +39,8 @@ func NewWriter(messageCodec spi.MessageCodec) Writer {
}
}
-func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
+func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
+ // TODO: handle context
result := make(chan model.PlcWriteRequestResult)
// If we are requesting only one field, use a
if len(writeRequest.GetFieldNames()) == 1 {
diff --git a/plc4go/internal/modbus/Reader.go b/plc4go/internal/modbus/Reader.go
index 48469854b..d85d1ed4e 100644
--- a/plc4go/internal/modbus/Reader.go
+++ b/plc4go/internal/modbus/Reader.go
@@ -20,6 +20,7 @@
package modbus
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/modbus/readwrite/model"
@@ -47,7 +48,8 @@ func NewReader(unitIdentifier uint8, messageCodec spi.MessageCodec) *Reader {
}
}
-func (m *Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
+func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
+ // TODO: handle ctx
log.Trace().Msg("Reading")
result := make(chan model.PlcReadRequestResult)
go func() {
diff --git a/plc4go/internal/modbus/Writer.go b/plc4go/internal/modbus/Writer.go
index 46c5e6545..d0ff99d9d 100644
--- a/plc4go/internal/modbus/Writer.go
+++ b/plc4go/internal/modbus/Writer.go
@@ -20,6 +20,7 @@
package modbus
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/modbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
@@ -46,7 +47,8 @@ func NewWriter(unitIdentifier uint8, messageCodec spi.MessageCodec) Writer {
}
}
-func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
+func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
+ // TODO: handle context
result := make(chan model.PlcWriteRequestResult)
go func() {
// If we are requesting only one field, use a
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index 418a18174..68094ce47 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -20,6 +20,7 @@
package s7
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/s7/readwrite/model"
@@ -46,7 +47,8 @@ func NewReader(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm *
}
}
-func (m *Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
+func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
+ // TODO: handle ctx
log.Trace().Msg("Reading")
result := make(chan model.PlcReadRequestResult)
go func() {
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index 45dc0cac1..f86369b1e 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -20,6 +20,7 @@
package s7
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/s7/readwrite/model"
@@ -45,7 +46,8 @@ func NewWriter(tpduGenerator *TpduGenerator, messageCodec spi.MessageCodec, tm *
}
}
-func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
+func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
+ // TODO: handle context
result := make(chan model.PlcWriteRequestResult)
go func() {
parameterItems := make([]readWriteModel.S7VarRequestParameterItem, len(writeRequest.GetFieldNames()))
diff --git a/plc4go/internal/simulated/Reader.go b/plc4go/internal/simulated/Reader.go
index 2496bee0d..9520cefa0 100644
--- a/plc4go/internal/simulated/Reader.go
+++ b/plc4go/internal/simulated/Reader.go
@@ -20,6 +20,7 @@
package simulated
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
"github.com/apache/plc4x/plc4go/spi"
@@ -42,7 +43,8 @@ func NewReader(device *Device, options map[string][]string, tracer *spi.Tracer)
}
}
-func (r Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
+func (r Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
+ // TODO: handle ctx
ch := make(chan model.PlcReadRequestResult)
go func() {
var txId string
diff --git a/plc4go/internal/simulated/Reader_test.go b/plc4go/internal/simulated/Reader_test.go
index af84cfc07..3af1cf90d 100644
--- a/plc4go/internal/simulated/Reader_test.go
+++ b/plc4go/internal/simulated/Reader_test.go
@@ -161,7 +161,7 @@ func TestReader_Read(t *testing.T) {
r := NewReader(tt.fields.device, tt.fields.options, nil)
readRequest := model3.NewDefaultPlcReadRequest(tt.args.fields, tt.args.fieldNames, r, nil)
timeBeforeReadRequest := time.Now()
- readResponseChannel := r.Read(readRequest)
+ readResponseChannel := r.Read(nil, readRequest)
select {
case readResponse := <-readResponseChannel:
timeAfterReadRequest := time.Now()
diff --git a/plc4go/internal/simulated/Writer.go b/plc4go/internal/simulated/Writer.go
index 9ec28b32f..0317f48c5 100644
--- a/plc4go/internal/simulated/Writer.go
+++ b/plc4go/internal/simulated/Writer.go
@@ -20,6 +20,7 @@
package simulated
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi"
model2 "github.com/apache/plc4x/plc4go/spi/model"
@@ -41,7 +42,8 @@ func NewWriter(device *Device, options map[string][]string, tracer *spi.Tracer)
}
}
-func (w Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
+func (w Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
+ // TODO: handle context
ch := make(chan model.PlcWriteRequestResult)
go func() {
var txId string
diff --git a/plc4go/internal/simulated/Writer_test.go b/plc4go/internal/simulated/Writer_test.go
index 4baab6935..55494c2dc 100644
--- a/plc4go/internal/simulated/Writer_test.go
+++ b/plc4go/internal/simulated/Writer_test.go
@@ -173,7 +173,7 @@ func TestWriter_Write(t *testing.T) {
w := NewWriter(tt.fields.device, tt.fields.options, nil)
writeRequest := model3.NewDefaultPlcWriteRequest(tt.args.fields, tt.args.fieldNames, tt.args.values, w, nil)
timeBeforeWriteRequest := time.Now()
- writeResponseChannel := w.Write(writeRequest)
+ writeResponseChannel := w.Write(nil, writeRequest)
select {
case writeResponse := <-writeResponseChannel:
timeAfterWriteRequest := time.Now()
diff --git a/plc4go/pkg/api/model/plc_browse.go b/plc4go/pkg/api/model/plc_browse.go
index 9211c8aeb..2f785d572 100644
--- a/plc4go/pkg/api/model/plc_browse.go
+++ b/plc4go/pkg/api/model/plc_browse.go
@@ -37,6 +37,8 @@ type PlcBrowseRequest interface {
ExecuteWithContext(ctx context.Context) <-chan PlcBrowseRequestResult
// ExecuteWithInterceptor Will call the given callback for every found resource
ExecuteWithInterceptor(interceptor func(result PlcBrowseEvent) bool) <-chan PlcBrowseRequestResult
+ // ExecuteWithInterceptorWithContext Will call the given callback for every found resource
+ ExecuteWithInterceptorWithContext(ctx context.Context, interceptor func(result PlcBrowseEvent) bool) <-chan PlcBrowseRequestResult
GetFieldNames() []string
GetField(name string) PlcField
}
diff --git a/plc4go/spi/PlcBrowser.go b/plc4go/spi/PlcBrowser.go
index c9280b3a4..7b6488301 100644
--- a/plc4go/spi/PlcBrowser.go
+++ b/plc4go/spi/PlcBrowser.go
@@ -19,11 +19,14 @@
package spi
-import "github.com/apache/plc4x/plc4go/pkg/api/model"
+import (
+ "context"
+ "github.com/apache/plc4x/plc4go/pkg/api/model"
+)
type PlcBrowser interface {
// Browse Non-Blocking request, which will return a full result as soon as the operation is finished
- Browse(browseRequest model.PlcBrowseRequest) <-chan model.PlcBrowseRequestResult
+ Browse(ctx context.Context, browseRequest model.PlcBrowseRequest) <-chan model.PlcBrowseRequestResult
// BrowseWithInterceptor Variant of the Browser, which allows immediately intercepting found resources
// This is ideal, if additional information has to be queried on such found resources
@@ -31,5 +34,5 @@ type PlcBrowser interface {
// and increase throughput. It can also be used for simple filtering.
// If the interceptor function returns 'true' the result is added to the overall result
// if it's 'false' is is not.
- BrowseWithInterceptor(browseRequest model.PlcBrowseRequest, interceptor func(result model.PlcBrowseEvent) bool) <-chan model.PlcBrowseRequestResult
+ BrowseWithInterceptor(ctx context.Context, browseRequest model.PlcBrowseRequest, interceptor func(result model.PlcBrowseEvent) bool) <-chan model.PlcBrowseRequestResult
}
diff --git a/plc4go/spi/PlcReader.go b/plc4go/spi/PlcReader.go
index 8eb0dcd77..59439c934 100644
--- a/plc4go/spi/PlcReader.go
+++ b/plc4go/spi/PlcReader.go
@@ -19,8 +19,11 @@
package spi
-import "github.com/apache/plc4x/plc4go/pkg/api/model"
+import (
+ "context"
+ "github.com/apache/plc4x/plc4go/pkg/api/model"
+)
type PlcReader interface {
- Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult
+ Read(ctx context.Context, readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult
}
diff --git a/plc4go/spi/PlcSubscriber.go b/plc4go/spi/PlcSubscriber.go
index f5f7bef4e..ceb771b95 100644
--- a/plc4go/spi/PlcSubscriber.go
+++ b/plc4go/spi/PlcSubscriber.go
@@ -19,9 +19,12 @@
package spi
-import "github.com/apache/plc4x/plc4go/pkg/api/model"
+import (
+ "context"
+ "github.com/apache/plc4x/plc4go/pkg/api/model"
+)
type PlcSubscriber interface {
- Subscribe(subscriptionRequest model.PlcSubscriptionRequest) <-chan model.PlcSubscriptionRequestResult
- Unsubscribe(unsubscriptionRequest model.PlcUnsubscriptionRequest) <-chan model.PlcUnsubscriptionRequestResult
+ Subscribe(ctx context.Context, subscriptionRequest model.PlcSubscriptionRequest) <-chan model.PlcSubscriptionRequestResult
+ Unsubscribe(ctx context.Context, unsubscriptionRequest model.PlcUnsubscriptionRequest) <-chan model.PlcUnsubscriptionRequestResult
}
diff --git a/plc4go/spi/PlcWriter.go b/plc4go/spi/PlcWriter.go
index 39103c95e..6178c3be8 100644
--- a/plc4go/spi/PlcWriter.go
+++ b/plc4go/spi/PlcWriter.go
@@ -19,8 +19,11 @@
package spi
-import "github.com/apache/plc4x/plc4go/pkg/api/model"
+import (
+ "context"
+ "github.com/apache/plc4x/plc4go/pkg/api/model"
+)
type PlcWriter interface {
- Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult
+ Write(ctx context.Context, writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult
}
diff --git a/plc4go/spi/default/DefaultBrowser.go b/plc4go/spi/default/DefaultBrowser.go
new file mode 100644
index 000000000..30bd911fe
--- /dev/null
+++ b/plc4go/spi/default/DefaultBrowser.go
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package _default
+
+import (
+ "context"
+ apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
+ "github.com/apache/plc4x/plc4go/spi"
+ "github.com/apache/plc4x/plc4go/spi/model"
+)
+
+// DefaultBrowserRequirements adds required methods to Browser that are needed when using DefaultBrowser
+type DefaultBrowserRequirements interface {
+ BrowseField(ctx context.Context, browseRequest apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseEvent) bool, fieldName string, field apiModel.PlcField) (apiModel.PlcResponseCode, []apiModel.PlcBrowseFoundField)
+}
+
+type DefaultBrowser interface {
+ spi.PlcBrowser
+}
+
+func NewDefaultBrowser(defaultBrowserRequirements DefaultBrowserRequirements) DefaultBrowser {
+ return &defaultBrowser{defaultBrowserRequirements}
+}
+
+///////////////////////////////////////
+///////////////////////////////////////
+//
+// Internal section
+//
+
+type defaultBrowser struct {
+ DefaultBrowserRequirements
+}
+
+//
+// Internal section
+//
+///////////////////////////////////////
+///////////////////////////////////////
+
+func (m *defaultBrowser) Browse(ctx context.Context, browseRequest apiModel.PlcBrowseRequest) <-chan apiModel.PlcBrowseRequestResult {
+ return m.BrowseWithContext(ctx, browseRequest)
+}
+
+func (m *defaultBrowser) BrowseWithContext(ctx context.Context, browseRequest apiModel.PlcBrowseRequest) <-chan apiModel.PlcBrowseRequestResult {
+ return m.BrowseWithInterceptor(ctx, browseRequest, func(result apiModel.PlcBrowseEvent) bool {
+ return true
+ })
+}
+
+func (m *defaultBrowser) BrowseWithInterceptor(ctx context.Context, browseRequest apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseEvent) bool) <-chan apiModel.PlcBrowseRequestResult {
+ result := make(chan apiModel.PlcBrowseRequestResult)
+ go func() {
+ responseCodes := map[string]apiModel.PlcResponseCode{}
+ results := map[string][]apiModel.PlcBrowseFoundField{}
+ for _, fieldName := range browseRequest.GetFieldNames() {
+ field := browseRequest.GetField(fieldName)
+ responseCodes[fieldName], results[fieldName] = m.BrowseField(ctx, browseRequest, interceptor, fieldName, field)
+ }
+ result <- &model.DefaultPlcBrowseRequestResult{
+ Request: browseRequest,
+ Response: model.NewDefaultPlcBrowseResponse(browseRequest, results, responseCodes),
+ Err: nil,
+ }
+ }()
+ return result
+}
diff --git a/plc4go/spi/interceptors/RequestInterceptor.go b/plc4go/spi/interceptors/RequestInterceptor.go
index 57c029744..30593b1c9 100644
--- a/plc4go/spi/interceptors/RequestInterceptor.go
+++ b/plc4go/spi/interceptors/RequestInterceptor.go
@@ -20,17 +20,18 @@
package interceptors
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
)
type ReadRequestInterceptor interface {
- InterceptReadRequest(readRequest model.PlcReadRequest) []model.PlcReadRequest
- ProcessReadResponses(readRequest model.PlcReadRequest, readResults []model.PlcReadRequestResult) model.PlcReadRequestResult
+ InterceptReadRequest(ctx context.Context, readRequest model.PlcReadRequest) []model.PlcReadRequest
+ ProcessReadResponses(ctx context.Context, readRequest model.PlcReadRequest, readResults []model.PlcReadRequestResult) model.PlcReadRequestResult
}
type WriteRequestInterceptor interface {
- InterceptWriteRequest(writeRequest model.PlcWriteRequest) []model.PlcWriteRequest
- ProcessWriteResponses(writeRequest model.PlcWriteRequest, writeResults []model.PlcWriteRequestResult) model.PlcWriteRequestResult
+ InterceptWriteRequest(ctx context.Context, writeRequest model.PlcWriteRequest) []model.PlcWriteRequest
+ ProcessWriteResponses(ctx context.Context, writeRequest model.PlcWriteRequest, writeResults []model.PlcWriteRequestResult) model.PlcWriteRequestResult
}
type RequestInterceptor interface {
diff --git a/plc4go/spi/interceptors/SingleItemRequestInterceptor.go b/plc4go/spi/interceptors/SingleItemRequestInterceptor.go
index f74fbb26b..0b88c42da 100644
--- a/plc4go/spi/interceptors/SingleItemRequestInterceptor.go
+++ b/plc4go/spi/interceptors/SingleItemRequestInterceptor.go
@@ -20,6 +20,7 @@
package interceptors
import (
+ "context"
"errors"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
@@ -109,7 +110,8 @@ func (d *interceptedPlcWriteRequestResult) GetErr() error {
///////////////////////////////////////
///////////////////////////////////////
-func (m SingleItemRequestInterceptor) InterceptReadRequest(readRequest model.PlcReadRequest) []model.PlcReadRequest {
+func (m SingleItemRequestInterceptor) InterceptReadRequest(ctx context.Context, readRequest model.PlcReadRequest) []model.PlcReadRequest {
+ // TODO: handle ctx
// If this request just has one field, go the shortcut
if len(readRequest.GetFieldNames()) == 1 {
log.Debug().Msg("We got only one request, no splitting required")
@@ -132,7 +134,8 @@ func (m SingleItemRequestInterceptor) InterceptReadRequest(readRequest model.Plc
return readRequests
}
-func (m SingleItemRequestInterceptor) ProcessReadResponses(readRequest model.PlcReadRequest, readResults []model.PlcReadRequestResult) model.PlcReadRequestResult {
+func (m SingleItemRequestInterceptor) ProcessReadResponses(ctx context.Context, readRequest model.PlcReadRequest, readResults []model.PlcReadRequestResult) model.PlcReadRequestResult {
+ // TODO: handle ctx
if len(readResults) == 1 {
log.Debug().Msg("We got only one response, no merging required")
return readResults[0]
@@ -168,7 +171,8 @@ func (m SingleItemRequestInterceptor) ProcessReadResponses(readRequest model.Plc
}
}
-func (m SingleItemRequestInterceptor) InterceptWriteRequest(writeRequest model.PlcWriteRequest) []model.PlcWriteRequest {
+func (m SingleItemRequestInterceptor) InterceptWriteRequest(ctx context.Context, writeRequest model.PlcWriteRequest) []model.PlcWriteRequest {
+ // TODO: handle ctx
// If this request just has one field, go the shortcut
if len(writeRequest.GetFieldNames()) == 1 {
log.Debug().Msg("We got only one request, no splitting required")
@@ -192,7 +196,8 @@ func (m SingleItemRequestInterceptor) InterceptWriteRequest(writeRequest model.P
return writeRequests
}
-func (m SingleItemRequestInterceptor) ProcessWriteResponses(writeRequest model.PlcWriteRequest, writeResults []model.PlcWriteRequestResult) model.PlcWriteRequestResult {
+func (m SingleItemRequestInterceptor) ProcessWriteResponses(ctx context.Context, writeRequest model.PlcWriteRequest, writeResults []model.PlcWriteRequestResult) model.PlcWriteRequestResult {
+ // TODO: handle ctx
if len(writeResults) == 1 {
log.Debug().Msg("We got only one response, no merging required")
return writeResults[0]
diff --git a/plc4go/spi/model/DefaultPlcBrowseRequest.go b/plc4go/spi/model/DefaultPlcBrowseRequest.go
index 33084df64..bae157086 100644
--- a/plc4go/spi/model/DefaultPlcBrowseRequest.go
+++ b/plc4go/spi/model/DefaultPlcBrowseRequest.go
@@ -82,15 +82,19 @@ func NewDefaultPlcBrowseRequest(fields map[string]model.PlcField, fieldNames []s
}
func (d DefaultPlcBrowseRequest) Execute() <-chan model.PlcBrowseRequestResult {
- return d.browser.Browse(d)
+ return d.browser.Browse(context.TODO(), d)
}
-func (d DefaultPlcBrowseRequest) ExecuteWithContext(_ context.Context) <-chan model.PlcBrowseRequestResult {
- return d.Execute()
+func (d DefaultPlcBrowseRequest) ExecuteWithContext(ctx context.Context) <-chan model.PlcBrowseRequestResult {
+ return d.browser.Browse(ctx, d)
}
func (d DefaultPlcBrowseRequest) ExecuteWithInterceptor(interceptor func(result model.PlcBrowseEvent) bool) <-chan model.PlcBrowseRequestResult {
- return d.browser.BrowseWithInterceptor(d, interceptor)
+ return d.ExecuteWithInterceptorWithContext(context.TODO(), interceptor)
+}
+
+func (d DefaultPlcBrowseRequest) ExecuteWithInterceptorWithContext(ctx context.Context, interceptor func(result model.PlcBrowseEvent) bool) <-chan model.PlcBrowseRequestResult {
+ return d.browser.BrowseWithInterceptor(ctx, d, interceptor)
}
func (d DefaultPlcBrowseRequest) Serialize(writeBuffer utils.WriteBuffer) error {
diff --git a/plc4go/spi/model/DefaultPlcReadRequest.go b/plc4go/spi/model/DefaultPlcReadRequest.go
index 5fb613ab2..a486bb892 100644
--- a/plc4go/spi/model/DefaultPlcReadRequest.go
+++ b/plc4go/spi/model/DefaultPlcReadRequest.go
@@ -100,25 +100,28 @@ func (m DefaultPlcReadRequest) GetReader() spi.PlcReader {
func (m DefaultPlcReadRequest) GetReadRequestInterceptor() interceptors.ReadRequestInterceptor {
return m.readRequestInterceptor
}
-
func (m DefaultPlcReadRequest) Execute() <-chan model.PlcReadRequestResult {
+ return m.ExecuteWithContext(context.TODO())
+}
+
+func (m DefaultPlcReadRequest) ExecuteWithContext(ctx context.Context) <-chan model.PlcReadRequestResult {
// Shortcut, if no interceptor is defined
if m.readRequestInterceptor == nil {
- return m.reader.Read(m)
+ return m.reader.Read(ctx, m)
}
// Split the requests up into multiple ones.
- readRequests := m.readRequestInterceptor.InterceptReadRequest(m)
+ readRequests := m.readRequestInterceptor.InterceptReadRequest(ctx, m)
// Shortcut for single-request-requests
if len(readRequests) == 1 {
- return m.reader.Read(readRequests[0])
+ return m.reader.Read(nil, readRequests[0])
}
// Create a sub-result-channel slice
var subResultChannels []<-chan model.PlcReadRequestResult
// Iterate over all requests and add the result-channels to the list
for _, subRequest := range readRequests {
- subResultChannels = append(subResultChannels, m.reader.Read(subRequest))
+ subResultChannels = append(subResultChannels, m.reader.Read(ctx, subRequest))
// TODO: Replace this with a real queueing of requests. Later on we need throttling. At the moment this avoids race condition as the read above writes to fast on the line which is a problem for the test
time.Sleep(time.Millisecond * 4)
}
@@ -129,11 +132,16 @@ func (m DefaultPlcReadRequest) Execute() <-chan model.PlcReadRequestResult {
var subResults []model.PlcReadRequestResult
// Iterate over all sub-results
for _, subResultChannel := range subResultChannels {
- subResult := <-subResultChannel
- subResults = append(subResults, subResult)
+ select {
+ case <-ctx.Done():
+ resultChannel <- &DefaultPlcReadRequestResult{Request: m, Err: ctx.Err()}
+ return
+ case subResult := <-subResultChannel:
+ subResults = append(subResults, subResult)
+ }
}
// As soon as all are done, process the results
- result := m.readRequestInterceptor.ProcessReadResponses(m, subResults)
+ result := m.readRequestInterceptor.ProcessReadResponses(ctx, m, subResults)
// Return the final result
resultChannel <- result
}()
@@ -141,10 +149,6 @@ func (m DefaultPlcReadRequest) Execute() <-chan model.PlcReadRequestResult {
return resultChannel
}
-func (m DefaultPlcReadRequest) ExecuteWithContext(_ context.Context) <-chan model.PlcReadRequestResult {
- return m.Execute()
-}
-
func (m DefaultPlcReadRequest) Serialize(writeBuffer utils.WriteBuffer) error {
if err := writeBuffer.PushContext("PlcReadRequest"); err != nil {
return err
diff --git a/plc4go/spi/model/DefaultPlcSubscriptionRequest.go b/plc4go/spi/model/DefaultPlcSubscriptionRequest.go
index 6731c0e43..932dc98f3 100644
--- a/plc4go/spi/model/DefaultPlcSubscriptionRequest.go
+++ b/plc4go/spi/model/DefaultPlcSubscriptionRequest.go
@@ -137,11 +137,11 @@ func NewDefaultPlcSubscriptionRequest(fields map[string]model.PlcField, fieldNam
}
func (m DefaultPlcSubscriptionRequest) Execute() <-chan model.PlcSubscriptionRequestResult {
- return m.subscriber.Subscribe(m)
+ return m.ExecuteWithContext(context.TODO())
}
-func (m DefaultPlcSubscriptionRequest) ExecuteWithContext(_ context.Context) <-chan model.PlcSubscriptionRequestResult {
- return m.Execute()
+func (m DefaultPlcSubscriptionRequest) ExecuteWithContext(ctx context.Context) <-chan model.PlcSubscriptionRequestResult {
+ return m.subscriber.Subscribe(ctx, m)
}
func (m DefaultPlcSubscriptionRequest) GetEventHandler() model.PlcSubscriptionEventHandler {
diff --git a/plc4go/spi/model/DefaultPlcWriteRequest.go b/plc4go/spi/model/DefaultPlcWriteRequest.go
index 48d32554f..a4f6b5056 100644
--- a/plc4go/spi/model/DefaultPlcWriteRequest.go
+++ b/plc4go/spi/model/DefaultPlcWriteRequest.go
@@ -127,23 +127,27 @@ func NewDefaultPlcWriteRequest(fields map[string]model.PlcField, fieldNames []st
}
func (m DefaultPlcWriteRequest) Execute() <-chan model.PlcWriteRequestResult {
+ return m.ExecuteWithContext(context.TODO())
+}
+
+func (m DefaultPlcWriteRequest) ExecuteWithContext(ctx context.Context) <-chan model.PlcWriteRequestResult {
// Shortcut, if no interceptor is defined
if m.writeRequestInterceptor == nil {
- return m.writer.Write(m)
+ return m.writer.Write(ctx, m)
}
// Split the requests up into multiple ones.
- writeRequests := m.writeRequestInterceptor.InterceptWriteRequest(m)
+ writeRequests := m.writeRequestInterceptor.InterceptWriteRequest(ctx, m)
// Shortcut for single-request-requests
if len(writeRequests) == 1 {
- return m.writer.Write(writeRequests[0])
+ return m.writer.Write(ctx, writeRequests[0])
}
// Create a sub-result-channel slice
var subResultChannels []<-chan model.PlcWriteRequestResult
// Iterate over all requests and add the result-channels to the list
for _, subRequest := range writeRequests {
- subResultChannels = append(subResultChannels, m.writer.Write(subRequest))
+ subResultChannels = append(subResultChannels, m.writer.Write(ctx, subRequest))
// TODO: Replace this with a real queueing of requests. Later on we need throttling. At the moment this avoids race condition as the read above writes to fast on the line which is a problem for the test
time.Sleep(time.Millisecond * 4)
}
@@ -154,11 +158,16 @@ func (m DefaultPlcWriteRequest) Execute() <-chan model.PlcWriteRequestResult {
var subResults []model.PlcWriteRequestResult
// Iterate over all sub-results
for _, subResultChannel := range subResultChannels {
- subResult := <-subResultChannel
- subResults = append(subResults, subResult)
+ select {
+ case <-ctx.Done():
+ resultChannel <- &DefaultPlcWriteRequestResult{Request: m, Err: ctx.Err()}
+ return
+ case subResult := <-subResultChannel:
+ subResults = append(subResults, subResult)
+ }
}
// As soon as all are done, process the results
- result := m.writeRequestInterceptor.ProcessWriteResponses(m, subResults)
+ result := m.writeRequestInterceptor.ProcessWriteResponses(ctx, m, subResults)
// Return the final result
resultChannel <- result
}()
@@ -166,10 +175,6 @@ func (m DefaultPlcWriteRequest) Execute() <-chan model.PlcWriteRequestResult {
return resultChannel
}
-func (m DefaultPlcWriteRequest) ExecuteWithContext(_ context.Context) <-chan model.PlcWriteRequestResult {
- return m.Execute()
-}
-
func (m DefaultPlcWriteRequest) GetWriter() spi.PlcWriter {
return m.writer
}