You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2021/03/22 10:53:08 UTC

[plc4x] branch develop updated: plc4go: fix concurrency issue in testframework

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0c15bad  plc4go: fix concurrency issue in testframework
0c15bad is described below

commit 0c15bad738c7ddc88497ec34ee0716d263650ab7
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Mar 22 11:52:41 2021 +0100

    plc4go: fix concurrency issue in testframework
---
 .../cmd/main/drivers/tests/modbus_driver_test.go   |  7 +---
 plc4go/internal/plc4go/modbus/Reader.go            | 14 ++++----
 plc4go/internal/plc4go/modbus/Writer.go            | 38 +++++++++++-----------
 .../plc4go/spi/testutils/DriverTestRunner.go       |  8 +++--
 plc4go/pkg/plc4go/logging/init.go                  | 26 ++++++++++++---
 5 files changed, 55 insertions(+), 38 deletions(-)

diff --git a/plc4go/cmd/main/drivers/tests/modbus_driver_test.go b/plc4go/cmd/main/drivers/tests/modbus_driver_test.go
index 5ce6a8e..601c388 100644
--- a/plc4go/cmd/main/drivers/tests/modbus_driver_test.go
+++ b/plc4go/cmd/main/drivers/tests/modbus_driver_test.go
@@ -26,10 +26,5 @@ import (
 )
 
 func TestModbusDriver(t *testing.T) {
-	testutils.RunDriverTestsuite(t, modbus.NewDriver(), "assets/testing/protocols/modbus/DriverTestsuite.xml",
-		// TODO: find out why these test fail. It seems like a timing issue because sometimes the work sometimes not
-		"Single element read request",
-		"Multi element read request",
-		"Array element read request",
-	)
+	testutils.RunDriverTestsuite(t, modbus.NewDriver(), "assets/testing/protocols/modbus/DriverTestsuite.xml")
 }
