You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by cd...@apache.org on 2021/01/29 15:11:55 UTC

[plc4x] 02/02: - Refactored the callback-based operations

This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 17f46561efaffdbc8ad90b4a75e00a8b21e23a5b
Author: Christofer Dutz <ch...@c-ware.de>
AuthorDate: Fri Jan 29 16:11:37 2021 +0100

    - Refactored the callback-based operations
---
 plc4go/cmd/main/drivers/knxnetip_test.go           |  26 ++-
 plc4go/internal/plc4go/knxnetip/KnxNetIpBrowser.go | 180 +++++++++------------
 plc4go/internal/plc4go/spi/PlcBrowser.go           |  10 +-
 .../plc4go/spi/model/DefaultBrowseRequest.go       |   4 +-
 plc4go/pkg/plc4go/model/plc_browse.go              |   2 +-
 5 files changed, 112 insertions(+), 110 deletions(-)

diff --git a/plc4go/cmd/main/drivers/knxnetip_test.go b/plc4go/cmd/main/drivers/knxnetip_test.go
index 87df161..1122cf8 100644
--- a/plc4go/cmd/main/drivers/knxnetip_test.go
+++ b/plc4go/cmd/main/drivers/knxnetip_test.go
@@ -41,6 +41,8 @@ func Init() {
 func TestKnxNetIpPlc4goBrowse(t *testing.T) {
 	Init()
 
+	startTime := time.Now()
+
 	log.Debug("Initializing PLC4X")
 	driverManager := plc4go.NewPlcDriverManager()
 	log.Debug("Registering KNXnet/IP driver")
@@ -136,11 +138,15 @@ func TestKnxNetIpPlc4goBrowse(t *testing.T) {
 			}
 		}
 	}
