You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/10/22 11:30:43 UTC

[incubator-streampipes] 03/03: fix blocking issue in multibrokerbridge

This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 31ef2584246eecd9022d08f849267d9f2527fff7
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Fri Oct 22 07:41:54 2021 +0200

    fix blocking issue in multibrokerbridge
---
 .../management/relay/base/EventRelay.java          |   2 +-
 .../relay/bridges/MultiBrokerBridge.java           | 119 +++++++++++++--------
 .../performance/performancetest/GenericTest.java   |  16 +--
 3 files changed, 87 insertions(+), 50 deletions(-)

diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/base/EventRelay.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/base/EventRelay.java
index d1423c2..2c91089 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/base/EventRelay.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/base/EventRelay.java
@@ -42,6 +42,6 @@ public class EventRelay extends AbstractEventRelay {
     }
 
     public RelayMetrics getRelayMetrics() {
-        return this.multiBrokerBridge.getRelayMerics();
+        return this.multiBrokerBridge.getRelayMetrics();
     }
 }
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
index 71cb142..9db1d75 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
@@ -31,6 +31,9 @@ import org.slf4j.LoggerFactory;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.function.Supplier;
 
 public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends TransportProtocol> implements EventRelay {
@@ -43,11 +46,13 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
     private final EventConsumer<T1> consumer;
     private final EventProducer<T2> producer;
     private final EventRelayStrategy eventRelayStrategy;
-    private final ArrayList<byte[]> eventBuffer = new ArrayList<>();
+    private final Queue<byte[]> eventBuffer = new ArrayBlockingQueue<>(NodeConfiguration.getEventRelayBufferSize());
+    //private CircularFifoBuffer buffer = new CircularFifoBuffer(NodeConfiguration.getEventRelayBufferSize());
 
     private final int EVENT_BUFFER_SIZE = NodeConfiguration.getEventRelayBufferSize();
     private final Tuple3<String, Integer, String> relayInfo;
     private boolean isBuffering = false;
+    private boolean targetAlive = true;
 
     public MultiBrokerBridge(TransportProtocol sourceProtocol, TransportProtocol targetProtocol,
                              String eventRelayStrategy, Supplier<EventConsumer<T1>> consumerSupplier,
@@ -87,40 +92,21 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
     }
 
     private void publishWithCheck(byte[] event) {
+
+        if (targetAlive && !isTargetBrokerAlive()){
+            targetAlive = false;
+            startAliveThread();
+        }
         // check if target broker can be reached
-        if (isTargetBrokerAlive()) {
+        if (targetAlive) {
 
-            if (eventRelayStrategy == EventRelayStrategy.BUFFER) {
-                if (eventBuffer.isEmpty()) {
-                    // send events to upstream
-                    producer.publish(event);
-                    metrics.increaseNumRelayedEvents();
-                } else {
-                    // TODO: send buffered event should run independent of callback
-                    // send buffered events & clear buffer
-                    LOG.info("Re-established connection to broker={}:{}. Resent buffered events for topic={} " +
-                                    "(buffer_size={}, num_dropped_events={})", relayInfo.a, relayInfo.b,
-                            relayInfo.c, eventBuffer.size(), metrics.getNumDroppedEvents());
-
-                    // add current event from callback
-                    eventBuffer.add(event);
-                    eventBuffer.forEach(e -> {
-                        try{
-                            producer.publish(e);
-                            metrics.increaseNumRelayedEvents();
-                        }  catch (Exception ex) {
-                            LOG.error(ex.toString());
-                        }
-                    });
-                    eventBuffer.clear();
-                    metrics.clearNumDroppedEvents();
-                    isBuffering = false;
-                }
-            } else if (eventRelayStrategy == EventRelayStrategy.PURGE) {
-                // send events to upstream
-                producer.publish(event);
-                metrics.increaseNumRelayedEvents();
+            if(!eventBuffer.isEmpty()){
+                publishBufferedEvents(eventBuffer);
+                publishEvent(event);
+            }else{
+                publishEvent(event);
             }
+
         } else {
             //
             if (eventRelayStrategy == EventRelayStrategy.BUFFER) {
@@ -130,17 +116,9 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
                             , relayInfo.c);
                     isBuffering = true;
                 }
-
-                if (eventBuffer.size() != EVENT_BUFFER_SIZE) {
-                    eventBuffer.add(event);
-                } else {
-                    // evict oldest event
-                    eventBuffer.remove(0);
-                    metrics.increaseNumDroppedEvents();
-                }
+                bufferEvent(event);
             } else if (eventRelayStrategy == EventRelayStrategy.PURGE) {
-                LOG.info("Connection issue to broker={}:{}. Purge events for topic={}", relayInfo.a, relayInfo.b,
-                        relayInfo.c);
+                purgeEvent(event);
             }
         }
     }
@@ -151,6 +129,45 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
         metrics.increaseNumRelayedEvents();
     }
 