diff --git a/plc4go/internal/plc4go/modbus/Reader.go b/plc4go/internal/plc4go/modbus/Reader.go
index 17cfd41..15a5679 100644
--- a/plc4go/internal/plc4go/modbus/Reader.go
+++ b/plc4go/internal/plc4go/modbus/Reader.go
@@ -19,13 +19,13 @@
 package modbus
 
 import (
-	"errors"
 	readWriteModel "github.com/apache/plc4x/plc4go/internal/plc4go/modbus/readwrite/model"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
 	plc4goModel "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
 	"github.com/apache/plc4x/plc4go/pkg/plc4go/model"
 	"github.com/apache/plc4x/plc4go/pkg/plc4go/values"
+	"github.com/pkg/errors"
 	"math"
 	"sync/atomic"
 	"time"
@@ -64,7 +64,7 @@ func (m *Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequ
 			result <- model.PlcReadRequestResult{
 				Request:  readRequest,
 				Response: nil,
-				Err:      errors.New("invalid field item type"),
+				Err:      errors.Wrap(err, "invalid field item type"),
 			}
 			return
 		}
@@ -126,7 +126,7 @@ func (m *Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequ
 				if err != nil {
 					result <- model.PlcReadRequestResult{
 						Request: readRequest,
-						Err:     errors.New("Error decoding response: " + err.Error()),
+						Err:     errors.Wrap(err, "Error decoding response"),
 					}
 					// TODO: should we return the error here?
 					return nil
@@ -140,7 +140,7 @@ func (m *Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequ
 			func(err error) error {
 				result <- model.PlcReadRequestResult{
 					Request: readRequest,
-					Err:     errors.New("got timeout while waiting for response"),
+					Err:     errors.Wrap(err, "got timeout while waiting for response"),
 				}
 				return nil
 			},
@@ -148,7 +148,7 @@ func (m *Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequ
 			result <- model.PlcReadRequestResult{
 				Request:  readRequest,
 				Response: nil,
-				Err:      errors.New("error sending message: " + err.Error()),
+				Err:      errors.Wrap(err, "error sending message"),
 			}
 		}
 	}()
@@ -183,14 +183,14 @@ func (m *Reader) ToPlc4xReadResponse(responseAdu readWriteModel.ModbusTcpADU, re
 	fieldName := readRequest.GetFieldNames()[0]
 	field, err := CastToModbusFieldFromPlcField(readRequest.GetField(fieldName))
 	if err != nil {
-		return nil, errors.New("error casting to modbus-field")
+		return nil, errors.Wrap(err, "error casting to modbus-field")
 	}
 
 	// Decode the data according to the information from the request
 	rb := utils.NewReadBuffer(data)
 	value, err := readWriteModel.DataItemParse(rb, field.Datatype, field.Quantity)
 	if err != nil {
-		return nil, err
+		return nil, errors.Wrap(err, "Error parsing data item")
 	}
 	responseCodes := map[string]model.PlcResponseCode{}
 	plcValues := map[string]values.PlcValue{}
diff --git a/plc4go/internal/plc4go/modbus/Writer.go b/plc4go/internal/plc4go/modbus/Writer.go
index d7df184..6d3d8f0 100644
--- a/plc4go/internal/plc4go/modbus/Writer.go
+++ b/plc4go/internal/plc4go/modbus/Writer.go
@@ -19,13 +19,12 @@
 package modbus
 
 import (
-	"errors"
-	"fmt"
 	readWriteModel "github.com/apache/plc4x/plc4go/internal/plc4go/modbus/readwrite/model"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
 	plc4goModel "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model"
 	"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
 	"github.com/apache/plc4x/plc4go/pkg/plc4go/model"
+	"github.com/pkg/errors"
 	"math"
 	"sync/atomic"
 	"time"
@@ -47,8 +46,16 @@ func NewWriter(unitIdentifier uint8, messageCodec spi.MessageCodec) Writer {
 
 func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
 	result := make(chan model.PlcWriteRequestResult)
-	// If we are requesting only one field, use a
-	if len(writeRequest.GetFieldNames()) == 1 {
+	go func() {
+		// If we are requesting only one field, use a
+		if len(writeRequest.GetFieldNames()) != 1 {
+			result <- model.PlcWriteRequestResult{
+				Request:  writeRequest,
+				Response: nil,
+				Err:      errors.New("modbus only supports single-item requests"),
+			}
+			return
+		}
 		fieldName := writeRequest.GetFieldNames()[0]
 
 		// Get the modbus field instance from the request
@@ -58,9 +65,9 @@ func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteR
 			result <- model.PlcWriteRequestResult{
 				Request:  writeRequest,
 				Response: nil,
-				Err:      errors.New("invalid field item type"),
+				Err:      errors.Wrap(err, "invalid field item type"),
 			}
-			return result
+			return
 		}
 
 		// Get the value from the request and serialize it to a byte array
@@ -70,9 +77,9 @@ func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteR
 			result <- model.PlcWriteRequestResult{
 				Request:  writeRequest,
 				Response: nil,
-				Err:      errors.New("error serializing value: " + err.Error()),
+				Err:      errors.Wrap(err, "error serializing value"),
 			}
-			return result
+			return
 		}
 		data := utils.Uint8ArrayToInt8Array(io.GetBytes())
 
@@ -97,14 +104,14 @@ func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteR
 				Response: nil,
 				Err:      errors.New("modbus currently doesn't support extended register requests"),
 			}
-			return result
+			return
 		default:
 			result <- model.PlcWriteRequestResult{
 				Request:  writeRequest,
 				Response: nil,
 				Err:      errors.New("unsupported field type"),
 			}
-			return result
+			return
 		}
 
 		// Calculate a new unit identifier
@@ -138,7 +145,7 @@ func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteR
 				if err != nil {
 					result <- model.PlcWriteRequestResult{
 						Request: writeRequest,
-						Err:     errors.New("Error decoding response: " + err.Error()),
+						Err:     errors.Wrap(err, "Error decoding response"),
 					}
 				} else {
 					result <- model.PlcWriteRequestResult{
@@ -156,14 +163,7 @@ func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteR
 				return nil
 			},
 			time.Second*1)
-	} else {
-		result <- model.PlcWriteRequestResult{
-			Request:  writeRequest,
-			Response: nil,
-			Err:      errors.New("modbus only supports single-item requests"),
-		}
-	}
-	fmt.Printf("Write Request %s", writeRequest)
+	}()
 	return result
 }
 
diff --git a/plc4go/internal/plc4go/spi/testutils/DriverTestRunner.go b/plc4go/internal/plc4go/spi/testutils/DriverTestRunner.go
index ce6c95c..88e709c 100644
--- a/plc4go/internal/plc4go/spi/testutils/DriverTestRunner.go
+++ b/plc4go/internal/plc4go/spi/testutils/DriverTestRunner.go
@@ -213,9 +213,13 @@ func (m DriverTestsuite) ExecuteStep(connection plc4go.PlcConnection, testcase *
 		expectedRawOutput := wb.GetBytes()
 		expectedRawOutputLength := uint32(len(expectedRawOutput))
 
+		now := time.Now()
 		// Read exactly this amount of bytes from the transport
-		if testTransportInstance.GetNumDrainableBytes() < expectedRawOutputLength {
-			return errors.Errorf("error getting bytes from transport. Not enough data available: actual(%d)<expected(%d)", testTransportInstance.GetNumDrainableBytes(), expectedRawOutputLength)
+		for testTransportInstance.GetNumDrainableBytes() < expectedRawOutputLength {
+			if time.Now().Sub(now) > 2*time.Second {
+				return errors.Errorf("error getting bytes from transport. Not enough data available: actual(%d)<expected(%d)", testTransportInstance.GetNumDrainableBytes(), expectedRawOutputLength)
+			}
+			time.Sleep(10 * time.Millisecond)
 		}
 		rawOutput, err := testTransportInstance.DrainWriteBuffer(expectedRawOutputLength)
 		if err != nil {
diff --git a/plc4go/pkg/plc4go/logging/init.go b/plc4go/pkg/plc4go/logging/init.go
index e110e63..185e0a6 100644
--- a/plc4go/pkg/plc4go/logging/init.go
+++ b/plc4go/pkg/plc4go/logging/init.go
@@ -23,17 +23,35 @@ import (
 	"github.com/rs/zerolog/log"
 )
 
+var oldLogger zerolog.Logger
+
 // init is used for _ imports for easy log config
 func init() {
+	oldLogger = log.Logger
+	log.Logger.Level(zerolog.ErrorLevel)
+}
+
+// ErrorLevel configures zerolog to WarnLevel
+func ErrorLevel() {
 	log.Logger.Level(zerolog.ErrorLevel)
 }
 
-// Info configures zerolog to InfoLevel
-func Info() {
+// WarnLevel configures zerolog to WarnLevel
+func WarnLevel() {
+	log.Logger.Level(zerolog.WarnLevel)
+}
+
+// InfoLevel configures zerolog to InfoLevel
+func InfoLevel() {
 	log.Logger.Level(zerolog.InfoLevel)
 }
 
-// Debug configures zerolog to InfoLevel
-func Debug() {
+// DebugLevel configures zerolog to InfoLevel
+func DebugLevel() {
 	log.Logger.Level(zerolog.DebugLevel)
 }
+
+// ResetLogging can be used to reset to the old log settings
+func ResetLogging() {
+	log.Logger = oldLogger
+}