You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2018/06/13 11:23:10 UTC

[incubator-plc4x] 01/03: added single message rate limiter to throttle outgoing messages to one on serial connection

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 83c69c9a9a2becb2c14568c937b3de3d010ed630
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Jun 13 12:44:15 2018 +0200

    added single message rate limiter to throttle outgoing messages to one
    on serial connection
---
 .../ads/connection/AdsSerialPlcConnection.java     |   2 +
 .../protocol/util/SingleMessageRateLimiter.java    | 120 +++++++++++++++++++++
 .../ads/connection/AdsSerialPlcConnectionTest.java |  16 +++
 3 files changed, 138 insertions(+)

diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java
index 848a743..82ca6a2 100644
--- a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnection.java
@@ -27,6 +27,7 @@ import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
 import org.apache.plc4x.java.ads.protocol.Ads2PayloadProtocol;
 import org.apache.plc4x.java.ads.protocol.Payload2SerialProtocol;
 import org.apache.plc4x.java.ads.protocol.Plc4x2AdsProtocol;
+import org.apache.plc4x.java.ads.protocol.util.SingleMessageRateLimiter;
 import org.apache.plc4x.java.base.connection.SerialChannelFactory;
 
 import java.util.concurrent.CompletableFuture;
@@ -57,6 +58,7 @@ public class AdsSerialPlcConnection extends AdsAbstractPlcConnection {
                 // Build the protocol stack for communicating with the ads protocol.
                 ChannelPipeline pipeline = channel.pipeline();
                 pipeline.addLast(new Payload2SerialProtocol());
+                pipeline.addLast(new SingleMessageRateLimiter());
                 pipeline.addLast(new Ads2PayloadProtocol());
                 pipeline.addLast(new Plc4x2AdsProtocol(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, addressMapping));
             }
diff --git a/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/util/SingleMessageRateLimiter.java b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/util/SingleMessageRateLimiter.java
new file mode 100644
index 0000000..afab15d
--- /dev/null
+++ b/plc4j/protocols/ads/src/main/java/org/apache/plc4x/java/ads/protocol/util/SingleMessageRateLimiter.java
@@ -0,0 +1,120 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ */
+package org.apache.plc4x.java.ads.protocol.util;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.concurrent.ScheduledFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketAddress;
+import java.util.ArrayDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Inspired by {@link ChannelTrafficShapingHandler} this limiter ensures only one message is sent at a time.
+ */
+public class SingleMessageRateLimiter extends ChannelDuplexHandler {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SingleMessageRateLimiter.class);
+
+    private final ArrayDeque<ToSend> messagesQueue = new ArrayDeque<>();
+
+    private AtomicBoolean messageOnTheWay = new AtomicBoolean(false);
+
+    private ScheduledFuture<?> sender;
+
+    @Override
+    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
+        LOGGER.debug("bind({}, {}, {})", ctx, localAddress, promise);
+        super.bind(ctx, localAddress, promise);
+    }
+
+    @Override
+    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+        LOGGER.debug("deregister({}, {})", ctx, promise);
+        super.deregister(ctx, promise);
+    }
+
+    @Override
+    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
+        LOGGER.debug("connect({}, {}, {}, {})", ctx, remoteAddress, localAddress, promise);
+        sender = ctx.executor().scheduleAtFixedRate(() -> {
+            LOGGER.trace("Woke up and doing work messageOnTheWay:{}, messageQueue:{}", messageOnTheWay, messagesQueue);
+            if (!messagesQueue.isEmpty() && messageOnTheWay.compareAndSet(false, true)) {
+                ToSend pop = messagesQueue.pop();
+                LOGGER.debug("Sending {}", pop);
+                pop.channelHandlerContext.writeAndFlush(pop.toSend, pop.promise);
+                LOGGER.debug("Send {}", pop);
+            }
+        }, 0, 10, TimeUnit.MILLISECONDS);
+        super.connect(ctx, remoteAddress, localAddress, promise);
+    }
+
+    @Override
+    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+        LOGGER.debug("disconnect({}, {}, {}, {})", ctx, promise);
+        sender.cancel(true);
+        super.disconnect(ctx, promise);
+    }
+
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
+        LOGGER.trace("(<--OUT): {}, {}, {}", ctx, msg, promise);
+        messagesQueue.add(new ToSend(ctx, msg, promise));
+    }
+
+    @Override
+    public void read(ChannelHandlerContext ctx) throws Exception {
+        LOGGER.trace("(-->In): {}", ctx);
+        messageOnTheWay.set(false);
+        super.read(ctx);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        LOGGER.trace("(-->ERR): {}", ctx, cause);
+        messageOnTheWay.set(false);
+        super.exceptionCaught(ctx, cause);
+    }
+
+    private static final class ToSend {
+        final ChannelHandlerContext channelHandlerContext;
+        final Object toSend;
+        final ChannelPromise promise;
+
+        private ToSend(ChannelHandlerContext channelHandlerContext, Object toSend, ChannelPromise promise) {
+            this.channelHandlerContext = channelHandlerContext;
+            this.toSend = toSend;
+            this.promise = promise;
+        }
+
+        @Override
+        public String toString() {
+            return "ToSend{" +
+                "channelHandlerContext=" + channelHandlerContext +
+                ", toSend=" + toSend +
+                ", promise=" + promise +
+                '}';
+        }
+    }
+}
diff --git a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnectionTest.java b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnectionTest.java
index 5f6b16a..703c205 100644
--- a/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnectionTest.java
+++ b/plc4j/protocols/ads/src/test/java/org/apache/plc4x/java/ads/connection/AdsSerialPlcConnectionTest.java
@@ -20,7 +20,9 @@ package org.apache.plc4x.java.ads.connection;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.channel.jsc.JSerialCommDeviceAddress;
 import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.commons.lang3.reflect.MethodUtils;
 import org.apache.plc4x.java.ads.api.generic.types.AmsNetId;
 import org.apache.plc4x.java.ads.api.generic.types.AmsPort;
 import org.apache.plc4x.java.ads.api.serial.AmsSerialAcknowledgeFrame;
