You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by go...@apache.org on 2021/10/22 11:30:43 UTC
[incubator-streampipes] 03/03: fix blocking issue in
multibrokerbridge
This is an automated email from the ASF dual-hosted git repository.
gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 31ef2584246eecd9022d08f849267d9f2527fff7
Author: daniel-gomm <da...@outlook.de>
AuthorDate: Fri Oct 22 07:41:54 2021 +0200
fix blocking issue in multibrokerbridge
---
.../management/relay/base/EventRelay.java | 2 +-
.../relay/bridges/MultiBrokerBridge.java | 119 +++++++++++++--------
.../performance/performancetest/GenericTest.java | 16 +--
3 files changed, 87 insertions(+), 50 deletions(-)
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/base/EventRelay.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/base/EventRelay.java
index d1423c2..2c91089 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/base/EventRelay.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/base/EventRelay.java
@@ -42,6 +42,6 @@ public class EventRelay extends AbstractEventRelay {
}
public RelayMetrics getRelayMetrics() {
- return this.multiBrokerBridge.getRelayMerics();
+ return this.multiBrokerBridge.getRelayMetrics();
}
}
diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
index 71cb142..9db1d75 100644
--- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
+++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/relay/bridges/MultiBrokerBridge.java
@@ -31,6 +31,9 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.function.Supplier;
public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends TransportProtocol> implements EventRelay {
@@ -43,11 +46,13 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
private final EventConsumer<T1> consumer;
private final EventProducer<T2> producer;
private final EventRelayStrategy eventRelayStrategy;
- private final ArrayList<byte[]> eventBuffer = new ArrayList<>();
+ private final Queue<byte[]> eventBuffer = new ArrayBlockingQueue<>(NodeConfiguration.getEventRelayBufferSize());
+ //private CircularFifoBuffer buffer = new CircularFifoBuffer(NodeConfiguration.getEventRelayBufferSize());
private final int EVENT_BUFFER_SIZE = NodeConfiguration.getEventRelayBufferSize();
private final Tuple3<String, Integer, String> relayInfo;
private boolean isBuffering = false;
+ private boolean targetAlive = true;
public MultiBrokerBridge(TransportProtocol sourceProtocol, TransportProtocol targetProtocol,
String eventRelayStrategy, Supplier<EventConsumer<T1>> consumerSupplier,
@@ -87,40 +92,21 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
}
private void publishWithCheck(byte[] event) {
+
+ if (targetAlive && !isTargetBrokerAlive()){
+ targetAlive = false;
+ startAliveThread();
+ }
// check if target broker can be reached
- if (isTargetBrokerAlive()) {
+ if (targetAlive) {
- if (eventRelayStrategy == EventRelayStrategy.BUFFER) {
- if (eventBuffer.isEmpty()) {
- // send events to upstream
- producer.publish(event);
- metrics.increaseNumRelayedEvents();
- } else {
- // TODO: send buffered event should run independent of callback
- // send buffered events & clear buffer
- LOG.info("Re-established connection to broker={}:{}. Resent buffered events for topic={} " +
- "(buffer_size={}, num_dropped_events={})", relayInfo.a, relayInfo.b,
- relayInfo.c, eventBuffer.size(), metrics.getNumDroppedEvents());
-
- // add current event from callback
- eventBuffer.add(event);
- eventBuffer.forEach(e -> {
- try{
- producer.publish(e);
- metrics.increaseNumRelayedEvents();
- } catch (Exception ex) {
- LOG.error(ex.toString());
- }
- });
- eventBuffer.clear();
- metrics.clearNumDroppedEvents();
- isBuffering = false;
- }
- } else if (eventRelayStrategy == EventRelayStrategy.PURGE) {
- // send events to upstream
- producer.publish(event);
- metrics.increaseNumRelayedEvents();
+ if(!eventBuffer.isEmpty()){
+ publishBufferedEvents(eventBuffer);
+ publishEvent(event);
+ }else{
+ publishEvent(event);
}
+
} else {
//
if (eventRelayStrategy == EventRelayStrategy.BUFFER) {
@@ -130,17 +116,9 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
, relayInfo.c);
isBuffering = true;
}
-
- if (eventBuffer.size() != EVENT_BUFFER_SIZE) {
- eventBuffer.add(event);
- } else {
- // evict oldest event
- eventBuffer.remove(0);
- metrics.increaseNumDroppedEvents();
- }
+ bufferEvent(event);
} else if (eventRelayStrategy == EventRelayStrategy.PURGE) {
- LOG.info("Connection issue to broker={}:{}. Purge events for topic={}", relayInfo.a, relayInfo.b,
- relayInfo.c);
+ purgeEvent(event);
}
}
}
@@ -151,6 +129,45 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
metrics.increaseNumRelayedEvents();
}
+ private void publishEvent(byte[] event){
+ producer.publish(event);
+ metrics.increaseNumRelayedEvents();
+ }
+
+ private synchronized void bufferEvent(byte[] event){
+ if (!eventBuffer.offer(event)){
+ eventBuffer.poll();
+ eventBuffer.offer(event);
+ metrics.increaseNumDroppedEvents();
+ }
+ }
+
+ private void purgeEvent(byte[] event){
+ LOG.info("Connection issue to broker={}:{}. Purge events for topic={}", relayInfo.a, relayInfo.b,
+ relayInfo.c);
+ }
+
+ private synchronized void publishBufferedEvents(Queue<byte[]> eventBuffer){
+ LOG.info("Re-established connection to broker={}:{}. Resent buffered events for topic={} " +
+ "(buffer_size={}, num_dropped_events={})", relayInfo.a, relayInfo.b,
+ relayInfo.c, eventBuffer.size(), metrics.getNumDroppedEvents());
+
+ // add current event from callback
+ eventBuffer.forEach(e -> {
+ try{
+ producer.publish(e);
+ metrics.increaseNumRelayedEvents();
+ } catch (Exception ex) {
+ LOG.error(ex.toString());
+ }
+ });
+ eventBuffer.clear();
+ metrics.clearNumDroppedEvents();
+ isBuffering = false;
+ }
+
+
+
@Override
public void stop() throws SpRuntimeException {
LOG.info("Stop event relay to broker={}:{}, topic={}", relayInfo.a, relayInfo.b, relayInfo.c);
@@ -168,7 +185,7 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
return producer.isConnected();
}
- public RelayMetrics getRelayMerics() {
+ public RelayMetrics getRelayMetrics() {
return metrics;
}
@@ -204,4 +221,20 @@ public abstract class MultiBrokerBridge<T1 extends TransportProtocol, T2 extends
}
throw new SpRuntimeException("Transport protocol not valid");
}
+
+ private void startAliveThread(){
+ LOG.info("Alive Thread started.");
+ Thread th = new Thread(){
+ public synchronized void run(){
+ boolean isAlive = false;
+ while (!isAlive){
+ if(isTargetBrokerAlive())
+ isAlive = true;
+ }
+ publishBufferedEvents(eventBuffer);
+ targetAlive = true;
+ }
+ };
+ th.start();
+ }
}
diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
index 2d8523c..25e7a1a 100644
--- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
+++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java
@@ -73,8 +73,8 @@ public class GenericTest implements Test{
Object[] line = null;
- if (testType.equals("Reconfiguration") && nrRuns == 0){
- executeOffloading();
+ if (testType.equals("Reconfiguration")){
+ executeReconfiguration();
return;
}
//Start Pipeline
@@ -176,7 +176,7 @@ public class GenericTest implements Test{
}
}
- private void executeOffloading(){
+ private void executeReconfiguration(){
if (!pipeline.isRunning()) {
PipelineOperationStatus startMessage = client.pipelines().start(pipeline);
System.out.println(startMessage.getTitle());
@@ -205,9 +205,13 @@ public class GenericTest implements Test{
sp.setValue(Float.toString(reconfigurationValue));
}
}));
-
- PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
- System.out.println(message.getTitle());
+ EvaluationLogger.getInstance().logMQTT("Reconfiguration", "Reconfiguration triggered", 0, reconfigurationValue);
+ client.pipelines().reconfigure(pipeline);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}