You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/18 14:51:08 UTC

[plc4x] branch develop updated (96a452e4d -> 4d22dd159)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from 96a452e4d feat(plc4go/cbus): implemented value handler for writes
     new 6906355d7 feat(plc4go/cbus): added some more dummy implementations for value types
     new 4d22dd159 fix(plc4go): fixed some timer leaks

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/bacnetip/Connection.go             |   5 +-
 plc4go/internal/cbus/Connection.go                 |   9 +-
 plc4go/internal/cbus/ValueHandler.go               | 223 +++++++++++++++++++--
 plc4go/internal/simulated/Connection_test.go       |  17 +-
 plc4go/internal/simulated/Driver_test.go           |   5 +-
 plc4go/internal/simulated/Reader_test.go           |   5 +-
 plc4go/internal/simulated/Writer_test.go           |   5 +-
 plc4go/pkg/api/cache/plc_connection_cache.go       |  25 ++-
 plc4go/spi/utils/{MultiError.go => Errors.go}      |  26 +++
 plc4go/spi/utils/{Utils.go => Misc.go}             |  35 +---
 plc4go/spi/utils/{net.go => Net.go}                |  10 +-
 .../tests/drivers/tests/manual_cbus_driver_test.go |   6 +-
 .../cbus/c-bus_air_conditioning_application.mspec  |   8 +-
 13 files changed, 315 insertions(+), 64 deletions(-)
 rename plc4go/spi/utils/{MultiError.go => Errors.go} (76%)
 rename plc4go/spi/utils/{Utils.go => Misc.go} (70%)
 rename plc4go/spi/utils/{net.go => Net.go} (97%)


[plc4x] 01/02: feat(plc4go/cbus): added some more dummy implementations for value types

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 6906355d798538ddaaa35ddd309abb8dc275949a
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Aug 18 16:23:59 2022 +0200

    feat(plc4go/cbus): added some more dummy implementations for value types
---
 plc4go/internal/cbus/ValueHandler.go               | 223 +++++++++++++++++++--
 .../cbus/c-bus_air_conditioning_application.mspec  |   8 +-
 2 files changed, 211 insertions(+), 20 deletions(-)

