You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2020/11/09 15:37:32 UTC

[plc4x] 01/02: - Implemented a first version of the subscription API

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch feature/plc4go
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 2f615be3bee9507fc8bab00f25b192fe361593e2
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Mon Nov 9 16:37:01 2020 +0100

    - Implemented a first version of the subscription API
---
 plc4go/cmd/main/drivers/knxnetip_test.go           |  20 +-
 .../plc4go/model/DefaultPlcSubscriptionRequest.go  | 210 ++++++++++-----------
 plc4go/pkg/plc4go/model/plc_subscription_event.go  |  11 ++
 .../pkg/plc4go/model/plc_subscription_request.go   |  21 ++-
 4 files changed, 144 insertions(+), 118 deletions(-)

diff --git a/plc4go/cmd/main/drivers/knxnetip_test.go b/plc4go/cmd/main/drivers/knxnetip_test.go
index 09e63be..6104e46 100644
--- a/plc4go/cmd/main/drivers/knxnetip_test.go
+++ b/plc4go/cmd/main/drivers/knxnetip_test.go
@@ -20,12 +20,15 @@ package drivers
 
 import (
     "encoding/hex"
+    "fmt"
     "plc4x.apache.org/plc4go/v0/internal/plc4go/knxnetip"
     "plc4x.apache.org/plc4go/v0/internal/plc4go/knxnetip/readwrite/model"
     "plc4x.apache.org/plc4go/v0/internal/plc4go/transports/udp"
     "plc4x.apache.org/plc4go/v0/internal/plc4go/utils"
     "plc4x.apache.org/plc4go/v0/pkg/plc4go"
+    apiModel "plc4x.apache.org/plc4go/v0/pkg/plc4go/model"
     "testing"
+    "time"
 )
 
 func KnxNetIp(t *testing.T) {
@@ -52,6 +55,7 @@ func TestKnxNetIpPlc4goDriver(t *testing.T) {
 
     // Get a connection to a remote PLC
     crc := driverManager.GetConnection("knxnet-ip://192.168.42.11")
+    //crc := driverManager.GetConnection("knxnet-ip://-discover-")
 
     // Wait for the driver to connect (or not)
     connectionResult := <-crc
@@ -75,9 +79,16 @@ func TestKnxNetIpPlc4goDriver(t *testing.T) {
     defer connection.Close()
 
     // Prepare a read-request
+    pollingInterval, err := time.ParseDuration("5s")
+    if err != nil {
+        t.Errorf("invalid format")
+        t.Fail()
+        return
+    }
     srb := connection.SubscriptionRequestBuilder()
     srb.AddChangeOfStateItem("field1", "*/*/*")
-    srb.AddChangeOfStateItem("field2", "holding-register:3:REAL")
+    srb.AddCyclicItem("field2", "holding-register:3:REAL", pollingInterval)
+    srb.AddItemHandler(knxEventHandler)
     subscriptionRequest, err := srb.Build()
     if err != nil {
         t.Errorf("error preparing subscription-request: %s", connectionResult.Err.Error())
@@ -128,3 +139,10 @@ func TestKnxNetIpPlc4goDriver(t *testing.T) {
     fmt.Printf("\n\nResult field2: %d\n", wrr.Response.GetResponseCode("field2"))*/
 }
 
+func knxEventHandler(event apiModel.PlcSubscriptionEvent) {
+    for _, fieldName := range event.GetFieldNames() {
+        if event.GetResponseCode(fieldName) == apiModel.PlcResponseCode_OK {
+            fmt.Printf("Got update for field %s with value %s", fieldName, event.GetValue(fieldName).GetString())
+        }
+    }
+}
\ No newline at end of file
diff --git a/plc4go/internal/plc4go/model/DefaultPlcSubscriptionRequest.go b/plc4go/internal/plc4go/model/DefaultPlcSubscriptionRequest.go
index 1abb1a4..53dd8b8 100644
--- a/plc4go/internal/plc4go/model/DefaultPlcSubscriptionRequest.go
+++ b/plc4go/internal/plc4go/model/DefaultPlcSubscriptionRequest.go
@@ -19,64 +19,85 @@
 package model
 
 import (
-	"encoding/xml"
-	"errors"
-	values2 "plc4x.apache.org/plc4go/v0/internal/plc4go/model/values"
-	"plc4x.apache.org/plc4go/v0/internal/plc4go/spi"
-	"plc4x.apache.org/plc4go/v0/pkg/plc4go/model"
-	"plc4x.apache.org/plc4go/v0/pkg/plc4go/values"
+    "encoding/xml"
+    "errors"
+    "plc4x.apache.org/plc4go/v0/internal/plc4go/spi"
+    "plc4x.apache.org/plc4go/v0/pkg/plc4go/model"
+    "time"
+)
+
+type SubscriptionType uint8
+
+const (
+    SUBSCRIPTION_CYCLIC          SubscriptionType = 0x01
+    SUBSCRIPTION_CHANGE_OF_STATE SubscriptionType = 0x02
+    SUBSCRIPTION_EVENT           SubscriptionType = 0x03
 )
 
 type DefaultPlcSubscriptionRequestBuilder struct {
-	subscriber   spi.PlcSubscriber
-	fieldHandler spi.PlcFieldHandler
-	valueHandler spi.PlcValueHandler
-	queries      map[string]string
-	values       map[string]interface{}
+    subscriber   spi.PlcSubscriber
+    fieldHandler spi.PlcFieldHandler
+    valueHandler spi.PlcValueHandler
+    eventHandler model.PlcSubscriptionEventHandler
+    queries      map[string]string
+    types        map[string]SubscriptionType
+    intervals    map[string]time.Duration
 }
 
 func NewDefaultPlcSubscriptionRequestBuilder(fieldHandler spi.PlcFieldHandler, valueHandler spi.PlcValueHandler, subscriber spi.PlcSubscriber) *DefaultPlcSubscriptionRequestBuilder {
-	return &DefaultPlcSubscriptionRequestBuilder{
+    return &DefaultPlcSubscriptionRequestBuilder{
         subscriber:   subscriber,
-		fieldHandler: fieldHandler,
-		valueHandler: valueHandler,
-		queries:      map[string]string{},
-		values:       map[string]interface{}{},
-	}
+        fieldHandler: fieldHandler,
+        valueHandler: valueHandler,
+        queries:      map[string]string{},
+    }
+}
+
+func (m *DefaultPlcSubscriptionRequestBuilder) AddCyclicItem(name string, query string, interval time.Duration) {
+    m.queries[name] = query
+    m.types[name] = SUBSCRIPTION_CYCLIC
+    m.intervals[name] = interval
+}
+
+func (m *DefaultPlcSubscriptionRequestBuilder) AddChangeOfStateItem(name string, query string) {
+    m.queries[name] = query
+    m.types[name] = SUBSCRIPTION_CHANGE_OF_STATE
 }
 
-func (m *DefaultPlcSubscriptionRequestBuilder) AddItem(name string, query string, value interface{}) {
-	m.queries[name] = query
-	m.values[name] = value
+func (m *DefaultPlcSubscriptionRequestBuilder) AddEventItem(name string, query string) {
+    m.queries[name] = query
+    m.types[name] = SUBSCRIPTION_EVENT
+}
+
+func (m *DefaultPlcSubscriptionRequestBuilder) AddItemHandler(eventHandler model.PlcSubscriptionEventHandler) {
+    m.eventHandler = eventHandler
 }
 
 func (m *DefaultPlcSubscriptionRequestBuilder) Build() (model.PlcSubscriptionRequest, error) {
-	fields := make(map[string]model.PlcField)
-	values := make(map[string]values.PlcValue)
-	for name, query := range m.queries {
-		field, err := m.fieldHandler.ParseQuery(query)
-		if err != nil {
-			return nil, errors.New("Error parsing query: " + query + ". Got error: " + err.Error())
-		}
-		fields[name] = field
-		value, err := m.valueHandler.NewPlcValue(field, m.values[name])
-		if err != nil {
-			return nil, errors.New("Error parsing value of type: " + field.GetTypeName() + ". Got error: " + err.Error())
-		}
-		values[name] = value
-	}
-	return DefaultPlcSubscriptionRequest{
-		fields: fields,
-		values: values,
-        subscriber: m.subscriber,
-	}, nil
+    fields := make(map[string]model.PlcField)
+    for name, query := range m.queries {
+        field, err := m.fieldHandler.ParseQuery(query)
+        if err != nil {
+            return nil, errors.New("Error parsing query: " + query + ". Got error: " + err.Error())
+        }
+        fields[name] = field
+    }
+    return DefaultPlcSubscriptionRequest{
+        fields:       fields,
+        types:        m.types,
+        intervals:    m.intervals,
+        subscriber:   m.subscriber,
+        eventHandler: m.eventHandler,
+    }, nil
 }
 
 type DefaultPlcSubscriptionRequest struct {
-	fields map[string]model.PlcField
-	values map[string]values.PlcValue
-    subscriber spi.PlcSubscriber
-	model.PlcSubscriptionRequest
+    fields       map[string]model.PlcField
+    types        map[string]SubscriptionType
+    intervals    map[string]time.Duration
+    eventHandler model.PlcSubscriptionEventHandler
+    subscriber   spi.PlcSubscriber
+    model.PlcSubscriptionRequest
 }
 
 func (m DefaultPlcSubscriptionRequest) Execute() <-chan model.PlcSubscriptionRequestResult {
@@ -84,77 +105,50 @@ func (m DefaultPlcSubscriptionRequest) Execute() <-chan model.PlcSubscriptionReq
 }
 
 func (m DefaultPlcSubscriptionRequest) GetFieldNames() []string {
-	var fieldNames []string
-	for fieldName, _ := range m.fields {
-		fieldNames = append(fieldNames, fieldName)
-	}
-	return fieldNames
+    var fieldNames []string
+    for fieldName, _ := range m.fields {
+        fieldNames = append(fieldNames, fieldName)
+    }
+    return fieldNames
 }
 
 func (m DefaultPlcSubscriptionRequest) GetField(name string) model.PlcField {
-	return m.fields[name]
+    return m.fields[name]
+}
+
+func (m DefaultPlcSubscriptionRequest) GetType(name string) SubscriptionType {
+    return m.types[name]
 }
 
-func (m DefaultPlcSubscriptionRequest) GetValue(name string) values.PlcValue {
-	return m.values[name]
+func (m DefaultPlcSubscriptionRequest) GetInterval(name string) time.Duration {
+    return m.intervals[name]
 }
 
 func (m DefaultPlcSubscriptionRequest) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
-	if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: "PlcSubscriptionRequest"}}); err != nil {
-		return err
-	}
-
-	if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: "fields"}}); err != nil {
-		return err
-	}
-	for fieldName, field := range m.fields {
-		value := m.values[fieldName]
-		if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: fieldName}}); err != nil {
-			return err
-		}
-		if err := e.EncodeElement(field, xml.StartElement{Name: xml.Name{Local: "field"}}); err != nil {
-			return err
-		}
-		switch value.(type) {
-		case values2.PlcList:
-			listValue, ok := value.(values2.PlcList)
-			if !ok {
-				return errors.New("couldn't cast PlcValue to PlcList")
-			}
-			for _, subValue := range listValue.Values {
-				if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: "value"}}); err != nil {
-					return err
-				}
-				if !subValue.IsString() {
-					return errors.New("value not serializable to string")
-				}
-				e.EncodeToken(xml.CharData(subValue.GetString()))
-				if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: "value"}}); err != nil {
-					return err
-				}
-			}
-		default:
-			if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: "value"}}); err != nil {
-				return err
-			}
-			if !value.IsString() {
-				return errors.New("value not serializable to string")
-			}
-			e.EncodeToken(xml.CharData(value.GetString()))
-			if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: "value"}}); err != nil {
-				return err
-			}
-		}
-		if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: fieldName}}); err != nil {
-			return err
-		}
-	}
-	if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: "fields"}}); err != nil {
-		return err
-	}
-
-	if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: "PlcSubscriptionRequest"}}); err != nil {
-		return err
-	}
-	return nil
+    if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: "PlcSubscriptionRequest"}}); err != nil {
+        return err
+    }
+
+    if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: "fields"}}); err != nil {
+        return err
+    }
+    for fieldName, field := range m.fields {
+        if err := e.EncodeToken(xml.StartElement{Name: xml.Name{Local: fieldName}}); err != nil {
+            return err
+        }
+        if err := e.EncodeElement(field, xml.StartElement{Name: xml.Name{Local: "field"}}); err != nil {
+            return err
+        }
+        if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: fieldName}}); err != nil {
+            return err
+        }
+    }
+    if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: "fields"}}); err != nil {
+        return err
+    }
+
+    if err := e.EncodeToken(xml.EndElement{Name: xml.Name{Local: "PlcSubscriptionRequest"}}); err != nil {
+        return err
+    }
+    return nil
 }
diff --git a/plc4go/pkg/plc4go/model/plc_subscription_event.go b/plc4go/pkg/plc4go/model/plc_subscription_event.go
index 773175b..50ae9b8 100644
--- a/plc4go/pkg/plc4go/model/plc_subscription_event.go
+++ b/plc4go/pkg/plc4go/model/plc_subscription_event.go
@@ -17,3 +17,14 @@
 // under the License.
 //
 package model
+
+import "plc4x.apache.org/plc4go/v0/pkg/plc4go/values"
+
+type PlcSubscriptionEvent interface {
+    GetRequest() PlcSubscriptionRequest
+    GetFieldNames() []string
+    GetResponseCode(name string) PlcResponseCode
+    GetValue(name string) values.PlcValue
+    PlcResponse
+}
+
diff --git a/plc4go/pkg/plc4go/model/plc_subscription_request.go b/plc4go/pkg/plc4go/model/plc_subscription_request.go
index 6b3c398..1c5b96c 100644
--- a/plc4go/pkg/plc4go/model/plc_subscription_request.go
+++ b/plc4go/pkg/plc4go/model/plc_subscription_request.go
@@ -20,20 +20,23 @@ package model
 
 import "time"
 
+type PlcSubscriptionEventHandler func(event PlcSubscriptionEvent)
+
 type PlcSubscriptionRequestBuilder interface {
-	AddCyclicItem(name string, query string, interval time.Duration)
-	AddChangeOfStateItem(name string, query string)
-	AddEventItem(name string, query string)
-	Build() (PlcSubscriptionRequest, error)
+    AddCyclicItem(name string, query string, interval time.Duration)
+    AddChangeOfStateItem(name string, query string)
+    AddEventItem(name string, query string)
+    AddItemHandler(handler PlcSubscriptionEventHandler)
+    Build() (PlcSubscriptionRequest, error)
 }
 
 type PlcSubscriptionRequestResult struct {
-	Request  PlcSubscriptionRequest
-	Response PlcSubscriptionResponse
-	Err      error
+    Request  PlcSubscriptionRequest
+    Response PlcSubscriptionResponse
+    Err      error
 }
 
 type PlcSubscriptionRequest interface {
-	Execute() <-chan PlcSubscriptionRequestResult
-	PlcRequest
+    Execute() <-chan PlcSubscriptionRequestResult
+    PlcRequest
 }