You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/12/01 14:07:23 UTC
[incubator-streampipes] branch dev updated: [hotfix] Improve pipelien monitoring view
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 91e076e [hotfix] Improve pipelien monitoring view
new 9a50f58 Merge branch 'dev' of github.com:apache/incubator-streampipes into dev
91e076e is described below
commit 91e076e0adf6542688e8cc2348698dd68f28ce97
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Dec 1 15:07:02 2021 +0100
[hotfix] Improve pipelien monitoring view
---
.../monitoring/pipeline/TopicInfoCollector.java | 28 ++--------------------
.../pipeline-element-statistics.component.html | 4 ++--
.../pipeline-element-statistics.component.ts | 10 ++++++--
3 files changed, 12 insertions(+), 30 deletions(-)
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java
index 0e7378d..303b3a9 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java
@@ -26,6 +26,7 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.model.monitoring.ConsumedMessagesInfo;
import org.apache.streampipes.model.monitoring.PipelineElementMonitoringInfo;
@@ -113,20 +114,9 @@ public class TopicInfoCollector {
info.setProducedMessagesInfo(outputTopicInfo);
info.setConsumedMessagesInfos(inputTopicInfo);
- printStatistics(info);
return info;
}
- private void printStatistics(PipelineElementMonitoringInfo info) {
- System.out.println("Pipeline Element: " + info.getPipelineElementName());
- info.getConsumedMessagesInfos().forEach(input -> {
- System.out.println("Consumed messages since pipeline start: " + input.getConsumedMessagesSincePipelineStart());
- System.out.println("Total messages since pipeline start: " + (input.getTotalMessagesSincePipelineStart()));
- System.out.println("Lag: " + input.getLag());
- });
- //System.out.println("Produced messages: " + (info.getOutputTopicInfo().getCurrentOffset() - info.getOutputTopicInfo().getOffsetAtPipelineStart()));
- }
-
private PipelineElementMonitoringInfo makeSinkMonitoringInfo(DataSinkInvocation sink) {
PipelineElementMonitoringInfo info = prepare(sink.getElementId(), sink.getName(), true, false);
info.setConsumedMessagesInfos(makeInputTopicInfoForPipelineElement(sink.getInputStreams()));
@@ -141,7 +131,7 @@ public class TopicInfoCollector {
ConsumedMessagesInfo info = new ConsumedMessagesInfo(topic, groupId);
long consumedMessagesSincePipelineStart = (getCurrentConsumerGroupOffset(groupId) - topicOffsetAtPipelineStart.get(topic));
long totalMessagesSincePipelineStart = (latestTopicOffsets.get(topic) - topicOffsetAtPipelineStart.get(topic));
- long lag = totalMessagesSincePipelineStart - consumedMessagesSincePipelineStart;
+ long lag = Math.max(0, totalMessagesSincePipelineStart - consumedMessagesSincePipelineStart);
info.setTotalMessagesSincePipelineStart(totalMessagesSincePipelineStart);
info.setConsumedMessagesSincePipelineStart(consumedMessagesSincePipelineStart);
@@ -240,18 +230,4 @@ public class TopicInfoCollector {
.collect(Collectors.toList());
}
- public static void main(String[] args) {
- List<Pipeline> pipelines = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getAllPipelines();
- Pipeline testPipeline = pipelines.get(0);
-
- for(int i = 0; i < 50; i++) {
- List<PipelineElementMonitoringInfo> monitoringInfo = new TopicInfoCollector(testPipeline).makeMonitoringInfo();
- System.out.println(monitoringInfo.size());
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
}
diff --git a/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html
index 883a86b..372d2e4 100644
--- a/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html
+++ b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html
@@ -20,7 +20,7 @@
<div fxFlex="100" fxLayout="row" fxLayoutAlign="start start">
<div fxFlex="33">
<div fxLayout="column" class="mb-10">
- <sp-status-widget fxFlex="100" [label]="'Consumed Messages / Lag'"
+ <sp-status-widget fxFlex="100" [label]="'Consumed / Queued'"
[color]="(consumedMessagesFirstInputStream == notAvailable) ? deactivatedCardColor : cardColor"
[textColor]="(consumedMessagesFirstInputStream == notAvailable) ? deactivatedTextColor : textColor"
[bandColor]="consumedMessagesFirstStreamBandColor"
@@ -35,7 +35,7 @@
</div>
<div fxFlex="33">
<div fxLayout="column" class="mb-10">
- <sp-status-widget fxFlex="100" [label]="'Consumed Messages / Lag'"
+ <sp-status-widget fxFlex="100" [label]="'Consumed / Queued'"
[color]="consumedMessagesSecondInputStream === notAvailable ? deactivatedCardColor : cardColor"
[textColor]="consumedMessagesSecondInputStream === notAvailable ? deactivatedTextColor : textColor"
[bandColor]="consumedMessagesSecondStreamBandColor"
diff --git a/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts
index 295a6ee..cc3b269 100644
--- a/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts
+++ b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts
@@ -63,7 +63,7 @@ export class PipelineElementStatisticsComponent implements OnInit {
consumedMessagesFirstStreamBandColor: string;
consumedMessagesSecondStreamBandColor: string;
- notAvailable: string = "n/a";
+ notAvailable: string = "-";
historicFirstConsumedInputValues: HistoricalMonitoringData[] = [];
historicSecondConsumedInputValues: HistoricalMonitoringData[] = [];
@@ -73,8 +73,14 @@ export class PipelineElementStatisticsComponent implements OnInit {
consumedMessagesSecondStreamLastValue: number = -1;
producedMessageOutputLastValue: number = -1;
- ngOnInit(): void {
+ consumedMessagesFirstStreamAvailable = false;
+ consumedMessagesSecondStreamAvailable = false;
+ producedMessagesAvailable = false;
+ ngOnInit(): void {
+ this.producedMessagesAvailable = this.pipelineElementMonitoringInfo.producedMessageInfoExists;
+ this.consumedMessagesFirstStreamAvailable = this.pipelineElementMonitoringInfo.consumedMessageInfoExists;
+ this.consumedMessagesSecondStreamAvailable = this.pipelineElementMonitoringInfo.consumedMessageInfoExists && this.pipelineElementMonitoringInfo.consumedMessagesInfos.length > 1;
}
updateMonitoringInfo() {