You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/02 14:03:53 UTC

[plc4x] 01/02: fix(plc4go): transaction should now be properly handled

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 25480b1d22a08f863ba15383d364e2b29605e35c
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 2 15:17:00 2023 +0200

    fix(plc4go): transaction should now be properly handled
---
 plc4go/internal/bacnetip/Connection.go |  2 +-
 plc4go/internal/bacnetip/Reader.go     | 12 ++++++++++--
 plc4go/internal/cbus/Reader.go         |  4 ----
 plc4go/internal/cbus/Writer.go         |  4 +++-
 plc4go/internal/eip/Reader.go          |  4 +++-
 plc4go/internal/eip/Writer.go          |  8 ++++++--
 plc4go/internal/s7/Reader.go           |  4 +++-
 plc4go/internal/s7/Writer.go           |  4 +++-
 8 files changed, 29 insertions(+), 13 deletions(-)

diff --git a/plc4go/internal/bacnetip/Connection.go b/plc4go/internal/bacnetip/Connection.go
index 516c4b1fe5..5c19e558d9 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -127,7 +127,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
 }
 
 func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
-	return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.invokeIdGenerator, c.messageCodec, c.tm))
+	return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.invokeIdGenerator, c.messageCodec, c.tm, options.WithCustomLogger(c.log)))
 }
 
 func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
diff --git a/plc4go/internal/bacnetip/Reader.go b/plc4go/internal/bacnetip/Reader.go
index e8e1f5b1cc..1ee5239bc9 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -22,7 +22,9 @@ package bacnetip
 import (
 	"context"
 	"fmt"
+	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transactions"
+	"github.com/rs/zerolog"
 	"time"
 
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -42,9 +44,11 @@ type Reader struct {
 
 	maxSegmentsAccepted   readWriteModel.MaxSegmentsAccepted
 	maxApduLengthAccepted readWriteModel.MaxApduLengthAccepted
+
+	log zerolog.Logger
 }
 
-func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager) *Reader {
+func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager, _options ...options.WithOption) *Reader {
 	return &Reader{
 		invokeIdGenerator: invokeIdGenerator,
 		messageCodec:      messageCodec,
@@ -52,6 +56,8 @@ func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCod
 
 		maxSegmentsAccepted:   readWriteModel.MaxSegmentsAccepted_MORE_THAN_64_SEGMENTS,
 		maxApduLengthAccepted: readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1476,
+
+		log: options.ExtractCustomLogger(_options...),
 	}
 }
 
@@ -190,7 +196,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
 					nil,
 					errors.Wrap(err, "error sending message"),
 				)
-				_ = transaction.EndRequest()
+				if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+					m.log.Debug().Err(err).Msg("Error failing request")
+				}
 			}
 		})
 	}()
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 2382a20e1b..e57f344224 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -156,10 +156,6 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac
 		expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
 		return actualAlpha == expectedAlpha
 	}, func(receivedMessage spi.Message) error {
-		defer func(transaction transactions.RequestTransaction) {
-			// This is just to make sure we don't forget to close the transaction here
-			_ = transaction.EndRequest()
-		}(transaction)
 		// Convert the response into an
 		m.log.Trace().Msg("convert response to ")
 		messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go
index 35b13f71dc..8dbaf855fe 100644
--- a/plc4go/internal/cbus/Writer.go
+++ b/plc4go/internal/cbus/Writer.go
@@ -144,7 +144,9 @@ func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteReques
 				}, time.Second*1); err != nil {
 					m.log.Debug().Err(err).Msgf("Error sending message for tag %s", tagNameCopy)
 					addResponseCode(tagNameCopy, apiModel.PlcResponseCode_INTERNAL_ERROR)
-					_ = transaction.EndRequest()
+					if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+						m.log.Debug().Err(err).Msg("Error failing request")
+					}
 				}
 			})
 		}
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 61b5503c6f..a38a0d2ce0 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -141,7 +141,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
 						nil,
 						errors.Wrap(err, "error sending message"),
 					)
-					_ = transaction.EndRequest()
+					if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+						m.log.Debug().Err(err).Msg("Error failing request")
+					}
 				}
 			})
 		}
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index 52cef6d3f3..b04eb3c61d 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -172,7 +172,9 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
 							return transaction.EndRequest()
 						}, time.Second*1); err != nil {
 							result <- spiModel.NewDefaultPlcWriteRequestResult( writeRequest, nil,      errors.Wrap(err, "error sending message"))
-							_ = transaction.EndRequest()
+							if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+								m.log.Debug().Err(err).Msg("Error failing request")
+							}
 						}
 					})
 				} else {
@@ -263,7 +265,9 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
 							return transaction.EndRequest()
 						}, time.Second*1); err != nil {
 							result <- spiModel.NewDefaultPlcWriteRequestResult( writeRequest, nil,      errors.Wrap(err, "error sending message"))
-							_ = transaction.EndRequest()
+							if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+								m.log.Debug().Err(err).Msg("Error failing request")
+							}
 						}
 					})
 				}*/
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index 7b34d87748..59ea75bc28 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -161,7 +161,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
 					nil,
 					errors.Wrap(err, "error sending message"),
 				)
-				_ = transaction.EndRequest()
+				if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+					m.log.Debug().Err(err).Msg("Error failing request")
+				}
 			}
 		})
 	}()
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index bb67795494..22761cd09a 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -149,7 +149,9 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
 				return transaction.EndRequest()
 			}, time.Second*1); err != nil {
 				result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Wrap(err, "error sending message"))
-				_ = transaction.EndRequest()
+				if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+					m.log.Debug().Err(err).Msg("Error failing request")
+				}
 			}
 		})
 	}()