+    private void publishEvent(byte[] event){
+        producer.publish(event);
+        metrics.increaseNumRelayedEvents();
+    }
+
+    private synchronized void bufferEvent(byte[] event){
+        if (!eventBuffer.offer(event)){
+            eventBuffer.poll();
+            eventBuffer.offer(event);
+            metrics.increaseNumDroppedEvents();
+        }
+    }
+
+    private void purgeEvent(byte[] event){
+        LOG.info("Connection issue to broker={}:{}. Purge events for topic={}", relayInfo.a, relayInfo.b,
+                relayInfo.c);
+    }
+
+    private synchronized void publishBufferedEvents(Queue<byte[]> eventBuffer){
+        LOG.info("Re-established connection to broker={}:{}. Resent buffered events for topic={} " +
+                        "(buffer_size={}, num_dropped_events={})", relayInfo.a, relayInfo.b,
+                relayInfo.c, eventBuffer.size(), metrics.getNumDroppedEvents());
+
+        // add current event from callback
+        eventBuffer.forEach(e -> {
+            try{
+                producer.publish(e);
+                metrics.increaseNumRelayedEvents();
+            }  catch (Exception ex) {
+                LOG.error(ex.toString());
+            }
+        });
+        eventBuffer.clear();
+        metrics.clearNumDroppedEvents();
+        isBuffering = false;
+    }
+
+
+
     @Override
     public void stop() throws SpRuntimeException {
         LOG.info("Stop event relay to broker={}:{}, topic={}", relayInfo.a, relayInfo.b, relayInfo.c);
@@ -168,7 +185,7 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
         return producer.isConnected();
     }
 
-    public RelayMetrics getRelayMerics() {
+    public RelayMetrics getRelayMetrics() {
         return metrics;
     }
 
@@ -204,4 +221,20 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
         }
         throw new SpRuntimeException("Transport protocol not valid");
     }
+
+    private void startAliveThread(){
+        LOG.info("Alive Thread started.");
+        Thread th = new Thread(){
+            public synchronized void run(){
+                boolean isAlive = false;
+                while (!isAlive){
+                    if(isTargetBrokerAlive())
+                        isAlive = true;
+                }
+                publishBufferedEvents(eventBuffer);
+                targetAlive = true;
+            }
+        };
+        th.start();
+    }
 }
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 2d8523c..25e7a1a 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -73,8 +73,8 @@ public class GenericTest implements Test{
 
         Object[] line = null;
 
-        if (testType.equals("Reconfiguration") && nrRuns == 0){
-            executeOffloading();
+        if (testType.equals("Reconfiguration")){
+            executeReconfiguration();
             return;
         }
         //Start Pipeline
@@ -176,7 +176,7 @@ public class GenericTest implements Test{
         }
     }
 
-    private void executeOffloading(){
+    private void executeReconfiguration(){
         if (!pipeline.isRunning()) {
             PipelineOperationStatus startMessage = client.pipelines().start(pipeline);
             System.out.println(startMessage.getTitle());
@@ -205,9 +205,13 @@ public class GenericTest implements Test{
                             sp.setValue(Float.toString(reconfigurationValue));
                         }
                     }));
-
-            PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
-            System.out.println(message.getTitle());
+            EvaluationLogger.getInstance().logMQTT("Reconfiguration", "Reconfiguration triggered", 0, reconfigurationValue);
+            client.pipelines().reconfigure(pipeline);
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
         }