You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2021/01/29 15:11:55 UTC
[plc4x] 02/02: - Refactored the callback-based operations
This is an automated email from the ASF dual-hosted git repository.
cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 17f46561efaffdbc8ad90b4a75e00a8b21e23a5b
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Fri Jan 29 16:11:37 2021 +0100
- Refactored the callback-based operations
---
plc4go/cmd/main/drivers/knxnetip_test.go | 26 ++-
plc4go/internal/plc4go/knxnetip/KnxNetIpBrowser.go | 180 +++++++++------------
plc4go/internal/plc4go/spi/PlcBrowser.go | 10 +-
.../plc4go/spi/model/DefaultBrowseRequest.go | 4 +-
plc4go/pkg/plc4go/model/plc_browse.go | 2 +-
5 files changed, 112 insertions(+), 110 deletions(-)
diff --git a/plc4go/cmd/main/drivers/knxnetip_test.go b/plc4go/cmd/main/drivers/knxnetip_test.go
index 87df161..1122cf8 100644
--- a/plc4go/cmd/main/drivers/knxnetip_test.go
+++ b/plc4go/cmd/main/drivers/knxnetip_test.go
@@ -41,6 +41,8 @@ func Init() {
func TestKnxNetIpPlc4goBrowse(t *testing.T) {
Init()
+ startTime := time.Now()
+
log.Debug("Initializing PLC4X")
driverManager := plc4go.NewPlcDriverManager()
log.Debug("Registering KNXnet/IP driver")
@@ -136,11 +138,15 @@ func TestKnxNetIpPlc4goBrowse(t *testing.T) {
}
}
}
+
+ log.Infof("Operation finished in %s", time.Since(startTime))
}
func TestKnxNetIpPlc4goBlockingBrowseWithCallback(t *testing.T) {
Init()
+ startTime := time.Now()
+
log.Debug("Initializing PLC4X")
driverManager := plc4go.NewPlcDriverManager()
log.Debug("Registering KNXnet/IP driver")
@@ -174,9 +180,10 @@ func TestKnxNetIpPlc4goBlockingBrowseWithCallback(t *testing.T) {
// Execute the browse-request
log.Info("Scanning for KNX devices")
- _ = browseRequest.ExecuteAsync(func(result apiModel.PlcBrowseEvent) {
+
+ brr := browseRequest.ExecuteWithInterceptor(func(result apiModel.PlcBrowseEvent) bool {
if result.Err != nil {
- return
+ return false
}
// Create a read-request to read the manufacturer and hardware ids
@@ -188,7 +195,7 @@ func TestKnxNetIpPlc4goBlockingBrowseWithCallback(t *testing.T) {
if err != nil {
log.Errorf("Got an error creating read-request: %s", err.Error())
t.Fail()
- return
+ return false
}
// Execute the read-requests
@@ -197,7 +204,7 @@ func TestKnxNetIpPlc4goBlockingBrowseWithCallback(t *testing.T) {
if readResult.Err != nil {
log.Errorf("got an error executing read-request: %s", readResult.Err.Error())
t.Fail()
- return
+ return false
}
// Check the response
@@ -205,7 +212,7 @@ func TestKnxNetIpPlc4goBlockingBrowseWithCallback(t *testing.T) {
if readResponse.GetResponseCode("manufacturerId") != apiModel.PlcResponseCode_OK {
log.Errorf("Got an error response code %d for field 'manufacturerId'", readResponse.GetResponseCode("manufacturerId"))
t.Fail()
- return
+ return false
}
if readResponse.GetResponseCode("applicationProgramVersion") != apiModel.PlcResponseCode_OK && readResponse.GetResponseCode("interfaceProgramVersion") != apiModel.PlcResponseCode_OK {
log.Errorf("Got response code %d for address %s ('programVersion')",
@@ -225,7 +232,16 @@ func TestKnxNetIpPlc4goBlockingBrowseWithCallback(t *testing.T) {
programVersionBytes := PlcValueUint8ListToByteArray(programVersion)
log.Infof(" - Manufacturer Id: %d, Interface Program Version: %s\n", manufacturerId, hex.EncodeToString(programVersionBytes))
}
+ return true
})
+ browseRequestResults := <-brr
+ if browseRequestResults.Err != nil {
+ log.Errorf("Got an error scanning for KNX devices: %s", connectionResult.Err.Error())
+ t.Fail()
+ return
+ }
+
+ log.Infof("Operation finished in %s", time.Since(startTime))
}
func TestKnxNetIpPlc4goGroupAddressRead(t *testing.T) {
diff --git a/plc4go/internal/plc4go/knxnetip/KnxNetIpBrowser.go b/plc4go/internal/plc4go/knxnetip/KnxNetIpBrowser.go
index 188f602..56269e6 100644
--- a/plc4go/internal/plc4go/knxnetip/KnxNetIpBrowser.go
+++ b/plc4go/internal/plc4go/knxnetip/KnxNetIpBrowser.go
@@ -45,6 +45,12 @@ func NewKnxNetIpBrowser(connection *KnxNetIpConnection, messageCodec spi.Message
}
func (b KnxNetIpBrowser) Browse(browseRequest apiModel.PlcBrowseRequest) <-chan apiModel.PlcBrowseRequestResult {
+ return b.BrowseWithInterceptor(browseRequest, func(result apiModel.PlcBrowseEvent) bool {
+ return true
+ })
+}
+
+func (b KnxNetIpBrowser) BrowseWithInterceptor(browseRequest apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseEvent) bool) <-chan apiModel.PlcBrowseRequestResult {
result := make(chan apiModel.PlcBrowseRequestResult)
sendResult := func(browseResponse apiModel.PlcBrowseResponse, err error) {
result <- apiModel.PlcBrowseRequestResult{
@@ -55,118 +61,92 @@ func (b KnxNetIpBrowser) Browse(browseRequest apiModel.PlcBrowseRequest) <-chan
}
go func() {
- browseResults := map[string][]apiModel.PlcBrowseQueryResult{}
- err := b.BlockingBrowseWithCallback(browseRequest, func(result apiModel.PlcBrowseEvent) {
- if _, ok := browseResults[result.QueryName]; !ok {
- browseResults[result.QueryName] = []apiModel.PlcBrowseQueryResult{}
+ results := map[string][]apiModel.PlcBrowseQueryResult{}
+ for _, queryName := range browseRequest.GetQueryNames() {
+ queryString := browseRequest.GetQueryString(queryName)
+ field, err := b.connection.fieldHandler.ParseQuery(queryString)
+ if err != nil {
+ sendResult(nil, err)
}
- if result.Err != nil {
- // TODO: Find out what to do here ...
- } else {
- browseResults[result.QueryName] = append(browseResults[result.QueryName], *result.Result)
+
+ // Create a list of address strings, which doesn't contain any ranges, lists or wildcards
+ options, err := b.calculateAddresses(field)
+ if err != nil {
+ sendResult(nil, err)
}
- })
- sendResult(model.NewDefaultPlcBrowseResponse(browseRequest, browseResults), err)
- }()
- return result
-}
+ var queryResults []apiModel.PlcBrowseQueryResult
+ // Parse each of these expanded addresses and handle them accordingly.
+ for _, option := range options {
+ field, err = b.connection.fieldHandler.ParseQuery(option)
+ if err != nil {
+ sendResult(nil, err)
+ }
-func (b KnxNetIpBrowser) BlockingBrowseWithCallback(browseRequest apiModel.PlcBrowseRequest, callback func(result apiModel.PlcBrowseEvent)) error {
- for _, queryName := range browseRequest.GetQueryNames() {
- queryString := browseRequest.GetQueryString(queryName)
- field, err := b.connection.fieldHandler.ParseQuery(queryString)
- if err != nil {
- callback(apiModel.PlcBrowseEvent{
- Request: browseRequest,
- QueryName: queryName,
- Result: nil,
- Err: err,
- })
- continue
- }
+ // The following browse actions could be required:
+ switch field.(type) {
+ // - A Device Address
+ // - A Device has to be detected (This is done in every case)
+ // TODO: Send a Connect to the physical knx address
+ // - If an object-id is provided, check if this object id exists
+ // - If a property-id is provided, check if this property exists and try to get more information about it
+ case KnxNetIpDevicePropertyAddressPlcField:
+ targetAddress := FieldToKnxAddress(field.(KnxNetIpDevicePropertyAddressPlcField))
- // Create a list of address strings, which doesn't contain any ranges, lists or wildcards
- options, err := b.calculateAddresses(field)
- if err != nil {
- callback(apiModel.PlcBrowseEvent{
- Request: browseRequest,
- QueryName: queryName,
- Result: nil,
- Err: err,
- })
- continue
- }
+ // Send a connection request to the device
+ deviceConnections := b.connection.ConnectToDevice(*targetAddress)
+ select {
+ case deviceConnection := <-deviceConnections:
+ if deviceConnection != nil {
+ queryResult := apiModel.PlcBrowseQueryResult{
+ Address: fmt.Sprintf("%d.%d.%d",
+ targetAddress.MainGroup,
+ targetAddress.MiddleGroup,
+ targetAddress.SubGroup),
+ PossibleDataTypes: nil,
+ }
- // Parse each of these expanded addresses and handle them accordingly.
- for _, option := range options {
- field, err = b.connection.fieldHandler.ParseQuery(option)
- if err != nil {
- callback(apiModel.PlcBrowseEvent{
- Request: browseRequest,
- QueryName: queryName,
- Result: nil,
- Err: err,
- })
- continue
- }
+ // Pass it to the callback
+ add := interceptor(apiModel.PlcBrowseEvent{
+ Request: browseRequest,
+ QueryName: queryName,
+ Result: &queryResult,
+ Err: nil,
+ })
- // The following browse actions could be required:
- switch field.(type) {
- // - A Device Address
- // - A Device has to be detected (This is done in every case)
- // TODO: Send a Connect to the physical knx address
- // - If an object-id is provided, check if this object id exists
- // - If a property-id is provided, check if this property exists and try to get more information about it
- case KnxNetIpDevicePropertyAddressPlcField:
- targetAddress := FieldToKnxAddress(field.(KnxNetIpDevicePropertyAddressPlcField))
-
- // Send a connection request to the device
- deviceConnections := b.connection.ConnectToDevice(*targetAddress)
- select {
- case deviceConnection := <-deviceConnections:
- if deviceConnection != nil {
- // Prepare the query result
- queryResult := apiModel.PlcBrowseQueryResult{
- Address: fmt.Sprintf("%d.%d.%d",
- targetAddress.MainGroup,
- targetAddress.MiddleGroup,
- targetAddress.SubGroup),
- PossibleDataTypes: nil,
- }
- // Pass it to the callback
- callback(apiModel.PlcBrowseEvent{
- Request: browseRequest,
- QueryName: queryName,
- Result: &queryResult,
- Err: nil,
- })
- // Disconnect from the device
- deviceDisconnections := b.connection.DisconnectFromDevice(*targetAddress)
- select {
- case _ = <-deviceDisconnections:
- case <-time.After(b.connection.defaultTtl * 10):
- fmt.Printf("Timedout")
+ // If the interceptor opted for adding it to the result, do so
+ if add {
+ queryResults = append(queryResults, queryResult)
+ }
+
+ deviceDisconnections := b.connection.DisconnectFromDevice(*targetAddress)
+ select {
+ case _ = <-deviceDisconnections:
+ case <-time.After(b.connection.defaultTtl * 10):
+ fmt.Printf("Timedout")
+ }
}
+ case <-time.After(b.connection.defaultTtl):
+ // In this case the remote was just not responding.
}
- case <-time.After(b.connection.defaultTtl):
- // In this case the remote was just not responding.
+ // Just to slow things down a bit (This way we can't exceed the max number of requests per minute)
+ //time.Sleep(time.Millisecond * 20)
+ // - A Group Address
+ // - Check the cache of known group addresses. If there is data available from that group-id, it's returned
+ case KnxNetIpGroupAddress3LevelPlcField:
+ // - A Group Address
+ // - Check the cache of known group addresses. If there is data available from that group-id, it's returned
+ case KnxNetIpGroupAddress2LevelPlcField:
+ // - A Group Address
+ // - Check the cache of known group addresses. If there is data available from that group-id, it's returned
+ case KnxNetIpGroupAddress1LevelPlcField:
}
- // Just to slow things down a bit (This way we can't exceed the max number of requests per minute)
- //time.Sleep(time.Millisecond * 20)
- // - A Group Address
- // - Check the cache of known group addresses. If there is data available from that group-id, it's returned
- case KnxNetIpGroupAddress3LevelPlcField:
- // - A Group Address
- // - Check the cache of known group addresses. If there is data available from that group-id, it's returned
- case KnxNetIpGroupAddress2LevelPlcField:
- // - A Group Address
- // - Check the cache of known group addresses. If there is data available from that group-id, it's returned
- case KnxNetIpGroupAddress1LevelPlcField:
}
+ results[queryName] = queryResults
}
- }
- return nil
+ sendResult(model.NewDefaultPlcBrowseResponse(browseRequest, results), nil)
+ }()
+ return result
}
func (b KnxNetIpBrowser) calculateAddresses(field apiModel.PlcField) ([]string, error) {
diff --git a/plc4go/internal/plc4go/spi/PlcBrowser.go b/plc4go/internal/plc4go/spi/PlcBrowser.go
index 2debdcc..d7b772c 100644
--- a/plc4go/internal/plc4go/spi/PlcBrowser.go
+++ b/plc4go/internal/plc4go/spi/PlcBrowser.go
@@ -23,6 +23,12 @@ import "github.com/apache/plc4x/plc4go/pkg/plc4go/model"
type PlcBrowser interface {
// Non-Blocking request, which will return a full result as soon as the operation is finished
Browse(browseRequest model.PlcBrowseRequest) <-chan model.PlcBrowseRequestResult
- // Blocking request, which calls a callback function on every found resource
- BlockingBrowseWithCallback(browseRequest model.PlcBrowseRequest, callback func(result model.PlcBrowseEvent)) error
+
+ // Variant of the Browser, which allows immediately intercepting found resources
+ // This is ideal, if additional information has to be queried on such found resources
+ // and especially for connection-based protocols can reduce the stress on the system
+ // and increase throughput. It can also be used for simple filtering.
+ // If the interceptor function returns 'true' the result is added to the overall result
+ // if it's 'false' is is not.
+ BrowseWithInterceptor(browseRequest model.PlcBrowseRequest, interceptor func(result model.PlcBrowseEvent) bool) <-chan model.PlcBrowseRequestResult
}
diff --git a/plc4go/internal/plc4go/spi/model/DefaultBrowseRequest.go b/plc4go/internal/plc4go/spi/model/DefaultBrowseRequest.go
index 7a23090..e438300 100644
--- a/plc4go/internal/plc4go/spi/model/DefaultBrowseRequest.go
+++ b/plc4go/internal/plc4go/spi/model/DefaultBrowseRequest.go
@@ -69,8 +69,8 @@ func (d DefaultPlcBrowseRequest) Execute() <-chan model.PlcBrowseRequestResult {
return d.browser.Browse(d)
}
-func (d DefaultPlcBrowseRequest) ExecuteAsync(callback func(result model.PlcBrowseEvent)) error {
- return d.browser.BlockingBrowseWithCallback(d, callback)
+func (d DefaultPlcBrowseRequest) ExecuteWithInterceptor(interceptor func(result model.PlcBrowseEvent) bool) <-chan model.PlcBrowseRequestResult {
+ return d.browser.BrowseWithInterceptor(d, interceptor)
}
type DefaultPlcBrowseResponse struct {
diff --git a/plc4go/pkg/plc4go/model/plc_browse.go b/plc4go/pkg/plc4go/model/plc_browse.go
index 277f658..23f1d0f 100644
--- a/plc4go/pkg/plc4go/model/plc_browse.go
+++ b/plc4go/pkg/plc4go/model/plc_browse.go
@@ -32,7 +32,7 @@ type PlcBrowseRequest interface {
// Will not return until a potential scan is finished and will return all results in one block
Execute() <-chan PlcBrowseRequestResult
// Will call the given callback for every found resource
- ExecuteAsync(callback func(result PlcBrowseEvent)) error
+ ExecuteWithInterceptor(interceptor func(result PlcBrowseEvent) bool) <-chan PlcBrowseRequestResult
GetQueryNames() []string
GetQueryString(name string) string
PlcRequest