@@ -39,6 +41,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -113,6 +116,7 @@ public class AdsSerialPlcConnectionTest {
         SerialChannelFactory serialChannelFactory = (SerialChannelFactory) channelFactoryField.get(SUT);
         SerialChannelFactory serialChannelFactorySpied = spy(serialChannelFactory);
         EmbeddedChannel embeddedChannel = new EmbeddedChannel(SUT.getChannelHandler(null));
+        embeddedChannel.connect(new JSerialCommDeviceAddress("/dev/tty0"));
         doReturn(embeddedChannel).when(serialChannelFactorySpied).createChannel(any());
         channelFactoryField.set(SUT, serialChannelFactorySpied);
         SUT.connect();
@@ -135,12 +139,14 @@ public class AdsSerialPlcConnectionTest {
         @Override
         public void run() {
             while (true) {
+                LOGGER.trace("in state {}. CurrentInvokeId: {}", state, currentInvokeId);
                 switch (state) {
                     // Receiving state
                     case RECEIVE_REQUEST: {
                         LOGGER.info("Waiting for normal message");
                         ByteBuf outputBuffer;
                         while ((outputBuffer = embeddedChannel.readOutbound()) == null) {
+                            LOGGER.trace("No buffer available yet");
                             if (!trySleep()) {
                                 return;
                             }
@@ -173,6 +179,11 @@ public class AdsSerialPlcConnectionTest {
                             ReceiverAddress.of((byte) 0x0),
                             FragmentNumber.of((byte) 0)
                         ).getByteBuf();
+                        try {
+                            MethodUtils.invokeMethod(byteBuf, true,"setRefCnt", 2);
+                        } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+                            throw new RuntimeException(e);
+                        }
                         embeddedChannel.writeOneInbound(byteBuf);
                         LOGGER.info("Acked Message");
                         state = SimulatorState.SEND_RESPONSE;
@@ -198,6 +209,11 @@ public class AdsSerialPlcConnectionTest {
                                 }
                             )
                         ).getByteBuf();
+                        try {
+                            MethodUtils.invokeMethod(byteBuf, true,"setRefCnt", 2);
+                        } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+                            throw new RuntimeException(e);
+                        }
                         embeddedChannel.writeOneInbound(byteBuf);
                         LOGGER.info("Wrote Inbound");
                         state = SimulatorState.WAIT_FOR_ACK;

-- 
To stop receiving notification emails like this one, please contact
sruehl@apache.org.