+
+	log.Infof("Operation finished in %s", time.Since(startTime))
 }
 
 func TestKnxNetIpPlc4goBlockingBrowseWithCallback(t *testing.T) {
 	Init()
 
+	startTime := time.Now()
+
 	log.Debug("Initializing PLC4X")
 	driverManager := plc4go.NewPlcDriverManager()
 	log.Debug("Registering KNXnet/IP driver")
@@ -174,9 +180,10 @@ func TestKnxNetIpPlc4goBlockingBrowseWithCallback(t *testing.T) {
 
 	// Execute the browse-request
 	log.Info("Scanning for KNX devices")
-	_ = browseRequest.ExecuteAsync(func(result apiModel.PlcBrowseEvent) {
+
+	brr := browseRequest.ExecuteWithInterceptor(func(result apiModel.PlcBrowseEvent) bool {
 		if result.Err != nil {
-			return
+			return false
 		}
 
 		// Create a read-request to read the manufacturer and hardware ids
@@ -188,7 +195,7 @@ func TestKnxNetIpPlc4goBlockingBrowseWithCallback(t *testing.T) {
 		if err != nil {
 			log.Errorf("Got an error creating read-request: %s", err.Error())
 			t.Fail()
-			return
+			return false
 		}
 
 		// Execute the read-requests
@@ -197,7 +204,7 @@ func TestKnxNetIpPlc4goBlockingBrowseWithCallback(t *testing.T) {
 		if readResult.Err != nil {
 			log.Errorf("got an error executing read-request: %s", readResult.Err.Error())
 			t.Fail()
-			return
+			return false
 		}
 
 		// Check the response
@@ -205,7 +212,7 @@ func TestKnxNetIpPlc4goBlockingBrowseWithCallback(t *testing.T) {
 		if readResponse.GetResponseCode("manufacturerId") != apiModel.PlcResponseCode_OK {
 			log.Errorf("Got an error response code %d for field 'manufacturerId'", readResponse.GetResponseCode("manufacturerId"))
 			t.Fail()
-			return
+			return false
 		}
 		if readResponse.GetResponseCode("applicationProgramVersion") != apiModel.PlcResponseCode_OK && readResponse.GetResponseCode("interfaceProgramVersion") != apiModel.PlcResponseCode_OK {
 			log.Errorf("Got response code %d for address %s ('programVersion')",
@@ -225,7 +232,16 @@ func TestKnxNetIpPlc4goBlockingBrowseWithCallback(t *testing.T) {
 			programVersionBytes := PlcValueUint8ListToByteArray(programVersion)
 			log.Infof(" - Manufacturer Id: %d, Interface Program Version: %s\n", manufacturerId, hex.EncodeToString(programVersionBytes))
 		}
+		return true
 	})
+	browseRequestResults := <-brr
+	if browseRequestResults.Err != nil {
+		log.Errorf("Got an error scanning for KNX devices: %s", connectionResult.Err.Error())
+		t.Fail()
+		return
+	}
+
+	log.Infof("Operation finished in %s", time.Since(startTime))
 }
 
 func TestKnxNetIpPlc4goGroupAddressRead(t *testing.T) {
diff --git a/plc4go/internal/plc4go/knxnetip/KnxNetIpBrowser.go b/plc4go/internal/plc4go/knxnetip/KnxNetIpBrowser.go
index 188f602..56269e6 100644
--- a/plc4go/internal/plc4go/knxnetip/KnxNetIpBrowser.go
+++ b/plc4go/internal/plc4go/knxnetip/KnxNetIpBrowser.go
@@ -45,6 +45,12 @@ func NewKnxNetIpBrowser(connection *KnxNetIpConnection, messageCodec spi.Message
 }
 
 func (b KnxNetIpBrowser) Browse(browseRequest apiModel.PlcBrowseRequest) <-chan apiModel.PlcBrowseRequestResult {
+	return b.BrowseWithInterceptor(browseRequest, func(result apiModel.PlcBrowseEvent) bool {
+		return true
+	})
+}
+
+func (b KnxNetIpBrowser) BrowseWithInterceptor(browseRequest apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseEvent) bool) <-chan apiModel.PlcBrowseRequestResult {
 	result := make(chan apiModel.PlcBrowseRequestResult)
 	sendResult := func(browseResponse apiModel.PlcBrowseResponse, err error) {
 		result <- apiModel.PlcBrowseRequestResult{
@@ -55,118 +61,92 @@ func (b KnxNetIpBrowser) Browse(browseRequest apiModel.PlcBrowseRequest) <-chan
 	}
 
 	go func() {
-		browseResults := map[string][]apiModel.PlcBrowseQueryResult{}
-		err := b.BlockingBrowseWithCallback(browseRequest, func(result apiModel.PlcBrowseEvent) {
-			if _, ok := browseResults[result.QueryName]; !ok {
-				browseResults[result.QueryName] = []apiModel.PlcBrowseQueryResult{}
+		results := map[string][]apiModel.PlcBrowseQueryResult{}
+		for _, queryName := range browseRequest.GetQueryNames() {
+			queryString := browseRequest.GetQueryString(queryName)
+			field, err := b.connection.fieldHandler.ParseQuery(queryString)
+			if err != nil {
+				sendResult(nil, err)
 			}
-			if result.Err != nil {
-				// TODO: Find out what to do here ...
-			} else {
-				browseResults[result.QueryName] = append(browseResults[result.QueryName], *result.Result)
+
+			// Create a list of address strings, which doesn't contain any ranges, lists or wildcards
+			options, err := b.calculateAddresses(field)
+			if err != nil {
+				sendResult(nil, err)
 			}
-		})
-		sendResult(model.NewDefaultPlcBrowseResponse(browseRequest, browseResults), err)
-	}()
 
-	return result
-}
+			var queryResults []apiModel.PlcBrowseQueryResult
+			// Parse each of these expanded addresses and handle them accordingly.
+			for _, option := range options {
+				field, err = b.connection.fieldHandler.ParseQuery(option)
+				if err != nil {
+					sendResult(nil, err)
+				}
 
-func (b KnxNetIpBrowser) BlockingBrowseWithCallback(browseRequest apiModel.PlcBrowseRequest, callback func(result apiModel.PlcBrowseEvent)) error {
-	for _, queryName := range browseRequest.GetQueryNames() {
-		queryString := browseRequest.GetQueryString(queryName)
-		field, err := b.connection.fieldHandler.ParseQuery(queryString)
-		if err != nil {
-			callback(apiModel.PlcBrowseEvent{
-				Request:   browseRequest,
-				QueryName: queryName,
-				Result:    nil,
-				Err:       err,
-			})
-			continue
-		}
+				// The following browse actions could be required:
+				switch field.(type) {
+				// - A Device Address
+				//   - A Device has to be detected (This is done in every case)
+				//      TODO: Send a Connect to the physical knx address
+				//   - If an object-id is provided, check if this object id exists
+				//   - If a property-id is provided, check if this property exists and try to get more information about it
+				case KnxNetIpDevicePropertyAddressPlcField:
+					targetAddress := FieldToKnxAddress(field.(KnxNetIpDevicePropertyAddressPlcField))
 
-		// Create a list of address strings, which doesn't contain any ranges, lists or wildcards
-		options, err := b.calculateAddresses(field)
-		if err != nil {
-			callback(apiModel.PlcBrowseEvent{
-				Request:   browseRequest,
-				QueryName: queryName,
-				Result:    nil,
-				Err:       err,
-			})
-			continue
-		}
+					// Send a connection request to the device
+					deviceConnections := b.connection.ConnectToDevice(*targetAddress)
+					select {
+					case deviceConnection := <-deviceConnections:
+						if deviceConnection != nil {
+							queryResult := apiModel.PlcBrowseQueryResult{
+								Address: fmt.Sprintf("%d.%d.%d",
+									targetAddress.MainGroup,
+									targetAddress.MiddleGroup,
+									targetAddress.SubGroup),
+								PossibleDataTypes: nil,
+							}
 
-		// Parse each of these expanded addresses and handle them accordingly.
-		for _, option := range options {
-			field, err = b.connection.fieldHandler.ParseQuery(option)
-			if err != nil {
-				callback(apiModel.PlcBrowseEvent{
-					Request:   browseRequest,
-					QueryName: queryName,
-					Result:    nil,
-					Err:       err,
-				})
-				continue
-			}
+							// Pass it to the callback
+							add := interceptor(apiModel.PlcBrowseEvent{
+								Request:   browseRequest,
+								QueryName: queryName,
+								Result:    &queryResult,
+								Err:       nil,
+							})
 
-			// The following browse actions could be required:
-			switch field.(type) {
-			// - A Device Address
-			//   - A Device has to be detected (This is done in every case)
-			//      TODO: Send a Connect to the physical knx address
-			//   - If an object-id is provided, check if this object id exists
-			//   - If a property-id is provided, check if this property exists and try to get more information about it
-			case KnxNetIpDevicePropertyAddressPlcField:
-				targetAddress := FieldToKnxAddress(field.(KnxNetIpDevicePropertyAddressPlcField))
-
-				// Send a connection request to the device
-				deviceConnections := b.connection.ConnectToDevice(*targetAddress)
-				select {
-				case deviceConnection := <-deviceConnections:
-					if deviceConnection != nil {
-						// Prepare the query result
-						queryResult := apiModel.PlcBrowseQueryResult{
-							Address: fmt.Sprintf("%d.%d.%d",
-								targetAddress.MainGroup,
-								targetAddress.MiddleGroup,
-								targetAddress.SubGroup),
-							PossibleDataTypes: nil,
-						}
-						// Pass it to the callback
-						callback(apiModel.PlcBrowseEvent{
-							Request:   browseRequest,
-							QueryName: queryName,
-							Result:    &queryResult,
-							Err:       nil,
-						})
-						// Disconnect from the device
-						deviceDisconnections := b.connection.DisconnectFromDevice(*targetAddress)
-						select {
-						case _ = <-deviceDisconnections:
-						case <-time.After(b.connection.defaultTtl * 10):
-							fmt.Printf("Timedout")
+							// If the interceptor opted for adding it to the result, do so
+							if add {
+								queryResults = append(queryResults, queryResult)
+							}
+
+							deviceDisconnections := b.connection.DisconnectFromDevice(*targetAddress)
+							select {
+							case _ = <-deviceDisconnections:
+							case <-time.After(b.connection.defaultTtl * 10):
+								fmt.Printf("Timedout")
+							}
 						}
+					case <-time.After(b.connection.defaultTtl):
+						// In this case the remote was just not responding.
 					}
-				case <-time.After(b.connection.defaultTtl):
-					// In this case the remote was just not responding.
+					// Just to slow things down a bit (This way we can't exceed the max number of requests per minute)
+					//time.Sleep(time.Millisecond * 20)
+				// - A Group Address
+				//   - Check the cache of known group addresses. If there is data available from that group-id, it's returned
+				case KnxNetIpGroupAddress3LevelPlcField:
+				// - A Group Address
+				//   - Check the cache of known group addresses. If there is data available from that group-id, it's returned
+				case KnxNetIpGroupAddress2LevelPlcField:
+				// - A Group Address
+				//   - Check the cache of known group addresses. If there is data available from that group-id, it's returned
+				case KnxNetIpGroupAddress1LevelPlcField:
 				}
-				// Just to slow things down a bit (This way we can't exceed the max number of requests per minute)
-				//time.Sleep(time.Millisecond * 20)
-			// - A Group Address
-			//   - Check the cache of known group addresses. If there is data available from that group-id, it's returned
-			case KnxNetIpGroupAddress3LevelPlcField:
-			// - A Group Address
-			//   - Check the cache of known group addresses. If there is data available from that group-id, it's returned
-			case KnxNetIpGroupAddress2LevelPlcField:
-			// - A Group Address
-			//   - Check the cache of known group addresses. If there is data available from that group-id, it's returned
-			case KnxNetIpGroupAddress1LevelPlcField:
 			}
+			results[queryName] = queryResults
 		}
-	}
-	return nil
+		sendResult(model.NewDefaultPlcBrowseResponse(browseRequest, results), nil)
+	}()
+	return result
 }
 
 func (b KnxNetIpBrowser) calculateAddresses(field apiModel.PlcField) ([]string, error) {
diff --git a/plc4go/internal/plc4go/spi/PlcBrowser.go b/plc4go/internal/plc4go/spi/PlcBrowser.go
index 2debdcc..d7b772c 100644
--- a/plc4go/internal/plc4go/spi/PlcBrowser.go
+++ b/plc4go/internal/plc4go/spi/PlcBrowser.go
@@ -23,6 +23,12 @@ import "github.com/apache/plc4x/plc4go/pkg/plc4go/model"
 type PlcBrowser interface {
 	// Non-Blocking request, which will return a full result as soon as the operation is finished
 	Browse(browseRequest model.PlcBrowseRequest) <-chan model.PlcBrowseRequestResult
-	// Blocking request, which calls a callback function on every found resource
-	BlockingBrowseWithCallback(browseRequest model.PlcBrowseRequest, callback func(result model.PlcBrowseEvent)) error
+
+	// Variant of the Browser, which allows immediately intercepting found resources
+	// This is ideal, if additional information has to be queried on such found resources
+	// and especially for connection-based protocols can reduce the stress on the system
+	// and increase throughput. It can also be used for simple filtering.
+	// If the interceptor function returns 'true' the result is added to the overall result
+	// if it's 'false' is is not.
+	BrowseWithInterceptor(browseRequest model.PlcBrowseRequest, interceptor func(result model.PlcBrowseEvent) bool) <-chan model.PlcBrowseRequestResult
 }
diff --git a/plc4go/internal/plc4go/spi/model/DefaultBrowseRequest.go b/plc4go/internal/plc4go/spi/model/DefaultBrowseRequest.go
index 7a23090..e438300 100644
--- a/plc4go/internal/plc4go/spi/model/DefaultBrowseRequest.go
+++ b/plc4go/internal/plc4go/spi/model/DefaultBrowseRequest.go
@@ -69,8 +69,8 @@ func (d DefaultPlcBrowseRequest) Execute() <-chan model.PlcBrowseRequestResult {
 	return d.browser.Browse(d)
 }
 
-func (d DefaultPlcBrowseRequest) ExecuteAsync(callback func(result model.PlcBrowseEvent)) error {
-	return d.browser.BlockingBrowseWithCallback(d, callback)
+func (d DefaultPlcBrowseRequest) ExecuteWithInterceptor(interceptor func(result model.PlcBrowseEvent) bool) <-chan model.PlcBrowseRequestResult {
+	return d.browser.BrowseWithInterceptor(d, interceptor)
 }
 
 type DefaultPlcBrowseResponse struct {
diff --git a/plc4go/pkg/plc4go/model/plc_browse.go b/plc4go/pkg/plc4go/model/plc_browse.go
index 277f658..23f1d0f 100644
--- a/plc4go/pkg/plc4go/model/plc_browse.go
+++ b/plc4go/pkg/plc4go/model/plc_browse.go
@@ -32,7 +32,7 @@ type PlcBrowseRequest interface {
 	// Will not return until a potential scan is finished and will return all results in one block
 	Execute() <-chan PlcBrowseRequestResult
 	// Will call the given callback for every found resource
-	ExecuteAsync(callback func(result PlcBrowseEvent)) error
+	ExecuteWithInterceptor(interceptor func(result PlcBrowseEvent) bool) <-chan PlcBrowseRequestResult
 	GetQueryNames() []string
 	GetQueryString(name string) string
 	PlcRequest