You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2021/03/22 10:53:08 UTC
[plc4x] branch develop updated: plc4go: fix concurrency issue in
testframework
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 0c15bad plc4go: fix concurrency issue in testframework
0c15bad is described below
commit 0c15bad738c7ddc88497ec34ee0716d263650ab7
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Mar 22 11:52:41 2021 +0100
plc4go: fix concurrency issue in testframework
---
.../cmd/main/drivers/tests/modbus_driver_test.go | 7 +---
plc4go/internal/plc4go/modbus/Reader.go | 14 ++++----
plc4go/internal/plc4go/modbus/Writer.go | 38 +++++++++++-----------
.../plc4go/spi/testutils/DriverTestRunner.go | 8 +++--
plc4go/pkg/plc4go/logging/init.go | 26 ++++++++++++---
5 files changed, 55 insertions(+), 38 deletions(-)
diff --git a/plc4go/cmd/main/drivers/tests/modbus_driver_test.go b/plc4go/cmd/main/drivers/tests/modbus_driver_test.go
index 5ce6a8e..601c388 100644
--- a/plc4go/cmd/main/drivers/tests/modbus_driver_test.go
+++ b/plc4go/cmd/main/drivers/tests/modbus_driver_test.go
@@ -26,10 +26,5 @@ import (
)
func TestModbusDriver(t *testing.T) {
- testutils.RunDriverTestsuite(t, modbus.NewDriver(), "assets/testing/protocols/modbus/DriverTestsuite.xml",
- // TODO: find out why these test fail. It seems like a timing issue because sometimes the work sometimes not
- "Single element read request",
- "Multi element read request",
- "Array element read request",
- )
+ testutils.RunDriverTestsuite(t, modbus.NewDriver(), "assets/testing/protocols/modbus/DriverTestsuite.xml")
}
diff --git a/plc4go/internal/plc4go/modbus/Reader.go b/plc4go/internal/plc4go/modbus/Reader.go
index 17cfd41..15a5679 100644
--- a/plc4go/internal/plc4go/modbus/Reader.go
+++ b/plc4go/internal/plc4go/modbus/Reader.go
@@ -19,13 +19,13 @@
package modbus
import (
- "errors"
readWriteModel "github.com/apache/plc4x/plc4go/internal/plc4go/modbus/readwrite/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
plc4goModel "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
"github.com/apache/plc4x/plc4go/pkg/plc4go/model"
"github.com/apache/plc4x/plc4go/pkg/plc4go/values"
+ "github.com/pkg/errors"
"math"
"sync/atomic"
"time"
@@ -64,7 +64,7 @@ func (m *Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequ
result <- model.PlcReadRequestResult{
Request: readRequest,
Response: nil,
- Err: errors.New("invalid field item type"),
+ Err: errors.Wrap(err, "invalid field item type"),
}
return
}
@@ -126,7 +126,7 @@ func (m *Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequ
if err != nil {
result <- model.PlcReadRequestResult{
Request: readRequest,
- Err: errors.New("Error decoding response: " + err.Error()),
+ Err: errors.Wrap(err, "Error decoding response"),
}
// TODO: should we return the error here?
return nil
@@ -140,7 +140,7 @@ func (m *Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequ
func(err error) error {
result <- model.PlcReadRequestResult{
Request: readRequest,
- Err: errors.New("got timeout while waiting for response"),
+ Err: errors.Wrap(err, "got timeout while waiting for response"),
}
return nil
},
@@ -148,7 +148,7 @@ func (m *Reader) Read(readRequest model.PlcReadRequest) <-chan model.PlcReadRequ
result <- model.PlcReadRequestResult{
Request: readRequest,
Response: nil,
- Err: errors.New("error sending message: " + err.Error()),
+ Err: errors.Wrap(err, "error sending message"),
}
}
}()
@@ -183,14 +183,14 @@ func (m *Reader) ToPlc4xReadResponse(responseAdu readWriteModel.ModbusTcpADU, re
fieldName := readRequest.GetFieldNames()[0]
field, err := CastToModbusFieldFromPlcField(readRequest.GetField(fieldName))
if err != nil {
- return nil, errors.New("error casting to modbus-field")
+ return nil, errors.Wrap(err, "error casting to modbus-field")
}
// Decode the data according to the information from the request
rb := utils.NewReadBuffer(data)
value, err := readWriteModel.DataItemParse(rb, field.Datatype, field.Quantity)
if err != nil {
- return nil, err
+ return nil, errors.Wrap(err, "Error parsing data item")
}
responseCodes := map[string]model.PlcResponseCode{}
plcValues := map[string]values.PlcValue{}
diff --git a/plc4go/internal/plc4go/modbus/Writer.go b/plc4go/internal/plc4go/modbus/Writer.go
index d7df184..6d3d8f0 100644
--- a/plc4go/internal/plc4go/modbus/Writer.go
+++ b/plc4go/internal/plc4go/modbus/Writer.go
@@ -19,13 +19,12 @@
package modbus
import (
- "errors"
- "fmt"
readWriteModel "github.com/apache/plc4x/plc4go/internal/plc4go/modbus/readwrite/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi"
plc4goModel "github.com/apache/plc4x/plc4go/internal/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/internal/plc4go/spi/utils"
"github.com/apache/plc4x/plc4go/pkg/plc4go/model"
+ "github.com/pkg/errors"
"math"
"sync/atomic"
"time"
@@ -47,8 +46,16 @@ func NewWriter(unitIdentifier uint8, messageCodec spi.MessageCodec) Writer {
func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
result := make(chan model.PlcWriteRequestResult)
- // If we are requesting only one field, use a
- if len(writeRequest.GetFieldNames()) == 1 {
+ go func() {
+ // If we are requesting only one field, use a
+ if len(writeRequest.GetFieldNames()) != 1 {
+ result <- model.PlcWriteRequestResult{
+ Request: writeRequest,
+ Response: nil,
+ Err: errors.New("modbus only supports single-item requests"),
+ }
+ return
+ }
fieldName := writeRequest.GetFieldNames()[0]
// Get the modbus field instance from the request
@@ -58,9 +65,9 @@ func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteR
result <- model.PlcWriteRequestResult{
Request: writeRequest,
Response: nil,
- Err: errors.New("invalid field item type"),
+ Err: errors.Wrap(err, "invalid field item type"),
}
- return result
+ return
}
// Get the value from the request and serialize it to a byte array
@@ -70,9 +77,9 @@ func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteR
result <- model.PlcWriteRequestResult{
Request: writeRequest,
Response: nil,
- Err: errors.New("error serializing value: " + err.Error()),
+ Err: errors.Wrap(err, "error serializing value"),
}
- return result
+ return
}
data := utils.Uint8ArrayToInt8Array(io.GetBytes())
@@ -97,14 +104,14 @@ func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteR
Response: nil,
Err: errors.New("modbus currently doesn't support extended register requests"),
}
- return result
+ return
default:
result <- model.PlcWriteRequestResult{
Request: writeRequest,
Response: nil,
Err: errors.New("unsupported field type"),
}
- return result
+ return
}
// Calculate a new unit identifier
@@ -138,7 +145,7 @@ func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteR
if err != nil {
result <- model.PlcWriteRequestResult{
Request: writeRequest,
- Err: errors.New("Error decoding response: " + err.Error()),
+ Err: errors.Wrap(err, "Error decoding response"),
}
} else {
result <- model.PlcWriteRequestResult{
@@ -156,14 +163,7 @@ func (m Writer) Write(writeRequest model.PlcWriteRequest) <-chan model.PlcWriteR
return nil
},
time.Second*1)
- } else {
- result <- model.PlcWriteRequestResult{
- Request: writeRequest,
- Response: nil,
- Err: errors.New("modbus only supports single-item requests"),
- }
- }
- fmt.Printf("Write Request %s", writeRequest)
+ }()
return result
}
diff --git a/plc4go/internal/plc4go/spi/testutils/DriverTestRunner.go b/plc4go/internal/plc4go/spi/testutils/DriverTestRunner.go
index ce6c95c..88e709c 100644
--- a/plc4go/internal/plc4go/spi/testutils/DriverTestRunner.go
+++ b/plc4go/internal/plc4go/spi/testutils/DriverTestRunner.go
@@ -213,9 +213,13 @@ func (m DriverTestsuite) ExecuteStep(connection plc4go.PlcConnection, testcase *
expectedRawOutput := wb.GetBytes()
expectedRawOutputLength := uint32(len(expectedRawOutput))
+ now := time.Now()
// Read exactly this amount of bytes from the transport
- if testTransportInstance.GetNumDrainableBytes() < expectedRawOutputLength {
- return errors.Errorf("error getting bytes from transport. Not enough data available: actual(%d)<expected(%d)", testTransportInstance.GetNumDrainableBytes(), expectedRawOutputLength)
+ for testTransportInstance.GetNumDrainableBytes() < expectedRawOutputLength {
+ if time.Now().Sub(now) > 2*time.Second {
+ return errors.Errorf("error getting bytes from transport. Not enough data available: actual(%d)<expected(%d)", testTransportInstance.GetNumDrainableBytes(), expectedRawOutputLength)
+ }
+ time.Sleep(10 * time.Millisecond)
}
rawOutput, err := testTransportInstance.DrainWriteBuffer(expectedRawOutputLength)
if err != nil {
diff --git a/plc4go/pkg/plc4go/logging/init.go b/plc4go/pkg/plc4go/logging/init.go
index e110e63..185e0a6 100644
--- a/plc4go/pkg/plc4go/logging/init.go
+++ b/plc4go/pkg/plc4go/logging/init.go
@@ -23,17 +23,35 @@ import (
"github.com/rs/zerolog/log"
)
+var oldLogger zerolog.Logger
+
// init is used for _ imports for easy log config
func init() {
+ oldLogger = log.Logger
+ log.Logger.Level(zerolog.ErrorLevel)
+}
+
+// ErrorLevel configures zerolog to WarnLevel
+func ErrorLevel() {
log.Logger.Level(zerolog.ErrorLevel)
}
-// Info configures zerolog to InfoLevel
-func Info() {
+// WarnLevel configures zerolog to WarnLevel
+func WarnLevel() {
+ log.Logger.Level(zerolog.WarnLevel)
+}
+
+// InfoLevel configures zerolog to InfoLevel
+func InfoLevel() {
log.Logger.Level(zerolog.InfoLevel)
}
-// Debug configures zerolog to InfoLevel
-func Debug() {
+// DebugLevel configures zerolog to InfoLevel
+func DebugLevel() {
log.Logger.Level(zerolog.DebugLevel)
}
+
+// ResetLogging can be used to reset to the old log settings
+func ResetLogging() {
+ log.Logger = oldLogger
+}