diff --git a/plc4go/internal/cbus/ValueHandler.go b/plc4go/internal/cbus/ValueHandler.go
index 7524a71ae..ba999cc88 100644
--- a/plc4go/internal/cbus/ValueHandler.go
+++ b/plc4go/internal/cbus/ValueHandler.go
@@ -43,7 +43,7 @@ func (m ValueHandler) NewPlcValue(field apiModel.PlcField, value interface{}) (a
 		CAL_IDENTIFY_REPLY.GetName(),
 		CAL_STATUS.GetName(),
 		CAL_STATUS_EXTENDED.GetName():
-		panic("implement me")
+		panic("Implement me") //TODO: implement me
 	case SAL.GetName():
 		var curValues []any
 		if field.GetQuantity() > 1 {
@@ -63,7 +63,11 @@ func (m ValueHandler) NewPlcValue(field apiModel.PlcField, value interface{}) (a
 		salCommand := field.salCommand
 		switch field.application.ApplicationId() {
 		case readWriteModel.ApplicationId_FREE_USAGE:
-			panic("Not yet implemented") // TODO: implement
+			switch salCommand {
+			// TODO:implement
+			default:
+				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
+			}
 		case readWriteModel.ApplicationId_TEMPERATURE_BROADCAST:
 			switch salCommand {
 			case readWriteModel.TemperatureBroadcastCommandType_BROADCAST_EVENT.PLC4XEnumName():
@@ -83,7 +87,11 @@ func (m ValueHandler) NewPlcValue(field apiModel.PlcField, value interface{}) (a
 				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
 			}
 		case readWriteModel.ApplicationId_ROOM_CONTROL_SYSTEM:
-			panic("Implement me")
+			switch salCommand {
+			// TODO:implement
+			default:
+				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
+			}
 		case readWriteModel.ApplicationId_LIGHTING,
 			readWriteModel.ApplicationId_VENTILATION,
 			readWriteModel.ApplicationId_IRRIGATION_CONTROL,
@@ -133,34 +141,217 @@ func (m ValueHandler) NewPlcValue(field apiModel.PlcField, value interface{}) (a
 				}
 				return group, nil
 			case readWriteModel.LightingCommandType_LABEL.PLC4XEnumName():
-				panic("Implement me")
+				panic("Implement me") //TODO: implement me
 			default:
 				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
 			}
 		case readWriteModel.ApplicationId_AIR_CONDITIONING:
-			panic("Implement me")
+			switch salCommand {
+			case readWriteModel.AirConditioningCommandType_SET_ZONE_GROUP_OFF.PLC4XEnumName():
+				zoneGroup, err := m.IEC61131ValueHandler.NewPlcValueFromType(spiValues.IEC61131_BYTE, curValues[0])
+				if err != nil {
+					return nil, errors.Wrap(err, "error creating value for zoneGroup")
+				}
+				return zoneGroup, nil
+			case readWriteModel.AirConditioningCommandType_ZONE_HVAC_PLANT_STATUS.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.AirConditioningCommandType_ZONE_HUMIDITY_PLANT_STATUS.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.AirConditioningCommandType_ZONE_TEMPERATURE.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.AirConditioningCommandType_ZONE_HUMIDITY.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.AirConditioningCommandType_REFRESH.PLC4XEnumName():
+				zoneGroup, err := m.IEC61131ValueHandler.NewPlcValueFromType(spiValues.IEC61131_BYTE, curValues[0])
+				if err != nil {
+					return nil, errors.Wrap(err, "error creating value for zoneGroup")
+				}
+				return zoneGroup, nil
+			case readWriteModel.AirConditioningCommandType_SET_ZONE_HVAC_MODE.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.AirConditioningCommandType_SET_PLANT_HVAC_LEVEL.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.AirConditioningCommandType_SET_ZONE_HUMIDITY_MODE.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.AirConditioningCommandType_SET_PLANT_HUMIDITY_LEVEL.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.AirConditioningCommandType_SET_HVAC_UPPER_GUARD_LIMIT.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.AirConditioningCommandType_SET_HVAC_LOWER_GUARD_LIMIT.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.AirConditioningCommandType_SET_HVAC_SETBACK_LIMIT.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.AirConditioningCommandType_SET_HUMIDITY_UPPER_GUARD_LIMIT.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.AirConditioningCommandType_SET_HUMIDITY_LOWER_GUARD_LIMIT.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.AirConditioningCommandType_SET_ZONE_GROUP_ON.PLC4XEnumName():
+				zoneGroup, err := m.IEC61131ValueHandler.NewPlcValueFromType(spiValues.IEC61131_BYTE, curValues[0])
+				if err != nil {
+					return nil, errors.Wrap(err, "error creating value for zoneGroup")
+				}
+				return zoneGroup, nil
+			case readWriteModel.AirConditioningCommandType_SET_HUMIDITY_SETBACK_LIMIT.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.AirConditioningCommandType_HVAC_SCHEDULE_ENTRY.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.AirConditioningCommandType_HUMIDITY_SCHEDULE_ENTRY.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			default:
+				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
+			}
 		case readWriteModel.ApplicationId_TRIGGER_CONTROL:
-			panic("Implement me")
+			switch salCommand {
+			case readWriteModel.TriggerControlCommandType_TRIGGER_EVENT.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.TriggerControlCommandType_TRIGGER_MIN.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.TriggerControlCommandType_TRIGGER_MAX.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.TriggerControlCommandType_INDICATOR_KILL.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			case readWriteModel.TriggerControlCommandType_LABEL.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			default:
+				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
+			}
 		case readWriteModel.ApplicationId_ENABLE_CONTROL:
-			panic("Implement me")
+			switch salCommand {
+			case readWriteModel.EnableControlCommandType_SET_NETWORK_VARIABLE.PLC4XEnumName():
+				panic("Implement me") //TODO: implement me
+			default:
+				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
+			}
 		case readWriteModel.ApplicationId_SECURITY:
-			panic("Implement me")
+			switch salCommand {
+			case readWriteModel.SecurityCommandType_OFF.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			case readWriteModel.SecurityCommandType_ON.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			case readWriteModel.SecurityCommandType_EVENT.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			default:
+				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
+			}
 		case readWriteModel.ApplicationId_METERING:
-			panic("Implement me")
+			switch salCommand {
+			case readWriteModel.MeteringCommandType_EVENT.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			default:
+				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
+			}
 		case readWriteModel.ApplicationId_ACCESS_CONTROL:
-			panic("Implement me")
+			switch salCommand {
+			case readWriteModel.AccessControlCommandType_CLOSE_ACCESS_POINT.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			case readWriteModel.AccessControlCommandType_LOCK_ACCESS_POINT.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			case readWriteModel.AccessControlCommandType_ACCESS_POINT_LEFT_OPEN.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			case readWriteModel.AccessControlCommandType_ACCESS_POINT_FORCED_OPEN.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			case readWriteModel.AccessControlCommandType_ACCESS_POINT_CLOSED.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			case readWriteModel.AccessControlCommandType_REQUEST_TO_EXIT.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			case readWriteModel.AccessControlCommandType_VALID_ACCESS.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			case readWriteModel.AccessControlCommandType_INVALID_ACCESS.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			default:
+				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
+			}
 		case readWriteModel.ApplicationId_CLOCK_AND_TIMEKEEPING:
-			panic("Implement me")
+			switch salCommand {
+			case readWriteModel.ClockAndTimekeepingCommandType_UPDATE_NETWORK_VARIABLE.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			case readWriteModel.ClockAndTimekeepingCommandType_REQUEST_REFRESH.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			default:
+				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
+			}
 		case readWriteModel.ApplicationId_TELEPHONY_STATUS_AND_CONTROL:
-			panic("Implement me")
+			switch salCommand {
+			case readWriteModel.TelephonyCommandType_EVENT.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			default:
+				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
+			}
 		case readWriteModel.ApplicationId_MEASUREMENT:
-			panic("Implement me")
+			switch salCommand {
+			case readWriteModel.MeasurementCommandType_MEASUREMENT_EVENT.PLC4XEnumName():
+				panic("Implement me") // TODO: implement
+			default:
+				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
+			}
 		case readWriteModel.ApplicationId_TESTING:
-			panic("Implement me")
+			switch salCommand {
+			// TODO:implement
+			default:
+				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
+			}
 		case readWriteModel.ApplicationId_MEDIA_TRANSPORT_CONTROL:
-			panic("Implement me")
+			switch salCommand {
+			case readWriteModel.MediaTransportControlCommandType_STOP.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_PLAY.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_PAUSE_RESUME.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_SELECT_CATEGORY.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_SELECT_SELECTION.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_SELECT_TRACK.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_SHUFFLE_ON_OFF.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_REPEAT_ON_OFF.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_NEXT_PREVIOUS_CATEGORY.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_NEXT_PREVIOUS_SELECTION.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_NEXT_PREVIOUS_TRACK.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_FAST_FORWARD.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_REWIND.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_SOURCE_POWER_CONTROL.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_TOTAL_TRACKS.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_STATUS_REQUEST.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_ENUMERATE_CATEGORIES_SELECTIONS_TRACKS.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_ENUMERATION_SIZE.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_TRACK_NAME.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_SELECTION_NAME.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_CATEGORY_NAME.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.MediaTransportControlCommandType_FAST_FORWARD.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			default:
+				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
+			}
 		case readWriteModel.ApplicationId_ERROR_REPORTING:
-			panic("Implement me")
+			switch salCommand {
+			case readWriteModel.ErrorReportingCommandType_DEPRECATED.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.ErrorReportingCommandType_ERROR_REPORT.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.ErrorReportingCommandType_ACKNOWLEDGE.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			case readWriteModel.ErrorReportingCommandType_CLEAR_MOST_SEVERE.PLC4XEnumName():
+				panic("Implement me") // TODO: implement me
+			default:
+				return nil, errors.Errorf("Unsupported command %s for %s", salCommand, field.application.ApplicationId())
+			}
 		default:
 			return nil, errors.Errorf("No support for %s", field.application)
 		}
diff --git a/protocols/c-bus/src/main/resources/protocols/cbus/c-bus_air_conditioning_application.mspec b/protocols/c-bus/src/main/resources/protocols/cbus/c-bus_air_conditioning_application.mspec
index de0276110..e66664f7e 100644
--- a/protocols/c-bus/src/main/resources/protocols/cbus/c-bus_air_conditioning_application.mspec
+++ b/protocols/c-bus/src/main/resources/protocols/cbus/c-bus_air_conditioning_application.mspec
@@ -30,8 +30,8 @@
             [simple   byte                format              ]
             [simple   HVACModeAndFlags    hvacModeAndFlags    ]
             [simple   HVACStartTime       startTime           ]
-            [optional HVACTemperature     level 'hvacModeAndFlags.isLevelTemperature'   ]
-            [optional HVACRawLevels       rawLevel 'hvacModeAndFlags.isLevelRaw'        ]
+            [optional HVACTemperature     level               'hvacModeAndFlags.isLevelTemperature'   ]
+            [optional HVACRawLevels       rawLevel            'hvacModeAndFlags.isLevelRaw'           ]
         ]
         ['HUMIDITY_SCHEDULE_ENTRY'          *HumidityScheduleEntry
             [simple   byte                      zoneGroup               ]
@@ -40,8 +40,8 @@
             [simple   byte                      format                  ]
             [simple   HVACHumidityModeAndFlags  humidityModeAndFlags    ]
             [simple   HVACStartTime             startTime               ]
-            [optional HVACHumidity              level    'humidityModeAndFlags.isLevelHumidity'   ]
-            [optional HVACRawLevels             rawLevel 'humidityModeAndFlags.isLevelRaw'        ]
+            [optional HVACHumidity              level                   'humidityModeAndFlags.isLevelHumidity'   ]
+            [optional HVACRawLevels             rawLevel                'humidityModeAndFlags.isLevelRaw'        ]
         ]
         ['REFRESH'                          *Refresh
             [simple   byte                      zoneGroup               ]


[plc4x] 02/02: fix(plc4go): fixed some timer leaks

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 4d22dd159756d97a75592b30adca8f63dff84083
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Aug 18 16:51:01 2022 +0200

    fix(plc4go): fixed some timer leaks
---
 plc4go/internal/bacnetip/Connection.go             |  5 +++-
 plc4go/internal/cbus/Connection.go                 |  9 ++++--
 plc4go/internal/simulated/Connection_test.go       | 17 ++++++++---
 plc4go/internal/simulated/Driver_test.go           |  5 +++-
 plc4go/internal/simulated/Reader_test.go           |  5 +++-
 plc4go/internal/simulated/Writer_test.go           |  5 +++-
 plc4go/pkg/api/cache/plc_connection_cache.go       | 25 ++++++++++++----
 plc4go/spi/utils/{MultiError.go => Errors.go}      | 26 ++++++++++++++++
 plc4go/spi/utils/{Utils.go => Misc.go}             | 35 +++++++---------------
 plc4go/spi/utils/{net.go => Net.go}                | 10 +++++--
 .../tests/drivers/tests/manual_cbus_driver_test.go |  6 ++--
 11 files changed, 104 insertions(+), 44 deletions(-)

diff --git a/plc4go/internal/bacnetip/Connection.go b/plc4go/internal/bacnetip/Connection.go
index 33af188a1..d5c583ca9 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -26,6 +26,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/spi/default"
 	internalModel "github.com/apache/plc4x/plc4go/spi/model"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/rs/zerolog/log"
 	"sync"
 	"time"
@@ -81,11 +82,13 @@ func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 			for c.IsConnected() {
 				log.Debug().Msg("Polling data")
 				incomingMessageChannel := c.messageCodec.GetDefaultIncomingMessageChannel()
+				timeout := time.NewTimer(20 * time.Millisecond)
 				select {
 				case message := <-incomingMessageChannel:
 					// TODO: implement mapping to subscribers
 					log.Info().Msgf("Received \n%v", message)
-				case <-time.After(20 * time.Millisecond):
+					utils.CleanupTimer(timeout)
+				case <-timeout.C:
 				}
 			}
 			log.Info().Msg("Ending incoming message transfer")
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index f339eed62..567a689fa 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -28,6 +28,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/spi/default"
 	internalModel "github.com/apache/plc4x/plc4go/spi/model"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
 	"sync"
@@ -294,6 +295,8 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
 	}
 
 	startTime := time.Now()
+	timeout := time.NewTimer(time.Millisecond * 500)
+	defer utils.CleanupTimer(timeout)
 	select {
 	case <-receivedResetEchoChan:
 		log.Debug().Msgf("We received the echo")
@@ -304,7 +307,7 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
 			log.Trace().Err(err).Msg("connect failed")
 		}
 		return false
-	case timeout := <-time.After(time.Millisecond * 500):
+	case timeout := <-timeout.C:
 		if sendOutErrorNotification {
 			c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
 		} else {
@@ -430,13 +433,15 @@ func (c *Connection) sendCalDataWrite(ctx context.Context, ch chan plc4go.PlcCon
 	}
 
 	startTime := time.Now()
+	timeout := time.NewTimer(time.Second * 2)
+	defer utils.CleanupTimer(timeout)
 	select {
 	case <-directCommandAckChan:
 		log.Debug().Msgf("We received the ack")
 	case err := <-directCommandAckErrorChan:
 		c.fireConnectionError(errors.Wrap(err, "Error receiving of ack"), ch)
 		return false
-	case timeout := <-time.After(time.Second * 2):
+	case timeout := <-timeout.C:
 		c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
 		return false
 	}
diff --git a/plc4go/internal/simulated/Connection_test.go b/plc4go/internal/simulated/Connection_test.go
index 57cbf2d26..ff45723f2 100644
--- a/plc4go/internal/simulated/Connection_test.go
+++ b/plc4go/internal/simulated/Connection_test.go
@@ -25,6 +25,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi"
 	_default "github.com/apache/plc4x/plc4go/spi/default"
 	internalModel "github.com/apache/plc4x/plc4go/spi/model"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	"reflect"
 	"testing"
 	"time"
@@ -122,6 +123,8 @@ func TestConnection_Connect(t *testing.T) {
 			}
 			timeBeforeConnect := time.Now()
 			connectionChan := c.Connect()
+			timeout := time.NewTimer(3 * time.Second)
+			defer utils.CleanupTimer(timeout)
 			select {
 			case connectResult := <-connectionChan:
 				timeAfterConnect := time.Now()
@@ -142,7 +145,7 @@ func TestConnection_Connect(t *testing.T) {
 						t.Errorf("TestConnection.Connect() = %v, want %v", connectResult, tt.want)
 					}
 				}
-			case <-time.After(3 * time.Second):
+			case <-timeout.C:
 				t.Errorf("TestConnection.Connect() got timeout")
 			}
 		})
