You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/15 09:20:08 UTC
[plc4x] branch develop updated: feat(plc4go): introduced ExecuteWithContext(ctx... calls
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new a21cbc6c6 feat(plc4go): introduced ExecuteWithContext(ctx... calls
a21cbc6c6 is described below
commit a21cbc6c6d0715ad5f2b7b4505aef36af851b92d
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Aug 15 11:19:57 2022 +0200
feat(plc4go): introduced ExecuteWithContext(ctx... calls
- This allows to abort long running jobs externally
---
plc4go/internal/simulated/Driver.go | 44 +-----
plc4go/internal/simulated/Driver_test.go | 162 ++--------------------
plc4go/pkg/api/driver.go | 2 +
plc4go/pkg/api/model/plc_browse.go | 7 +-
plc4go/pkg/api/model/plc_read.go | 6 +-
plc4go/pkg/api/model/plc_subscription.go | 2 +
plc4go/pkg/api/model/plc_unsubscription.go | 3 +
plc4go/pkg/api/model/plc_write.go | 6 +-
plc4go/spi/PlcDiscoverer.go | 2 +
plc4go/spi/default/DefaultDriver.go | 5 +
plc4go/spi/model/DefaultPlcBrowseRequest.go | 5 +
plc4go/spi/model/DefaultPlcReadRequest.go | 5 +
plc4go/spi/model/DefaultPlcSubscriptionRequest.go | 5 +
plc4go/spi/model/DefaultPlcWriteRequest.go | 5 +
14 files changed, 66 insertions(+), 193 deletions(-)
diff --git a/plc4go/internal/simulated/Driver.go b/plc4go/internal/simulated/Driver.go
index 1b5dc3e91..0b0fdd19c 100644
--- a/plc4go/internal/simulated/Driver.go
+++ b/plc4go/internal/simulated/Driver.go
@@ -21,59 +21,25 @@ package simulated
import (
"github.com/apache/plc4x/plc4go/pkg/api"
- "github.com/apache/plc4x/plc4go/pkg/api/model"
- "github.com/apache/plc4x/plc4go/spi/options"
+ _default "github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/transports"
- "github.com/pkg/errors"
"net/url"
)
type Driver struct {
- fieldHandler FieldHandler
+ _default.DefaultDriver
valueHandler ValueHandler
}
func NewDriver() plc4go.PlcDriver {
return &Driver{
- fieldHandler: NewFieldHandler(),
- valueHandler: NewValueHandler(),
+ DefaultDriver: _default.NewDefaultDriver("simulated", "Simulated PLC4X Datasource", "none", NewFieldHandler()),
+ valueHandler: NewValueHandler(),
}
}
-// GetProtocolCode Get the short code used to identify this driver (As used in the connection string)
-func (d *Driver) GetProtocolCode() string {
- return "simulated"
-}
-
-// GetProtocolName Get a human-readable name for this driver
-func (d *Driver) GetProtocolName() string {
- return "Simulated PLC4X Datasource"
-}
-
-// GetDefaultTransport If the driver has a default form of transport, provide this and make
-// providing the transport code optional in the connection string
-func (d *Driver) GetDefaultTransport() string {
- return "none"
-}
-
-// CheckQuery Have the driver parse the query string and provide feedback if it's not a valid one
-func (d *Driver) CheckQuery(query string) error {
- _, err := d.fieldHandler.ParseQuery(query)
- return err
-}
-
// GetConnection Establishes a connection to a given PLC using the information in the connectionString
func (d *Driver) GetConnection(_ url.URL, _ map[string]transports.Transport, options map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
- connection := NewConnection(NewDevice("test"), d.fieldHandler, d.valueHandler, options)
+ connection := NewConnection(NewDevice("test"), d.GetPlcFieldHandler(), d.valueHandler, options)
return connection.Connect()
}
-
-// SupportsDiscovery returns true if this driver supports discovery
-// TODO: Actually the connection could support discovery to list up all fields in the Device
-func (d *Driver) SupportsDiscovery() bool {
- return false
-}
-
-func (d *Driver) Discover(_ func(event model.PlcDiscoveryEvent), _ ...options.WithDiscoveryOption) error {
- return errors.New("unsupported operation")
-}
diff --git a/plc4go/internal/simulated/Driver_test.go b/plc4go/internal/simulated/Driver_test.go
index 92c042596..ad34db3ec 100644
--- a/plc4go/internal/simulated/Driver_test.go
+++ b/plc4go/internal/simulated/Driver_test.go
@@ -20,8 +20,6 @@
package simulated
import (
- "github.com/apache/plc4x/plc4go/pkg/api/model"
- "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"net/url"
"testing"
@@ -29,25 +27,16 @@ import (
)
func TestDriver_CheckQuery(t *testing.T) {
- type fields struct {
- fieldHandler FieldHandler
- valueHandler ValueHandler
- }
type args struct {
query string
}
tests := []struct {
name string
- fields fields
args args
wantErr bool
}{
{
name: "valid query",
- fields: fields{
- fieldHandler: NewFieldHandler(),
- valueHandler: NewValueHandler(),
- },
args: args{
query: "STATE/test:UINT[2]",
},
@@ -56,10 +45,7 @@ func TestDriver_CheckQuery(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- d := &Driver{
- fieldHandler: tt.fields.fieldHandler,
- valueHandler: tt.fields.valueHandler,
- }
+ d := NewDriver()
if err := d.CheckQuery(tt.args.query); (err != nil) != tt.wantErr {
t.Errorf("CheckQuery() error = %v, wantErr %v", err, tt.wantErr)
}
@@ -67,53 +53,7 @@ func TestDriver_CheckQuery(t *testing.T) {
}
}
-func TestDriver_Discover(t *testing.T) {
- type fields struct {
- fieldHandler FieldHandler
- valueHandler ValueHandler
- }
- type args struct {
- callback func(event model.PlcDiscoveryEvent)
- discoveryOptions []options.WithDiscoveryOption
- }
- tests := []struct {
- name string
- fields fields
- args args
- wantErr bool
- }{
- {
- name: "discovery fails",
- fields: fields{
- fieldHandler: NewFieldHandler(),
- valueHandler: NewValueHandler(),
- },
- args: args{
- // Can all be nil, as the call is expected to fail
- callback: nil,
- discoveryOptions: nil,
- },
- wantErr: true,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- d := &Driver{
- fieldHandler: tt.fields.fieldHandler,
- valueHandler: tt.fields.valueHandler,
- }
- if err := d.Discover(tt.args.callback, tt.args.discoveryOptions...); (err != nil) != tt.wantErr {
- t.Errorf("Discover() error = %v, wantErr %v", err, tt.wantErr)
- }
- })
- }
-}
-
func TestDriver_GetConnection(t *testing.T) {
- type fields struct {
- fieldHandler FieldHandler
- valueHandler ValueHandler
- }
type args struct {
in0 url.URL
in1 map[string]transports.Transport
@@ -121,16 +61,11 @@ func TestDriver_GetConnection(t *testing.T) {
}
tests := []struct {
name string
- fields fields
args args
wantErr bool
}{
{
name: "simple no options",
- fields: fields{
- fieldHandler: NewFieldHandler(),
- valueHandler: NewValueHandler(),
- },
// Input doesn't really matter, as the code simply ignores most of it.
args: args{
in0: url.URL{},
@@ -141,10 +76,6 @@ func TestDriver_GetConnection(t *testing.T) {
},
{
name: "simple with options",
- fields: fields{
- fieldHandler: NewFieldHandler(),
- valueHandler: NewValueHandler(),
- },
// Input doesn't really matter, as the code simply ignores most of it.
args: args{
in0: url.URL{},
@@ -158,10 +89,7 @@ func TestDriver_GetConnection(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- d := &Driver{
- fieldHandler: tt.fields.fieldHandler,
- valueHandler: tt.fields.valueHandler,
- }
+ d := NewDriver()
connectionChan := d.GetConnection(tt.args.in0, tt.args.in1, tt.args.options)
select {
case connectResult := <-connectionChan:
@@ -178,30 +106,18 @@ func TestDriver_GetConnection(t *testing.T) {
}
func TestDriver_GetDefaultTransport(t *testing.T) {
- type fields struct {
- fieldHandler FieldHandler
- valueHandler ValueHandler
- }
tests := []struct {
- name string
- fields fields
- want string
+ name string
+ want string
}{
{
name: "simple",
- fields: fields{
- fieldHandler: NewFieldHandler(),
- valueHandler: NewValueHandler(),
- },
want: "none",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- d := &Driver{
- fieldHandler: tt.fields.fieldHandler,
- valueHandler: tt.fields.valueHandler,
- }
+ d := NewDriver()
if got := d.GetDefaultTransport(); got != tt.want {
t.Errorf("GetDefaultTransport() = %v, want %v", got, tt.want)
}
@@ -210,30 +126,18 @@ func TestDriver_GetDefaultTransport(t *testing.T) {
}
func TestDriver_GetProtocolCode(t *testing.T) {
- type fields struct {
- fieldHandler FieldHandler
- valueHandler ValueHandler
- }
tests := []struct {
- name string
- fields fields
- want string
+ name string
+ want string
}{
{
name: "simple",
- fields: fields{
- fieldHandler: NewFieldHandler(),
- valueHandler: NewValueHandler(),
- },
want: "simulated",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- d := &Driver{
- fieldHandler: tt.fields.fieldHandler,
- valueHandler: tt.fields.valueHandler,
- }
+ d := NewDriver()
if got := d.GetProtocolCode(); got != tt.want {
t.Errorf("GetProtocolCode() = %v, want %v", got, tt.want)
}
@@ -242,30 +146,18 @@ func TestDriver_GetProtocolCode(t *testing.T) {
}
func TestDriver_GetProtocolName(t *testing.T) {
- type fields struct {
- fieldHandler FieldHandler
- valueHandler ValueHandler
- }
tests := []struct {
- name string
- fields fields
- want string
+ name string
+ want string
}{
{
name: "simple",
- fields: fields{
- fieldHandler: NewFieldHandler(),
- valueHandler: NewValueHandler(),
- },
want: "Simulated PLC4X Datasource",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- d := &Driver{
- fieldHandler: tt.fields.fieldHandler,
- valueHandler: tt.fields.valueHandler,
- }
+ d := NewDriver()
if got := d.GetProtocolName(); got != tt.want {
t.Errorf("GetProtocolName() = %v, want %v", got, tt.want)
}
@@ -273,38 +165,6 @@ func TestDriver_GetProtocolName(t *testing.T) {
}
}
-func TestDriver_SupportsDiscovery(t *testing.T) {
- type fields struct {
- fieldHandler FieldHandler
- valueHandler ValueHandler
- }
- tests := []struct {
- name string
- fields fields
- want bool
- }{
- {
- name: "simple",
- fields: fields{
- fieldHandler: NewFieldHandler(),
- valueHandler: NewValueHandler(),
- },
- want: false,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- d := &Driver{
- fieldHandler: tt.fields.fieldHandler,
- valueHandler: tt.fields.valueHandler,
- }
- if got := d.SupportsDiscovery(); got != tt.want {
- t.Errorf("SupportsDiscovery() = %v, want %v", got, tt.want)
- }
- })
- }
-}
-
func TestNewDriver(t *testing.T) {
tests := []struct {
name string
diff --git a/plc4go/pkg/api/driver.go b/plc4go/pkg/api/driver.go
index c9d343a1b..ffdf79adf 100644
--- a/plc4go/pkg/api/driver.go
+++ b/plc4go/pkg/api/driver.go
@@ -20,6 +20,7 @@
package plc4go
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
@@ -49,4 +50,5 @@ type PlcDriver interface {
// Discover TODO: document me
// FIXME: this leaks spi in the signature move to spi driver or create interfaces. Can also be done by moving spi in a proper module
Discover(callback func(event model.PlcDiscoveryEvent), discoveryOptions ...options.WithDiscoveryOption) error
+ DiscoverWithContext(ctx context.Context, callback func(event model.PlcDiscoveryEvent), discoveryOptions ...options.WithDiscoveryOption) error
}
diff --git a/plc4go/pkg/api/model/plc_browse.go b/plc4go/pkg/api/model/plc_browse.go
index 0012645eb..9211c8aeb 100644
--- a/plc4go/pkg/api/model/plc_browse.go
+++ b/plc4go/pkg/api/model/plc_browse.go
@@ -19,7 +19,10 @@
package model
-import "github.com/apache/plc4x/plc4go/pkg/api/values"
+import (
+ "context"
+ "github.com/apache/plc4x/plc4go/pkg/api/values"
+)
type PlcBrowseRequestBuilder interface {
AddQuery(name string, query string) PlcBrowseRequestBuilder
@@ -30,6 +33,8 @@ type PlcBrowseRequest interface {
PlcRequest
// Execute Will not return until a potential scan is finished and will return all results in one block
Execute() <-chan PlcBrowseRequestResult
+ // ExecuteWithContext is the same as Execute but handles the Context if implemented for Driver
+ ExecuteWithContext(ctx context.Context) <-chan PlcBrowseRequestResult
// ExecuteWithInterceptor Will call the given callback for every found resource
ExecuteWithInterceptor(interceptor func(result PlcBrowseEvent) bool) <-chan PlcBrowseRequestResult
GetFieldNames() []string
diff --git a/plc4go/pkg/api/model/plc_read.go b/plc4go/pkg/api/model/plc_read.go
index 7dd6f9fc3..4d4eef150 100644
--- a/plc4go/pkg/api/model/plc_read.go
+++ b/plc4go/pkg/api/model/plc_read.go
@@ -19,7 +19,10 @@
package model
-import "github.com/apache/plc4x/plc4go/pkg/api/values"
+import (
+ "context"
+ "github.com/apache/plc4x/plc4go/pkg/api/values"
+)
type PlcReadRequestBuilder interface {
AddQuery(name string, query string) PlcReadRequestBuilder
@@ -36,6 +39,7 @@ type PlcReadRequestResult interface {
type PlcReadRequest interface {
PlcRequest
Execute() <-chan PlcReadRequestResult
+ ExecuteWithContext(ctx context.Context) <-chan PlcReadRequestResult
GetFieldNames() []string
GetField(name string) PlcField
}
diff --git a/plc4go/pkg/api/model/plc_subscription.go b/plc4go/pkg/api/model/plc_subscription.go
index 68267d39e..89080ca6e 100644
--- a/plc4go/pkg/api/model/plc_subscription.go
+++ b/plc4go/pkg/api/model/plc_subscription.go
@@ -20,6 +20,7 @@
package model
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/values"
"time"
)
@@ -54,6 +55,7 @@ type PlcSubscriptionRequestResult interface {
type PlcSubscriptionRequest interface {
Execute() <-chan PlcSubscriptionRequestResult
+ ExecuteWithContext(ctx context.Context) <-chan PlcSubscriptionRequestResult
GetFieldNames() []string
GetField(name string) PlcField
GetEventHandler() PlcSubscriptionEventHandler
diff --git a/plc4go/pkg/api/model/plc_unsubscription.go b/plc4go/pkg/api/model/plc_unsubscription.go
index d996bd130..13a337769 100644
--- a/plc4go/pkg/api/model/plc_unsubscription.go
+++ b/plc4go/pkg/api/model/plc_unsubscription.go
@@ -19,6 +19,8 @@
package model
+import "context"
+
type PlcUnsubscriptionRequestBuilder interface {
// TODO: Implement
}
@@ -31,6 +33,7 @@ type PlcUnsubscriptionRequestResult interface {
type PlcUnsubscriptionRequest interface {
Execute() <-chan PlcUnsubscriptionRequestResult
+ ExecuteWithContext(ctx context.Context) <-chan PlcUnsubscriptionRequestResult
PlcRequest
}
diff --git a/plc4go/pkg/api/model/plc_write.go b/plc4go/pkg/api/model/plc_write.go
index d78d2d0ff..6ed5a6a11 100644
--- a/plc4go/pkg/api/model/plc_write.go
+++ b/plc4go/pkg/api/model/plc_write.go
@@ -19,7 +19,10 @@
package model
-import "github.com/apache/plc4x/plc4go/pkg/api/values"
+import (
+ "context"
+ "github.com/apache/plc4x/plc4go/pkg/api/values"
+)
type PlcWriteRequestBuilder interface {
AddQuery(name string, query string, value interface{}) PlcWriteRequestBuilder
@@ -36,6 +39,7 @@ type PlcWriteRequestResult interface {
type PlcWriteRequest interface {
PlcRequest
Execute() <-chan PlcWriteRequestResult
+ ExecuteWithContext(ctx context.Context) <-chan PlcWriteRequestResult
GetFieldNames() []string
GetField(name string) PlcField
GetValue(name string) values.PlcValue
diff --git a/plc4go/spi/PlcDiscoverer.go b/plc4go/spi/PlcDiscoverer.go
index 6a2ca7334..33c8af325 100644
--- a/plc4go/spi/PlcDiscoverer.go
+++ b/plc4go/spi/PlcDiscoverer.go
@@ -20,10 +20,12 @@
package spi
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi/options"
)
type PlcDiscoverer interface {
Discover(callback func(event model.PlcDiscoveryEvent), discoveryOptions ...options.WithDiscoveryOption) error
+ DiscoverWithContext(ctx context.Context, callback func(event model.PlcDiscoveryEvent), discoveryOptions ...options.WithDiscoveryOption) error
}
diff --git a/plc4go/spi/default/DefaultDriver.go b/plc4go/spi/default/DefaultDriver.go
index cb19a3430..cc6bf8ff1 100644
--- a/plc4go/spi/default/DefaultDriver.go
+++ b/plc4go/spi/default/DefaultDriver.go
@@ -20,6 +20,7 @@
package _default
import (
+ "context"
"fmt"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -93,6 +94,10 @@ func (d *defaultDriver) Discover(_ func(event apiModel.PlcDiscoveryEvent), _ ...
panic("not available")
}
+func (d *defaultDriver) DiscoverWithContext(_ context.Context, callback func(event apiModel.PlcDiscoveryEvent), discoveryOptions ...options.WithDiscoveryOption) error {
+ return d.Discover(callback, discoveryOptions...)
+}
+
func (d *defaultDriver) GetPlcFieldHandler() spi.PlcFieldHandler {
return d.plcFieldHandler
}
diff --git a/plc4go/spi/model/DefaultPlcBrowseRequest.go b/plc4go/spi/model/DefaultPlcBrowseRequest.go
index 979abb2fe..33084df64 100644
--- a/plc4go/spi/model/DefaultPlcBrowseRequest.go
+++ b/plc4go/spi/model/DefaultPlcBrowseRequest.go
@@ -20,6 +20,7 @@
package model
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/utils"
@@ -84,6 +85,10 @@ func (d DefaultPlcBrowseRequest) Execute() <-chan model.PlcBrowseRequestResult {
return d.browser.Browse(d)
}
+func (d DefaultPlcBrowseRequest) ExecuteWithContext(_ context.Context) <-chan model.PlcBrowseRequestResult {
+ return d.Execute()
+}
+
func (d DefaultPlcBrowseRequest) ExecuteWithInterceptor(interceptor func(result model.PlcBrowseEvent) bool) <-chan model.PlcBrowseRequestResult {
return d.browser.BrowseWithInterceptor(d, interceptor)
}
diff --git a/plc4go/spi/model/DefaultPlcReadRequest.go b/plc4go/spi/model/DefaultPlcReadRequest.go
index 4203df9d4..5fb613ab2 100644
--- a/plc4go/spi/model/DefaultPlcReadRequest.go
+++ b/plc4go/spi/model/DefaultPlcReadRequest.go
@@ -20,6 +20,7 @@
package model
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/interceptors"
@@ -140,6 +141,10 @@ func (m DefaultPlcReadRequest) Execute() <-chan model.PlcReadRequestResult {
return resultChannel
}
+func (m DefaultPlcReadRequest) ExecuteWithContext(_ context.Context) <-chan model.PlcReadRequestResult {
+ return m.Execute()
+}
+
func (m DefaultPlcReadRequest) Serialize(writeBuffer utils.WriteBuffer) error {
if err := writeBuffer.PushContext("PlcReadRequest"); err != nil {
return err
diff --git a/plc4go/spi/model/DefaultPlcSubscriptionRequest.go b/plc4go/spi/model/DefaultPlcSubscriptionRequest.go
index 9cc1e34e0..6731c0e43 100644
--- a/plc4go/spi/model/DefaultPlcSubscriptionRequest.go
+++ b/plc4go/spi/model/DefaultPlcSubscriptionRequest.go
@@ -20,6 +20,7 @@
package model
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/utils"
@@ -139,6 +140,10 @@ func (m DefaultPlcSubscriptionRequest) Execute() <-chan model.PlcSubscriptionReq
return m.subscriber.Subscribe(m)
}
+func (m DefaultPlcSubscriptionRequest) ExecuteWithContext(_ context.Context) <-chan model.PlcSubscriptionRequestResult {
+ return m.Execute()
+}
+
func (m DefaultPlcSubscriptionRequest) GetEventHandler() model.PlcSubscriptionEventHandler {
return m.eventHandler
}
diff --git a/plc4go/spi/model/DefaultPlcWriteRequest.go b/plc4go/spi/model/DefaultPlcWriteRequest.go
index f4a191d65..48d32554f 100644
--- a/plc4go/spi/model/DefaultPlcWriteRequest.go
+++ b/plc4go/spi/model/DefaultPlcWriteRequest.go
@@ -20,6 +20,7 @@
package model
import (
+ "context"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
"github.com/apache/plc4x/plc4go/spi"
@@ -165,6 +166,10 @@ func (m DefaultPlcWriteRequest) Execute() <-chan model.PlcWriteRequestResult {
return resultChannel
}
+func (m DefaultPlcWriteRequest) ExecuteWithContext(_ context.Context) <-chan model.PlcWriteRequestResult {
+ return m.Execute()
+}
+
func (m DefaultPlcWriteRequest) GetWriter() spi.PlcWriter {
return m.writer
}