You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/05 08:43:05 UTC
[plc4x] branch develop updated: refactor(plc4go): introduced DefaultBufferedTransportInstance to consolidate common code
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 4479cb056 refactor(plc4go): introduced DefaultBufferedTransportInstance to consolidate common code
4479cb056 is described below
commit 4479cb05643e9a81ff0987e1e74c515aefab5cc7
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Aug 5 10:42:57 2022 +0200
refactor(plc4go): introduced DefaultBufferedTransportInstance to consolidate common code
---
.../internal/spi/transports/TransportInstance.go | 52 ++++++++++++++++++++++
plc4go/internal/spi/transports/pcap/Transport.go | 48 +-------------------
plc4go/internal/spi/transports/serial/Transport.go | 48 +-------------------
plc4go/internal/spi/transports/tcp/Transport.go | 48 +-------------------
plc4go/internal/spi/transports/udp/Transport.go | 4 +-
.../spi/transports/utils/TransportLogger.go | 4 +-
plc4go/tests/drivers/s7_test.go | 44 ------------------
.../drivers/tests/manual_bacnet_PcapTest_test.go | 2 +-
8 files changed, 63 insertions(+), 187 deletions(-)
diff --git a/plc4go/internal/spi/transports/TransportInstance.go b/plc4go/internal/spi/transports/TransportInstance.go
index 72d9669b9..061f0330f 100644
--- a/plc4go/internal/spi/transports/TransportInstance.go
+++ b/plc4go/internal/spi/transports/TransportInstance.go
@@ -21,6 +21,7 @@ package transports
import (
"bufio"
+ "github.com/pkg/errors"
)
type TransportInstance interface {
@@ -45,3 +46,54 @@ type TestTransportInstance interface {
GetNumDrainableBytes() uint32
DrainWriteBuffer(numBytes uint32) ([]uint8, error)
}
+
+type DefaultBufferedTransportInstance struct {
+ *bufio.Reader
+}
+
+func (m *DefaultBufferedTransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
+ if m.Reader == nil {
+ return 0, nil
+ }
+ _, _ = m.Peek(1)
+ return uint32(m.Buffered()), nil
+}
+
+func (m *DefaultBufferedTransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
+ if m.Reader == nil {
+ return nil
+ }
+ nBytes := uint32(1)
+ for {
+ bytes, err := m.PeekReadableBytes(nBytes)
+ if err != nil {
+ return errors.Wrap(err, "Error while peeking")
+ }
+ if keepGoing := until(uint(nBytes-1), bytes[len(bytes)-1], m.Reader); !keepGoing {
+ return nil
+ }
+ nBytes++
+ }
+}
+
+func (m *DefaultBufferedTransportInstance) PeekReadableBytes(numBytes uint32) ([]uint8, error) {
+ if m.Reader == nil {
+ return nil, errors.New("error peeking from transport. No reader available")
+ }
+ return m.Peek(int(numBytes))
+}
+
+func (m *DefaultBufferedTransportInstance) Read(numBytes uint32) ([]uint8, error) {
+ if m.Reader == nil {
+ return nil, errors.New("error reading from transport. No reader available")
+ }
+ data := make([]uint8, numBytes)
+ for i := uint32(0); i < numBytes; i++ {
+ val, err := m.ReadByte()
+ if err != nil {
+ return nil, errors.Wrap(err, "error reading")
+ }
+ data[i] = val
+ }
+ return data, nil
+}
diff --git a/plc4go/internal/spi/transports/pcap/Transport.go b/plc4go/internal/spi/transports/pcap/Transport.go
index dcf26de4b..55ec1535a 100644
--- a/plc4go/internal/spi/transports/pcap/Transport.go
+++ b/plc4go/internal/spi/transports/pcap/Transport.go
@@ -79,13 +79,13 @@ func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[str
}
type TransportInstance struct {
+ transports.DefaultBufferedTransportInstance
transportFile string
transportType TransportType
portRange string
speedFactor float32
connected bool
transport *Transport
- reader *bufio.Reader
handle *pcap.Handle
mutex sync.Mutex
}
@@ -118,7 +118,7 @@ func (m *TransportInstance) Connect() error {
m.handle = handle
m.connected = true
buffer := new(bytes.Buffer)
- m.reader = bufio.NewReader(buffer)
+ m.Reader = bufio.NewReader(buffer)
go func(m *TransportInstance, buffer *bytes.Buffer) {
packageCount := 0
@@ -183,50 +183,6 @@ func (m *TransportInstance) IsConnected() bool {
return m.connected
}
-func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
- if m.reader == nil {
- return 0, nil
- }
- _, _ = m.reader.Peek(1)
- return uint32(m.reader.Buffered()), nil
-}
-
-func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
- nBytes := uint32(1)
- for {
- _bytes, err := m.PeekReadableBytes(nBytes)
- if err != nil {
- return errors.Wrap(err, "Error while peeking")
- }
- if keepGoing := until(uint(nBytes-1), _bytes[len(_bytes)-1], m.reader); !keepGoing {
- return nil
- }
- nBytes++
- }
-}
-
-func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]uint8, error) {
- if m.reader == nil {
- return nil, errors.New("error peeking from transport. No reader available")
- }
- return m.reader.Peek(int(numBytes))
-}
-
-func (m *TransportInstance) Read(numBytes uint32) ([]uint8, error) {
- if m.reader == nil {
- return nil, errors.New("error reading from transport. No reader available")
- }
- data := make([]uint8, numBytes)
- for i := uint32(0); i < numBytes; i++ {
- val, err := m.reader.ReadByte()
- if err != nil {
- return nil, errors.Wrap(err, "error reading")
- }
- data[i] = val
- }
- return data, nil
-}
-
func (m *TransportInstance) Write(_ []uint8) error {
panic("Write to pcap not supported")
}
diff --git a/plc4go/internal/spi/transports/serial/Transport.go b/plc4go/internal/spi/transports/serial/Transport.go
index 07582d7c4..d0a08b43d 100644
--- a/plc4go/internal/spi/transports/serial/Transport.go
+++ b/plc4go/internal/spi/transports/serial/Transport.go
@@ -76,12 +76,12 @@ func (m Transport) CreateTransportInstanceForLocalAddress(transportUrl url.URL,
}
type TransportInstance struct {
+ transports.DefaultBufferedTransportInstance
SerialPortName string
BaudRate uint
ConnectTimeout uint32
transport *Transport
serialPort io.ReadWriteCloser
- reader *bufio.Reader
}
func NewTransportInstance(serialPortName string, baudRate uint, connectTimeout uint32, transport *Transport) *TransportInstance {
@@ -109,7 +109,7 @@ func (m *TransportInstance) Connect() error {
m.serialPort = utils.NewTransportLogger(m.serialPort, utils.WithLogger(fileLogger))
log.Trace().Msgf("Logging Transport to file %s", logFile.Name())
}*/
- m.reader = bufio.NewReader(m.serialPort)
+ m.Reader = bufio.NewReader(m.serialPort)
return nil
}
@@ -130,50 +130,6 @@ func (m *TransportInstance) IsConnected() bool {
return m.serialPort != nil
}
-func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
- if m.reader == nil {
- return 0, nil
- }
- _, _ = m.reader.Peek(1)
- return uint32(m.reader.Buffered()), nil
-}
-
-func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
- nBytes := uint32(1)
- for {
- bytes, err := m.PeekReadableBytes(nBytes)
- if err != nil {
- return errors.Wrap(err, "Error while peeking")
- }
- if keepGoing := until(uint(nBytes-1), bytes[len(bytes)-1], m.reader); !keepGoing {
- return nil
- }
- nBytes++
- }
-}
-
-func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]uint8, error) {
- if m.reader == nil {
- return nil, errors.New("error peeking from transport. No reader available")
- }
- return m.reader.Peek(int(numBytes))
-}
-
-func (m *TransportInstance) Read(numBytes uint32) ([]uint8, error) {
- if m.reader == nil {
- return nil, errors.New("error reading from transport. No reader available")
- }
- data := make([]uint8, numBytes)
- numBytesRead, err := m.reader.Read(data)
- if err != nil {
- return nil, errors.Wrap(err, "error reading")
- }
- if uint32(numBytesRead) != numBytes {
- return nil, errors.Wrapf(err, "could only read %d of %d bytes", numBytesRead, numBytes)
- }
- return data, nil
-}
-
func (m *TransportInstance) Write(data []uint8) error {
if m.serialPort == nil {
return errors.New("error writing to transport. No writer available")
diff --git a/plc4go/internal/spi/transports/tcp/Transport.go b/plc4go/internal/spi/transports/tcp/Transport.go
index 5c49e8639..93fb25945 100644
--- a/plc4go/internal/spi/transports/tcp/Transport.go
+++ b/plc4go/internal/spi/transports/tcp/Transport.go
@@ -94,12 +94,12 @@ func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[str
}
type TransportInstance struct {
+ transports.DefaultBufferedTransportInstance
RemoteAddress *net.TCPAddr
LocalAddress *net.TCPAddr
ConnectTimeout uint32
transport *Transport
tcpConn net.Conn
- reader *bufio.Reader
}
func NewTcpTransportInstance(remoteAddress *net.TCPAddr, connectTimeout uint32, transport *Transport) *TransportInstance {
@@ -119,7 +119,7 @@ func (m *TransportInstance) Connect() error {
m.LocalAddress = m.tcpConn.LocalAddr().(*net.TCPAddr)
- m.reader = bufio.NewReader(m.tcpConn)
+ m.Reader = bufio.NewReader(m.tcpConn)
return nil
}
@@ -140,50 +140,6 @@ func (m *TransportInstance) IsConnected() bool {
return m.tcpConn != nil
}
-func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
- if m.reader == nil {
- return 0, nil
- }
- _, _ = m.reader.Peek(1)
- return uint32(m.reader.Buffered()), nil
-}
-
-func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
- nBytes := uint32(1)
- for {
- bytes, err := m.PeekReadableBytes(nBytes)
- if err != nil {
- return errors.Wrap(err, "Error while peeking")
- }
- if keepGoing := until(uint(nBytes-1), bytes[len(bytes)-1], m.reader); !keepGoing {
- return nil
- }
- nBytes++
- }
-}
-
-func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]uint8, error) {
- if m.reader == nil {
- return nil, errors.New("error peeking from transport. No reader available")
- }
- return m.reader.Peek(int(numBytes))
-}
-
-func (m *TransportInstance) Read(numBytes uint32) ([]uint8, error) {
- if m.reader == nil {
- return nil, errors.New("error reading from transport. No reader available")
- }
- data := make([]uint8, numBytes)
- for i := uint32(0); i < numBytes; i++ {
- val, err := m.reader.ReadByte()
- if err != nil {
- return nil, errors.Wrap(err, "error reading")
- }
- data[i] = val
- }
- return data, nil
-}
-
func (m *TransportInstance) Write(data []uint8) error {
if m.tcpConn == nil {
return errors.New("error writing to transport. No writer available")
diff --git a/plc4go/internal/spi/transports/udp/Transport.go b/plc4go/internal/spi/transports/udp/Transport.go
index c8eea0e8b..4f01865a8 100644
--- a/plc4go/internal/spi/transports/udp/Transport.go
+++ b/plc4go/internal/spi/transports/udp/Transport.go
@@ -201,11 +201,11 @@ func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
nBytes := uint32(1)
for {
- bytes, err := m.PeekReadableBytes(nBytes)
+ _bytes, err := m.PeekReadableBytes(nBytes)
if err != nil {
return errors.Wrap(err, "Error while peeking")
}
- if keepGoing := until(uint(nBytes-1), bytes[len(bytes)-1], m.reader); !keepGoing {
+ if keepGoing := until(uint(nBytes-1), _bytes[len(_bytes)-1], m.reader); !keepGoing {
return nil
}
nBytes++
diff --git a/plc4go/internal/spi/transports/utils/TransportLogger.go b/plc4go/internal/spi/transports/utils/TransportLogger.go
index 549b0c1e8..7a0b72c4c 100644
--- a/plc4go/internal/spi/transports/utils/TransportLogger.go
+++ b/plc4go/internal/spi/transports/utils/TransportLogger.go
@@ -50,7 +50,7 @@ func WithLogger(log zerolog.Logger) Option {
func (t *TransportLogger) Read(p []byte) (int, error) {
bytesRead, err := t.source.Read(p)
if bytesRead > 0 {
- t.log.Printf("Read: %s", p[:bytesRead])
+ t.log.Printf("Read: %+q", p[:bytesRead])
}
return bytesRead, err
}
@@ -58,7 +58,7 @@ func (t *TransportLogger) Read(p []byte) (int, error) {
func (t *TransportLogger) Write(p []byte) (int, error) {
bytesWritten, err := t.source.Write(p)
if bytesWritten > 0 {
- t.log.Printf("Write: %s", p[:bytesWritten])
+ t.log.Printf("Write: %+q", p[:bytesWritten])
}
return bytesWritten, err
}
diff --git a/plc4go/tests/drivers/s7_test.go b/plc4go/tests/drivers/s7_test.go
deleted file mode 100644
index db91b70c9..000000000
--- a/plc4go/tests/drivers/s7_test.go
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package drivers
-
-import (
- "encoding/hex"
- "github.com/apache/plc4x/plc4go/internal/spi/utils"
- "github.com/apache/plc4x/plc4go/protocols/s7/readwrite/model"
- _ "github.com/apache/plc4x/plc4go/tests/initializetest"
- "testing"
-)
-
-func TestS7(t *testing.T) {
- t.Skip()
- request, err := hex.DecodeString("000a00000006010300000004")
- if err != nil {
- // Output an error ...
- }
- rb := utils.NewReadBufferByteBased(request)
- adu, err := model.TPKTPacketParse(rb)
- if err != nil {
- t.Errorf("Error parsing: %s", err)
- }
- if adu != nil {
- // Output success ...
- }
-}
diff --git a/plc4go/tests/drivers/tests/manual_bacnet_PcapTest_test.go b/plc4go/tests/drivers/tests/manual_bacnet_PcapTest_test.go
index 9feb39c13..41c1f09fd 100644
--- a/plc4go/tests/drivers/tests/manual_bacnet_PcapTest_test.go
+++ b/plc4go/tests/drivers/tests/manual_bacnet_PcapTest_test.go
@@ -37,7 +37,7 @@ import (
"time"
)
-func Test(t *testing.T) {
+func TestBacnetDriverWithPcap(t *testing.T) {
t.Skip() // Manual test don't check in un-skipped
config.TraceTransactionManagerWorkers = false