@@ -237,6 +240,8 @@ func TestConnection_Close(t *testing.T) {
 			}
 			timeBeforeClose := time.Now()
 			closeChan := c.Close()
+			timeout := time.NewTimer(3 * time.Second)
+			defer utils.CleanupTimer(timeout)
 			select {
 			case closeResult := <-closeChan:
 				timeAfterClose := time.Now()
@@ -256,7 +261,7 @@ func TestConnection_Close(t *testing.T) {
 						t.Errorf("TestConnection.Close() = %v, want %v", closeResult, tt.want)
 					}
 				}
-			case <-time.After(3 * time.Second):
+			case <-timeout.C:
 				t.Errorf("TestConnection.Close() got timeout")
 			}
 		})
@@ -330,6 +335,8 @@ func TestConnection_BlockingClose(t *testing.T) {
 				}()
 				return ch
 			}
+			timeout := time.NewTimer(3 * time.Second)
+			defer utils.CleanupTimer(timeout)
 			select {
 			case <-executor():
 				timeAfterClose := time.Now()
@@ -341,7 +348,7 @@ func TestConnection_BlockingClose(t *testing.T) {
 						t.Errorf("TestConnection.Close() connected too fast. Expected at least %v but connected after %v", tt.delayAtLeast, connectionTime)
 					}
 				}
