You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/12/01 17:53:22 UTC
[incubator-streampipes] 01/03: improve relay buffer performance
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 50354ab8664470a115ac7897743164b5c02b5890
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Mon Nov 22 19:56:38 2021 +0100
improve relay buffer performance
---
.../relay/bridges/MultiBrokerBridge.java | 63 +++++++++++-----------
1 file changed, 32 insertions(+), 31 deletions(-)
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
index 9db1d75..6591759 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
@@ -30,9 +30,7 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.function.Supplier;
@@ -46,13 +44,12 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
private final EventConsumer<T1> consumer;
private final EventProducer<T2> producer;
private final EventRelayStrategy eventRelayStrategy;
- private final Queue<byte[]> eventBuffer = new ArrayBlockingQueue<>(NodeConfiguration.getEventRelayBufferSize());
- //private CircularFifoBuffer buffer = new CircularFifoBuffer(NodeConfiguration.getEventRelayBufferSize());
+ private Queue<byte[]> eventBuffer = new ArrayBlockingQueue<>(NodeConfiguration.getEventRelayBufferSize());
- private final int EVENT_BUFFER_SIZE = NodeConfiguration.getEventRelayBufferSize();
private final Tuple3<String, Integer, String> relayInfo;
- private boolean isBuffering = false;
- private boolean targetAlive = true;
+ private volatile boolean isBuffering = false;
+ private volatile boolean targetAlive = true;
+ private volatile boolean isEmptyingBuffer = false;
public MultiBrokerBridge(TransportProtocol sourceProtocol, TransportProtocol targetProtocol,
String eventRelayStrategy, Supplier<EventConsumer<T1>> consumerSupplier,
@@ -98,20 +95,20 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
startAliveThread();
}
// check if target broker can be reached
- if (targetAlive) {
+ if (targetAlive && !isEmptyingBuffer) {
if(!eventBuffer.isEmpty()){
- publishBufferedEvents(eventBuffer);
- publishEvent(event);
- }else{
+ bufferEvent(event);
+ publishBufferedEvents(eventBuffer, metrics.getNumDroppedEvents());
+ eventBuffer = new ArrayBlockingQueue<>(NodeConfiguration.getEventRelayBufferSize());
+ } else {
publishEvent(event);
}
} else {
- //
if (eventRelayStrategy == EventRelayStrategy.BUFFER) {
// add event to buffer
- if(!isBuffering) {
+ if(!isBuffering && !isEmptyingBuffer) {
LOG.info("Connection issue to broker={}:{}. Buffer events for topic={}", relayInfo.a, relayInfo.b
, relayInfo.c);
isBuffering = true;
@@ -134,7 +131,7 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
metrics.increaseNumRelayedEvents();
}
- private synchronized void bufferEvent(byte[] event){
+ private void bufferEvent(byte[] event){
if (!eventBuffer.offer(event)){
eventBuffer.poll();
eventBuffer.offer(event);
@@ -147,23 +144,25 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
relayInfo.c);
}
- private synchronized void publishBufferedEvents(Queue<byte[]> eventBuffer){
- LOG.info("Re-established connection to broker={}:{}. Resent buffered events for topic={} " +
- "(buffer_size={}, num_dropped_events={})", relayInfo.a, relayInfo.b,
- relayInfo.c, eventBuffer.size(), metrics.getNumDroppedEvents());
+ private void publishBufferedEvents(Queue<byte[]> buffer, long numDroppedEvents){
+ isEmptyingBuffer = true;
+ LOG.info("Resent buffered events for topic={} (buffer_size={}, num_dropped_events={})", relayInfo.c,
+ buffer.size(), numDroppedEvents);
- // add current event from callback
- eventBuffer.forEach(e -> {
- try{
- producer.publish(e);
- metrics.increaseNumRelayedEvents();
- } catch (Exception ex) {
- LOG.error(ex.toString());
- }
+ Thread th = new Thread(() -> {
+ buffer.forEach(e -> {
+ try{
+ publishEvent(e);
+ } catch (Exception ex) {
+ LOG.error(ex.toString());
+ }
+ });
+ metrics.clearNumDroppedEvents();
+ isBuffering = false;
+
+ isEmptyingBuffer = false;
});
- eventBuffer.clear();
- metrics.clearNumDroppedEvents();
- isBuffering = false;
+ th.start();
}
@@ -231,7 +230,9 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
if(isTargetBrokerAlive())
isAlive = true;
}
- publishBufferedEvents(eventBuffer);
+ LOG.info("Re-established connection to broker={}:{}", relayInfo.a, relayInfo.b);
+ publishBufferedEvents(eventBuffer, metrics.getNumDroppedEvents());
+ eventBuffer = new ArrayBlockingQueue<>(NodeConfiguration.getEventRelayBufferSize());
targetAlive = true;
}
};