You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/01 17:13:09 UTC

[plc4x] branch develop updated: fix(plc4go/cbus): reworked connection to use ack

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new e5acb84a1 fix(plc4go/cbus): reworked connection to use ack
e5acb84a1 is described below

commit e5acb84a1bb9d6a5be1a2590352ba83920b81a7b
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Aug 1 19:13:01 2022 +0200

    fix(plc4go/cbus): reworked connection to use ack
---
 plc4go/internal/cbus/Configuration.go              |   4 +-
 plc4go/internal/cbus/Connection.go                 | 160 +++++++++++++++++----
 plc4go/internal/cbus/MessageCodec.go               |   3 +-
 plc4go/pkg/api/config/config.go                    |   8 +-
 .../protocols/ads/readwrite/model/AdsConstants.go  | 159 ++++++++++++++++++++
 .../server/cbus/protocol/CBusServerAdapter.java    |  26 ++++
 6 files changed, 329 insertions(+), 31 deletions(-)

diff --git a/plc4go/internal/cbus/Configuration.go b/plc4go/internal/cbus/Configuration.go
index 09f71165e..39f7911ee 100644
--- a/plc4go/internal/cbus/Configuration.go
+++ b/plc4go/internal/cbus/Configuration.go
@@ -31,9 +31,7 @@ type Configuration struct {
 }
 
 func ParseFromOptions(options map[string][]string) (Configuration, error) {
-	configuration := Configuration{
-		srchk: true,
-	}
+	configuration := Configuration{}
 	if srchk := getFromOptions(options, "srchk"); srchk != "" {
 		parseBool, err := strconv.ParseBool(srchk)
 		if err != nil {
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index c57d8143e..8313bf04c 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -24,6 +24,7 @@ import (
 	"github.com/apache/plc4x/plc4go/internal/spi"
 	"github.com/apache/plc4x/plc4go/internal/spi/default"
 	internalModel "github.com/apache/plc4x/plc4go/internal/spi/model"
+	"github.com/apache/plc4x/plc4go/internal/spi/plcerrors"
 	"github.com/apache/plc4x/plc4go/pkg/api"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
@@ -179,59 +180,90 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
 	requestContext := &c.messageCodec.(*MessageCodec).requestContext
 
 	{
-		log.Debug().Msg("Send a reset Request")
+		log.Debug().Msg("Send a reset")
 		requestTypeReset := readWriteModel.RequestType_RESET
 		requestTypeResetByte := byte(readWriteModel.RequestType_RESET)
 		requestReset := readWriteModel.NewRequestReset(requestTypeReset, &requestTypeResetByte, requestTypeReset, &requestTypeResetByte, requestTypeReset, nil, &requestTypeReset, requestTypeReset, readWriteModel.NewRequestTermination(), *cbusOptions)
-		if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(requestReset, *requestContext, *cbusOptions)); err != nil {
-			c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
+		cBusMessage := readWriteModel.NewCBusMessageToServer(requestReset, *requestContext, *cbusOptions)
+
+		receivedResetEchoChan := make(chan bool)
+		receivedResetEchoErrorChan := make(chan error)
+		if err := c.messageCodec.SendRequest(
+			cBusMessage,
+			func(message spi.Message) bool {
+				cbusMessageToServer, ok := message.(readWriteModel.CBusMessageToServerExactly)
+				if !ok {
+					return false
+				}
+				_, ok = cbusMessageToServer.GetRequest().(readWriteModel.RequestResetExactly)
+				return ok
+			},
+			func(message spi.Message) error {
+				receivedResetEchoChan <- true
+				return nil
+			},
+			func(err error) error {
+				// If this is a timeout, do a check if the connection requires a reconnection
+				if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+					log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+					c.Close()
+				}
+				receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request")
+				return nil
+			},
+			c.GetTtl(),
+		); err != nil {
+			c.fireConnectionError(errors.Wrap(err, "Error during sending of Reset Request"), ch)
+			return
+		}
+
+		select {
+		case receivedResetEcho := <-receivedResetEchoChan:
+			log.Debug().Msgf("We received the echo {}", receivedResetEcho)
+		case err := <-receivedResetEchoErrorChan:
+			c.fireConnectionError(errors.Wrap(err, "Error receiving of Reset"), ch)
+			return
+		case timeout := <-time.After(time.Second * 2):
+			c.fireConnectionError(errors.Errorf("Timeout after %v", timeout), ch)
 			return
 		}
-		time.Sleep(time.Millisecond * 100)
 	}
 	{
 		log.Debug().Msg("Set application filter to all")
 		applicationAddress1 := readWriteModel.NewParameterValueApplicationAddress1(readWriteModel.NewApplicationAddress1(0xFF), 1)
-		calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_APPLICATION_ADDRESS_1, 0x0, applicationAddress1, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
-		directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
-		if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
-			c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
+		if !c.sendCalDataWrite(ch, readWriteModel.Parameter_APPLICATION_ADDRESS_1, applicationAddress1, requestContext, cbusOptions) {
 			return
 		}
-		time.Sleep(time.Millisecond * 100)
 	}
 	{
 		log.Debug().Msg("Set interface options 3")
 		interfaceOptions3 := readWriteModel.NewParameterValueInterfaceOptions3(readWriteModel.NewInterfaceOptions3(true, false, true, false), 1)
-		calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_INTERFACE_OPTIONS_3, 0x0, interfaceOptions3, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
-		directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
-		if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
-			c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
+		var newCBusOptions readWriteModel.CBusOptions
+		newCBusOptions = readWriteModel.NewCBusOptions(false, false, false, true, false, false, false, false, false)
+		cbusOptions = &newCBusOptions
+		if !c.sendCalDataWrite(ch, readWriteModel.Parameter_INTERFACE_OPTIONS_3, interfaceOptions3, requestContext, cbusOptions) {
 			return
 		}
-		time.Sleep(time.Millisecond * 100)
 	}
 	{
 		log.Debug().Msg("Set interface options 1 power up settings")
+		var newCBusOptions readWriteModel.CBusOptions
+		newCBusOptions = readWriteModel.NewCBusOptions(false, true, true, true, true, false, false, false, true)
+		cbusOptions = &newCBusOptions
 		interfaceOptions1PowerUpSettings := readWriteModel.NewParameterValueInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1(true, true, true, true, false, true)), 1)
-		calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_INTERFACE_OPTIONS_1_POWER_UP_SETTINGS, 0x0, interfaceOptions1PowerUpSettings, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
-		directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
-		if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
-			c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
+		if !c.sendCalDataWrite(ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1_POWER_UP_SETTINGS, interfaceOptions1PowerUpSettings, requestContext, cbusOptions) {
 			return
 		}
-		time.Sleep(time.Millisecond * 100)
 	}
 	{
 		log.Debug().Msg("Set interface options 1")
+		var newCBusOptions readWriteModel.CBusOptions
+		newCBusOptions = readWriteModel.NewCBusOptions(false, true, true, true, true, false, false, false, true)
+		cbusOptions = &newCBusOptions
 		interfaceOptions1 := readWriteModel.NewParameterValueInterfaceOptions1(readWriteModel.NewInterfaceOptions1(true, true, true, true, false, true), 1)
-		calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_INTERFACE_OPTIONS_1, 0x0, interfaceOptions1, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
-		directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
-		if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
-			c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
+		if !c.sendCalDataWrite(ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1, interfaceOptions1, requestContext, cbusOptions) {
 			return
 		}
-		time.Sleep(time.Millisecond * 100)
 	}
 	c.fireConnected(ch)
 
@@ -248,6 +280,86 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
 	}()
 }
 