-			case <-time.After(3 * time.Second):
+			case <-timeout.C:
 				t.Errorf("TestConnection.Close() got timeout")
 			}
 		})
@@ -499,6 +506,8 @@ func TestConnection_Ping(t *testing.T) {
 			}
 			timeBeforePing := time.Now()
 			pingChan := c.Ping()
+			timeout := time.NewTimer(3 * time.Second)
+			defer utils.CleanupTimer(timeout)
 			select {
 			case pingResult := <-pingChan:
 				timeAfterPing := time.Now()
@@ -513,7 +522,7 @@ func TestConnection_Ping(t *testing.T) {
 				if !reflect.DeepEqual(pingResult, tt.want) {
 					t.Errorf("TestConnection.Ping() = %v, want %v", pingResult, tt.want)
 				}
-			case <-time.After(3 * time.Second):
+			case <-timeout.C:
 				t.Errorf("TestConnection.Ping() got timeout")
 			}
 		})
diff --git a/plc4go/internal/simulated/Driver_test.go b/plc4go/internal/simulated/Driver_test.go
index ad34db3ec..92b0b5ae2 100644
--- a/plc4go/internal/simulated/Driver_test.go
+++ b/plc4go/internal/simulated/Driver_test.go
@@ -21,6 +21,7 @@ package simulated
 
 import (
 	"github.com/apache/plc4x/plc4go/spi/transports"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	"net/url"
 	"testing"
 	"time"
@@ -91,6 +92,8 @@ func TestDriver_GetConnection(t *testing.T) {
 		t.Run(tt.name, func(t *testing.T) {
 			d := NewDriver()
 			connectionChan := d.GetConnection(tt.args.in0, tt.args.in1, tt.args.options)
+			timeout := time.NewTimer(3 * time.Second)
+			defer utils.CleanupTimer(timeout)
 			select {
 			case connectResult := <-connectionChan:
 				if tt.wantErr && (connectResult.GetErr() == nil) {
@@ -98,7 +101,7 @@ func TestDriver_GetConnection(t *testing.T) {
 				} else if connectResult.GetErr() != nil {
 					t.Errorf("PlcConnectionPool.GetConnection() error = %v, wantErr %v", connectResult.GetErr(), tt.wantErr)
 				}
-			case <-time.After(3 * time.Second):
+			case <-timeout.C:
 				t.Errorf("PlcConnectionPool.GetConnection() got timeout")
 			}
 		})
diff --git a/plc4go/internal/simulated/Reader_test.go b/plc4go/internal/simulated/Reader_test.go
index f5331b627..f8fab660c 100644
--- a/plc4go/internal/simulated/Reader_test.go
+++ b/plc4go/internal/simulated/Reader_test.go
@@ -27,6 +27,7 @@ import (
 	model4 "github.com/apache/plc4x/plc4go/protocols/s7/readwrite/model"
 	model2 "github.com/apache/plc4x/plc4go/protocols/simulated/readwrite/model"
 	model3 "github.com/apache/plc4x/plc4go/spi/model"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	values2 "github.com/apache/plc4x/plc4go/spi/values"
 	"reflect"
 	"testing"
@@ -163,6 +164,8 @@ func TestReader_Read(t *testing.T) {
 			readRequest := model3.NewDefaultPlcReadRequest(tt.args.fields, tt.args.fieldNames, r, nil)
 			timeBeforeReadRequest := time.Now()
 			readResponseChannel := r.Read(context.TODO(), readRequest)
+			timeout := time.NewTimer(3 * time.Second)
+			defer utils.CleanupTimer(timeout)
 			select {
 			case readResponse := <-readResponseChannel:
 				timeAfterReadRequest := time.Now()
@@ -187,7 +190,7 @@ func TestReader_Read(t *testing.T) {
 							readResponse.GetResponse().GetValue(fieldName), tt.want.GetValue(fieldName))
 					}
 				}
-			case <-time.After(3 * time.Second):
+			case <-timeout.C:
 				t.Errorf("Reader.Read() got timeout")
 			}
 		})
diff --git a/plc4go/internal/simulated/Writer_test.go b/plc4go/internal/simulated/Writer_test.go
index 9291b801c..88c9caf43 100644
--- a/plc4go/internal/simulated/Writer_test.go
+++ b/plc4go/internal/simulated/Writer_test.go
@@ -27,6 +27,7 @@ import (
 	model4 "github.com/apache/plc4x/plc4go/protocols/s7/readwrite/model"
 	model2 "github.com/apache/plc4x/plc4go/protocols/simulated/readwrite/model"
 	model3 "github.com/apache/plc4x/plc4go/spi/model"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	values2 "github.com/apache/plc4x/plc4go/spi/values"
 	"reflect"
 	"testing"
@@ -175,6 +176,8 @@ func TestWriter_Write(t *testing.T) {
 			writeRequest := model3.NewDefaultPlcWriteRequest(tt.args.fields, tt.args.fieldNames, tt.args.values, w, nil)
 			timeBeforeWriteRequest := time.Now()
 			writeResponseChannel := w.Write(context.TODO(), writeRequest)
+			timeout := time.NewTimer(3 * time.Second)
+			defer utils.CleanupTimer(timeout)
 			select {
 			case writeResponse := <-writeResponseChannel:
 				timeAfterWriteRequest := time.Now()
@@ -199,7 +202,7 @@ func TestWriter_Write(t *testing.T) {
 					t.Errorf("Writer.Write() Device State = %v, want %v",
 						tt.fields.device.State, tt.newState)
 				}
-			case <-time.After(3 * time.Second):
+			case <-timeout.C:
 				t.Errorf("Reader.Read() got timeout")
 			}
 		})
diff --git a/plc4go/pkg/api/cache/plc_connection_cache.go b/plc4go/pkg/api/cache/plc_connection_cache.go
index a5bb8edbd..fbaf48aad 100644
--- a/plc4go/pkg/api/cache/plc_connection_cache.go
+++ b/plc4go/pkg/api/cache/plc_connection_cache.go
@@ -25,6 +25,7 @@ import (
 	"github.com/apache/plc4x/plc4go/pkg/api/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	_default "github.com/apache/plc4x/plc4go/spi/default"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
 	"github.com/viney-shih/go-lock"
@@ -117,16 +118,20 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
 			txId = t.tracer.AddTransactionalStartTrace("get-connection", "lease")
 		}
 		leaseChan := connection.lease()
+		maximumWaitTimeout := time.NewTimer(t.maxWaitTime)
+		defer utils.CleanupTimer(maximumWaitTimeout)
 		select {
 		// Wait till we get a lease.
 		case connectionResponse := <-leaseChan:
 			log.Debug().Str("connectionString", connectionString).Msg("Successfully got lease to connection")
+			responseTimeout := time.NewTimer(10 * time.Millisecond)
+			defer utils.CleanupTimer(responseTimeout)
 			select {
 			case ch <- connectionResponse:
 				if t.tracer != nil {
 					t.tracer.AddTransactionalTrace(txId, "get-connection", "success")
 				}
-			case <-time.After(10 * time.Millisecond):
+			case <-responseTimeout.C:
 				// Log a message, that the client has given up
 				if t.tracer != nil {
 					t.tracer.AddTransactionalTrace(txId, "get-connection", "client given up")
@@ -140,7 +145,7 @@ func (t *plcConnectionCache) GetConnection(connectionString string) <-chan plc4g
 			}
 
 		// Timeout after the maximum waiting time.
-		case <-time.After(t.maxWaitTime):
+		case <-maximumWaitTimeout.C:
 			if t.tracer != nil {
 				t.tracer.AddTransactionalTrace(txId, "get-connection", "timeout")
 			}
@@ -161,9 +166,11 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
 		defer t.cacheLock.Unlock()
 
 		if len(t.connections) == 0 {
+			responseDeliveryTimeout := time.NewTimer(10 * time.Millisecond)
+			defer utils.CleanupTimer(responseDeliveryTimeout)
 			select {
 			case ch <- newDefaultPlcConnectionCacheCloseResult(t, nil):
-			case <-time.After(time.Millisecond * 10):
+			case <-responseDeliveryTimeout.C:
 			}
 			log.Debug().Msg("Closing connection cache finished.")
 			return
@@ -176,6 +183,8 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
 			// while some go func is still using it.
 			go func(container *connectionContainer) {
 				leaseResults := container.lease()
+				closeTimeout := time.NewTimer(t.maxWaitTime)
+				defer utils.CleanupTimer(closeTimeout)
 				select {
 				// We're just getting the lease as this way we can be sure nobody else is using it.
 				// We also really don't care if it worked, or not ... it's just an attempt of being
@@ -185,15 +194,17 @@ func (t *plcConnectionCache) Close() <-chan PlcConnectionCacheCloseResult {
 					// Give back the connection.
 					container.connection.Close()
 				// If we're timing out brutally kill the connection.
-				case <-time.After(t.maxWaitTime):
+				case <-closeTimeout.C:
 					log.Debug().Str("connectionString", container.connectionString).Msg("Forcefully closing connection ...")
 					// Forcefully close this connection.
 					container.connection.Close()
 				}
 
+				responseDeliveryTimeout := time.NewTimer(10 * time.Millisecond)
+				defer utils.CleanupTimer(responseDeliveryTimeout)
 				select {
 				case ch <- newDefaultPlcConnectionCacheCloseResult(t, nil):
-				case <-time.After(time.Millisecond * 10):
+				case <-responseDeliveryTimeout.C:
 				}
 				log.Debug().Msg("Closing connection cache finished.")
 			}(cc)
@@ -485,9 +496,11 @@ func (t *plcConnectionLease) Close() <-chan plc4go.PlcConnectionCloseResult {
 		err := t.connectionContainer.returnConnection(newState)
 
 		// Finish closing the connection.
+		timeout := time.NewTimer(10 * time.Millisecond)
+		defer utils.CleanupTimer(timeout)
 		select {
 		case result <- _default.NewDefaultPlcConnectionCloseResultWithTraces(t, err, traces):
-		case <-time.After(time.Millisecond * 10):
+		case <-timeout.C:
 		}
 
 		// Detach the connection from this lease, so it can no longer be used by the client.
diff --git a/plc4go/spi/utils/MultiError.go b/plc4go/spi/utils/Errors.go
similarity index 76%
rename from plc4go/spi/utils/MultiError.go
rename to plc4go/spi/utils/Errors.go
index ba8c1d9e0..03c6e333b 100644
--- a/plc4go/spi/utils/MultiError.go
+++ b/plc4go/spi/utils/Errors.go
@@ -45,3 +45,29 @@ func (m MultiError) Error() string {
 		return result
 	}(m.Errors), "\n")
 }
+
+type ParseAssertError struct {
+	Message string
+}
+
+func (e ParseAssertError) Error() string {
+	return e.Message
+}
+
+func (e ParseAssertError) Is(target error) bool {
+	_, ok := target.(ParseAssertError)
+	return ok
+}
+
+type ParseValidationError struct {
+	Message string
+}
+
+func (e ParseValidationError) Error() string {
+	return e.Message
+}
+
+func (e ParseValidationError) Is(target error) bool {
+	_, ok := target.(ParseValidationError)
+	return ok
+}
diff --git a/plc4go/spi/utils/Utils.go b/plc4go/spi/utils/Misc.go
similarity index 70%
rename from plc4go/spi/utils/Utils.go
rename to plc4go/spi/utils/Misc.go
index 06985728c..55046851e 100644
--- a/plc4go/spi/utils/Utils.go
+++ b/plc4go/spi/utils/Misc.go
@@ -19,6 +19,8 @@
 
 package utils
 
+import "time"
+
 // InlineIf is basically a inline if like construct for golang
 func InlineIf(test bool, a func() interface{}, b func() interface{}) interface{} {
 	if test {
@@ -28,28 +30,13 @@ func InlineIf(test bool, a func() interface{}, b func() interface{}) interface{}
 	}
 }
 
-type ParseAssertError struct {
-	Message string
-}
-
-func (e ParseAssertError) Error() string {
-	return e.Message
-}
-
-func (e ParseAssertError) Is(target error) bool {
-	_, ok := target.(ParseAssertError)
-	return ok
-}
-
-type ParseValidationError struct {
-	Message string
-}
-
-func (e ParseValidationError) Error() string {
-	return e.Message
-}
-
-func (e ParseValidationError) Is(target error) bool {
-	_, ok := target.(ParseValidationError)
-	return ok
+// CleanupTimer stops a timer and purges anything left in the channel
+//              and is safe to call even if the channel has already been received
+func CleanupTimer(timer *time.Timer) {
+	if !timer.Stop() {
+		select {
+		case <-timer.C:
+		default:
+		}
+	}
 }
diff --git a/plc4go/spi/utils/net.go b/plc4go/spi/utils/Net.go
similarity index 97%
rename from plc4go/spi/utils/net.go
rename to plc4go/spi/utils/Net.go
index 476d08bb4..712a1709e 100644
--- a/plc4go/spi/utils/net.go
+++ b/plc4go/spi/utils/Net.go
@@ -143,11 +143,14 @@ func lockupIpsUsingArp(ctx context.Context, netInterface net.Interface, ipNet *n
 				// Schedule a discovery operation for this ip.
 				ip := net.IP(arp.SourceProtAddress)
 				log.Trace().Msgf("Scheduling discovery for IP %s", ip)
+				timeout := time.NewTimer(2 * time.Second)
 				go func(ip net.IP) {
 					select {
 					case <-ctx.Done():
+						CleanupTimer(timeout)
 					case foundIps <- DuplicateIP(ip):
-					case <-time.After(2 * time.Second):
+						CleanupTimer(timeout)
+					case <-timeout.C:
 					}
 				}(DuplicateIP(ip))
 			}
@@ -226,12 +229,15 @@ func lookupIps(ctx context.Context, ipnet *net.IPNet, foundIps chan net.IP, wg *
 		}
 
 		wg.Add(1)
+		timeout := time.NewTimer(2 * time.Second)
 		go func(ip net.IP) {
 			defer func() { wg.Done() }()
 			select {
 			case <-ctx.Done():
+				CleanupTimer(timeout)
 			case foundIps <- ip:
-			case <-time.After(2 * time.Second):
+				CleanupTimer(timeout)
+			case <-timeout.C:
 			}
 		}(DuplicateIP(ip))
 		log.Trace().Stringer("IP", ip).Msg("Expanded CIDR")
diff --git a/plc4go/tests/drivers/tests/manual_cbus_driver_test.go b/plc4go/tests/drivers/tests/manual_cbus_driver_test.go
index 798111bf7..20d25c5e1 100644
--- a/plc4go/tests/drivers/tests/manual_cbus_driver_test.go
+++ b/plc4go/tests/drivers/tests/manual_cbus_driver_test.go
@@ -27,6 +27,7 @@ import (
 	"github.com/apache/plc4x/plc4go/pkg/api/model"
 	"github.com/apache/plc4x/plc4go/pkg/api/transports"
 	"github.com/apache/plc4x/plc4go/spi/testutils"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	_ "github.com/apache/plc4x/plc4go/tests/initializetest"
 	"github.com/rs/zerolog"
 	"github.com/rs/zerolog/log"
@@ -73,13 +74,14 @@ func TestManualCBusDriver(t *testing.T) {
 			Build()
 		require.NoError(t, err)
 		subscriptionRequest.Execute()
-		timeout := time.After(30 * time.Second)
+		timeout := time.NewTimer(30 * time.Second)
+		defer utils.CleanupTimer(timeout)
 		// We expect couple monitors
 		monitorCount := 0
 	waitingForMonitors:
 		for {
 			select {
-			case at := <-timeout:
+			case at := <-timeout.C:
 				t.Errorf("timeout at %s", at)
 				break waitingForMonitors
 			case <-gotMonitor: