You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/11 10:25:38 UTC
[plc4x] branch develop updated: fix(plc4go): don't panic - catch panics and log them
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 33f1d9a283 fix(plc4go): don't panic - catch panics and log them
33f1d9a283 is described below
commit 33f1d9a283dfcb91365bb03aed45238340c377ae
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu May 11 12:25:29 2023 +0200
fix(plc4go): don't panic - catch panics and log them
---
plc4go/internal/ads/Browser.go | 9 +++++
plc4go/internal/ads/Connection.go | 10 +++++
plc4go/internal/ads/Discoverer.go | 5 +++
plc4go/internal/ads/Driver.go | 12 ++----
plc4go/internal/ads/Interactions.go | 31 +++++++++++++++
plc4go/internal/ads/Reader.go | 28 ++++++-------
plc4go/internal/ads/Subscriber.go | 12 +++++-
plc4go/internal/ads/Writer.go | 21 +++++-----
plc4go/internal/bacnetip/Connection.go | 11 ++++++
plc4go/internal/cbus/Browser.go | 5 +++
plc4go/internal/cbus/Connection.go | 15 +++++++
plc4go/internal/cbus/Discoverer.go | 15 +++++++
plc4go/internal/cbus/Driver.go | 6 +--
plc4go/internal/cbus/Reader.go | 5 +++
plc4go/internal/cbus/Subscriber.go | 6 +++
plc4go/internal/cbus/Writer.go | 5 +++
plc4go/internal/eip/Connection.go | 10 +++++
plc4go/internal/eip/EipDriver.go | 24 ++++-------
plc4go/internal/eip/Reader.go | 46 +++++++++++-----------
plc4go/internal/eip/Writer.go | 30 ++++++++------
plc4go/internal/knxnetip/Connection.go | 41 ++++++++++++++-----
.../knxnetip/ConnectionDriverSpecificOperations.go | 35 ++++++++++++++++
plc4go/internal/knxnetip/ConnectionHelper.go | 5 +++
plc4go/internal/knxnetip/Discoverer.go | 10 +++++
plc4go/internal/knxnetip/Driver.go | 12 ++----
plc4go/internal/knxnetip/Reader.go | 16 +++++---
plc4go/internal/knxnetip/Subscriber.go | 6 +++
plc4go/internal/modbus/Connection.go | 20 ++++++----
plc4go/internal/modbus/ModbusAsciiDriver.go | 17 ++++----
plc4go/internal/modbus/ModbusRtuDriver.go | 17 ++++----
plc4go/internal/modbus/ModbusTcpDriver.go | 17 ++++----
plc4go/internal/modbus/Reader.go | 29 +++++++-------
plc4go/internal/modbus/Writer.go | 28 +++++--------
plc4go/internal/s7/Connection.go | 5 +++
plc4go/internal/s7/Driver.go | 24 ++++-------
plc4go/internal/s7/Reader.go | 23 ++++++-----
plc4go/internal/s7/Writer.go | 25 +++++++-----
plc4go/internal/simulated/Connection.go | 24 +++++++++--
plc4go/internal/simulated/Reader.go | 12 ++++--
plc4go/internal/simulated/Writer.go | 12 ++++--
plc4go/pkg/api/driverManager.go | 24 ++++-------
plc4go/spi/default/DefaultConnection.go | 16 ++++++--
plc4go/spi/model/DefaultPlcReadRequest.go | 5 +++
plc4go/spi/model/DefaultPlcWriteRequest.go | 5 +++
plc4go/spi/transports/pcap/Transport.go | 5 +++
plc4go/spi/utils/Net.go | 10 +++++
plc4go/spi/utils/WorkerPool.go | 10 +++++
47 files changed, 521 insertions(+), 238 deletions(-)
diff --git a/plc4go/internal/ads/Browser.go b/plc4go/internal/ads/Browser.go
index 04ed5e7117..1c5862df4b 100644
--- a/plc4go/internal/ads/Browser.go
+++ b/plc4go/internal/ads/Browser.go
@@ -21,6 +21,7 @@ package ads
import (
"context"
+ "github.com/pkg/errors"
"strings"
"github.com/apache/plc4x/plc4go/internal/ads/model"
@@ -42,6 +43,14 @@ func (m *Connection) Browse(ctx context.Context, browseRequest apiModel.PlcBrows
func (m *Connection) BrowseWithInterceptor(ctx context.Context, browseRequest apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseItem) bool) <-chan apiModel.PlcBrowseRequestResult {
result := make(chan apiModel.PlcBrowseRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- &internalModel.DefaultPlcBrowseRequestResult{
+ Request: browseRequest,
+ Err: errors.Errorf("Recovered from panic: %v", err),
+ }
+ }
+ }()
responseCodes := map[string]apiModel.PlcResponseCode{}
results := map[string][]apiModel.PlcBrowseItem{}
for _, queryName := range browseRequest.GetQueryNames() {
diff --git a/plc4go/internal/ads/Connection.go b/plc4go/internal/ads/Connection.go
index e63e936a97..c5982d04c1 100644
--- a/plc4go/internal/ads/Connection.go
+++ b/plc4go/internal/ads/Connection.go
@@ -103,6 +103,11 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
m.driverContext.clear()
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ ch <- _default.NewDefaultPlcConnectionCloseResult(nil, errors.Errorf("Recovered from panic: %v", err))
+ }
+ }()
err := m.messageCodec.Connect()
if err != nil {
ch <- _default.NewDefaultPlcConnectionConnectResult(m, err)
@@ -155,6 +160,11 @@ func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
// (Messages that are not responses to outgoing messages)
defaultIncomingMessageChannel := m.messageCodec.GetDefaultIncomingMessageChannel()
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed: %v", err)
+ }
+ }()
for message := range defaultIncomingMessageChannel {
switch message.(type) {
case model.AmsTCPPacket:
diff --git a/plc4go/internal/ads/Discoverer.go b/plc4go/internal/ads/Discoverer.go
index c9ce8efeb8..f54927fedd 100644
--- a/plc4go/internal/ads/Discoverer.go
+++ b/plc4go/internal/ads/Discoverer.go
@@ -142,6 +142,11 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
// Start a worker to receive responses
go func(discoveryItem *discovery) {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
buf := make([]byte, 1024)
for {
length, fromAddr, err := socket.ReadFromUDP(buf)
diff --git a/plc4go/internal/ads/Driver.go b/plc4go/internal/ads/Driver.go
index ba336f461e..d102ef24f1 100644
--- a/plc4go/internal/ads/Driver.go
+++ b/plc4go/internal/ads/Driver.go
@@ -51,10 +51,8 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
transport, ok := transports[transportUrl.Scheme]
if !ok {
log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
@@ -83,10 +81,8 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
// Create the new connection
connection, err := NewConnection(codec, configuration, options)
if err != nil {
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "couldn't create connection"))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "couldn't create connection"))
return ch
}
log.Debug().Stringer("connection", connection).Msg("created connection, connecting now")
diff --git a/plc4go/internal/ads/Interactions.go b/plc4go/internal/ads/Interactions.go
index 586c2ccb11..bf575f343e 100644
--- a/plc4go/internal/ads/Interactions.go
+++ b/plc4go/internal/ads/Interactions.go
@@ -22,6 +22,7 @@ package ads
import (
"context"
"fmt"
+ "github.com/rs/zerolog/log"
"time"
"github.com/apache/plc4x/plc4go/protocols/ads/readwrite/model"
@@ -31,6 +32,11 @@ import (
func (m *Connection) ExecuteAdsReadDeviceInfoRequest(ctx context.Context) (model.AdsReadDeviceInfoResponse, error) {
responseChannel := make(chan model.AdsReadDeviceInfoResponse)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
request := m.NewAdsReadDeviceInfoRequest()
if err := m.messageCodec.SendRequest(
ctx,
@@ -67,6 +73,11 @@ func (m *Connection) ExecuteAdsReadDeviceInfoRequest(ctx context.Context) (model
func (m *Connection) ExecuteAdsReadRequest(ctx context.Context, indexGroup uint32, indexOffset uint32, length uint32) (model.AdsReadResponse, error) {
responseChannel := make(chan model.AdsReadResponse)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
request := m.NewAdsReadRequest(indexGroup, indexOffset, length)
if err := m.messageCodec.SendRequest(
ctx,
@@ -103,6 +114,11 @@ func (m *Connection) ExecuteAdsReadRequest(ctx context.Context, indexGroup uint3
func (m *Connection) ExecuteAdsWriteRequest(ctx context.Context, indexGroup uint32, indexOffset uint32, data []byte) (model.AdsWriteResponse, error) {
responseChannel := make(chan model.AdsWriteResponse)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
request := m.NewAdsWriteRequest(indexGroup, indexOffset, data)
if err := m.messageCodec.SendRequest(
ctx,
@@ -139,6 +155,11 @@ func (m *Connection) ExecuteAdsWriteRequest(ctx context.Context, indexGroup uint
func (m *Connection) ExecuteAdsReadWriteRequest(ctx context.Context, indexGroup uint32, indexOffset uint32, readLength uint32, items []model.AdsMultiRequestItem, writeData []byte) (model.AdsReadWriteResponse, error) {
responseChannel := make(chan model.AdsReadWriteResponse)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
request := m.NewAdsReadWriteRequest(indexGroup, indexOffset, readLength, items, writeData)
if err := m.messageCodec.SendRequest(
ctx,
@@ -175,6 +196,11 @@ func (m *Connection) ExecuteAdsReadWriteRequest(ctx context.Context, indexGroup
func (m *Connection) ExecuteAdsAddDeviceNotificationRequest(ctx context.Context, indexGroup uint32, indexOffset uint32, length uint32, transmissionMode model.AdsTransMode, maxDelay uint32, cycleTime uint32) (model.AdsAddDeviceNotificationResponse, error) {
responseChannel := make(chan model.AdsAddDeviceNotificationResponse)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
request := m.NewAdsAddDeviceNotificationRequest(indexGroup, indexOffset, length, transmissionMode, maxDelay, cycleTime)
if err := m.messageCodec.SendRequest(
ctx,
@@ -211,6 +237,11 @@ func (m *Connection) ExecuteAdsAddDeviceNotificationRequest(ctx context.Context,
func (m *Connection) ExecuteAdsDeleteDeviceNotificationRequest(ctx context.Context, notificationHandle uint32) (model.AdsDeleteDeviceNotificationResponse, error) {
responseChannel := make(chan model.AdsDeleteDeviceNotificationResponse)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
request := m.NewAdsDeleteDeviceNotificationRequest(notificationHandle)
if err := m.messageCodec.SendRequest(
ctx,
diff --git a/plc4go/internal/ads/Reader.go b/plc4go/internal/ads/Reader.go
index aa5b45dd6b..2dfdfa461f 100644
--- a/plc4go/internal/ads/Reader.go
+++ b/plc4go/internal/ads/Reader.go
@@ -44,6 +44,11 @@ func (m *Connection) Read(ctx context.Context, readRequest apiModel.PlcReadReque
log.Trace().Msg("Reading")
result := make(chan apiModel.PlcReadRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- internalModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
if len(readRequest.GetTagNames()) <= 1 {
m.singleRead(ctx, readRequest, result)
} else {
@@ -55,11 +60,7 @@ func (m *Connection) Read(ctx context.Context, readRequest apiModel.PlcReadReque
func (m *Connection) singleRead(ctx context.Context, readRequest apiModel.PlcReadRequest, result chan apiModel.PlcReadRequestResult) {
if len(readRequest.GetTagNames()) != 1 {
- result <- &internalModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Response: nil,
- Err: errors.New("this part of the ads driver only supports single-item requests"),
- }
+ result <- internalModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.New("this part of the ads driver only supports single-item requests"))
log.Debug().Msgf("this part of the ads driver only supports single-item requests. Got %d tags", len(readRequest.GetTagNames()))
return
}
@@ -70,11 +71,7 @@ func (m *Connection) singleRead(ctx context.Context, readRequest apiModel.PlcRea
if model.NeedsResolving(tag) {
adsField, err := model.CastToSymbolicPlcTagFromPlcTag(tag)
if err != nil {
- result <- &internalModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Response: nil,
- Err: errors.Wrap(err, "invalid tag item type"),
- }
+ result <- internalModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Wrap(err, "invalid tag item type"))
log.Debug().Msgf("Invalid tag item type %T", tag)
return
}
@@ -92,16 +89,17 @@ func (m *Connection) singleRead(ctx context.Context, readRequest apiModel.PlcRea
}
directAdsTag, ok := tag.(*model.DirectPlcTag)
if !ok {
- result <- &internalModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Response: nil,
- Err: errors.New("invalid tag item type"),
- }
+ result <- internalModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.New("invalid tag item type"))
log.Debug().Msgf("Invalid tag item type %T", tag)
return
}
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- internalModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
response, err := m.ExecuteAdsReadRequest(ctx, directAdsTag.IndexGroup, directAdsTag.IndexOffset, directAdsTag.DataType.GetSize())
if err != nil {
result <- &internalModel.DefaultPlcReadRequestResult{
diff --git a/plc4go/internal/ads/Subscriber.go b/plc4go/internal/ads/Subscriber.go
index 1fc73eeeb7..6c899d464d 100644
--- a/plc4go/internal/ads/Subscriber.go
+++ b/plc4go/internal/ads/Subscriber.go
@@ -99,11 +99,16 @@ func (m *Connection) Subscribe(ctx context.Context, subscriptionRequest apiModel
// Create a new result-channel, which completes as soon as all sub-result-channels have returned
globalResultChannel := make(chan apiModel.PlcSubscriptionRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
// Iterate over all sub-results
for _, subResultChannel := range subResultChannels {
select {
case <-ctx.Done():
- globalResultChannel <- &internalModel.DefaultPlcSubscriptionRequestResult{Request: subscriptionRequest, Err: ctx.Err()}
+ globalResultChannel <- internalModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil, ctx.Err())
return
case subResult := <-subResultChannel:
// These are all single value requests ... so it's safe to assume this shortcut.
@@ -123,6 +128,11 @@ func (m *Connection) Subscribe(ctx context.Context, subscriptionRequest apiModel
func (m *Connection) subscribe(ctx context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
responseChan := make(chan apiModel.PlcSubscriptionRequestResult)
go func(respChan chan apiModel.PlcSubscriptionRequestResult) {
+ defer func() {
+ if err := recover(); err != nil {
+ responseChan <- internalModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
// At this point we are sure to only have single item direct tag requests.
tagName := subscriptionRequest.GetTagNames()[0]
directTag := subscriptionRequest.GetTag(tagName).(dirverModel.DirectPlcTag)
diff --git a/plc4go/internal/ads/Writer.go b/plc4go/internal/ads/Writer.go
index a628f43f91..423c4e61e1 100644
--- a/plc4go/internal/ads/Writer.go
+++ b/plc4go/internal/ads/Writer.go
@@ -43,6 +43,11 @@ func (m *Connection) Write(ctx context.Context, writeRequest apiModel.PlcWriteRe
log.Trace().Msg("Writing")
result := make(chan apiModel.PlcWriteRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- internalModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
if len(writeRequest.GetTagNames()) <= 1 {
m.singleWrite(ctx, writeRequest, result)
} else {
@@ -54,11 +59,7 @@ func (m *Connection) Write(ctx context.Context, writeRequest apiModel.PlcWriteRe
func (m *Connection) singleWrite(ctx context.Context, writeRequest apiModel.PlcWriteRequest, result chan apiModel.PlcWriteRequestResult) {
if len(writeRequest.GetTagNames()) != 1 {
- result <- &internalModel.DefaultPlcWriteRequestResult{
- Request: writeRequest,
- Response: nil,
- Err: errors.New("this part of the ads driver only supports single-item requests"),
- }
+ result <- internalModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.New("this part of the ads driver only supports single-item requests"))
log.Debug().Msgf("this part of the ads driver only supports single-item requests. Got %d tags", len(writeRequest.GetTagNames()))
return
}
@@ -115,12 +116,14 @@ func (m *Connection) singleWrite(ctx context.Context, writeRequest apiModel.PlcW
data := io.GetBytes()
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- internalModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
response, err := m.ExecuteAdsWriteRequest(ctx, directAdsTag.IndexGroup, directAdsTag.IndexOffset, data)
if err != nil {
- result <- &internalModel.DefaultPlcWriteRequestResult{
- Request: writeRequest,
- Err: errors.Wrap(err, "got error executing the write request"),
- }
+ result <- internalModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Wrap(err, "got error executing the write request"))
return
}
diff --git a/plc4go/internal/bacnetip/Connection.go b/plc4go/internal/bacnetip/Connection.go
index 0258615506..5967ca1136 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -23,6 +23,7 @@ import (
"context"
"fmt"
"github.com/apache/plc4x/plc4go/spi/utils"
+ "github.com/pkg/errors"
"sync"
"time"
@@ -79,8 +80,18 @@ func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
connectionConnectResult := <-c.DefaultConnection.ConnectWithContext(ctx)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
for c.IsConnected() {
log.Trace().Msg("Polling data")
incomingMessageChannel := c.messageCodec.GetDefaultIncomingMessageChannel()
diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index 94d9ac34f4..d22e7a12cc 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -264,6 +264,11 @@ func (m Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]any
}
readCtx, readCtxCancel := context.WithTimeout(ctx, time.Second*2)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
defer readCtxCancel()
readRequestResult := <-readRequest.ExecuteWithContext(readCtx)
if err := readRequestResult.GetErr(); err != nil {
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 54886e9d89..a424fe415a 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -111,6 +111,11 @@ func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ c.fireConnectionError(errors.Errorf("panic-ed %v", err), ch)
+ }
+ }()
if err := c.messageCodec.Connect(); err != nil {
c.fireConnectionError(errors.Wrap(err, "Error connecting codec"), ch)
return
@@ -211,6 +216,11 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
func (c *Connection) startSubscriptionHandler() {
log.Debug().Msg("Starting SAL handler")
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
log.Debug().Msg("SAL handler stated")
for c.IsConnected() {
for monitoredSal := range c.messageCodec.monitoredSALs {
@@ -226,6 +236,11 @@ func (c *Connection) startSubscriptionHandler() {
}()
log.Debug().Msg("Starting MMI handler")
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
log.Debug().Msg("default MMI started")
for c.IsConnected() {
for calReply := range c.messageCodec.monitoredMMIs {
diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
index f7ccb2bec2..00ba336fa5 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -85,6 +85,11 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
}
wg.Add(1)
go func(netInterface addressProvider, interfaceLog zerolog.Logger) {
+ defer func() {
+ if err := recover(); err != nil {
+ interfaceLog.Error().Msgf("panic-ed %v", err)
+ }
+ }()
defer func() { wg.Done() }()
// Iterate over all addresses the current interface has configured
for _, addr := range addrs {
@@ -114,6 +119,11 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
}
wg.Add(1)
go func(addressLogger zerolog.Logger) {
+ defer func() {
+ if err := recover(); err != nil {
+ addressLogger.Error().Msgf("panic-ed %v", err)
+ }
+ }()
defer func() { wg.Done() }()
for ip := range addresses {
addressLogger.Trace().Msgf("Handling found ip %v", ip)
@@ -142,6 +152,11 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
}()
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
for transportInstance := range transportInstances {
log.Debug().Stringer("transportInstance", transportInstance).Msg("submitting device scan")
d.deviceScanningQueue.Submit(ctx, d.deviceScanningWorkItemId.Add(1), d.createDeviceScanDispatcher(transportInstance.(*tcp.TransportInstance), callback))
diff --git a/plc4go/internal/cbus/Driver.go b/plc4go/internal/cbus/Driver.go
index bd3e75a024..f8043723c7 100644
--- a/plc4go/internal/cbus/Driver.go
+++ b/plc4go/internal/cbus/Driver.go
@@ -89,10 +89,8 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
}
func (m *Driver) reportError(err error) <-chan plc4go.PlcConnectionConnectResult {
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, err)
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, err)
return ch
}
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 24c1502e16..fdb8a4c2ee 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -55,6 +55,11 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
}
func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadRequest, result chan apiModel.PlcReadRequestResult) {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
numTags := len(readRequest.GetTagNames())
if numTags > 20 { // letters g-z
result <- &spiModel.DefaultPlcReadRequestResult{
diff --git a/plc4go/internal/cbus/Subscriber.go b/plc4go/internal/cbus/Subscriber.go
index 0061922e02..aa1794f472 100644
--- a/plc4go/internal/cbus/Subscriber.go
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -22,6 +22,7 @@ package cbus
import (
"context"
"fmt"
+ "github.com/pkg/errors"
"strings"
"time"
@@ -48,6 +49,11 @@ func NewSubscriber(connection *Connection) *Subscriber {
func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
result := make(chan apiModel.PlcSubscriptionRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- spiModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
internalPlcSubscriptionRequest := subscriptionRequest.(*spiModel.DefaultPlcSubscriptionRequest)
// Add this subscriber to the connection.
diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go
index 46247c1ad4..47d15cc258 100644
--- a/plc4go/internal/cbus/Writer.go
+++ b/plc4go/internal/cbus/Writer.go
@@ -50,6 +50,11 @@ func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteReques
log.Trace().Msg("Writing")
result := make(chan apiModel.PlcWriteRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
numTags := len(writeRequest.GetTagNames())
if numTags > 20 { // letters g-z
result <- &spiModel.DefaultPlcWriteRequestResult{
diff --git a/plc4go/internal/eip/Connection.go b/plc4go/internal/eip/Connection.go
index 9aba1a393d..9bdd960ca8 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -106,6 +106,11 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
err := m.messageCodec.Connect()
if err != nil {
ch <- _default.NewDefaultPlcConnectionConnectResult(m, err)
@@ -132,6 +137,11 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
ctx := context.TODO()
result := make(chan plc4go.PlcConnectionCloseResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- _default.NewDefaultPlcConnectionCloseResult(nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
log.Debug().Msg("Sending UnregisterSession EIP Packet")
_ = m.messageCodec.SendRequest(ctx, readWriteModel.NewEipDisconnectRequest(m.sessionHandle, 0, []byte(DefaultSenderContext), 0), func(message spi.Message) bool {
return true
diff --git a/plc4go/internal/eip/EipDriver.go b/plc4go/internal/eip/EipDriver.go
index 336fbf7808..9920bdd10c 100644
--- a/plc4go/internal/eip/EipDriver.go
+++ b/plc4go/internal/eip/EipDriver.go
@@ -54,10 +54,8 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
transport, ok := transports[transportUrl.Scheme]
if !ok {
log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
@@ -66,10 +64,8 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
if err != nil {
log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
}
@@ -79,20 +75,16 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
configuration, err := ParseFromOptions(options)
if err != nil {
log.Error().Err(err).Msgf("Invalid options")
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
return ch
}
driverContext, err := NewDriverContext(configuration)
if err != nil {
log.Error().Err(err).Msgf("Invalid options")
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
return ch
}
driverContext.awaitSetupComplete = m.awaitSetupComplete
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 3348aecde0..1ce0e914b4 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -27,13 +27,14 @@ import (
"strconv"
"time"
- "github.com/apache/plc4x/plc4go/pkg/api/model"
+ apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/eip/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
- plc4goModel "github.com/apache/plc4x/plc4go/spi/model"
+ spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/utils"
spiValues "github.com/apache/plc4x/plc4go/spi/values"
+
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
@@ -54,11 +55,16 @@ func NewReader(messageCodec spi.MessageCodec, tm spi.RequestTransactionManager,
}
}
-func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
+func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <-chan apiModel.PlcReadRequestResult {
// TODO: handle ctx
log.Trace().Msg("Reading")
- result := make(chan model.PlcReadRequestResult)
+ result := make(chan apiModel.PlcReadRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
classSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewClassID(0, 6))
instanceSegment := readWriteModel.NewLogicalSegment(readWriteModel.NewInstanceID(0, 1))
for _, tagName := range readRequest.GetTagNames() {
@@ -70,11 +76,7 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
}
ansi, err := toAnsi(tag)
if err != nil {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Response: nil,
- Err: errors.Wrapf(err, "Error encoding eip ansi for tag %s", tagName),
- }
+ result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Wrapf(err, "Error encoding eip ansi for tag %s", tagName))
return
}
requestItem := readWriteModel.NewCipUnconnectedRequest(classSegment, instanceSegment,
@@ -106,26 +108,26 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
log.Trace().Msg("convert response to PLC4X response")
readResponse, err := m.ToPlc4xReadResponse(unconnectedDataItem.GetService(), readRequest)
if err != nil {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Err: errors.Wrap(err, "Error decoding response"),
}
return transaction.EndRequest()
}
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: readResponse,
}
return transaction.EndRequest()
},
func(err error) error {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Err: errors.Wrap(err, "got timeout while waiting for response"),
}
return transaction.EndRequest()
}, time.Second*1); err != nil {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: nil,
Err: errors.Wrap(err, "error sending message"),
@@ -181,9 +183,9 @@ func toAnsi(tag string) ([]byte, error) {
return buffer.GetBytes(), nil
}
-func (m *Reader) ToPlc4xReadResponse(response readWriteModel.CipService, readRequest model.PlcReadRequest) (model.PlcReadResponse, error) {
+func (m *Reader) ToPlc4xReadResponse(response readWriteModel.CipService, readRequest apiModel.PlcReadRequest) (apiModel.PlcReadResponse, error) {
plcValues := map[string]values.PlcValue{}
- responseCodes := map[string]model.PlcResponseCode{}
+ responseCodes := map[string]apiModel.PlcResponseCode{}
switch response := response.(type) {
case readWriteModel.CipReadResponse: // only 1 tag
cipReadResponse := response
@@ -193,7 +195,7 @@ func (m *Reader) ToPlc4xReadResponse(response readWriteModel.CipService, readReq
var plcValue values.PlcValue
_type := cipReadResponse.GetData().GetDataType()
data := utils.NewReadBufferByteBased(cipReadResponse.GetData().GetData(), utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian))
- if code == model.PlcResponseCode_OK {
+ if code == apiModel.PlcResponseCode_OK {
var err error
plcValue, err = parsePlcValue(tag, data, _type)
if err != nil {
@@ -232,7 +234,7 @@ func (m *Reader) ToPlc4xReadResponse(response readWriteModel.CipService, readReq
_type := cipReadResponse.GetData().GetDataType()
data := utils.NewReadBufferByteBased(cipReadResponse.GetData().GetData(), utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian))
var plcValue values.PlcValue
- if code == model.PlcResponseCode_OK {
+ if code == apiModel.PlcResponseCode_OK {
var err error
plcValue, err = parsePlcValue(tag, data, _type)
if err != nil {
@@ -243,7 +245,7 @@ func (m *Reader) ToPlc4xReadResponse(response readWriteModel.CipService, readReq
plcValues[tagName] = plcValue
responseCodes[tagName] = code
} else {
- responseCodes[tagName] = model.PlcResponseCode_INTERNAL_ERROR
+ responseCodes[tagName] = apiModel.PlcResponseCode_INTERNAL_ERROR
}
}
default:
@@ -252,7 +254,7 @@ func (m *Reader) ToPlc4xReadResponse(response readWriteModel.CipService, readReq
// Return the response
log.Trace().Msg("Returning the response")
- return plc4goModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues), nil
+ return spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues), nil
}
func parsePlcValue(tag EIPPlcTag, data utils.ReadBufferByteBased, _type readWriteModel.CIPDataTypeCode) (values.PlcValue, error) {
@@ -341,12 +343,12 @@ func parsePlcValue(tag EIPPlcTag, data utils.ReadBufferByteBased, _type readWrit
}
// Helper to convert the return codes returned from the eip into one of our standard
-func decodeResponseCode(status uint8) model.PlcResponseCode {
+func decodeResponseCode(status uint8) apiModel.PlcResponseCode {
//TODO other status
switch status {
case 0:
- return model.PlcResponseCode_OK
+ return apiModel.PlcResponseCode_OK
default:
- return model.PlcResponseCode_INTERNAL_ERROR
+ return apiModel.PlcResponseCode_INTERNAL_ERROR
}
}
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index b77256611a..b37457d6b1 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -28,8 +28,9 @@ import (
"github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/eip/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
- plc4goModel "github.com/apache/plc4x/plc4go/spi/model"
+ spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/utils"
+
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
@@ -56,6 +57,11 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
// TODO: handle context
result := make(chan model.PlcWriteRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
items := make([]readWriteModel.CipService, len(writeRequest.GetTagNames()))
for i, tagName := range writeRequest.GetTagNames() {
eipTag := writeRequest.GetTag(tagName).(EIPPlcTag)
@@ -80,7 +86,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
requestPathSize := int8(dataLength / 2)
data, err := encodeValue(value, eipTag.GetType(), elements)
if err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: nil,
Err: errors.Wrapf(err, "Error encoding value for eipTag %s", tagName),
@@ -89,7 +95,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
}
ansi, err := toAnsi(tag)
if err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: nil,
Err: errors.Wrapf(err, "Error encoding eip ansi for eipTag %s", tagName),
@@ -150,25 +156,25 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
readResponse, err := m.ToPlc4xWriteResponse(cipWriteResponse, writeRequest)
if err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Err: errors.Wrap(err, "Error decoding response"),
}
return transaction.EndRequest()
}
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: readResponse,
}
return transaction.EndRequest()
}, func(err error) error {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Err: errors.New("got timeout while waiting for response"),
}
return transaction.EndRequest()
}, time.Second*1); err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: nil,
Err: errors.Wrap(err, "error sending message"),
@@ -245,25 +251,25 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
readResponse, err := m.ToPlc4xWriteResponse(multipleServiceResponse, writeRequest)
if err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Err: errors.Wrap(err, "Error decoding response"),
}
return transaction.EndRequest()
}
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: readResponse,
}
return transaction.EndRequest()
}, func(err error) error {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Err: errors.New("got timeout while waiting for response"),
}
return transaction.EndRequest()
}, time.Second*1); err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: nil,
Err: errors.Wrap(err, "error sending message"),
@@ -350,5 +356,5 @@ func (m Writer) ToPlc4xWriteResponse(response readWriteModel.CipService, writeRe
// Return the response
log.Trace().Msg("Returning the response")
- return plc4goModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil
+ return spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil
}
diff --git a/plc4go/internal/knxnetip/Connection.go b/plc4go/internal/knxnetip/Connection.go
index 8fb9eaf42e..5191371c83 100644
--- a/plc4go/internal/knxnetip/Connection.go
+++ b/plc4go/internal/knxnetip/Connection.go
@@ -37,9 +37,10 @@ import (
driverModel "github.com/apache/plc4x/plc4go/protocols/knxnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/interceptors"
- internalModel "github.com/apache/plc4x/plc4go/spi/model"
+ spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/utils"
+
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
@@ -172,10 +173,10 @@ func NewConnection(transportInstance transports.TransportInstance, options map[s
tagHandler: tagHandler,
valueHandler: NewValueHandler(),
requestInterceptor: interceptors.NewSingleItemRequestInterceptor(
- internalModel.NewDefaultPlcReadRequest,
- internalModel.NewDefaultPlcWriteRequest,
- internalModel.NewDefaultPlcReadResponse,
- internalModel.NewDefaultPlcWriteResponse,
+ spiModel.NewDefaultPlcReadRequest,
+ spiModel.NewDefaultPlcWriteRequest,
+ spiModel.NewDefaultPlcReadResponse,
+ spiModel.NewDefaultPlcWriteResponse,
),
subscribers: []*Subscriber{},
valueCache: map[uint16][]byte{},
@@ -226,6 +227,11 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
}
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
// Open the UDP Connection
err := m.messageCodec.Connect()
if err != nil {
@@ -293,6 +299,11 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
// are being handled.
log.Debug().Msg("Starting tunneling handler")
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
defaultIncomingMessageChannel := m.messageCodec.GetDefaultIncomingMessageChannel()
for m.handleTunnelingRequests {
incomingMessage := <-defaultIncomingMessageChannel
@@ -388,6 +399,11 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
result := make(chan plc4go.PlcConnectionCloseResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
// Stop the connection-state checker.
if m.connectionStateTimer != nil {
m.connectionStateTimer.Stop()
@@ -446,6 +462,11 @@ func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
result := make(chan plc4go.PlcConnectionPingResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- _default.NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v", err))
+ }
+ }()
// Send the connection state request
_, err := m.sendConnectionStateRequest(ctx)
if err != nil {
@@ -464,26 +485,26 @@ func (m *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
}
func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
- return internalModel.NewDefaultPlcReadRequestBuilder(
+ return spiModel.NewDefaultPlcReadRequestBuilder(
m.tagHandler, NewReader(m))
}
func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
- return internalModel.NewDefaultPlcWriteRequestBuilder(
+ return spiModel.NewDefaultPlcWriteRequestBuilder(
m.tagHandler, m.valueHandler, NewWriter(m.messageCodec))
}
func (m *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
- return internalModel.NewDefaultPlcSubscriptionRequestBuilder(
+ return spiModel.NewDefaultPlcSubscriptionRequestBuilder(
m.tagHandler, m.valueHandler, NewSubscriber(m))
}
func (m *Connection) BrowseRequestBuilder() apiModel.PlcBrowseRequestBuilder {
- return internalModel.NewDefaultPlcBrowseRequestBuilder(m.tagHandler, NewBrowser(m, m.messageCodec))
+ return spiModel.NewDefaultPlcBrowseRequestBuilder(m.tagHandler, NewBrowser(m, m.messageCodec))
}
func (m *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder {
- return nil /*internalModel.NewDefaultPlcUnsubscriptionRequestBuilder(
+ return nil /*spiModel.NewDefaultPlcUnsubscriptionRequestBuilder(
m.tagHandler, m.valueHandler, NewSubscriber(m.messageCodec))*/
}
diff --git a/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go b/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
index 5b574db276..1b7ac49b50 100644
--- a/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
+++ b/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
@@ -64,6 +64,11 @@ func (m *Connection) ReadGroupAddress(ctx context.Context, groupAddress []byte,
}
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
groupAddressReadResponse, err := m.sendGroupAddressReadRequest(ctx, groupAddress)
if err != nil {
sendResponse(nil, 0, errors.Wrap(err, "error reading group address"))
@@ -119,6 +124,11 @@ func (m *Connection) DeviceConnect(ctx context.Context, targetAddress driverMode
}
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
// If we're already connected, use that connection instead.
if connection, ok := m.DeviceConnections[targetAddress]; ok {
sendResponse(connection, nil)
@@ -209,6 +219,11 @@ func (m *Connection) DeviceDisconnect(ctx context.Context, targetAddress driverM
}
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
if connection, ok := m.DeviceConnections[targetAddress]; ok {
_, err := m.sendDeviceDisconnectionRequest(ctx, targetAddress)
@@ -242,6 +257,11 @@ func (m *Connection) DeviceAuthenticate(ctx context.Context, targetAddress drive
}
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
// Check if there is already a connection available,
// if not, create a new one.
connection, ok := m.DeviceConnections[targetAddress]
@@ -297,6 +317,11 @@ func (m *Connection) DeviceReadProperty(ctx context.Context, targetAddress drive
}
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
// Check if there is already a connection available,
// if not, create a new one.
connection, ok := m.DeviceConnections[targetAddress]
@@ -376,6 +401,11 @@ func (m *Connection) DeviceReadPropertyDescriptor(ctx context.Context, targetAdd
}
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
// Check if there is already a connection available,
// if not, create a new one.
connection, ok := m.DeviceConnections[targetAddress]
@@ -436,6 +466,11 @@ func (m *Connection) DeviceReadMemory(ctx context.Context, targetAddress driverM
}
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
// Set a default datatype, if none is specified
if datapointType == nil {
dpt := driverModel.KnxDatapointType_USINT
diff --git a/plc4go/internal/knxnetip/ConnectionHelper.go b/plc4go/internal/knxnetip/ConnectionHelper.go
index f06dfcea49..c26cc9734d 100644
--- a/plc4go/internal/knxnetip/ConnectionHelper.go
+++ b/plc4go/internal/knxnetip/ConnectionHelper.go
@@ -53,6 +53,11 @@ func (m *Connection) castIpToKnxAddress(ip net.IP) driverModel.IPAddress {
func (m *Connection) handleIncomingTunnelingRequest(ctx context.Context, tunnelingRequest driverModel.TunnelingRequest) {
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
lDataInd, ok := tunnelingRequest.GetCemi().(driverModel.LDataIndExactly)
if !ok {
return
diff --git a/plc4go/internal/knxnetip/Discoverer.go b/plc4go/internal/knxnetip/Discoverer.go
index ceaf41b77c..9bf2841602 100644
--- a/plc4go/internal/knxnetip/Discoverer.go
+++ b/plc4go/internal/knxnetip/Discoverer.go
@@ -99,6 +99,11 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
}
wg.Add(1)
go func(netInterface net.Interface) {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
defer func() { wg.Done() }()
// Iterate over all addresses the current interface has configured
// For KNX we're only interested in IPv4 addresses, as it doesn't
@@ -132,6 +137,11 @@ func (d *Discoverer) Discover(ctx context.Context, callback func(event apiModel.
}()
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
for transportInstance := range transportInstances {
d.deviceScanningQueue.Submit(ctx, d.deviceScanningWorkItemId.Add(1), d.createDeviceScanDispatcher(transportInstance.(*udp.TransportInstance), callback))
}
diff --git a/plc4go/internal/knxnetip/Driver.go b/plc4go/internal/knxnetip/Driver.go
index cede735647..c8131ca9cb 100644
--- a/plc4go/internal/knxnetip/Driver.go
+++ b/plc4go/internal/knxnetip/Driver.go
@@ -51,10 +51,8 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
// Get an the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
@@ -62,10 +60,8 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
// Have the transport create a new transport-instance.
transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
if err != nil {
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't initialize transport configuration for given transport url %#v", transportUrl))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't initialize transport configuration for given transport url %#v", transportUrl))
return ch
}
diff --git a/plc4go/internal/knxnetip/Reader.go b/plc4go/internal/knxnetip/Reader.go
index 24b5548102..27c635a004 100644
--- a/plc4go/internal/knxnetip/Reader.go
+++ b/plc4go/internal/knxnetip/Reader.go
@@ -21,8 +21,6 @@ package knxnetip
import (
"context"
- "errors"
- "github.com/rs/zerolog/log"
"strconv"
"strings"
"time"
@@ -30,9 +28,12 @@ import (
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
driverModel "github.com/apache/plc4x/plc4go/protocols/knxnetip/readwrite/model"
- internalModel "github.com/apache/plc4x/plc4go/spi/model"
+ spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/utils"
internalValues "github.com/apache/plc4x/plc4go/spi/values"
+
+ "github.com/pkg/errors"
+ "github.com/rs/zerolog/log"
)
type Reader struct {
@@ -49,6 +50,11 @@ func (m Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <
// TODO: handle ctx
resultChan := make(chan apiModel.PlcReadRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ resultChan <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
responseCodes := map[string]apiModel.PlcResponseCode{}
plcValues := map[string]apiValues.PlcValue{}
@@ -156,8 +162,8 @@ func (m Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <
}
// Assemble the results
- result := internalModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues)
- resultChan <- &internalModel.DefaultPlcReadRequestResult{
+ result := spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues)
+ resultChan <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: result,
Err: nil,
diff --git a/plc4go/internal/knxnetip/Subscriber.go b/plc4go/internal/knxnetip/Subscriber.go
index a7b1a2db74..33fc2c762f 100644
--- a/plc4go/internal/knxnetip/Subscriber.go
+++ b/plc4go/internal/knxnetip/Subscriber.go
@@ -21,6 +21,7 @@ package knxnetip
import (
"context"
+ "github.com/pkg/errors"
"time"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -47,6 +48,11 @@ func (m *Subscriber) Subscribe(ctx context.Context, subscriptionRequest apiModel
// TODO: handle context
result := make(chan apiModel.PlcSubscriptionRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- spiModel.NewDefaultPlcSubscriptionRequestResult(subscriptionRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
internalPlcSubscriptionRequest := subscriptionRequest.(*spiModel.DefaultPlcSubscriptionRequest)
// Add this subscriber to the connection.
diff --git a/plc4go/internal/modbus/Connection.go b/plc4go/internal/modbus/Connection.go
index bd90602505..476ec7cf56 100644
--- a/plc4go/internal/modbus/Connection.go
+++ b/plc4go/internal/modbus/Connection.go
@@ -30,7 +30,8 @@ import (
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/interceptors"
- internalModel "github.com/apache/plc4x/plc4go/spi/model"
+ spiModel "github.com/apache/plc4x/plc4go/spi/model"
+
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
@@ -52,10 +53,10 @@ func NewConnection(unitIdentifier uint8, messageCodec spi.MessageCodec, options
messageCodec: messageCodec,
options: options,
requestInterceptor: interceptors.NewSingleItemRequestInterceptor(
- internalModel.NewDefaultPlcReadRequest,
- internalModel.NewDefaultPlcWriteRequest,
- internalModel.NewDefaultPlcReadResponse,
- internalModel.NewDefaultPlcWriteResponse,
+ spiModel.NewDefaultPlcReadRequest,
+ spiModel.NewDefaultPlcWriteRequest,
+ spiModel.NewDefaultPlcReadResponse,
+ spiModel.NewDefaultPlcWriteResponse,
),
}
if traceEnabledOption, ok := options["traceEnabled"]; ok {
@@ -97,6 +98,11 @@ func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
log.Trace().Msg("Pinging")
result := make(chan plc4go.PlcConnectionPingResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- _default.NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v", err))
+ }
+ }()
diagnosticRequestPdu := readWriteModel.NewModbusPDUDiagnosticRequest(0, 0x42)
pingRequest := readWriteModel.NewModbusTcpADU(1, m.unitIdentifier, diagnosticRequestPdu, false)
if err := m.messageCodec.SendRequest(ctx, pingRequest,
@@ -140,7 +146,7 @@ func (m *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
}
func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
- return internalModel.NewDefaultPlcReadRequestBuilderWithInterceptor(
+ return spiModel.NewDefaultPlcReadRequestBuilderWithInterceptor(
m.GetPlcTagHandler(),
NewReader(m.unitIdentifier, m.messageCodec),
m.requestInterceptor,
@@ -148,7 +154,7 @@ func (m *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
}
func (m *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
- return internalModel.NewDefaultPlcWriteRequestBuilderWithInterceptor(
+ return spiModel.NewDefaultPlcWriteRequestBuilderWithInterceptor(
m.GetPlcTagHandler(),
m.GetPlcValueHandler(),
NewWriter(m.unitIdentifier, m.messageCodec),
diff --git a/plc4go/internal/modbus/ModbusAsciiDriver.go b/plc4go/internal/modbus/ModbusAsciiDriver.go
index 53ffec994c..7013a85e8b 100644
--- a/plc4go/internal/modbus/ModbusAsciiDriver.go
+++ b/plc4go/internal/modbus/ModbusAsciiDriver.go
@@ -48,10 +48,8 @@ func (m ModbusAsciiDriver) GetConnectionWithContext(ctx context.Context, transpo
transport, ok := transports[transportUrl.Scheme]
if !ok {
log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
@@ -60,10 +58,8 @@ func (m ModbusAsciiDriver) GetConnectionWithContext(ctx context.Context, transpo
transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
if err != nil {
log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
}
@@ -71,6 +67,11 @@ func (m ModbusAsciiDriver) GetConnectionWithContext(ctx context.Context, transpo
// TODO: the code below looks strange: where is defaultChanel being used?
defaultChanel := make(chan any)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
for {
msg := <-defaultChanel
adu := msg.(model.ModbusTcpADU)
diff --git a/plc4go/internal/modbus/ModbusRtuDriver.go b/plc4go/internal/modbus/ModbusRtuDriver.go
index 61dc1bced4..81158c362e 100644
--- a/plc4go/internal/modbus/ModbusRtuDriver.go
+++ b/plc4go/internal/modbus/ModbusRtuDriver.go
@@ -48,10 +48,8 @@ func (m ModbusRtuDriver) GetConnectionWithContext(ctx context.Context, transport
transport, ok := transports[transportUrl.Scheme]
if !ok {
log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
@@ -60,10 +58,8 @@ func (m ModbusRtuDriver) GetConnectionWithContext(ctx context.Context, transport
transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
if err != nil {
log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
}
@@ -71,6 +67,11 @@ func (m ModbusRtuDriver) GetConnectionWithContext(ctx context.Context, transport
// TODO: the code below looks strange: where is defaultChanel being used?
defaultChanel := make(chan any)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
for {
msg := <-defaultChanel
adu := msg.(model.ModbusTcpADU)
diff --git a/plc4go/internal/modbus/ModbusTcpDriver.go b/plc4go/internal/modbus/ModbusTcpDriver.go
index 9043afadd8..1e1f21d597 100644
--- a/plc4go/internal/modbus/ModbusTcpDriver.go
+++ b/plc4go/internal/modbus/ModbusTcpDriver.go
@@ -48,10 +48,8 @@ func (m ModbusTcpDriver) GetConnectionWithContext(ctx context.Context, transport
transport, ok := transports[transportUrl.Scheme]
if !ok {
log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
@@ -60,10 +58,8 @@ func (m ModbusTcpDriver) GetConnectionWithContext(ctx context.Context, transport
transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
if err != nil {
log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
}
@@ -71,6 +67,11 @@ func (m ModbusTcpDriver) GetConnectionWithContext(ctx context.Context, transport
// TODO: the code below looks strange: where is defaultChanel being used?
defaultChanel := make(chan any)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
for {
msg := <-defaultChanel
adu := msg.(model.ModbusTcpADU)
diff --git a/plc4go/internal/modbus/Reader.go b/plc4go/internal/modbus/Reader.go
index fea48227d6..d0914a4730 100644
--- a/plc4go/internal/modbus/Reader.go
+++ b/plc4go/internal/modbus/Reader.go
@@ -29,7 +29,7 @@ import (
"github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/modbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
- plc4goModel "github.com/apache/plc4x/plc4go/spi/model"
+ spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
@@ -53,12 +53,13 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
log.Trace().Msg("Reading")
result := make(chan model.PlcReadRequestResult)
go func() {
- if len(readRequest.GetTagNames()) != 1 {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
- Request: readRequest,
- Response: nil,
- Err: errors.New("modbus only supports single-item requests"),
+ defer func() {
+ if err := recover(); err != nil {
+ result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Errorf("panic-ed %v", err))
}
+ }()
+ if len(readRequest.GetTagNames()) != 1 {
+ result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.New("modbus only supports single-item requests"))
log.Debug().Msgf("modbus only supports single-item requests. Got %d tags", len(readRequest.GetTagNames()))
return
}
@@ -67,7 +68,7 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
tag := readRequest.GetTag(tagName)
modbusTagVar, err := CastToModbusTagFromPlcTag(tag)
if err != nil {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: nil,
Err: errors.Wrap(err, "invalid tag item type"),
@@ -88,14 +89,14 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
case HoldingRegister:
pdu = readWriteModel.NewModbusPDUReadHoldingRegistersRequest(modbusTagVar.Address, numWords)
case ExtendedRegister:
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: nil,
Err: errors.New("modbus currently doesn't support extended register requests"),
}
return
default:
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: nil,
Err: errors.Errorf("unsupported tag type %x", modbusTagVar.TagType),
@@ -131,26 +132,26 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
readResponse, err := m.ToPlc4xReadResponse(responseAdu, readRequest)
if err != nil {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Err: errors.Wrap(err, "Error decoding response"),
}
// TODO: should we return the error here?
return nil
}
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: readResponse,
}
return nil
}, func(err error) error {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Err: errors.Wrap(err, "got timeout while waiting for response"),
}
return nil
}, time.Second*1); err != nil {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: nil,
Err: errors.Wrap(err, "error sending message"),
@@ -201,5 +202,5 @@ func (m *Reader) ToPlc4xReadResponse(responseAdu readWriteModel.ModbusTcpADU, re
// Return the response
log.Trace().Msg("Returning the response")
- return plc4goModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues), nil
+ return spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues), nil
}
diff --git a/plc4go/internal/modbus/Writer.go b/plc4go/internal/modbus/Writer.go
index 6cf8389c29..d13bcf38a8 100644
--- a/plc4go/internal/modbus/Writer.go
+++ b/plc4go/internal/modbus/Writer.go
@@ -28,7 +28,7 @@ import (
"github.com/apache/plc4x/plc4go/pkg/api/model"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/modbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
- plc4goModel "github.com/apache/plc4x/plc4go/spi/model"
+ spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
@@ -53,11 +53,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
go func() {
// If we are requesting only one tag, use a
if len(writeRequest.GetTagNames()) != 1 {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
- Request: writeRequest,
- Response: nil,
- Err: errors.New("modbus only supports single-item requests"),
- }
+ result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.New("modbus only supports single-item requests"))
return
}
tagName := writeRequest.GetTagNames()[0]
@@ -66,11 +62,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
tag := writeRequest.GetTag(tagName)
modbusTag, err := CastToModbusTagFromPlcTag(tag)
if err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
- Request: writeRequest,
- Response: nil,
- Err: errors.Wrap(err, "invalid tag item type"),
- }
+ result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Wrap(err, "invalid tag item type"))
return
}
@@ -78,7 +70,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
value := writeRequest.GetValue(tagName)
data, err := readWriteModel.DataItemSerialize(value, modbusTag.Datatype, modbusTag.Quantity)
if err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: nil,
Err: errors.Wrap(err, "error serializing value"),
@@ -102,14 +94,14 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
numWords,
data)
case ExtendedRegister:
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: nil,
Err: errors.New("modbus currently doesn't support extended register requests"),
}
return
default:
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: nil,
Err: errors.New("unsupported tag type"),
@@ -139,19 +131,19 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
readResponse, err := m.ToPlc4xWriteResponse(requestAdu, responseAdu, writeRequest)
if err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Err: errors.Wrap(err, "Error decoding response"),
}
} else {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: readResponse,
}
}
return nil
}, func(err error) error {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Err: errors.New("got timeout while waiting for response"),
}
@@ -209,5 +201,5 @@ func (m Writer) ToPlc4xWriteResponse(requestAdu readWriteModel.ModbusTcpADU, res
// Return the response
log.Trace().Msg("Returning the response")
- return plc4goModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil
+ return spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil
}
diff --git a/plc4go/internal/s7/Connection.go b/plc4go/internal/s7/Connection.go
index 68a8d31040..8e1ab6870d 100644
--- a/plc4go/internal/s7/Connection.go
+++ b/plc4go/internal/s7/Connection.go
@@ -110,6 +110,11 @@ func (m *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcCo
log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
err := m.messageCodec.Connect()
if err != nil {
ch <- _default.NewDefaultPlcConnectionConnectResult(m, err)
diff --git a/plc4go/internal/s7/Driver.go b/plc4go/internal/s7/Driver.go
index 4c79b80457..37d4e080a1 100644
--- a/plc4go/internal/s7/Driver.go
+++ b/plc4go/internal/s7/Driver.go
@@ -54,10 +54,8 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
transport, ok := transports[transportUrl.Scheme]
if !ok {
log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't find transport for given transport url %#v", transportUrl))
return ch
}
// Provide a default-port to the transport, which is used, if the user doesn't provide on in the connection string.
@@ -66,10 +64,8 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
transportInstance, err := transport.CreateTransportInstance(transportUrl, options)
if err != nil {
log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultTcpPort"])
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.New("couldn't initialize transport configuration for given transport url "+transportUrl.String()))
return ch
}
@@ -79,20 +75,16 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
configuration, err := ParseFromOptions(options)
if err != nil {
log.Error().Err(err).Msgf("Invalid options")
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
return ch
}
driverContext, err := NewDriverContext(configuration)
if err != nil {
log.Error().Err(err).Msgf("Invalid options")
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
- }()
+ ch := make(chan plc4go.PlcConnectionConnectResult, 1)
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Invalid options"))
return ch
}
driverContext.awaitSetupComplete = m.awaitSetupComplete
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index 92ae8a9686..7b6226cdb9 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -27,7 +27,7 @@ import (
"github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/s7/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
- plc4goModel "github.com/apache/plc4x/plc4go/spi/model"
+ spiModel "github.com/apache/plc4x/plc4go/spi/model"
spiValues "github.com/apache/plc4x/plc4go/spi/values"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
@@ -52,13 +52,18 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
log.Trace().Msg("Reading")
result := make(chan model.PlcReadRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
requestItems := make([]readWriteModel.S7VarRequestParameterItem, len(readRequest.GetTagNames()))
for i, tagName := range readRequest.GetTagNames() {
tag := readRequest.GetTag(tagName)
address, err := encodeS7Address(tag)
if err != nil {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: nil,
Err: errors.Wrapf(err, "Error encoding s7 address for tag %s", tagName),
@@ -124,25 +129,25 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
readResponse, err := m.ToPlc4xReadResponse(payload, readRequest)
if err != nil {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Err: errors.Wrap(err, "Error decoding response"),
}
return transaction.EndRequest()
}
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: readResponse,
}
return transaction.EndRequest()
}, func(err error) error {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Err: errors.Wrap(err, "got timeout while waiting for response"),
}
return transaction.EndRequest()
}, time.Second*1); err != nil {
- result <- &plc4goModel.DefaultPlcReadRequestResult{
+ result <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
Response: nil,
Err: errors.Wrap(err, "error sending message"),
@@ -181,7 +186,7 @@ func (m *Reader) ToPlc4xReadResponse(response readWriteModel.S7Message, readRequ
plcValues[tagName] = spiValues.NewPlcNULL()
}
log.Trace().Msg("Returning the response")
- return plc4goModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues), nil
+ return spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues), nil
} else {
log.Warn().Msgf("Got an unknown error response from the PLC. Error Class: %d, Error Code %d. "+
"We probably need to implement explicit handling for this, so please file a bug-report "+
@@ -192,7 +197,7 @@ func (m *Reader) ToPlc4xReadResponse(response readWriteModel.S7Message, readRequ
responseCodes[tagName] = model.PlcResponseCode_INTERNAL_ERROR
plcValues[tagName] = spiValues.NewPlcNULL()
}
- return plc4goModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues), nil
+ return spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues), nil
}
}
@@ -226,7 +231,7 @@ func (m *Reader) ToPlc4xReadResponse(response readWriteModel.S7Message, readRequ
// Return the response
log.Trace().Msg("Returning the response")
- return plc4goModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues), nil
+ return spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues), nil
}
// Currently we only support the S7 Any type of addresses. This helper simply converts the S7Tag from PLC4X into
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index 5333146918..7053daa9b5 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -27,7 +27,7 @@ import (
"github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/s7/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
- plc4goModel "github.com/apache/plc4x/plc4go/spi/model"
+ spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
@@ -50,6 +50,11 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
// TODO: handle context
result := make(chan model.PlcWriteRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
parameterItems := make([]readWriteModel.S7VarRequestParameterItem, len(writeRequest.GetTagNames()))
payloadItems := make([]readWriteModel.S7VarPayloadDataItem, len(writeRequest.GetTagNames()))
for i, tagName := range writeRequest.GetTagNames() {
@@ -57,7 +62,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
plcValue := writeRequest.GetValue(tagName)
s7Address, err := encodeS7Address(tag)
if err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: nil,
Err: errors.Wrapf(err, "Error encoding s7 address for tag %s", tagName),
@@ -67,7 +72,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
parameterItems[i] = readWriteModel.NewS7VarRequestParameterItemAddress(s7Address)
value, err := serializePlcValue(tag, plcValue)
if err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: nil,
Err: errors.Wrapf(err, "Error encoding value for tag %s", tagName),
@@ -127,25 +132,25 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
readResponse, err := m.ToPlc4xWriteResponse(payload, writeRequest)
if err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Err: errors.Wrap(err, "Error decoding response"),
}
return transaction.EndRequest()
}
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: readResponse,
}
return transaction.EndRequest()
}, func(err error) error {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Err: errors.New("got timeout while waiting for response"),
}
return transaction.EndRequest()
}, time.Second*1); err != nil {
- result <- &plc4goModel.DefaultPlcWriteRequestResult{
+ result <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
Response: nil,
Err: errors.Wrap(err, "error sending message"),
@@ -182,7 +187,7 @@ func (m Writer) ToPlc4xWriteResponse(response readWriteModel.S7Message, writeReq
responseCodes[tagName] = model.PlcResponseCode_ACCESS_DENIED
}
log.Trace().Msg("Returning the response")
- return plc4goModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil
+ return spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil
} else {
log.Warn().Msgf("Got an unknown error response from the PLC. Error Class: %d, Error Code %d. "+
"We probably need to implement explicit handling for this, so please file a bug-report "+
@@ -192,7 +197,7 @@ func (m Writer) ToPlc4xWriteResponse(response readWriteModel.S7Message, writeReq
for _, tagName := range writeRequest.GetTagNames() {
responseCodes[tagName] = model.PlcResponseCode_INTERNAL_ERROR
}
- return plc4goModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil
+ return spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil
}
}
@@ -218,7 +223,7 @@ func (m Writer) ToPlc4xWriteResponse(response readWriteModel.S7Message, writeReq
// Return the response
log.Trace().Msg("Returning the response")
- return plc4goModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil
+ return spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes), nil
}
func serializePlcValue(tag model.PlcTag, plcValue values.PlcValue) (readWriteModel.S7VarPayloadDataItem, error) {
diff --git a/plc4go/internal/simulated/Connection.go b/plc4go/internal/simulated/Connection.go
index a56287d140..95e4c35015 100644
--- a/plc4go/internal/simulated/Connection.go
+++ b/plc4go/internal/simulated/Connection.go
@@ -28,8 +28,9 @@ import (
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi"
_default "github.com/apache/plc4x/plc4go/spi/default"
- internalModel "github.com/apache/plc4x/plc4go/spi/model"
+ spiModel "github.com/apache/plc4x/plc4go/spi/model"
"github.com/apache/plc4x/plc4go/spi/utils"
+
"github.com/pkg/errors"
)
@@ -79,6 +80,11 @@ func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
func (c *Connection) ConnectWithContext(ctx context.Context) <-chan plc4go.PlcConnectionConnectResult {
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ ch <- _default.NewDefaultPlcConnectionCloseResult(nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
// Check if the connection was already connected
if c.connected {
if c.tracer != nil {
@@ -130,6 +136,11 @@ func (c *Connection) BlockingClose() {
func (c *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
ch := make(chan plc4go.PlcConnectionCloseResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
// Check if the connection is connected.
if !c.connected {
if c.tracer != nil {
@@ -171,6 +182,11 @@ func (c *Connection) IsConnected() bool {
func (c *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
ch := make(chan plc4go.PlcConnectionPingResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ ch <- _default.NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v", err))
+ }
+ }()
// Check if the connection is connected
if !c.connected {
if c.tracer != nil {
@@ -229,15 +245,15 @@ func (c *Connection) GetMetadata() model.PlcConnectionMetadata {
}
func (c *Connection) ReadRequestBuilder() model.PlcReadRequestBuilder {
- return internalModel.NewDefaultPlcReadRequestBuilder(c.tagHandler, NewReader(c.device, c.options, c.tracer))
+ return spiModel.NewDefaultPlcReadRequestBuilder(c.tagHandler, NewReader(c.device, c.options, c.tracer))
}
func (c *Connection) WriteRequestBuilder() model.PlcWriteRequestBuilder {
- return internalModel.NewDefaultPlcWriteRequestBuilder(c.tagHandler, c.valueHandler, NewWriter(c.device, c.options, c.tracer))
+ return spiModel.NewDefaultPlcWriteRequestBuilder(c.tagHandler, c.valueHandler, NewWriter(c.device, c.options, c.tracer))
}
func (c *Connection) SubscriptionRequestBuilder() model.PlcSubscriptionRequestBuilder {
- return internalModel.NewDefaultPlcSubscriptionRequestBuilder(c.tagHandler, c.valueHandler, NewSubscriber(c.device, c.options, c.tracer))
+ return spiModel.NewDefaultPlcSubscriptionRequestBuilder(c.tagHandler, c.valueHandler, NewSubscriber(c.device, c.options, c.tracer))
}
func (c *Connection) UnsubscriptionRequestBuilder() model.PlcUnsubscriptionRequestBuilder {
diff --git a/plc4go/internal/simulated/Reader.go b/plc4go/internal/simulated/Reader.go
index dabbf34a14..ea3e2fcda9 100644
--- a/plc4go/internal/simulated/Reader.go
+++ b/plc4go/internal/simulated/Reader.go
@@ -21,13 +21,14 @@ package simulated
import (
"context"
+ "github.com/pkg/errors"
"strconv"
"time"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/pkg/api/values"
"github.com/apache/plc4x/plc4go/spi"
- model2 "github.com/apache/plc4x/plc4go/spi/model"
+ spiModel "github.com/apache/plc4x/plc4go/spi/model"
)
type Reader struct {
@@ -47,6 +48,11 @@ func NewReader(device *Device, options map[string][]string, tracer *spi.Tracer)
func (r *Reader) Read(_ context.Context, readRequest model.PlcReadRequest) <-chan model.PlcReadRequestResult {
ch := make(chan model.PlcReadRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ ch <- spiModel.NewDefaultPlcReadRequestResult(readRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
var txId string
if r.tracer != nil {
txId = r.tracer.AddTransactionalStartTrace("read", "started")
@@ -86,9 +92,9 @@ func (r *Reader) Read(_ context.Context, readRequest model.PlcReadRequest) <-cha
r.tracer.AddTransactionalTrace(txId, "read", "success")
}
// Emit the response
- ch <- &model2.DefaultPlcReadRequestResult{
+ ch <- &spiModel.DefaultPlcReadRequestResult{
Request: readRequest,
- Response: model2.NewDefaultPlcReadResponse(readRequest, responseCodes, responseValues),
+ Response: spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, responseValues),
Err: nil,
}
}()
diff --git a/plc4go/internal/simulated/Writer.go b/plc4go/internal/simulated/Writer.go
index 83b8f5428a..b8cae999ad 100644
--- a/plc4go/internal/simulated/Writer.go
+++ b/plc4go/internal/simulated/Writer.go
@@ -21,12 +21,13 @@ package simulated
import (
"context"
+ "github.com/pkg/errors"
"strconv"
"time"
"github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi"
- model2 "github.com/apache/plc4x/plc4go/spi/model"
+ spiModel "github.com/apache/plc4x/plc4go/spi/model"
)
type Writer struct {
@@ -46,6 +47,11 @@ func NewWriter(device *Device, options map[string][]string, tracer *spi.Tracer)
func (w *Writer) Write(_ context.Context, writeRequest model.PlcWriteRequest) <-chan model.PlcWriteRequestResult {
ch := make(chan model.PlcWriteRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ ch <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
var txId string
if w.tracer != nil {
txId = w.tracer.AddTransactionalStartTrace("write", "started")
@@ -78,9 +84,9 @@ func (w *Writer) Write(_ context.Context, writeRequest model.PlcWriteRequest) <-
w.tracer.AddTransactionalTrace(txId, "write", "success")
}
// Emit the response
- ch <- &model2.DefaultPlcWriteRequestResult{
+ ch <- &spiModel.DefaultPlcWriteRequestResult{
Request: writeRequest,
- Response: model2.NewDefaultPlcWriteResponse(writeRequest, responseCodes),
+ Response: spiModel.NewDefaultPlcWriteResponse(writeRequest, responseCodes),
Err: nil,
}
}()
diff --git a/plc4go/pkg/api/driverManager.go b/plc4go/pkg/api/driverManager.go
index 411f4979e9..4ca897125d 100644
--- a/plc4go/pkg/api/driverManager.go
+++ b/plc4go/pkg/api/driverManager.go
@@ -203,10 +203,8 @@ func (m *plcDriverManger) GetConnection(connectionString string) <-chan PlcConne
connectionUrl, err := url.Parse(connectionString)
if err != nil {
log.Error().Err(err).Msg("Error parsing connection")
- ch := make(chan PlcConnectionConnectResult)
- go func() {
- ch <- &plcConnectionConnectResult{err: errors.Wrap(err, "error parsing connection string")}
- }()
+ ch := make(chan PlcConnectionConnectResult, 1)
+ ch <- &plcConnectionConnectResult{err: errors.Wrap(err, "error parsing connection string")}
return ch
}
log.Debug().Stringer("connectionUrl", connectionUrl).Msg("parsed connection URL")
@@ -219,10 +217,8 @@ func (m *plcDriverManger) GetConnection(connectionString string) <-chan PlcConne
driver, err := m.GetDriver(driverName)
if err != nil {
log.Err(err).Str("driverName", driverName).Msgf("Couldn't get driver for %s", driverName)
- ch := make(chan PlcConnectionConnectResult)
- go func() {
- ch <- &plcConnectionConnectResult{err: errors.Wrap(err, "error getting driver for connection string")}
- }()
+ ch := make(chan PlcConnectionConnectResult, 1)
+ ch <- &plcConnectionConnectResult{err: errors.Wrap(err, "error getting driver for connection string")}
return ch
}
log.Debug().Stringer("connectionUrl", connectionUrl).Msgf("got driver %s", driver.GetProtocolName())
@@ -237,10 +233,8 @@ func (m *plcDriverManger) GetConnection(connectionString string) <-chan PlcConne
connectionUrl, err := url.Parse(connectionUrl.Opaque)
if err != nil {
log.Err(err).Str("connectionUrl.Opaque", connectionUrl.Opaque).Msg("Couldn't get transport due to parsing error")
- ch := make(chan PlcConnectionConnectResult)
- go func() {
- ch <- &plcConnectionConnectResult{err: errors.Wrap(err, "error parsing connection string")}
- }()
+ ch := make(chan PlcConnectionConnectResult, 1)
+ ch <- &plcConnectionConnectResult{err: errors.Wrap(err, "error parsing connection string")}
return ch
}
transportName = connectionUrl.Scheme
@@ -260,10 +254,8 @@ func (m *plcDriverManger) GetConnection(connectionString string) <-chan PlcConne
// If no transport has been specified explicitly or per default, we have to abort.
if transportName == "" {
log.Error().Msg("got a empty transport")
- ch := make(chan PlcConnectionConnectResult)
- go func() {
- ch <- &plcConnectionConnectResult{err: errors.New("no transport specified and no default defined by driver")}
- }()
+ ch := make(chan PlcConnectionConnectResult, 1)
+ ch <- &plcConnectionConnectResult{err: errors.New("no transport specified and no default defined by driver")}
return ch
}
diff --git a/plc4go/spi/default/DefaultConnection.go b/plc4go/spi/default/DefaultConnection.go
index 7ce2a07cdf..c0505bcea4 100644
--- a/plc4go/spi/default/DefaultConnection.go
+++ b/plc4go/spi/default/DefaultConnection.go
@@ -236,6 +236,11 @@ func (d *defaultConnection) ConnectWithContext(ctx context.Context) <-chan plc4g
log.Trace().Msg("Connecting")
ch := make(chan plc4go.PlcConnectionConnectResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ ch <- NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
err := d.GetMessageCodec().ConnectWithContext(ctx)
d.SetConnected(true)
connection := d.GetConnection()
@@ -268,10 +273,8 @@ func (d *defaultConnection) Close() <-chan plc4go.PlcConnectionCloseResult {
}
err := d.GetTransportInstance().Close()
d.SetConnected(false)
- ch := make(chan plc4go.PlcConnectionCloseResult)
- go func() {
- ch <- NewDefaultPlcConnectionCloseResult(d.GetConnection(), err)
- }()
+ ch := make(chan plc4go.PlcConnectionCloseResult, 1)
+ ch <- NewDefaultPlcConnectionCloseResult(d.GetConnection(), err)
return ch
}
@@ -283,6 +286,11 @@ func (d *defaultConnection) IsConnected() bool {
func (d *defaultConnection) Ping() <-chan plc4go.PlcConnectionPingResult {
ch := make(chan plc4go.PlcConnectionPingResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ ch <- NewDefaultPlcConnectionPingResult(errors.Errorf("panic-ed %v", err))
+ }
+ }()
if d.GetConnection().IsConnected() {
ch <- NewDefaultPlcConnectionPingResult(nil)
} else {
diff --git a/plc4go/spi/model/DefaultPlcReadRequest.go b/plc4go/spi/model/DefaultPlcReadRequest.go
index 13bb9e2433..67775e8c9f 100644
--- a/plc4go/spi/model/DefaultPlcReadRequest.go
+++ b/plc4go/spi/model/DefaultPlcReadRequest.go
@@ -138,6 +138,11 @@ func (d *DefaultPlcReadRequest) ExecuteWithContext(ctx context.Context) <-chan a
// Create a new result-channel, which completes as soon as all sub-result-channels have returned
resultChannel := make(chan apiModel.PlcReadRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ resultChannel <- NewDefaultPlcReadRequestResult(d, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
var subResults []apiModel.PlcReadRequestResult
// Iterate over all sub-results
for _, subResultChannel := range subResultChannels {
diff --git a/plc4go/spi/model/DefaultPlcWriteRequest.go b/plc4go/spi/model/DefaultPlcWriteRequest.go
index 34efb1b1c5..9e846e10d0 100644
--- a/plc4go/spi/model/DefaultPlcWriteRequest.go
+++ b/plc4go/spi/model/DefaultPlcWriteRequest.go
@@ -157,6 +157,11 @@ func (d *DefaultPlcWriteRequest) ExecuteWithContext(ctx context.Context) <-chan
// Create a new result-channel, which completes as soon as all sub-result-channels have returned
resultChannel := make(chan apiModel.PlcWriteRequestResult)
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ resultChannel <- NewDefaultPlcWriteRequestResult(d, nil, errors.Errorf("panic-ed %v", err))
+ }
+ }()
var subResults []apiModel.PlcWriteRequestResult
// Iterate over all sub-results
for _, subResultChannel := range subResultChannels {
diff --git a/plc4go/spi/transports/pcap/Transport.go b/plc4go/spi/transports/pcap/Transport.go
index 98c1cd6fee..8bc986a28a 100644
--- a/plc4go/spi/transports/pcap/Transport.go
+++ b/plc4go/spi/transports/pcap/Transport.go
@@ -130,6 +130,11 @@ func (m *TransportInstance) Connect() error {
m.reader = bufio.NewReader(buffer)
go func(m *TransportInstance, buffer *bytes.Buffer) {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
packageCount := 0
var lastPacketTime *time.Time
for m.connected {
diff --git a/plc4go/spi/utils/Net.go b/plc4go/spi/utils/Net.go
index a166a21213..59b41e1ad0 100644
--- a/plc4go/spi/utils/Net.go
+++ b/plc4go/spi/utils/Net.go
@@ -40,6 +40,11 @@ func GetIPAddresses(ctx context.Context, netInterface net.Interface, useArpBased
return nil, errors.Wrap(err, "Error getting addresses")
}
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
wg := &sync.WaitGroup{}
for _, address := range addrs {
// Check if context has been cancelled before continuing
@@ -120,6 +125,11 @@ func lockupIpsUsingArp(ctx context.Context, netInterface net.Interface, ipNet *n
wg.Add(1)
// Handler for processing incoming ARP responses.
go func(handle *pcap.Handle, iface net.Interface, stop chan struct{}) {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
src := gopacket.NewPacketSource(handle, layers.LayerTypeEthernet)
in := src.Packets()
for {
diff --git a/plc4go/spi/utils/WorkerPool.go b/plc4go/spi/utils/WorkerPool.go
index 8576bb7516..781b10b433 100644
--- a/plc4go/spi/utils/WorkerPool.go
+++ b/plc4go/spi/utils/WorkerPool.go
@@ -157,6 +157,11 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
mutex := sync.Mutex{}
// Worker spawner
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
for {
time.Sleep(100 * time.Millisecond)
mutex.Lock()
@@ -174,6 +179,11 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
}()
// Worker killer
go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
for {
time.Sleep(5 * time.Second)
mutex.Lock()