You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/02 14:03:53 UTC
[plc4x] 01/02: fix(plc4go): transaction should now be properly handled
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 25480b1d22a08f863ba15383d364e2b29605e35c
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 2 15:17:00 2023 +0200
fix(plc4go): transaction should now be properly handled
---
plc4go/internal/bacnetip/Connection.go | 2 +-
plc4go/internal/bacnetip/Reader.go | 12 ++++++++++--
plc4go/internal/cbus/Reader.go | 4 ----
plc4go/internal/cbus/Writer.go | 4 +++-
plc4go/internal/eip/Reader.go | 4 +++-
plc4go/internal/eip/Writer.go | 8 ++++++--
plc4go/internal/s7/Reader.go | 4 +++-
plc4go/internal/s7/Writer.go | 4 +++-
8 files changed, 29 insertions(+), 13 deletions(-)
diff --git a/plc4go/internal/bacnetip/Connection.go b/plc4go/internal/bacnetip/Connection.go
index 516c4b1fe5..5c19e558d9 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -127,7 +127,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
}
func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
- return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.invokeIdGenerator, c.messageCodec, c.tm))
+ return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.invokeIdGenerator, c.messageCodec, c.tm, options.WithCustomLogger(c.log)))
}
func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
diff --git a/plc4go/internal/bacnetip/Reader.go b/plc4go/internal/bacnetip/Reader.go
index e8e1f5b1cc..1ee5239bc9 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -22,7 +22,9 @@ package bacnetip
import (
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transactions"
+ "github.com/rs/zerolog"
"time"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -42,9 +44,11 @@ type Reader struct {
maxSegmentsAccepted readWriteModel.MaxSegmentsAccepted
maxApduLengthAccepted readWriteModel.MaxApduLengthAccepted
+
+ log zerolog.Logger
}
-func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager) *Reader {
+func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager, _options ...options.WithOption) *Reader {
return &Reader{
invokeIdGenerator: invokeIdGenerator,
messageCodec: messageCodec,
@@ -52,6 +56,8 @@ func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCod
maxSegmentsAccepted: readWriteModel.MaxSegmentsAccepted_MORE_THAN_64_SEGMENTS,
maxApduLengthAccepted: readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1476,
+
+ log: options.ExtractCustomLogger(_options...),
}
}
@@ -190,7 +196,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
nil,
errors.Wrap(err, "error sending message"),
)
- _ = transaction.EndRequest()
+ if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+ m.log.Debug().Err(err).Msg("Error failing request")
+ }
}
})
}()
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 2382a20e1b..e57f344224 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -156,10 +156,6 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac
expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
return actualAlpha == expectedAlpha
}, func(receivedMessage spi.Message) error {
- defer func(transaction transactions.RequestTransaction) {
- // This is just to make sure we don't forget to close the transaction here
- _ = transaction.EndRequest()
- }(transaction)
// Convert the response into an
m.log.Trace().Msg("convert response to ")
messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go
index 35b13f71dc..8dbaf855fe 100644
--- a/plc4go/internal/cbus/Writer.go
+++ b/plc4go/internal/cbus/Writer.go
@@ -144,7 +144,9 @@ func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteReques
}, time.Second*1); err != nil {
m.log.Debug().Err(err).Msgf("Error sending message for tag %s", tagNameCopy)
addResponseCode(tagNameCopy, apiModel.PlcResponseCode_INTERNAL_ERROR)
- _ = transaction.EndRequest()
+ if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+ m.log.Debug().Err(err).Msg("Error failing request")
+ }
}
})
}
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 61b5503c6f..a38a0d2ce0 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -141,7 +141,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
nil,
errors.Wrap(err, "error sending message"),
)
- _ = transaction.EndRequest()
+ if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+ m.log.Debug().Err(err).Msg("Error failing request")
+ }
}
})
}
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index 52cef6d3f3..b04eb3c61d 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -172,7 +172,9 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
return transaction.EndRequest()
}, time.Second*1); err != nil {
result <- spiModel.NewDefaultPlcWriteRequestResult( writeRequest, nil, errors.Wrap(err, "error sending message"))
- _ = transaction.EndRequest()
+ if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+ m.log.Debug().Err(err).Msg("Error failing request")
+ }
}
})
} else {
@@ -263,7 +265,9 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
return transaction.EndRequest()
}, time.Second*1); err != nil {
result <- spiModel.NewDefaultPlcWriteRequestResult( writeRequest, nil, errors.Wrap(err, "error sending message"))
- _ = transaction.EndRequest()
+ if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+ m.log.Debug().Err(err).Msg("Error failing request")
+ }
}
})
}*/
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index 7b34d87748..59ea75bc28 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -161,7 +161,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
nil,
errors.Wrap(err, "error sending message"),
)
- _ = transaction.EndRequest()
+ if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+ m.log.Debug().Err(err).Msg("Error failing request")
+ }
}
})
}()
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index bb67795494..22761cd09a 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -149,7 +149,9 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
return transaction.EndRequest()
}, time.Second*1); err != nil {
result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Wrap(err, "error sending message"))
- _ = transaction.EndRequest()
+ if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+ m.log.Debug().Err(err).Msg("Error failing request")
+ }
}
})
}()