You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/05 08:43:05 UTC

[plc4x] branch develop updated: refactor(plc4go): introduced DefaultBufferedTransportInstance to consolidate common code

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4479cb056 refactor(plc4go): introduced DefaultBufferedTransportInstance to consolidate common code
4479cb056 is described below

commit 4479cb05643e9a81ff0987e1e74c515aefab5cc7
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Aug 5 10:42:57 2022 +0200

    refactor(plc4go): introduced DefaultBufferedTransportInstance to consolidate common code
---
 .../internal/spi/transports/TransportInstance.go   | 52 ++++++++++++++++++++++
 plc4go/internal/spi/transports/pcap/Transport.go   | 48 +-------------------
 plc4go/internal/spi/transports/serial/Transport.go | 48 +-------------------
 plc4go/internal/spi/transports/tcp/Transport.go    | 48 +-------------------
 plc4go/internal/spi/transports/udp/Transport.go    |  4 +-
 .../spi/transports/utils/TransportLogger.go        |  4 +-
 plc4go/tests/drivers/s7_test.go                    | 44 ------------------
 .../drivers/tests/manual_bacnet_PcapTest_test.go   |  2 +-
 8 files changed, 63 insertions(+), 187 deletions(-)

diff --git a/plc4go/internal/spi/transports/TransportInstance.go b/plc4go/internal/spi/transports/TransportInstance.go
index 72d9669b9..061f0330f 100644
--- a/plc4go/internal/spi/transports/TransportInstance.go
+++ b/plc4go/internal/spi/transports/TransportInstance.go
@@ -21,6 +21,7 @@ package transports
 
 import (
 	"bufio"
+	"github.com/pkg/errors"
 )
 
 type TransportInstance interface {
@@ -45,3 +46,54 @@ type TestTransportInstance interface {
 	GetNumDrainableBytes() uint32
 	DrainWriteBuffer(numBytes uint32) ([]uint8, error)
 }
+
+type DefaultBufferedTransportInstance struct {
+	*bufio.Reader
+}
+
+func (m *DefaultBufferedTransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
+	if m.Reader == nil {
+		return 0, nil
+	}
+	_, _ = m.Peek(1)
+	return uint32(m.Buffered()), nil
+}
+
+func (m *DefaultBufferedTransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
+	if m.Reader == nil {
+		return nil
+	}
+	nBytes := uint32(1)
+	for {
+		bytes, err := m.PeekReadableBytes(nBytes)
+		if err != nil {
+			return errors.Wrap(err, "Error while peeking")
+		}
+		if keepGoing := until(uint(nBytes-1), bytes[len(bytes)-1], m.Reader); !keepGoing {
+			return nil
+		}
+		nBytes++
+	}
+}
+
+func (m *DefaultBufferedTransportInstance) PeekReadableBytes(numBytes uint32) ([]uint8, error) {
+	if m.Reader == nil {
+		return nil, errors.New("error peeking from transport. No reader available")
+	}
+	return m.Peek(int(numBytes))
+}
+
+func (m *DefaultBufferedTransportInstance) Read(numBytes uint32) ([]uint8, error) {
+	if m.Reader == nil {
+		return nil, errors.New("error reading from transport. No reader available")
+	}
+	data := make([]uint8, numBytes)
+	for i := uint32(0); i < numBytes; i++ {
+		val, err := m.ReadByte()
+		if err != nil {
+			return nil, errors.Wrap(err, "error reading")
+		}
+		data[i] = val
+	}
+	return data, nil
+}
diff --git a/plc4go/internal/spi/transports/pcap/Transport.go b/plc4go/internal/spi/transports/pcap/Transport.go
index dcf26de4b..55ec1535a 100644
--- a/plc4go/internal/spi/transports/pcap/Transport.go
+++ b/plc4go/internal/spi/transports/pcap/Transport.go
@@ -79,13 +79,13 @@ func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[str
 }
 
 type TransportInstance struct {
+	transports.DefaultBufferedTransportInstance
 	transportFile string
 	transportType TransportType
 	portRange     string
 	speedFactor   float32
 	connected     bool
 	transport     *Transport
-	reader        *bufio.Reader
 	handle        *pcap.Handle
 	mutex         sync.Mutex
 }
@@ -118,7 +118,7 @@ func (m *TransportInstance) Connect() error {
 	m.handle = handle
 	m.connected = true
 	buffer := new(bytes.Buffer)
-	m.reader = bufio.NewReader(buffer)
+	m.Reader = bufio.NewReader(buffer)
 
 	go func(m *TransportInstance, buffer *bytes.Buffer) {
 		packageCount := 0
@@ -183,50 +183,6 @@ func (m *TransportInstance) IsConnected() bool {
 	return m.connected
 }
 
-func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
-	if m.reader == nil {
-		return 0, nil
-	}
-	_, _ = m.reader.Peek(1)
-	return uint32(m.reader.Buffered()), nil
-}
-
-func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
-	nBytes := uint32(1)
-	for {
-		_bytes, err := m.PeekReadableBytes(nBytes)
-		if err != nil {
-			return errors.Wrap(err, "Error while peeking")
-		}
-		if keepGoing := until(uint(nBytes-1), _bytes[len(_bytes)-1], m.reader); !keepGoing {
-			return nil
-		}
-		nBytes++
-	}
-}
-
-func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]uint8, error) {
-	if m.reader == nil {
-		return nil, errors.New("error peeking from transport. No reader available")
-	}
-	return m.reader.Peek(int(numBytes))
-}
-
-func (m *TransportInstance) Read(numBytes uint32) ([]uint8, error) {
-	if m.reader == nil {
-		return nil, errors.New("error reading from transport. No reader available")
-	}
-	data := make([]uint8, numBytes)
-	for i := uint32(0); i < numBytes; i++ {
-		val, err := m.reader.ReadByte()
-		if err != nil {
-			return nil, errors.Wrap(err, "error reading")
-		}
-		data[i] = val
-	}
-	return data, nil
-}
-
 func (m *TransportInstance) Write(_ []uint8) error {
 	panic("Write to pcap not supported")
 }
diff --git a/plc4go/internal/spi/transports/serial/Transport.go b/plc4go/internal/spi/transports/serial/Transport.go
index 07582d7c4..d0a08b43d 100644
--- a/plc4go/internal/spi/transports/serial/Transport.go
+++ b/plc4go/internal/spi/transports/serial/Transport.go
@@ -76,12 +76,12 @@ func (m Transport) CreateTransportInstanceForLocalAddress(transportUrl url.URL,
 }
 
 type TransportInstance struct {
+	transports.DefaultBufferedTransportInstance
 	SerialPortName string
 	BaudRate       uint
 	ConnectTimeout uint32
 	transport      *Transport
 	serialPort     io.ReadWriteCloser
-	reader         *bufio.Reader
 }
 
 func NewTransportInstance(serialPortName string, baudRate uint, connectTimeout uint32, transport *Transport) *TransportInstance {
@@ -109,7 +109,7 @@ func (m *TransportInstance) Connect() error {
 		m.serialPort = utils.NewTransportLogger(m.serialPort, utils.WithLogger(fileLogger))
 		log.Trace().Msgf("Logging Transport to file %s", logFile.Name())
 	}*/
-	m.reader = bufio.NewReader(m.serialPort)
+	m.Reader = bufio.NewReader(m.serialPort)
 
 	return nil
 }
@@ -130,50 +130,6 @@ func (m *TransportInstance) IsConnected() bool {
 	return m.serialPort != nil
 }
 
-func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
-	if m.reader == nil {
-		return 0, nil
-	}
-	_, _ = m.reader.Peek(1)
-	return uint32(m.reader.Buffered()), nil
-}
-
-func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
-	nBytes := uint32(1)
-	for {
-		bytes, err := m.PeekReadableBytes(nBytes)
-		if err != nil {
-			return errors.Wrap(err, "Error while peeking")
-		}
-		if keepGoing := until(uint(nBytes-1), bytes[len(bytes)-1], m.reader); !keepGoing {
-			return nil
-		}
-		nBytes++
-	}
-}
-
-func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]uint8, error) {
-	if m.reader == nil {
-		return nil, errors.New("error peeking from transport. No reader available")
-	}
-	return m.reader.Peek(int(numBytes))
-}
-
-func (m *TransportInstance) Read(numBytes uint32) ([]uint8, error) {
-	if m.reader == nil {
-		return nil, errors.New("error reading from transport. No reader available")
-	}
-	data := make([]uint8, numBytes)
-	numBytesRead, err := m.reader.Read(data)
-	if err != nil {
-		return nil, errors.Wrap(err, "error reading")
-	}
-	if uint32(numBytesRead) != numBytes {
-		return nil, errors.Wrapf(err, "could only read %d of %d bytes", numBytesRead, numBytes)
-	}
-	return data, nil
-}
-
 func (m *TransportInstance) Write(data []uint8) error {
 	if m.serialPort == nil {
 		return errors.New("error writing to transport. No writer available")
diff --git a/plc4go/internal/spi/transports/tcp/Transport.go b/plc4go/internal/spi/transports/tcp/Transport.go
index 5c49e8639..93fb25945 100644
--- a/plc4go/internal/spi/transports/tcp/Transport.go
+++ b/plc4go/internal/spi/transports/tcp/Transport.go
@@ -94,12 +94,12 @@ func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[str
 }
 
 type TransportInstance struct {
+	transports.DefaultBufferedTransportInstance
 	RemoteAddress  *net.TCPAddr
 	LocalAddress   *net.TCPAddr
 	ConnectTimeout uint32
 	transport      *Transport
 	tcpConn        net.Conn
-	reader         *bufio.Reader
 }
 
 func NewTcpTransportInstance(remoteAddress *net.TCPAddr, connectTimeout uint32, transport *Transport) *TransportInstance {
@@ -119,7 +119,7 @@ func (m *TransportInstance) Connect() error {
 
 	m.LocalAddress = m.tcpConn.LocalAddr().(*net.TCPAddr)
 
-	m.reader = bufio.NewReader(m.tcpConn)
+	m.Reader = bufio.NewReader(m.tcpConn)
 
 	return nil
 }
@@ -140,50 +140,6 @@ func (m *TransportInstance) IsConnected() bool {
 	return m.tcpConn != nil
 }
 
-func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
-	if m.reader == nil {
-		return 0, nil
-	}
-	_, _ = m.reader.Peek(1)
-	return uint32(m.reader.Buffered()), nil
-}
-
-func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
-	nBytes := uint32(1)
-	for {
-		bytes, err := m.PeekReadableBytes(nBytes)
-		if err != nil {
-			return errors.Wrap(err, "Error while peeking")
-		}
-		if keepGoing := until(uint(nBytes-1), bytes[len(bytes)-1], m.reader); !keepGoing {
-			return nil
-		}
-		nBytes++
-	}
-}
-
-func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]uint8, error) {
-	if m.reader == nil {
-		return nil, errors.New("error peeking from transport. No reader available")
-	}
-	return m.reader.Peek(int(numBytes))
-}
-
-func (m *TransportInstance) Read(numBytes uint32) ([]uint8, error) {
-	if m.reader == nil {
-		return nil, errors.New("error reading from transport. No reader available")
-	}
-	data := make([]uint8, numBytes)
-	for i := uint32(0); i < numBytes; i++ {
-		val, err := m.reader.ReadByte()
-		if err != nil {
-			return nil, errors.Wrap(err, "error reading")
-		}
-		data[i] = val
-	}
-	return data, nil
-}
-
 func (m *TransportInstance) Write(data []uint8) error {
 	if m.tcpConn == nil {
 		return errors.New("error writing to transport. No writer available")
diff --git a/plc4go/internal/spi/transports/udp/Transport.go b/plc4go/internal/spi/transports/udp/Transport.go
index c8eea0e8b..4f01865a8 100644
--- a/plc4go/internal/spi/transports/udp/Transport.go
+++ b/plc4go/internal/spi/transports/udp/Transport.go
@@ -201,11 +201,11 @@ func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
 func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
 	nBytes := uint32(1)
 	for {
-		bytes, err := m.PeekReadableBytes(nBytes)
+		_bytes, err := m.PeekReadableBytes(nBytes)
 		if err != nil {
 			return errors.Wrap(err, "Error while peeking")
 		}
-		if keepGoing := until(uint(nBytes-1), bytes[len(bytes)-1], m.reader); !keepGoing {
+		if keepGoing := until(uint(nBytes-1), _bytes[len(_bytes)-1], m.reader); !keepGoing {
 			return nil
 		}
 		nBytes++
diff --git a/plc4go/internal/spi/transports/utils/TransportLogger.go b/plc4go/internal/spi/transports/utils/TransportLogger.go
index 549b0c1e8..7a0b72c4c 100644
--- a/plc4go/internal/spi/transports/utils/TransportLogger.go
+++ b/plc4go/internal/spi/transports/utils/TransportLogger.go
@@ -50,7 +50,7 @@ func WithLogger(log zerolog.Logger) Option {
 func (t *TransportLogger) Read(p []byte) (int, error) {
 	bytesRead, err := t.source.Read(p)
 	if bytesRead > 0 {
-		t.log.Printf("Read: %s", p[:bytesRead])
+		t.log.Printf("Read: %+q", p[:bytesRead])
 	}
 	return bytesRead, err
 }
@@ -58,7 +58,7 @@ func (t *TransportLogger) Read(p []byte) (int, error) {
 func (t *TransportLogger) Write(p []byte) (int, error) {
 	bytesWritten, err := t.source.Write(p)
 	if bytesWritten > 0 {
-		t.log.Printf("Write: %s", p[:bytesWritten])
+		t.log.Printf("Write: %+q", p[:bytesWritten])
 	}
 	return bytesWritten, err
 }
diff --git a/plc4go/tests/drivers/s7_test.go b/plc4go/tests/drivers/s7_test.go
deleted file mode 100644
index db91b70c9..000000000
--- a/plc4go/tests/drivers/s7_test.go
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package drivers
-
-import (
-	"encoding/hex"
-	"github.com/apache/plc4x/plc4go/internal/spi/utils"
-	"github.com/apache/plc4x/plc4go/protocols/s7/readwrite/model"
-	_ "github.com/apache/plc4x/plc4go/tests/initializetest"
-	"testing"
-)
-
-func TestS7(t *testing.T) {
-	t.Skip()
-	request, err := hex.DecodeString("000a00000006010300000004")
-	if err != nil {
-		// Output an error ...
-	}
-	rb := utils.NewReadBufferByteBased(request)
-	adu, err := model.TPKTPacketParse(rb)
-	if err != nil {
-		t.Errorf("Error parsing: %s", err)
-	}
-	if adu != nil {
-		// Output success ...
-	}
-}
diff --git a/plc4go/tests/drivers/tests/manual_bacnet_PcapTest_test.go b/plc4go/tests/drivers/tests/manual_bacnet_PcapTest_test.go
index 9feb39c13..41c1f09fd 100644
--- a/plc4go/tests/drivers/tests/manual_bacnet_PcapTest_test.go
+++ b/plc4go/tests/drivers/tests/manual_bacnet_PcapTest_test.go
@@ -37,7 +37,7 @@ import (
 	"time"
 )
 
-func Test(t *testing.T) {
+func TestBacnetDriverWithPcap(t *testing.T) {
 	t.Skip() // Manual test don't check in un-skipped
 
 	config.TraceTransactionManagerWorkers = false