+func (c *Connection) sendCalDataWrite(ch chan plc4go.PlcConnectionConnectResult, paramNo readWriteModel.Parameter, parameterValue readWriteModel.ParameterValue, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) bool {
+	// TODO: we assume that is always a one byte request otherwise we need to map the length here
+	calData := readWriteModel.NewCALDataWrite(paramNo, 0x0, parameterValue, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
+	directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
+	cBusMessage := readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)
+
+	directCommandAckChan := make(chan bool)
+	directCommandAckErrorChan := make(chan error)
+	if err := c.messageCodec.SendRequest(
+		cBusMessage,
+		func(message spi.Message) bool {
+			switch message := message.(type) {
+			case readWriteModel.CBusMessageToClientExactly:
+				switch reply := message.GetReply().(type) {
+				case readWriteModel.ReplyOrConfirmationReplyExactly:
+					switch reply := reply.GetReply().(type) {
+					case readWriteModel.ReplyEncodedReplyExactly:
+						switch encodedReply := reply.GetEncodedReply().(type) {
+						case readWriteModel.EncodedReplyCALReplyExactly:
+							switch data := encodedReply.GetCalReply().GetCalData().(type) {
+							case readWriteModel.CALDataAcknowledgeExactly:
+								if data.GetParamNo() == paramNo {
+									return true
+								}
+							}
+						}
+					}
+				}
+			}
+			return false
+		},
+		func(message spi.Message) error {
+			switch message := message.(type) {
+			case readWriteModel.CBusMessageToClientExactly:
+				switch reply := message.GetReply().(type) {
+				case readWriteModel.ReplyOrConfirmationReplyExactly:
+					switch reply := reply.GetReply().(type) {
+					case readWriteModel.ReplyEncodedReplyExactly:
+						switch encodedReply := reply.GetEncodedReply().(type) {
+						case readWriteModel.EncodedReplyCALReplyExactly:
+							switch data := encodedReply.GetCalReply().GetCalData().(type) {
+							case readWriteModel.CALDataAcknowledgeExactly:
+								if data.GetParamNo() == paramNo {
+									directCommandAckChan <- true
+								}
+							}
+						}
+					}
+				}
+			}
+			return nil
+		},
+		func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+				log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+				c.Close()
+			}
+			directCommandAckErrorChan <- errors.Wrap(err, "got error processing request")
+			return nil
+		},
+		c.GetTtl(),
+	); err != nil {
+		c.fireConnectionError(errors.Wrap(err, "Error during sending of write request"), ch)
+		return false
+	}
+
+	select {
+	case receivedResetEcho := <-directCommandAckChan:
+		log.Debug().Msgf("We received the ack {}", receivedResetEcho)
+	case err := <-directCommandAckErrorChan:
+		c.fireConnectionError(errors.Wrap(err, "Error receiving of ack"), ch)
+		return false
+	case timeout := <-time.After(time.Second * 2):
+		c.fireConnectionError(errors.Errorf("Timeout after %v", timeout), ch)
+		return false
+	}
+	return true
+}
+
 func (c *Connection) fireConnectionError(err error, ch chan<- plc4go.PlcConnectionConnectResult) {
 	if c.driverContext.awaitSetupComplete {
 		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Error during connection"))
diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index f88c92053..0570da211 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -139,7 +139,7 @@ lookingForTheEnd:
 		// This means a <cr> is directly followed by a <lf> which means that we know for sure this is a response
 		pciResponse = true
 	}
-	if !requestToPci && indexOfLF < 0 {
+	if !pciResponse && !requestToPci && indexOfLF < 0 {
 		// To be sure we might receive that package later we hash the bytes and check if we might receive one
 		hash := crc32.NewIEEE()
 		_, _ = hash.Write(peekedBytes)
@@ -153,6 +153,7 @@ lookingForTheEnd:
 		} else {
 			// after 90ms we give up finding a lf
 			m.lastPackageHash, m.hashEncountered = 0, 0
+			requestToPci = true
 		}
 	}
 	if !pciResponse && !requestToPci {
diff --git a/plc4go/pkg/api/config/config.go b/plc4go/pkg/api/config/config.go
index 7cc157c42..ac364f38a 100644
--- a/plc4go/pkg/api/config/config.go
+++ b/plc4go/pkg/api/config/config.go
@@ -20,9 +20,11 @@
 package config
 
 // TraceTransactionManagerWorkers when set to true the transaction manager displays worker states in log
-var TraceTransactionManagerWorkers bool
-var TraceTransactionManagerTransactions bool
-var TraceDefaultMessageCodecWorker bool
+var (
+	TraceTransactionManagerWorkers      bool
+	TraceTransactionManagerTransactions bool
+	TraceDefaultMessageCodecWorker      bool
+)
 
 func init() {
 	TraceTransactionManagerWorkers = false
diff --git a/plc4go/protocols/ads/readwrite/model/AdsConstants.go b/plc4go/protocols/ads/readwrite/model/AdsConstants.go
new file mode 100644
index 000000000..24fe12383
--- /dev/null
+++ b/plc4go/protocols/ads/readwrite/model/AdsConstants.go
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package model
+
+import (
+	"fmt"
+	"github.com/apache/plc4x/plc4go/internal/spi/utils"
+	"github.com/pkg/errors"
+)
+
+// Code generated by code-generation. DO NOT EDIT.
+
+// Constant values.
+const AdsConstants_ADSTCPDEFAULTPORT uint16 = uint16(48898)
+
+// AdsConstants is the corresponding interface of AdsConstants
+type AdsConstants interface {
+	utils.LengthAware
+	utils.Serializable
+}
+
+// AdsConstantsExactly can be used when we want exactly this type and not a type which fulfills AdsConstants.
+// This is useful for switch cases.
+type AdsConstantsExactly interface {
+	AdsConstants
+	isAdsConstants() bool
+}
+
+// _AdsConstants is the data-structure of this message
+type _AdsConstants struct {
+}
+
+///////////////////////////////////////////////////////////
+///////////////////////////////////////////////////////////
+/////////////////////// Accessors for const fields.
+///////////////////////
+
+func (m *_AdsConstants) GetAdsTcpDefaultPort() uint16 {
+	return AdsConstants_ADSTCPDEFAULTPORT
+}
+
+///////////////////////
+///////////////////////
+///////////////////////////////////////////////////////////
+///////////////////////////////////////////////////////////
+
+// NewAdsConstants factory function for _AdsConstants
+func NewAdsConstants() *_AdsConstants {
+	return &_AdsConstants{}
+}
+
+// Deprecated: use the interface for direct cast
+func CastAdsConstants(structType interface{}) AdsConstants {
+	if casted, ok := structType.(AdsConstants); ok {
+		return casted
+	}
+	if casted, ok := structType.(*AdsConstants); ok {
+		return *casted
+	}
+	return nil
+}
+
+func (m *_AdsConstants) GetTypeName() string {
+	return "AdsConstants"
+}
+
+func (m *_AdsConstants) GetLengthInBits() uint16 {
+	return m.GetLengthInBitsConditional(false)
+}
+
+func (m *_AdsConstants) GetLengthInBitsConditional(lastItem bool) uint16 {
+	lengthInBits := uint16(0)
+
+	// Const Field (adsTcpDefaultPort)
+	lengthInBits += 16
+
+	return lengthInBits
+}
+
+func (m *_AdsConstants) GetLengthInBytes() uint16 {
+	return m.GetLengthInBits() / 8
+}
+
+func AdsConstantsParse(readBuffer utils.ReadBuffer) (AdsConstants, error) {
+	positionAware := readBuffer
+	_ = positionAware
+	if pullErr := readBuffer.PullContext("AdsConstants"); pullErr != nil {
+		return nil, errors.Wrap(pullErr, "Error pulling for AdsConstants")
+	}
+	currentPos := positionAware.GetPos()
+	_ = currentPos
+
+	// Const Field (adsTcpDefaultPort)
+	adsTcpDefaultPort, _adsTcpDefaultPortErr := readBuffer.ReadUint16("adsTcpDefaultPort", 16)
+	if _adsTcpDefaultPortErr != nil {
+		return nil, errors.Wrap(_adsTcpDefaultPortErr, "Error parsing 'adsTcpDefaultPort' field of AdsConstants")
+	}
+	if adsTcpDefaultPort != AdsConstants_ADSTCPDEFAULTPORT {
+		return nil, errors.New("Expected constant value " + fmt.Sprintf("%d", AdsConstants_ADSTCPDEFAULTPORT) + " but got " + fmt.Sprintf("%d", adsTcpDefaultPort))
+	}
+
+	if closeErr := readBuffer.CloseContext("AdsConstants"); closeErr != nil {
+		return nil, errors.Wrap(closeErr, "Error closing for AdsConstants")
+	}
+
+	// Create the instance
+	return NewAdsConstants(), nil
+}
+
+func (m *_AdsConstants) Serialize(writeBuffer utils.WriteBuffer) error {
+	positionAware := writeBuffer
+	_ = positionAware
+	if pushErr := writeBuffer.PushContext("AdsConstants"); pushErr != nil {
+		return errors.Wrap(pushErr, "Error pushing for AdsConstants")
+	}
+
+	// Const Field (adsTcpDefaultPort)
+	_adsTcpDefaultPortErr := writeBuffer.WriteUint16("adsTcpDefaultPort", 16, 48898)
+	if _adsTcpDefaultPortErr != nil {
+		return errors.Wrap(_adsTcpDefaultPortErr, "Error serializing 'adsTcpDefaultPort' field")
+	}
+
+	if popErr := writeBuffer.PopContext("AdsConstants"); popErr != nil {
+		return errors.Wrap(popErr, "Error popping for AdsConstants")
+	}
+	return nil
+}
+
+func (m *_AdsConstants) isAdsConstants() bool {
+	return true
+}
+
+func (m *_AdsConstants) String() string {
+	if m == nil {
+		return "<nil>"
+	}
+	writeBuffer := utils.NewBoxedWriteBufferWithOptions(true, true)
+	if err := writeBuffer.WriteSerializable(m); err != nil {
+		return err.Error()
+	}
+	return writeBuffer.GetBox().String()
+}
diff --git a/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java b/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java
index 593678061..429ca705b 100644
--- a/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java
+++ b/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java
@@ -20,6 +20,7 @@ package org.apache.plc4x.simulator.server.cbus.protocol;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.vavr.CheckedRunnable;
 import org.apache.plc4x.java.cbus.readwrite.*;
 import org.apache.plc4x.simulator.model.Context;
 import org.slf4j.Logger;
@@ -77,6 +78,11 @@ public class CBusServerAdapter extends ChannelInboundHandlerAdapter {
         if (!(msg instanceof CBusMessage)) {
             return;
         }
+        if (!smart && !connect) {
+            // In this mode every message will be echoed
+            LOGGER.info("Sending echo");
+            ctx.writeAndFlush(msg);
+        }
         try {
             writeLock.lock();
             CBusMessage packet = (CBusMessage) msg;
@@ -98,12 +104,24 @@ public class CBusServerAdapter extends ChannelInboundHandlerAdapter {
                 // TODO: handle other cal data type
                 if (calData instanceof CALDataWrite) {
                     CALDataWrite calDataWrite = (CALDataWrite) calData;
+                    Runnable acknowledger = () -> {
+                        CALDataAcknowledge calDataAcknowledge = new CALDataAcknowledge(CALCommandTypeContainer.CALCommandAcknowledge, null, calDataWrite.getParamNo(), (short) 0x0, requestContext);
+                        CALReplyShort calReply = new CALReplyShort((byte) 0x0, calDataAcknowledge, cBusOptions, requestContext);
+                        EncodedReplyCALReply encodedReply = new EncodedReplyCALReply((byte) 0x0, calReply, cBusOptions, requestContext);
+                        ReplyEncodedReply replyEncodedReply = new ReplyEncodedReply((byte) 0x0, encodedReply, null, cBusOptions, requestContext);
+                        ReplyOrConfirmationReply replyOrConfirmationReply = new ReplyOrConfirmationReply((byte) 0x0, replyEncodedReply, new ResponseTermination(), cBusOptions, requestContext);
+                        CBusMessageToClient cBusMessageToClient = new CBusMessageToClient(replyOrConfirmationReply, requestContext, cBusOptions);
+                        LOGGER.info("Sending ack\n{}\n{}", cBusMessageToClient, encodedReply);
+                        ctx.writeAndFlush(cBusMessageToClient);
+                    };
                     switch (calDataWrite.getParamNo().getParameterType()) {
                         case APPLICATION_ADDRESS_1:
                             // TODO: check settings for subscription etc.
+                            acknowledger.run();
                             return;
                         case APPLICATION_ADDRESS_2:
                             // TODO: check settings for subscription etc.
+                            acknowledger.run();
                             return;
                         case INTERFACE_OPTIONS_1:
                             InterfaceOptions1 interfaceOptions1 = ((ParameterValueInterfaceOptions1) calDataWrite.getParameterValue()).getValue();
@@ -117,6 +135,7 @@ public class CBusServerAdapter extends ChannelInboundHandlerAdapter {
                             // xonxoff = interfaceOptions1.getXonXoff();
                             connect = interfaceOptions1.getConnect();
                             buildCBusOptions();
+                            acknowledger.run();
                             return;
                         case INTERFACE_OPTIONS_2:
                             InterfaceOptions2 interfaceOptions2 = ((ParameterValueInterfaceOptions2) calDataWrite.getParameterValue()).getValue();
@@ -125,6 +144,7 @@ public class CBusServerAdapter extends ChannelInboundHandlerAdapter {
                             // TODO: add support for clockgen
                             // clockgen = interfaceOptions2.getClockGen();
                             buildCBusOptions();
+                            acknowledger.run();
                             return;
                         case INTERFACE_OPTIONS_3:
                             InterfaceOptions3 interfaceOptions3Value = ((ParameterValueInterfaceOptions3) calDataWrite.getParameterValue()).getValue();
@@ -134,12 +154,14 @@ public class CBusServerAdapter extends ChannelInboundHandlerAdapter {
                             // localsal = interfaceOptions3Value.getLocalSal();
                             pcn = interfaceOptions3Value.getPcn();
                             buildCBusOptions();
+                            acknowledger.run();
                             return;
                         case BAUD_RATE_SELECTOR:
                             BaudRateSelector baudRateSelector = ((ParameterValueBaudRateSelector) calDataWrite.getParameterValue()).getValue();
                             // TODO: add support for baudrate
                             // baudrate = baudRateSelector.getValue();
                             buildCBusOptions();
+                            acknowledger.run();
                             return;
                         case INTERFACE_OPTIONS_1_POWER_UP_SETTINGS:
                             InterfaceOptions1 interfaceOptions1PowerUpSettings = ((ParameterValueInterfaceOptions1PowerUpSettings) calDataWrite.getParameterValue()).getValue().getInterfaceOptions1();
@@ -153,15 +175,19 @@ public class CBusServerAdapter extends ChannelInboundHandlerAdapter {
                             // xonxoff = interfaceOptions1PowerUpSettings.getXonXoff();
                             connect = interfaceOptions1PowerUpSettings.getConnect();
                             buildCBusOptions();
+                            acknowledger.run();
                             return;
                         case CUSTOM_MANUFACTURER:
                             // TODO: handle other parm typed
+                            acknowledger.run();
                             return;
                         case SERIAL_NUMBER:
                             // TODO: handle other parm typed
+                            acknowledger.run();
                             return;
                         case CUSTOM_TYPE:
                             // TODO: handle other parm typed
+                            acknowledger.run();
                             return;
                         default:
                             throw new IllegalStateException("Unmapped type");