You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/06/13 11:23:10 UTC
[incubator-plc4x] 01/03: added single message rate limiter to
throttle outgoing messages to one on serial connection
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 83c69c9a9a2becb2c14568c937b3de3d010ed630
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Jun 13 12:44:15 2018 +0200
added single message rate limiter to throttle outgoing messages to one
on serial connection
---
.../ads/connection/AdsSerialPlcConnection.java | 2 +
.../protocol/util/SingleMessageRateLimiter.java | 120 +++++++++++++++++++++
.../ads/connection/AdsSerialPlcConnectionTest.java | 16 +++
3 files changed, 138 insertions(+)
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java
index 848a743..82ca6a2 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java
@@ -27,6 +27,7 @@ import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol;
import org.apache.plc4x.java.ads.protocol.Payload2SerialProtocol;
import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
+import org.apache.plc4x.java.ads.protocol.util.SingleMessageRateLimiter;
import org.apache.plc4x.java.base.connection.SerialChannelFactory;
import java.util.concurrent.CompletableFuture;
@@ -57,6 +58,7 @@ public class AdsSerialPlcConnection extends AdsAbstractPlcConnection {
// Build the protocol stack for communicating with the ads protocol.
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new Payload2SerialProtocol());
+ pipeline.addLast(new SingleMessageRateLimiter());
pipeline.addLast(new Ads2PayloadProtocol());
pipeline.addLast(new Plc4x2AdsProtocol(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, addressMapping));
}
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/util/SingleMessageRateLimiter.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/util/SingleMessageRateLimiter.java
new file mode 100644
index 0000000..afab15d
--- /dev/null
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/util/SingleMessageRateLimiter.java
@@ -0,0 +1,120 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.ads.protocol.util;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.concurrent.ScheduledFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketAddress;
+import java.util.ArrayDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Inspired by {@link ChannelTrafficShapingHandler} this limiter ensures only one message is sent at a time.
+ */
+public class SingleMessageRateLimiter extends ChannelDuplexHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SingleMessageRateLimiter.class);
+
+ private final ArrayDeque<ToSend> messagesQueue = new ArrayDeque<>();
+
+ private AtomicBoolean messageOnTheWay = new AtomicBoolean(false);
+
+ private ScheduledFuture<?> sender;
+
+ @Override
+ public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
+ LOGGER.debug("bind({}, {}, {})", ctx, localAddress, promise);
+ super.bind(ctx, localAddress, promise);
+ }
+
+ @Override
+ public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ LOGGER.debug("deregister({}, {})", ctx, promise);
+ super.deregister(ctx, promise);
+ }
+
+ @Override
+ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
+ LOGGER.debug("connect({}, {}, {}, {})", ctx, remoteAddress, localAddress, promise);
+ sender = ctx.executor().scheduleAtFixedRate(() -> {
+ LOGGER.trace("Woke up and doing work messageOnTheWay:{}, messageQueue:{}", messageOnTheWay, messagesQueue);
+ if (!messagesQueue.isEmpty() && messageOnTheWay.compareAndSet(false, true)) {
+ ToSend pop = messagesQueue.pop();
+ LOGGER.debug("Sending {}", pop);
+ pop.channelHandlerContext.writeAndFlush(pop.toSend, pop.promise);
+ LOGGER.debug("Send {}", pop);
+ }
+ }, 0, 10, TimeUnit.MILLISECONDS);
+ super.connect(ctx, remoteAddress, localAddress, promise);
+ }
+
+ @Override
+ public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ LOGGER.debug("disconnect({}, {}, {}, {})", ctx, promise);
+ sender.cancel(true);
+ super.disconnect(ctx, promise);
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
+ LOGGER.trace("(<--OUT): {}, {}, {}", ctx, msg, promise);
+ messagesQueue.add(new ToSend(ctx, msg, promise));
+ }
+
+ @Override
+ public void read(ChannelHandlerContext ctx) throws Exception {
+ LOGGER.trace("(-->In): {}", ctx);
+ messageOnTheWay.set(false);
+ super.read(ctx);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ LOGGER.trace("(-->ERR): {}", ctx, cause);
+ messageOnTheWay.set(false);
+ super.exceptionCaught(ctx, cause);
+ }
+
+ private static final class ToSend {
+ final ChannelHandlerContext channelHandlerContext;
+ final Object toSend;
+ final ChannelPromise promise;
+
+ private ToSend(ChannelHandlerContext channelHandlerContext, Object toSend, ChannelPromise promise) {
+ this.channelHandlerContext = channelHandlerContext;
+ this.toSend = toSend;
+ this.promise = promise;
+ }
+
+ @Override
+ public String toString() {
+ return "ToSend{" +
+ "channelHandlerContext=" + channelHandlerContext +
+ ", toSend=" + toSend +
+ ", promise=" + promise +
+ '}';
+ }
+ }
+}
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnectionTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnectionTest.java
index 5f6b16a..703c205 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnectionTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnectionTest.java
@@ -20,7 +20,9 @@ package org.apache.plc4x.java.ads.connection;
import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.channel.jsc.JSerialCommDeviceAddress;
import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.plc4x.java.ads.api.generic.types.AmsNetId;
import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
import org.apache.plc4x.java.ads.api.serial.AmsSerialAcknowledgeFrame;
@@ -39,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -113,6 +116,7 @@ public class AdsSerialPlcConnectionTest {
SerialChannelFactory serialChannelFactory = (SerialChannelFactory) channelFactoryField.get(SUT);
SerialChannelFactory serialChannelFactorySpied = spy(serialChannelFactory);
EmbeddedChannel embeddedChannel = new EmbeddedChannel(SUT.getChannelHandler(null));
+ embeddedChannel.connect(new JSerialCommDeviceAddress("/dev/tty0"));
doReturn(embeddedChannel).when(serialChannelFactorySpied).createChannel(any());
channelFactoryField.set(SUT, serialChannelFactorySpied);
SUT.connect();
@@ -135,12 +139,14 @@ public class AdsSerialPlcConnectionTest {
@Override
public void run() {
while (true) {
+ LOGGER.trace("in state {}. CurrentInvokeId: {}", state, currentInvokeId);
switch (state) {
// Receiving state
case RECEIVE_REQUEST: {
LOGGER.info("Waiting for normal message");
ByteBuf outputBuffer;
while ((outputBuffer = embeddedChannel.readOutbound()) == null) {
+ LOGGER.trace("No buffer available yet");
if (!trySleep()) {
return;
}
@@ -173,6 +179,11 @@ public class AdsSerialPlcConnectionTest {
ReceiverAddress.of((byte) 0x0),
FragmentNumber.of((byte) 0)
).getByteBuf();
+ try {
+ MethodUtils.invokeMethod(byteBuf, true,"setRefCnt", 2);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
embeddedChannel.writeOneInbound(byteBuf);
LOGGER.info("Acked Message");
state = SimulatorState.SEND_RESPONSE;
@@ -198,6 +209,11 @@ public class AdsSerialPlcConnectionTest {
}
)
).getByteBuf();
+ try {
+ MethodUtils.invokeMethod(byteBuf, true,"setRefCnt", 2);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
embeddedChannel.writeOneInbound(byteBuf);
LOGGER.info("Wrote Inbound");
state = SimulatorState.WAIT_FOR_ACK;
--
To stop receiving notification emails like this one, please contact
sruehl@apache.org.