You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/12/01 17:53:22 UTC

[incubator-streampipes] 01/03: improve relay buffer performance

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 50354ab8664470a115ac7897743164b5c02b5890
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Mon Nov 22 19:56:38 2021 +0100

    improve relay buffer performance
---
 .../relay/bridges/MultiBrokerBridge.java           | 63 +++++++++++-----------
 1 file changed, 32 insertions(+), 31 deletions(-)

diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
index 9db1d75..6591759 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
@@ -30,9 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.function.Supplier;
 
@@ -46,13 +44,12 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
     private final EventConsumer<T1> consumer;
     private final EventProducer<T2> producer;
     private final EventRelayStrategy eventRelayStrategy;
-    private final Queue<byte[]> eventBuffer = new ArrayBlockingQueue<>(NodeConfiguration.getEventRelayBufferSize());
-    //private CircularFifoBuffer buffer = new CircularFifoBuffer(NodeConfiguration.getEventRelayBufferSize());
+    private Queue<byte[]> eventBuffer = new ArrayBlockingQueue<>(NodeConfiguration.getEventRelayBufferSize());
 
-    private final int EVENT_BUFFER_SIZE = NodeConfiguration.getEventRelayBufferSize();
     private final Tuple3<String, Integer, String> relayInfo;
-    private boolean isBuffering = false;
-    private boolean targetAlive = true;
+    private volatile boolean isBuffering = false;
+    private volatile boolean targetAlive = true;
+    private volatile boolean isEmptyingBuffer = false;
 
     public MultiBrokerBridge(TransportProtocol sourceProtocol, TransportProtocol targetProtocol,
                              String eventRelayStrategy, Supplier<EventConsumer<T1>> consumerSupplier,
@@ -98,20 +95,20 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
             startAliveThread();
         }
         // check if target broker can be reached
-        if (targetAlive) {
+        if (targetAlive && !isEmptyingBuffer) {
 
             if(!eventBuffer.isEmpty()){
-                publishBufferedEvents(eventBuffer);
-                publishEvent(event);
-            }else{
+                bufferEvent(event);
+                publishBufferedEvents(eventBuffer, metrics.getNumDroppedEvents());
+                eventBuffer = new ArrayBlockingQueue<>(NodeConfiguration.getEventRelayBufferSize());
+            } else {
                 publishEvent(event);
             }
 
         } else {
-            //
             if (eventRelayStrategy == EventRelayStrategy.BUFFER) {
                 // add event to buffer
-                if(!isBuffering) {
+                if(!isBuffering && !isEmptyingBuffer) {
                     LOG.info("Connection issue to broker={}:{}. Buffer events for topic={}", relayInfo.a, relayInfo.b
                             , relayInfo.c);
                     isBuffering = true;
@@ -134,7 +131,7 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
         metrics.increaseNumRelayedEvents();
     }
 
-    private synchronized void bufferEvent(byte[] event){
+    private void bufferEvent(byte[] event){
         if (!eventBuffer.offer(event)){
             eventBuffer.poll();
             eventBuffer.offer(event);
@@ -147,23 +144,25 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
                 relayInfo.c);
     }
 
-    private synchronized void publishBufferedEvents(Queue<byte[]> eventBuffer){
-        LOG.info("Re-established connection to broker={}:{}. Resent buffered events for topic={} " +
-                        "(buffer_size={}, num_dropped_events={})", relayInfo.a, relayInfo.b,
-                relayInfo.c, eventBuffer.size(), metrics.getNumDroppedEvents());
+    private void publishBufferedEvents(Queue<byte[]> buffer, long numDroppedEvents){
+        isEmptyingBuffer = true;
+        LOG.info("Resent buffered events for topic={} (buffer_size={}, num_dropped_events={})", relayInfo.c,
+                buffer.size(), numDroppedEvents);
 
-        // add current event from callback
-        eventBuffer.forEach(e -> {
-            try{
-                producer.publish(e);
-                metrics.increaseNumRelayedEvents();
-            }  catch (Exception ex) {
-                LOG.error(ex.toString());
-            }
+        Thread th = new Thread(() -> {
+            buffer.forEach(e -> {
+                try{
+                    publishEvent(e);
+                }  catch (Exception ex) {
+                    LOG.error(ex.toString());
+                }
+            });
+            metrics.clearNumDroppedEvents();
+            isBuffering = false;
+
+            isEmptyingBuffer = false;
         });
-        eventBuffer.clear();
-        metrics.clearNumDroppedEvents();
-        isBuffering = false;
+        th.start();
     }
 
 
@@ -231,7 +230,9 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
                     if(isTargetBrokerAlive())
                         isAlive = true;
                 }
-                publishBufferedEvents(eventBuffer);
+                LOG.info("Re-established connection to broker={}:{}", relayInfo.a, relayInfo.b);
+                publishBufferedEvents(eventBuffer, metrics.getNumDroppedEvents());
+                eventBuffer = new ArrayBlockingQueue<>(NodeConfiguration.getEventRelayBufferSize());
                 targetAlive = true;
             }
         };