You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2020/11/09 15:37:32 UTC
[plc4x] 01/02: - Implemented a first version of the subscription API
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch feature/plc4go
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 2f615be3bee9507fc8bab00f25b192fe361593e2
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Mon Nov 9 16:37:01 2020 +0100
- Implemented a first version of the subscription API
---
plc4go/cmd/main/drivers/knxnetip_test.go | 20 +-
.../plc4go/model/DefaultPlcSubscriptionRequest.go | 210 ++++++++++-----------
plc4go/pkg/plc4go/model/plc_subscription_event.go | 11 ++
.../pkg/plc4go/model/plc_subscription_request.go | 21 ++-
4 files changed, 144 insertions(+), 118 deletions(-)
diff --git a/plc4go/cmd/main/drivers/knxnetip_test.go b/plc4go/cmd/main/drivers/knxnetip_test.go
index 09e63be..6104e46 100644
--- a/plc4go/cmd/main/drivers/knxnetip_test.go
+++ b/plc4go/cmd/main/drivers/knxnetip_test.go
@@ -20,12 +20,15 @@ package drivers
import (
"encoding/hex"
+ "fmt"
"plc4x.apache.org/plc4go/v0/internal/plc4go/knxnetip"
"plc4x.apache.org/plc4go/v0/internal/plc4go/knxnetip/readwrite/model"
"plc4x.apache.org/plc4go/v0/internal/plc4go/transports/udp"
"plc4x.apache.org/plc4go/v0/internal/plc4go/utils"
"plc4x.apache.org/plc4go/v0/pkg/plc4go"
+ apiModel "plc4x.apache.org/plc4go/v0/pkg/plc4go/model"
"testing"
+ "time"
)
func KnxNetIp(t *testing.T) {
@@ -52,6 +55,7 @@ func TestKnxNetIpPlc4goDriver(t *testing.T) {
// Get a connection to a remote PLC
crc := driverManager.GetConnection("knxnet-ip://192.168.42.11")
+ //crc := driverManager.GetConnection("knxnet-ip://-discover-")
// Wait for the driver to connect (or not)
connectionResult := <-crc
@@ -75,9 +79,16 @@ func TestKnxNetIpPlc4goDriver(t *testing.T) {
defer connection.Close()
// Prepare a read-request
+ pollingInterval, err := time.ParseDuration("5s")
+ if err != nil {
+ t.Errorf("invalid format")
+ t.Fail()
+ return
+ }
srb := connection.SubscriptionRequestBuilder()
srb.AddChangeOfStateItem("field1", "*/*/*")
- srb.AddChangeOfStateItem("field2", "holding-register:3:REAL")
+ srb.AddCyclicItem("field2", "holding-register:3:REAL", pollingInterval)
+ srb.AddItemHandler(knxEventHandler)
subscriptionRequest, err := srb.Build()
if err != nil {
t.Errorf("error preparing subscription-request: %s", connectionResult.Err.Error())
@@ -128,3 +139,10 @@ func TestKnxNetIpPlc4goDriver(t *testing.T) {
fmt.Printf("\n\nResult field2: %d\n", wrr.Response.GetResponseCode("field2"))*/
}
+func knxEventHandler(event apiModel.PlcSubscriptionEvent) {
+ for _, fieldName := range event.GetFieldNames() {
+ if event.GetResponseCode(fieldName) == apiModel.PlcResponseCode_OK {
+ fmt.Printf("Got update for field %s with value %s", fieldName, event.GetValue(fieldName).GetString())
+ }
+ }
+}
\ No newline at end of file
diff --git a/plc4go/internal/plc4go/model/DefaultPlcSubscriptionRequest.go b/plc4go/internal/plc4go/model/DefaultPlcSubscriptionRequest.go
index 1abb1a4..53dd8b8 100644
--- a/plc4go/internal/plc4go/model/DefaultPlcSubscriptionRequest.go
+++ b/plc4go/internal/plc4go/model/DefaultPlcSubscriptionRequest.go
@@ -19,64 +19,85 @@
package model
import (
- "encoding/xml"
- "errors"
- values2 "plc4x.apache.org/plc4go/v0/internal/plc4go/model/values"
- "plc4x.apache.org/plc4go/v0/internal/plc4go/spi"
- "plc4x.apache.org/plc4go/v0/pkg/plc4go/model"
- "plc4x.apache.org/plc4go/v0/pkg/plc4go/values"
+ "encoding/xml"
+ "errors"
+ "plc4x.apache.org/plc4go/v0/internal/plc4go/spi"
+ "plc4x.apache.org/plc4go/v0/pkg/plc4go/model"
+ "time"
+)
+
+type SubscriptionType uint8
+
+const (
+ SUBSCRIPTION_CYCLIC SubscriptionType = 0x01
+ SUBSCRIPTION_CHANGE_OF_STATE SubscriptionType = 0x02
+ SUBSCRIPTION_EVENT SubscriptionType = 0x03
)
type DefaultPlcSubscriptionRequestBuilder struct {
- subscriber spi.PlcSubscriber
- fieldHandler spi.PlcFieldHandler
- valueHandler spi.PlcValueHandler
- queries map[string]string
- values map[string]interface{}
+ subscriber spi.PlcSubscriber
+ fieldHandler spi.PlcFieldHandler
+ valueHandler spi.PlcValueHandler
+ eventHandler model.PlcSubscriptionEventHandler
+ queries map[string]string
+ types map[string]SubscriptionType
+ intervals map[string]time.Duration
}
func NewDefaultPlcSubscriptionRequestBuilder(fieldHandler spi.PlcFieldHandler, valueHandler spi.PlcValueHandler, subscriber spi.PlcSubscriber) *DefaultPlcSubscriptionRequestBuilder {
- return &DefaultPlcSubscriptionRequestBuilder{
+ return &DefaultPlcSubscriptionRequestBuilder{
subscriber: subscriber,
- fieldHandler: fieldHandler,
- valueHandler: valueHandler,
- queries: map[string]string{},
- values: map[string]interface{}{},
- }
+ fieldHandler: fieldHandler,
+ valueHandler: valueHandler,
+ queries: map[string]string{},
+ }
+}
+
+func (m *DefaultPlcSubscriptionRequestBuilder) AddCyclicItem(name string, query string, interval time.Duration) {
+ m.queries[name] = query
+ m.types[name] = SUBSCRIPTION_CYCLIC
+ m.intervals[name] = interval
+}
+
+func (m *DefaultPlcSubscriptionRequestBuilder) AddChangeOfStateItem(name string, query string) {
+ m.queries[name] = query
+ m.types[name] = SUBSCRIPTION_CHANGE_OF_STATE
}
-func (m *DefaultPlcSubscriptionRequestBuilder) AddItem(name string, query string, value interface{}) {
- m.queries[name] = query
- m.values[name] = value
+func (m *DefaultPlcSubscriptionRequestBuilder) AddEventItem(name string, query string) {
+ m.queries[name] = query
+ m.types[name] = SUBSCRIPTION_EVENT
+}
+
+func (m *DefaultPlcSubscriptionRequestBuilder) AddItemHandler(eventHandler model.PlcSubscriptionEventHandler) {
+ m.eventHandler = eventHandler
}
func (m *DefaultPlcSubscriptionRequestBuilder) Build() (model.PlcSubscriptionRequest, error) {
- fields := make(map[string]model.PlcField)
- values := make(map[string]values.PlcValue)
- for name, query := range m.queries {
- field, err := m.fieldHandler.ParseQuery(query)
- if err != nil {
- return nil, errors.New("Error parsing query: " + query + ". Got error: " + err.Error())
- }
- fields[name] = field
- value, err := m.valueHandler.NewPlcValue(field, m.values[name])
- if err != nil {
- return nil, errors.New("Error parsing value of type: " + field.GetTypeName() + ". Got error: " + err.Error())
- }
- values[name] = value
- }
- return DefaultPlcSubscriptionRequest{
- fields: fields,
- values: values,
- subscriber: m.subscriber,
- }, nil
+ fields := make(map[string]model.PlcField)
+ for name, query := range m.queries {
+ field, err := m.fieldHandler.ParseQuery(query)
+ if err != nil {
+ return nil, errors.New("Error parsing query: " + query + ". Got error: " + err.Error())
+ }
+ fields[name] = field
+ }
+ return DefaultPlcSubscriptionRequest{
+ fields: fields,
+ types: m.types,
+ intervals: m.intervals,
+ subscriber: m.subscriber,
+ eventHandler: m.eventHandler,
+ }, nil
}
type DefaultPlcSubscriptionRequest struct {
- fields map[string]model.PlcField
- values map[string]values.PlcValue
- subscriber spi.PlcSubscriber
- model.PlcSubscriptionRequest
+ fields map[string]model.PlcField
+ types map[string]SubscriptionType
+ intervals map[string]time.Duration
+ eventHandler model.PlcSubscriptionEventHandler
+ subscriber spi.PlcSubscriber
+ model.PlcSubscriptionRequest
}
func (m DefaultPlcSubscriptionRequest) Execute() <-chan model.PlcSubscriptionRequestResult {
@@ -84,77 +105,50 @@ func (m DefaultPlcSubscriptionRequest) Execute() <-chan model.PlcSubscriptionReq
}
func (m DefaultPlcSubscriptionRequest) GetFieldNames() []string {
- var fieldNames []string
- for fieldName, _ := range m.fields {
- fieldNames = append(fieldNames, fieldName)
- }
- return fieldNames
+ var fieldNames []string
+ for fieldName, _ := range m.fields {
+ fieldNames = append(fieldNames, fieldName)
+ }
+ return fieldNames
}
func (m DefaultPlcSubscriptionRequest) GetField(name string) model.PlcField {
- return m.fields[name]
+ return m.fields[name]
+}
+
+func (m DefaultPlcSubscriptionRequest) GetType(name string) SubscriptionType {
+ return m.types[name]
}
-func (m DefaultPlcSubscriptionRequest) GetValue(name string) values.PlcValue {
- return m.values[name]
+func (m DefaultPlcSubscriptionRequest) GetInterval(name string) time.Duration {
+ return m.intervals[name]
}
func (m DefaultPlcSubscriptionRequest) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
- if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: "PlcSubscriptionRequest"}}); err != nil {
- return err
- }
-
- if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: "fields"}}); err != nil {
- return err
- }
- for fieldName, field := range m.fields {
- value := m.values[fieldName]
- if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: fieldName}}); err != nil {
- return err
- }
- if err := e.EncodeElement(field, xml.StartElement{Name: xml.Name{Local: "field"}}); err != nil {
- return err
- }
- switch value.(type) {
- case values2.PlcList:
- listValue, ok := value.(values2.PlcList)
- if !ok {
- return errors.New("couldn't cast PlcValue to PlcList")
- }
- for _, subValue := range listValue.Values {
- if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: "value"}}); err != nil {
- return err
- }
- if !subValue.IsString() {
- return errors.New("value not serializable to string")
- }
- e.EncodeToken(xml.CharData(subValue.GetString()))
- if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: "value"}}); err != nil {
- return err
- }
- }
- default:
- if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: "value"}}); err != nil {
- return err
- }
- if !value.IsString() {
- return errors.New("value not serializable to string")
- }
- e.EncodeToken(xml.CharData(value.GetString()))
- if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: "value"}}); err != nil {
- return err
- }
- }
- if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: fieldName}}); err != nil {
- return err
- }
- }
- if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: "fields"}}); err != nil {
- return err
- }
-
- if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: "PlcSubscriptionRequest"}}); err != nil {
- return err
- }
- return nil
+ if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: "PlcSubscriptionRequest"}}); err != nil {
+ return err
+ }
+
+ if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: "fields"}}); err != nil {
+ return err
+ }
+ for fieldName, field := range m.fields {
+ if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: fieldName}}); err != nil {
+ return err
+ }
+ if err := e.EncodeElement(field, xml.StartElement{Name: xml.Name{Local: "field"}}); err != nil {
+ return err
+ }
+ if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: fieldName}}); err != nil {
+ return err
+ }
+ }
+ if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: "fields"}}); err != nil {
+ return err
+ }
+
+ if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: "PlcSubscriptionRequest"}}); err != nil {
+ return err
+ }
+ return nil
}
diff --git a/plc4go/pkg/plc4go/model/plc_subscription_event.go b/plc4go/pkg/plc4go/model/plc_subscription_event.go
index 773175b..50ae9b8 100644
--- a/plc4go/pkg/plc4go/model/plc_subscription_event.go
+++ b/plc4go/pkg/plc4go/model/plc_subscription_event.go
@@ -17,3 +17,14 @@
// under the License.
//
package model
+
+import "plc4x.apache.org/plc4go/v0/pkg/plc4go/values"
+
+type PlcSubscriptionEvent interface {
+ GetRequest() PlcSubscriptionRequest
+ GetFieldNames() []string
+ GetResponseCode(name string) PlcResponseCode
+ GetValue(name string) values.PlcValue
+ PlcResponse
+}
+
diff --git a/plc4go/pkg/plc4go/model/plc_subscription_request.go b/plc4go/pkg/plc4go/model/plc_subscription_request.go
index 6b3c398..1c5b96c 100644
--- a/plc4go/pkg/plc4go/model/plc_subscription_request.go
+++ b/plc4go/pkg/plc4go/model/plc_subscription_request.go
@@ -20,20 +20,23 @@ package model
import "time"
+type PlcSubscriptionEventHandler func(event PlcSubscriptionEvent)
+
type PlcSubscriptionRequestBuilder interface {
- AddCyclicItem(name string, query string, interval time.Duration)
- AddChangeOfStateItem(name string, query string)
- AddEventItem(name string, query string)
- Build() (PlcSubscriptionRequest, error)
+ AddCyclicItem(name string, query string, interval time.Duration)
+ AddChangeOfStateItem(name string, query string)
+ AddEventItem(name string, query string)
+ AddItemHandler(handler PlcSubscriptionEventHandler)
+ Build() (PlcSubscriptionRequest, error)
}
type PlcSubscriptionRequestResult struct {
- Request PlcSubscriptionRequest
- Response PlcSubscriptionResponse
- Err error
+ Request PlcSubscriptionRequest
+ Response PlcSubscriptionResponse
+ Err error
}
type PlcSubscriptionRequest interface {
- Execute() <-chan PlcSubscriptionRequestResult
- PlcRequest
+ Execute() <-chan PlcSubscriptionRequestResult
+ PlcRequest
}