You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/12/01 14:07:23 UTC

[incubator-streampipes] branch dev updated: [hotfix] Improve pipelien monitoring view

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 91e076e  [hotfix] Improve pipelien monitoring view
     new 9a50f58  Merge branch 'dev' of github.com:apache/incubator-streampipes into dev
91e076e is described below

commit 91e076e0adf6542688e8cc2348698dd68f28ce97
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Dec 1 15:07:02 2021 +0100

    [hotfix] Improve pipelien monitoring view
---
 .../monitoring/pipeline/TopicInfoCollector.java    | 28 ++--------------------
 .../pipeline-element-statistics.component.html     |  4 ++--
 .../pipeline-element-statistics.component.ts       | 10 ++++++--
 3 files changed, 12 insertions(+), 30 deletions(-)

diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java
index 0e7378d..303b3a9 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java
@@ -26,6 +26,7 @@ import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
 import org.apache.streampipes.model.grounding.TransportProtocol;
 import org.apache.streampipes.model.monitoring.ConsumedMessagesInfo;
 import org.apache.streampipes.model.monitoring.PipelineElementMonitoringInfo;
@@ -113,20 +114,9 @@ public class TopicInfoCollector {
 
     info.setProducedMessagesInfo(outputTopicInfo);
     info.setConsumedMessagesInfos(inputTopicInfo);
-    printStatistics(info);
     return info;
   }
 
-  private void printStatistics(PipelineElementMonitoringInfo info) {
-    System.out.println("Pipeline Element: " + info.getPipelineElementName());
-    info.getConsumedMessagesInfos().forEach(input -> {
-      System.out.println("Consumed messages since pipeline start: " + input.getConsumedMessagesSincePipelineStart());
-      System.out.println("Total messages since pipeline start: " + (input.getTotalMessagesSincePipelineStart()));
-      System.out.println("Lag: " + input.getLag());
-    });
-    //System.out.println("Produced messages: " + (info.getOutputTopicInfo().getCurrentOffset() - info.getOutputTopicInfo().getOffsetAtPipelineStart()));
-  }
-
   private PipelineElementMonitoringInfo makeSinkMonitoringInfo(DataSinkInvocation sink) {
     PipelineElementMonitoringInfo info = prepare(sink.getElementId(), sink.getName(), true, false);
     info.setConsumedMessagesInfos(makeInputTopicInfoForPipelineElement(sink.getInputStreams()));
@@ -141,7 +131,7 @@ public class TopicInfoCollector {
       ConsumedMessagesInfo info = new ConsumedMessagesInfo(topic, groupId);
       long consumedMessagesSincePipelineStart = (getCurrentConsumerGroupOffset(groupId) - topicOffsetAtPipelineStart.get(topic));
       long totalMessagesSincePipelineStart = (latestTopicOffsets.get(topic) - topicOffsetAtPipelineStart.get(topic));
-      long lag = totalMessagesSincePipelineStart - consumedMessagesSincePipelineStart;
+      long lag = Math.max(0, totalMessagesSincePipelineStart - consumedMessagesSincePipelineStart);
 
       info.setTotalMessagesSincePipelineStart(totalMessagesSincePipelineStart);
       info.setConsumedMessagesSincePipelineStart(consumedMessagesSincePipelineStart);
@@ -240,18 +230,4 @@ public class TopicInfoCollector {
             .collect(Collectors.toList());
   }
 
-  public static void main(String[] args) {
-    List<Pipeline> pipelines = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getAllPipelines();
-    Pipeline testPipeline = pipelines.get(0);
-
-    for(int i = 0; i < 50; i++) {
-      List<PipelineElementMonitoringInfo> monitoringInfo = new TopicInfoCollector(testPipeline).makeMonitoringInfo();
-      System.out.println(monitoringInfo.size());
-      try {
-        Thread.sleep(5000);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-  }
 }
diff --git a/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html
index 883a86b..372d2e4 100644
--- a/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html
+++ b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html
@@ -20,7 +20,7 @@
 <div fxFlex="100" fxLayout="row" fxLayoutAlign="start start">
         <div fxFlex="33">
             <div fxLayout="column" class="mb-10">
-                <sp-status-widget fxFlex="100" [label]="'Consumed Messages / Lag'"
+                <sp-status-widget fxFlex="100" [label]="'Consumed / Queued'"
                                   [color]="(consumedMessagesFirstInputStream == notAvailable) ? deactivatedCardColor : cardColor"
                                   [textColor]="(consumedMessagesFirstInputStream == notAvailable) ? deactivatedTextColor : textColor"
                                   [bandColor]="consumedMessagesFirstStreamBandColor"
@@ -35,7 +35,7 @@
         </div>
         <div fxFlex="33">
             <div fxLayout="column" class="mb-10">
-                <sp-status-widget fxFlex="100" [label]="'Consumed Messages / Lag'"
+                <sp-status-widget fxFlex="100" [label]="'Consumed / Queued'"
                                   [color]="consumedMessagesSecondInputStream === notAvailable ? deactivatedCardColor : cardColor"
                                   [textColor]="consumedMessagesSecondInputStream === notAvailable ? deactivatedTextColor : textColor"
                                   [bandColor]="consumedMessagesSecondStreamBandColor"
diff --git a/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts
index 295a6ee..cc3b269 100644
--- a/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts
+++ b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts
@@ -63,7 +63,7 @@ export class PipelineElementStatisticsComponent implements OnInit {
   consumedMessagesFirstStreamBandColor: string;
   consumedMessagesSecondStreamBandColor: string;
 
-  notAvailable: string = "n/a";
+  notAvailable: string = "-";
 
   historicFirstConsumedInputValues: HistoricalMonitoringData[] = [];
   historicSecondConsumedInputValues: HistoricalMonitoringData[] = [];
@@ -73,8 +73,14 @@ export class PipelineElementStatisticsComponent implements OnInit {
   consumedMessagesSecondStreamLastValue: number = -1;
   producedMessageOutputLastValue: number = -1;
 
-  ngOnInit(): void {
+  consumedMessagesFirstStreamAvailable = false;
+  consumedMessagesSecondStreamAvailable = false;
+  producedMessagesAvailable = false;
 
+  ngOnInit(): void {
+    this.producedMessagesAvailable = this.pipelineElementMonitoringInfo.producedMessageInfoExists;
+    this.consumedMessagesFirstStreamAvailable = this.pipelineElementMonitoringInfo.consumedMessageInfoExists;
+    this.consumedMessagesSecondStreamAvailable = this.pipelineElementMonitoringInfo.consumedMessageInfoExists && this.pipelineElementMonitoringInfo.consumedMessagesInfos.length > 1;
   }
 
   updateMonitoringInfo() {