You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/02/06 18:38:35 UTC

[incubator-streampipes] branch experimental-module-federation-494 updated (e123c80 -> 6080ce8)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch experimental-module-federation-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from e123c80  [STREAMPIPES-494] Fix layout bug in pipeline details component
     new c464015  [STREAMPIPES-509] Use data lake APIs in live dashboard
     new 42405c7  [STREAMPIPES-509] Load notifications over REST instead of websockets
     new 6301f2a  [STREAMPIPES-509] Store image blobs in database
     new 74f9c72  [STREAMPIPES-509] Remove dashboard sink
     new 7781208  [STREAMPIPES-509] Refactor asset dashboard to receive data from data lake
     new 6f5e65f  [STREAMPIPES-509] Remove websocket dependencies
     new 6080ce8  [STREAMPIPES-509] Remove ActiveMQ dependency from installer files

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 installer/cli/environments/adapter                 |   1 -
 installer/cli/environments/backend                 |   1 -
 installer/cli/environments/basic                   |   1 -
 installer/cli/environments/full                    |   1 -
 installer/cli/environments/lite                    |   1 -
 installer/cli/environments/minimal                 |   4 +-
 installer/cli/environments/pipeline-element        |   4 +-
 installer/cli/environments/ui                      |   4 +-
 installer/compose/docker-compose.full.yml          |   9 -
 installer/compose/docker-compose.yml               |   9 -
 .../external/activemq/activemq-deployment.yaml     |  39 --
 .../external/activemq/activemq-service.yaml        |  39 --
 installer/k8s/values.yaml                          |   1 -
 .../backend/StreamPipesResourceConfig.java         |   2 +
 .../streampipes/client/StreamPipesClient.java      |   4 +
 .../streampipes/client/api/NotificationsApi.java   |  22 ++
 .../dataexplorer/DataLakeManagementV3.java         | 272 --------------
 .../dataexplorer/DataLakeManagementV4.java         |   1 -
 .../streampipes-sinks-internal-jvm/development/env |   5 +-
 .../sinks/internal/jvm/SinksInternalJvmInit.java   |   8 +-
 .../sinks/internal/jvm/config/ConfigKeys.java      |   7 +-
 .../sinks/internal/jvm/dashboard/Dashboard.java    |  71 ----
 .../jvm/dashboard/DashboardController.java         |  55 ---
 .../jvm/dashboard/DashboardParameters.java         |  36 --
 .../sinks/internal/jvm/datalake/DataLake.java      |  34 +-
 .../sinks/internal/jvm/datalake/ImageStore.java    |  54 +++
 .../jvm/notification/NotificationProducer.java     |  19 +-
 .../kafka/config/ConsumerConfigFactory.java        |   5 +-
 .../org/apache/streampipes/model/Notification.java |   6 +-
 .../manager/execution/http/PipelineExecutor.java   |   1 +
 .../streampipes/ps/DataLakeImageResource.java      |  27 +-
 .../apache/streampipes/rest/impl/Notification.java |  24 +-
 .../{IUserGroupStorage.java => IImageStorage.java} |   8 +-
 .../streampipes/storage/api/INoSqlStorage.java     |   2 +
 .../storage/api/INotificationStorage.java          |   2 +
 .../storage/couchdb/CouchDbStorageManager.java     |   5 +
 .../storage/couchdb/impl/ImageStorageImpl.java     |  23 +-
 .../couchdb/impl/NotificationStorageImpl.java      |  18 +-
 .../streampipes/storage/couchdb/utils/Utils.java   |   4 +
 ui/deployment/appng5.module.mst                    |   7 -
 ui/package.json                                    |   2 +-
 .../src/lib/apis/datalake-rest.service.ts          |   9 +-
 .../src/lib/model/gen/streampipes-model.ts         |  10 +-
 .../src/lib/query/DatalakeQueryParameterBuilder.ts |   8 +-
 .../app-asset-monitoring.module.ts                 |  51 +--
 .../components/view-asset/view-asset.component.ts  |  57 ++-
 .../add-pipeline-dialog.component.html             |   6 +-
 .../add-pipeline/add-pipeline-dialog.component.ts  |  32 +-
 .../model/selected-visualization-data.model.ts     |   3 +-
 .../app-asset-monitoring/services/shape.service.ts |   3 +-
 .../services/websocket.service.ts                  |  40 --
 .../core/components/iconbar/iconbar.component.html |   4 +-
 .../core/components/iconbar/iconbar.component.ts   |  50 +--
 .../widget/dashboard-widget.component.ts           |  24 +-
 .../widgets/area/area-widget.component.ts          |   7 +-
 .../components/widgets/bar-race/bar-race-config.ts |  16 +-
 .../widgets/bar-race/bar-race-widget.component.ts  |  21 +-
 .../components/widgets/base/base-echarts-widget.ts |   7 +-
 .../widgets/base/base-ngx-charts-widget.ts         |   7 +-
 .../widgets/base/base-ngx-line-charts-widget.ts    |  32 +-
 .../widgets/base/base-ngx-line-config.ts           |  52 +--
 .../components/widgets/base/base-widget.ts         | 242 +++++++-----
 .../widgets/gauge/gauge-widget.component.ts        |  27 +-
 .../widgets/html/html-widget.component.ts          |  27 +-
 .../components/widgets/image/image-config.ts       |   4 +-
 .../widgets/image/image-widget.component.html      |   2 +-
 .../widgets/image/image-widget.component.ts        |  44 ++-
 .../widgets/line/line-widget.component.ts          |  15 +-
 .../components/widgets/map/map-widget.component.ts |  19 +-
 .../widgets/number/number-widget.component.ts      |  77 ++--
 .../dashboard/components/widgets/raw/raw-config.ts |  26 +-
 .../components/widgets/raw/raw-widget.component.ts |  28 +-
 .../stacked-line-chart-config.ts                   |  17 +-
 .../stacked-line-chart-widget.component.ts         |  56 +--
 .../widgets/status/status-widget.component.ts      |  31 +-
 .../components/widgets/table/table-config.ts       |  31 +-
 .../widgets/table/table-widget.component.ts        |  49 +--
 .../trafficlight/traffic-light-widget.component.ts |  47 +--
 .../wordcloud/wordcloud-widget.component.ts        |  14 +-
 ui/src/app/dashboard/dashboard.module.ts           | 150 ++++----
 .../add-visualization-dialog.component.html        |   6 +-
 .../add-visualization-dialog.component.ts          | 410 +++++++++++----------
 .../dashboard/registry/widget-config-builder.ts    |  20 +-
 ui/src/app/dashboard/registry/widget-registry.ts   |   6 +-
 ui/src/app/dashboard/services/websocket.config.ts  |  36 --
 .../app/dashboard/services/websocket.settings.ts   |  32 --
 ui/src/app/home/components/status.component.html   |   2 +-
 ui/src/app/home/components/status.component.ts     |   3 +
 .../components/notification-item.component.html    |   4 +-
 .../app/notifications/notifications.component.html |   4 +-
 .../app/notifications/notifications.component.ts   | 326 ++++++++--------
 .../notifications/service/notifications.service.ts |  15 +-
 ui/src/app/services/notification-count-service.ts  |  17 +-
 ui/src/app/services/rest-api.service.ts            |   2 +-
 .../app/services/secure.pipe.ts}                   |  26 +-
 ui/src/app/services/services.module.ts             |   7 +-
 96 files changed, 1362 insertions(+), 1737 deletions(-)
 delete mode 100644 installer/k8s/templates/external/activemq/activemq-deployment.yaml
 delete mode 100644 installer/k8s/templates/external/activemq/activemq-service.yaml
 create mode 100644 streampipes-client/src/main/java/org/apache/streampipes/client/api/NotificationsApi.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV3.java
 delete mode 100644 streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
 delete mode 100644 streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
 delete mode 100644 streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
 create mode 100644 streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/ImageStore.java
 copy streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Version.java => streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeImageResource.java (64%)
 copy streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/{IUserGroupStorage.java => IImageStorage.java} (87%)
 copy streampipes-maven-plugin/src/main/java/org/apache/streampipes/smp/parser/PipelineElementDescriptionReplacer.java => streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ImageStorageImpl.java (60%)
 delete mode 100644 ui/src/app/app-asset-monitoring/services/websocket.service.ts
 delete mode 100644 ui/src/app/dashboard/services/websocket.config.ts
 delete mode 100644 ui/src/app/dashboard/services/websocket.settings.ts
 copy ui/{projects/streampipes/platform-services/src/lib/apis/semantic-types.service.ts => src/app/services/secure.pipe.ts} (61%)

[incubator-streampipes] 01/07: [STREAMPIPES-509] Use data lake APIs in live dashboard

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch experimental-module-federation-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit c464015477a88e9333c896df954040fa5d953930
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sat Feb 5 19:59:18 2022 +0100

    [STREAMPIPES-509] Use data lake APIs in live dashboard
---
 .../dataexplorer/DataLakeManagementV4.java         |   1 -
 .../src/lib/apis/datalake-rest.service.ts          |   7 +-
 .../src/lib/query/DatalakeQueryParameterBuilder.ts |   8 +-
 .../widget/dashboard-widget.component.ts           |  24 +-
 .../widgets/area/area-widget.component.ts          |   7 +-
 .../components/widgets/bar-race/bar-race-config.ts |  16 +-
 .../widgets/bar-race/bar-race-widget.component.ts  |  21 +-
 .../components/widgets/base/base-echarts-widget.ts |   7 +-
 .../widgets/base/base-ngx-charts-widget.ts         |   7 +-
 .../widgets/base/base-ngx-line-charts-widget.ts    |  32 +-
 .../widgets/base/base-ngx-line-config.ts           |  52 +--
 .../components/widgets/base/base-widget.ts         | 242 +++++++-----
 .../widgets/gauge/gauge-widget.component.ts        |  27 +-
 .../widgets/html/html-widget.component.ts          |  27 +-
 .../widgets/image/image-widget.component.ts        |  27 +-
 .../widgets/line/line-widget.component.ts          |  15 +-
 .../components/widgets/map/map-widget.component.ts |  19 +-
 .../widgets/number/number-widget.component.ts      |  77 ++--
 .../dashboard/components/widgets/raw/raw-config.ts |  26 +-
 .../components/widgets/raw/raw-widget.component.ts |  28 +-
 .../stacked-line-chart-config.ts                   |  17 +-
 .../stacked-line-chart-widget.component.ts         |  56 +--
 .../widgets/status/status-widget.component.ts      |  31 +-
 .../components/widgets/table/table-config.ts       |  31 +-
 .../widgets/table/table-widget.component.ts        |  49 +--
 .../trafficlight/traffic-light-widget.component.ts |  47 +--
 .../wordcloud/wordcloud-widget.component.ts        |  14 +-
 .../add-visualization-dialog.component.html        |   6 +-
 .../add-visualization-dialog.component.ts          | 410 +++++++++++----------
 .../dashboard/registry/widget-config-builder.ts    |  20 +-
 ui/src/app/dashboard/registry/widget-registry.ts   |   6 +-
 31 files changed, 740 insertions(+), 617 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 9f659f1..1b72bcb 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -321,7 +321,6 @@ public class DataLakeManagementV4 {
                         }
                     });
                 });
-                System.out.println(queryResult.getResults().size());
         });
 
         return tags;
diff --git a/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts b/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts
index b1adceb..4a5a2d1 100644
--- a/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts
+++ b/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts
@@ -46,11 +46,12 @@ export class DatalakeRestService {
   }
 
   getData(index: string,
-          queryParams: DatalakeQueryParameters): Observable<SpQueryResult> {
+          queryParams: DatalakeQueryParameters,
+          ignoreLoadingBar?: boolean): Observable<SpQueryResult> {
     const url = this.dataLakeUrl + '/measurements/' + index;
-
+    const headers = ignoreLoadingBar ? { ignoreLoadingBar: '' } : {};
     // @ts-ignore
-    return this.http.get<SpQueryResult>(url, { params: queryParams });
+    return this.http.get<SpQueryResult>(url, { params: queryParams }, headers);
   }
 
 
diff --git a/ui/projects/streampipes/platform-services/src/lib/query/DatalakeQueryParameterBuilder.ts b/ui/projects/streampipes/platform-services/src/lib/query/DatalakeQueryParameterBuilder.ts
index 0d341d9..14dc4a4 100644
--- a/ui/projects/streampipes/platform-services/src/lib/query/DatalakeQueryParameterBuilder.ts
+++ b/ui/projects/streampipes/platform-services/src/lib/query/DatalakeQueryParameterBuilder.ts
@@ -33,8 +33,12 @@ export class DatalakeQueryParameterBuilder {
   private constructor(startTime?: number,
                       endTime?: number) {
     this.queryParams = new DatalakeQueryParameters();
-    this.queryParams.startDate = startTime;
-    this.queryParams.endDate = endTime;
+    if (startTime) {
+      this.queryParams.startDate = startTime;
+    }
+    if (endTime) {
+      this.queryParams.endDate = endTime;
+    }
   }
 
   public withMaximumAmountOfEvents(maximumAmountOfEvents: number): DatalakeQueryParameterBuilder {
diff --git a/ui/src/app/dashboard/components/widget/dashboard-widget.component.ts b/ui/src/app/dashboard/components/widget/dashboard-widget.component.ts
index 5778c7f..05a114b 100644
--- a/ui/src/app/dashboard/components/widget/dashboard-widget.component.ts
+++ b/ui/src/app/dashboard/components/widget/dashboard-widget.component.ts
@@ -20,13 +20,18 @@ import { Component, EventEmitter, Input, OnInit, Output } from '@angular/core';
 import { DashboardService } from '../../services/dashboard.service';
 import { AddVisualizationDialogComponent } from '../../dialogs/add-widget/add-visualization-dialog.component';
 import {
-  DashboardWidgetModel, Pipeline,
-  VisualizablePipeline, DashboardItem, PipelineService
+  DashboardItem,
+  DashboardWidgetModel,
+  DataLakeMeasure, DatalakeRestService,
+  DataViewDataExplorerService,
+  Pipeline,
+  PipelineService
 } from '@streampipes/platform-services';
 import { PanelType } from '../../../core-ui/dialog/base-dialog/base-dialog.model';
 import { DialogService } from '../../../core-ui/dialog/base-dialog/base-dialog.service';
 import { EditModeService } from '../../services/edit-mode.service';
 import { ReloadPipelineService } from '../../services/reload-pipeline.service';
+import { zip } from 'rxjs';
 
 @Component({
   selector: 'dashboard-widget',
@@ -40,15 +45,13 @@ export class DashboardWidgetComponent implements OnInit {
   @Input() headerVisible = false;
   @Input() itemWidth: number;
   @Input() itemHeight: number;
-  // @Input() item: GridsterItem;
-  // @Input() gridsterItemComponent: GridsterItemComponent;
 
   @Output() deleteCallback: EventEmitter<DashboardItem> = new EventEmitter<DashboardItem>();
   @Output() updateCallback: EventEmitter<DashboardWidgetModel> = new EventEmitter<DashboardWidgetModel>();
 
   widgetLoaded = false;
   configuredWidget: DashboardWidgetModel;
-  widgetDataConfig: VisualizablePipeline;
+  widgetDataConfig: DataLakeMeasure;
   pipeline: Pipeline;
 
   pipelineRunning = false;
@@ -58,7 +61,9 @@ export class DashboardWidgetComponent implements OnInit {
               private dialogService: DialogService,
               private pipelineService: PipelineService,
               private editModeService: EditModeService,
-              private reloadPipelineService: ReloadPipelineService) {
+              private reloadPipelineService: ReloadPipelineService,
+              private dataExplorerService: DataViewDataExplorerService,
+              private dataLakeRestService: DatalakeRestService) {
   }
 
   ngOnInit(): void {
@@ -76,8 +81,11 @@ export class DashboardWidgetComponent implements OnInit {
   }
 
   loadVisualizablePipeline() {
-    this.dashboardService.getVisualizablePipelineByPipelineIdAndVisualizationName(this.configuredWidget.pipelineId,
-        this.configuredWidget.visualizationName).subscribe(vizPipeline => {
+    zip(this.dataExplorerService.getPersistedDataStream(this.configuredWidget.pipelineId, this.configuredWidget.visualizationName), this.dataLakeRestService.getAllMeasurementSeries())
+      .subscribe(res => {
+        const vizPipeline = res[0];
+        const measurement = res[1].find(m => m.measureName === vizPipeline.measureName);
+        vizPipeline.eventSchema = measurement.eventSchema;
       this.widgetDataConfig = vizPipeline;
       this.dashboardService.getPipelineById(vizPipeline.pipelineId).subscribe(pipeline => {
         this.pipeline = pipeline;
diff --git a/ui/src/app/dashboard/components/widgets/area/area-widget.component.ts b/ui/src/app/dashboard/components/widgets/area/area-widget.component.ts
index 157be48..1e7ea9d 100644
--- a/ui/src/app/dashboard/components/widgets/area/area-widget.component.ts
+++ b/ui/src/app/dashboard/components/widgets/area/area-widget.component.ts
@@ -18,9 +18,8 @@
 
 import { Component, OnDestroy, OnInit } from '@angular/core';
 import { BaseNgxLineChartsStreamPipesWidget } from '../base/base-ngx-line-charts-widget';
-import { RxStompService } from '@stomp/ng2-stompjs';
 import { ResizeService } from '../../../services/resize.service';
-import { DashboardService } from '../../../services/dashboard.service';
+import { DatalakeRestService } from '@streampipes/platform-services';
 
 @Component({
     selector: 'area-widget',
@@ -29,8 +28,8 @@ import { DashboardService } from '../../../services/dashboard.service';
 })
 export class AreaWidgetComponent extends BaseNgxLineChartsStreamPipesWidget implements OnInit, OnDestroy {
 
-    constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService) {
-        super(rxStompService, dashboardService, resizeService);
+    constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+        super(dataLakeService, resizeService);
     }
 
     ngOnInit(): void {
diff --git a/ui/src/app/dashboard/components/widgets/bar-race/bar-race-config.ts b/ui/src/app/dashboard/components/widgets/bar-race/bar-race-config.ts
index 58d48e3..e0ddd41 100644
--- a/ui/src/app/dashboard/components/widgets/bar-race/bar-race-config.ts
+++ b/ui/src/app/dashboard/components/widgets/bar-race/bar-race-config.ts
@@ -33,13 +33,13 @@ export class BarRaceConfig extends WidgetConfig {
 
   getConfig(): DashboardWidgetSettings {
     return WidgetConfigBuilder.createWithSelectableColorsAndTitlePanel('bar-race', 'Bar Race Chart')
-        .withIcon('fas fa-chart-bar')
-        .withDescription('Renders ordered, separate bar charts based on a partition field (e.g., device id).')
-        .requiredSchema(SchemaRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(BarRaceConfig.PARTITION_KEY, 'Partition field', '', EpRequirements.stringReq())
-            .requiredPropertyWithUnaryMapping(BarRaceConfig.VALUE_KEY, 'Value field', '', EpRequirements.numberReq())
-            .build())
-        .build();
+      .withIcon('fas fa-chart-bar')
+      .withDescription('Renders ordered, separate bar charts based on a partition field (e.g., device id).')
+      .requiredSchema(SchemaRequirementsBuilder
+        .create()
+        .requiredPropertyWithUnaryMapping(BarRaceConfig.PARTITION_KEY, 'Partition field', '', EpRequirements.stringReq())
+        .requiredPropertyWithUnaryMapping(BarRaceConfig.VALUE_KEY, 'Value field', '', EpRequirements.numberReq())
+        .build())
+      .build();
   }
 }
diff --git a/ui/src/app/dashboard/components/widgets/bar-race/bar-race-widget.component.ts b/ui/src/app/dashboard/components/widgets/bar-race/bar-race-widget.component.ts
index 2ffd185..5bbcf60 100644
--- a/ui/src/app/dashboard/components/widgets/bar-race/bar-race-widget.component.ts
+++ b/ui/src/app/dashboard/components/widgets/bar-race/bar-race-widget.component.ts
@@ -17,14 +17,13 @@
  */
 
 import { Component, OnDestroy, OnInit } from '@angular/core';
-import { RxStompService } from '@stomp/ng2-stompjs';
 import { BaseStreamPipesWidget } from '../base/base-widget';
 import { StaticPropertyExtractor } from '../../../sdk/extractor/static-property-extractor';
-import { DashboardService } from '../../../services/dashboard.service';
 import { ResizeService } from '../../../services/resize.service';
 import { ECharts } from 'echarts/core';
 import { EChartsOption } from 'echarts';
 import { BarRaceConfig } from './bar-race-config';
+import { DatalakeRestService } from '@streampipes/platform-services';
 
 
 @Component({
@@ -118,8 +117,8 @@ export class BarRaceWidgetComponent extends BaseStreamPipesWidget implements OnI
     animationEasingUpdate: 'linear',
   };
 
-  constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService) {
-    super(rxStompService, dashboardService, resizeService, false);
+  constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+    super(dataLakeService, resizeService, false);
   }
 
   protected extractConfig(extractor: StaticPropertyExtractor) {
@@ -129,12 +128,12 @@ export class BarRaceWidgetComponent extends BaseStreamPipesWidget implements OnI
     this.chartOption.yAxis.axisLabel.textStyle.color = this.selectedPrimaryTextColor;
   }
 
-  protected onEvent(event: any) {
+  protected onEvent(events: any[]) {
     this.dynamicData = this.chartOption;
-    const partitionValue = event[this.partitionField];
-    const value = event[this.valueField];
-    if (this.dynamicData.series[0].data.some(d => d.name == partitionValue)) {
-      this.dynamicData.series[0].data.find(d => d.name == partitionValue).value = value;
+    const partitionValue = events[0][this.partitionField];
+    const value = events[0][this.valueField];
+    if (this.dynamicData.series[0].data.some(d => d.name === partitionValue)) {
+      this.dynamicData.series[0].data.find(d => d.name === partitionValue).value = value;
     } else {
       this.dynamicData.series[0].data.push({name: partitionValue, value});
       this.dynamicData.yAxis.data.push(partitionValue);
@@ -162,4 +161,8 @@ export class BarRaceWidgetComponent extends BaseStreamPipesWidget implements OnI
       this.eChartsInstance.resize({width, height});
     }
   }
+
+  protected getQueryLimit(extractor: StaticPropertyExtractor): number {
+    return 1;
+  }
 }
diff --git a/ui/src/app/dashboard/components/widgets/base/base-echarts-widget.ts b/ui/src/app/dashboard/components/widgets/base/base-echarts-widget.ts
index 62b4d67..ff37734 100644
--- a/ui/src/app/dashboard/components/widgets/base/base-echarts-widget.ts
+++ b/ui/src/app/dashboard/components/widgets/base/base-echarts-widget.ts
@@ -17,11 +17,10 @@
  */
 
 import { BaseStreamPipesWidget } from './base-widget';
-import { RxStompService } from '@stomp/ng2-stompjs';
 import { ResizeService } from '../../../services/resize.service';
-import { DashboardService } from '../../../services/dashboard.service';
 import { Directive } from '@angular/core';
 import { ECharts } from 'echarts/core';
+import { DatalakeRestService } from '@streampipes/platform-services';
 
 @Directive()
 export abstract class BaseEchartsWidget extends BaseStreamPipesWidget {
@@ -34,8 +33,8 @@ export abstract class BaseEchartsWidget extends BaseStreamPipesWidget {
   eChartsInstance: ECharts;
   dynamicData: any;
 
-  constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService) {
-    super(rxStompService, dashboardService, resizeService, false);
+  constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+    super(dataLakeService, resizeService, false);
   }
 
   protected onSizeChanged(width: number, height: number) {
diff --git a/ui/src/app/dashboard/components/widgets/base/base-ngx-charts-widget.ts b/ui/src/app/dashboard/components/widgets/base/base-ngx-charts-widget.ts
index c584e6c..6b6948f 100644
--- a/ui/src/app/dashboard/components/widgets/base/base-ngx-charts-widget.ts
+++ b/ui/src/app/dashboard/components/widgets/base/base-ngx-charts-widget.ts
@@ -17,10 +17,9 @@
  */
 
 import { BaseStreamPipesWidget } from './base-widget';
-import { RxStompService } from '@stomp/ng2-stompjs';
 import { ResizeService } from '../../../services/resize.service';
-import { DashboardService } from '../../../services/dashboard.service';
 import { Directive } from '@angular/core';
+import { DatalakeRestService } from '@streampipes/platform-services';
 
 @Directive()
 export abstract class BaseNgxChartsStreamPipesWidget extends BaseStreamPipesWidget {
@@ -30,8 +29,8 @@ export abstract class BaseNgxChartsStreamPipesWidget extends BaseStreamPipesWidg
 
     colorScheme: any;
 
-    constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService) {
-        super(rxStompService, dashboardService, resizeService, true);
+    constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+        super(dataLakeService, resizeService, true);
     }
 
     ngOnInit() {
diff --git a/ui/src/app/dashboard/components/widgets/base/base-ngx-line-charts-widget.ts b/ui/src/app/dashboard/components/widgets/base/base-ngx-line-charts-widget.ts
index 7343598..94fef22 100644
--- a/ui/src/app/dashboard/components/widgets/base/base-ngx-line-charts-widget.ts
+++ b/ui/src/app/dashboard/components/widgets/base/base-ngx-line-charts-widget.ts
@@ -16,13 +16,14 @@
  *
  */
 
-import { RxStompService } from '@stomp/ng2-stompjs';
 import { ResizeService } from '../../../services/resize.service';
 import { BaseNgxChartsStreamPipesWidget } from './base-ngx-charts-widget';
 import { StaticPropertyExtractor } from '../../../sdk/extractor/static-property-extractor';
 import { LineConfig } from '../line/line-config';
-import { DashboardService } from '../../../services/dashboard.service';
 import { Directive } from '@angular/core';
+import { DatalakeRestService } from '@streampipes/platform-services';
+import { WidgetConfigBuilder } from '../../../registry/widget-config-builder';
+import { BaseStreamPipesWidget } from './base-widget';
 
 @Directive()
 export abstract class BaseNgxLineChartsStreamPipesWidget extends BaseNgxChartsStreamPipesWidget {
@@ -34,8 +35,8 @@ export abstract class BaseNgxLineChartsStreamPipesWidget extends BaseNgxChartsSt
     minYAxisRange: number;
     maxYAxisRange: number;
 
-    constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService) {
-        super(rxStompService, dashboardService, resizeService);
+    constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+        super(dataLakeService, resizeService);
     }
 
     ngOnInit(): void {
@@ -50,28 +51,23 @@ export abstract class BaseNgxLineChartsStreamPipesWidget extends BaseNgxChartsSt
 
     protected extractConfig(extractor: StaticPropertyExtractor) {
         this.selectedNumberProperty = extractor.mappingPropertyValue(LineConfig.NUMBER_MAPPING_KEY);
-        this.selectedTimestampProperty = extractor.mappingPropertyValue(LineConfig.TIMESTAMP_MAPPING_KEY);
         this.minYAxisRange = extractor.integerParameter(LineConfig.MIN_Y_AXIS_KEY);
         this.maxYAxisRange = extractor.integerParameter(LineConfig.MAX_Y_AXIS_KEY);
     }
 
-    protected onEvent(event: any) {
-        const time = event[this.selectedTimestampProperty];
-        const value = event[this.selectedNumberProperty];
-        this.makeEvent(time, value);
-    }
-
-    makeEvent(time: any, value: any): void {
-        this.multi[0].series.push({'name': time, 'value': value});
-        if (this.multi[0].series.length > 10) {
-            this.multi[0].series.shift();
-        }
+    protected onEvent(events: any[]) {
+        this.multi[0].series = events.map(ev => {
+            return { 'name': ev[BaseStreamPipesWidget.TIMESTAMP_KEY], 'value': ev[this.selectedNumberProperty]};
+        });
         this.multi = [...this.multi];
     }
 
     timestampTickFormatting(timestamp: any): string {
         const date = new Date(timestamp);
-        const timeString = date.getHours() + ':' + date.getMinutes().toString().substr(-2) + ':' + date.getSeconds().toString().substr(-2);
-        return timeString;
+        return date.getHours() + ':' + date.getMinutes().toString().substr(-2) + ':' + date.getSeconds().toString().substr(-2);
+    }
+
+    protected getQueryLimit(extractor: StaticPropertyExtractor): number {
+        return extractor.integerParameter(WidgetConfigBuilder.QUERY_LIMIT_KEY);
     }
 }
diff --git a/ui/src/app/dashboard/components/widgets/base/base-ngx-line-config.ts b/ui/src/app/dashboard/components/widgets/base/base-ngx-line-config.ts
index da9ef28..1492c89 100644
--- a/ui/src/app/dashboard/components/widgets/base/base-ngx-line-config.ts
+++ b/ui/src/app/dashboard/components/widgets/base/base-ngx-line-config.ts
@@ -24,30 +24,30 @@ import { DashboardWidgetSettings } from '@streampipes/platform-services';
 
 export abstract class BaseNgxLineConfig extends WidgetConfig {
 
-    static readonly NUMBER_MAPPING_KEY: string = 'number-mapping';
-    static readonly TIMESTAMP_MAPPING_KEY: string = 'timestamp-mapping';
-    static readonly MIN_Y_AXIS_KEY: string = 'min-y-axis-key';
-    static readonly MAX_Y_AXIS_KEY: string = 'max-y-axis-key';
-
-    getConfig(): DashboardWidgetSettings {
-        return WidgetConfigBuilder.createWithSelectableColorsAndTitlePanel(this.getWidgetName(), this.getWidgetLabel())
-            .withDescription(this.getWidgetDescription())
-            .withIcon(this.getWidgetIcon())
-            .requiredSchema(SchemaRequirementsBuilder
-                .create()
-                .requiredPropertyWithUnaryMapping(BaseNgxLineConfig.TIMESTAMP_MAPPING_KEY, 'Timestamp field', '', EpRequirements.timestampReq())
-                .requiredPropertyWithUnaryMapping(BaseNgxLineConfig.NUMBER_MAPPING_KEY, 'Number field', '', EpRequirements.numberReq())
-                .build())
-            .requiredIntegerParameter(BaseNgxLineConfig.MIN_Y_AXIS_KEY, 'Y-axis range (min)', '')
-            .requiredIntegerParameter(BaseNgxLineConfig.MAX_Y_AXIS_KEY, 'Y-axis range (max)', '')
-            .build();
-    }
-
-    abstract getWidgetName(): string;
-
-    abstract getWidgetLabel(): string;
-
-    abstract getWidgetDescription(): string;
-
-    abstract getWidgetIcon(): string;
+  static readonly NUMBER_MAPPING_KEY: string = 'number-mapping';
+  static readonly TIMESTAMP_MAPPING_KEY: string = 'timestamp-mapping';
+  static readonly MIN_Y_AXIS_KEY: string = 'min-y-axis-key';
+  static readonly MAX_Y_AXIS_KEY: string = 'max-y-axis-key';
+
+  getConfig(): DashboardWidgetSettings {
+    return WidgetConfigBuilder.createWithSelectableColorsAndTitlePanel(this.getWidgetName(), this.getWidgetLabel())
+      .withDescription(this.getWidgetDescription())
+      .withIcon(this.getWidgetIcon())
+      .withNumberOfPastEvents()
+      .requiredSchema(SchemaRequirementsBuilder
+        .create()
+        .requiredPropertyWithUnaryMapping(BaseNgxLineConfig.NUMBER_MAPPING_KEY, 'Number field', '', EpRequirements.numberReq())
+        .build())
+      .requiredIntegerParameter(BaseNgxLineConfig.MIN_Y_AXIS_KEY, 'Y-axis range (min)', '')
+      .requiredIntegerParameter(BaseNgxLineConfig.MAX_Y_AXIS_KEY, 'Y-axis range (max)', '')
+      .build();
+  }
+
+  abstract getWidgetName(): string;
+
+  abstract getWidgetLabel(): string;
+
+  abstract getWidgetDescription(): string;
+
+  abstract getWidgetIcon(): string;
 }
diff --git a/ui/src/app/dashboard/components/widgets/base/base-widget.ts b/ui/src/app/dashboard/components/widgets/base/base-widget.ts
index a9fb3e8..33ee18a 100644
--- a/ui/src/app/dashboard/components/widgets/base/base-widget.ts
+++ b/ui/src/app/dashboard/components/widgets/base/base-widget.ts
@@ -16,127 +16,173 @@
  *
  */
 
-import { Input, OnChanges, SimpleChanges, Directive } from '@angular/core';
+import { Directive, Input, OnChanges, SimpleChanges } from '@angular/core';
 import { StaticPropertyExtractor } from '../../../sdk/extractor/static-property-extractor';
-import { RxStompService } from '@stomp/ng2-stompjs';
-import { Message } from '@stomp/stompjs';
-import { Subscription } from 'rxjs';
+import { BehaviorSubject, interval, Observable, Subscription } from 'rxjs';
 import { WidgetConfigBuilder } from '../../../registry/widget-config-builder';
 import { ResizeService } from '../../../services/resize.service';
 import { WidgetInfo } from '../../../models/gridster-info.model';
-import { DashboardService } from '../../../services/dashboard.service';
 import {
-    DashboardWidgetModel,
-    VisualizablePipeline
+  DashboardWidgetModel,
+  DataLakeMeasure,
+  DatalakeQueryParameterBuilder,
+  DatalakeRestService,
+  SpQueryResult
 } from '@streampipes/platform-services';
+import { map, switchMap } from 'rxjs/operators';
 
 @Directive()
 export abstract class BaseStreamPipesWidget implements OnChanges {
 
-
-    protected constructor(private rxStompService: RxStompService,
-                          protected dashboardService: DashboardService,
-                          protected resizeService: ResizeService,
-                          protected adjustPadding: boolean) {
+  protected constructor(protected dataLakeService: DatalakeRestService,
+                        protected resizeService: ResizeService,
+                        protected adjustPadding: boolean) {
+  }
+
+  static readonly PADDING: number = 20;
+  static readonly EDIT_HEADER_HEIGHT: number = 40;
+  static readonly TIMESTAMP_KEY = 'time';
+
+  @Input() widgetConfig: DashboardWidgetModel;
+  @Input() widgetDataConfig: DataLakeMeasure;
+  @Input() itemWidth: number;
+  @Input() itemHeight: number;
+  @Input() editMode: boolean;
+
+  subscription: Subscription;
+  intervalSubject: BehaviorSubject<number>;
+
+  hasSelectableColorSettings: boolean;
+  hasTitlePanelSettings: boolean;
+
+  selectedBackgroundColor: string;
+  selectedPrimaryTextColor: string;
+  selectedSecondaryTextColor: string;
+  selectedTitle: string;
+
+  defaultBackgroundColor = '#1B1464';
+  defaultPrimaryTextColor = '#FFFFFF';
+  defaultSecondaryTextColor = '#39B54A';
+
+  refreshIntervalInSeconds = 5;
+  queryLimit = 1;
+
+  ngOnInit(): void {
+    this.prepareConfigExtraction();
+    this.resizeService.resizeSubject.subscribe(info => {
+      this.onResize(info);
+    });
+
+    this.fireQuery().subscribe(result => this.processQueryResult(result));
+
+    this.intervalSubject = new BehaviorSubject<number>(this.refreshIntervalInSeconds);
+    this.subscription = this.intervalSubject.pipe(
+      switchMap(val => interval(val * 1000)))
+      .subscribe(() => {
+        this.fireQuery().subscribe(result => {
+          this.processQueryResult(result);
+        });
+      });
+  }
+
+  prepareConfigExtraction() {
+    const extractor: StaticPropertyExtractor = new StaticPropertyExtractor(this.widgetDataConfig.eventSchema, this.widgetConfig.dashboardWidgetSettings.config);
+    if (extractor.hasStaticProperty(WidgetConfigBuilder.BACKGROUND_COLOR_KEY)) {
+      this.hasSelectableColorSettings = true;
+      this.selectedBackgroundColor = extractor.selectedColor(WidgetConfigBuilder.BACKGROUND_COLOR_KEY);
+      this.selectedPrimaryTextColor = extractor.selectedColor(WidgetConfigBuilder.PRIMARY_TEXT_COLOR_KEY);
+      this.selectedSecondaryTextColor = extractor.selectedColor(WidgetConfigBuilder.SECONDARY_TEXT_COLOR_KEY);
+    } else {
+      this.selectedBackgroundColor = this.defaultBackgroundColor;
+      this.selectedPrimaryTextColor = this.defaultPrimaryTextColor;
+      this.selectedSecondaryTextColor = this.defaultSecondaryTextColor;
     }
+    if (extractor.hasStaticProperty(WidgetConfigBuilder.TITLE_KEY)) {
+      this.hasTitlePanelSettings = true;
+      this.selectedTitle = extractor.stringParameter(WidgetConfigBuilder.TITLE_KEY);
+    }
+    if (extractor.hasStaticProperty(WidgetConfigBuilder.REFRESH_INTERVAL_KEY)) {
+      this.refreshIntervalInSeconds = extractor.integerParameter(WidgetConfigBuilder.REFRESH_INTERVAL_KEY);
+      if (this.intervalSubject) {
+        this.intervalSubject.next(this.refreshIntervalInSeconds);
+      }
+    }
+    this.queryLimit = this.getQueryLimit(extractor);
+    this.extractConfig(extractor);
+  }
 
-    static readonly PADDING: number = 20;
-    static readonly EDIT_HEADER_HEIGHT: number = 40;
-
-    // @Input() widget: DashboardItem;
-    @Input() widgetConfig: DashboardWidgetModel;
-    @Input() widgetDataConfig: VisualizablePipeline;
-    @Input() itemWidth: number;
-    @Input() itemHeight: number;
-    // @Input() gridsterItem: GridsterItem;
-    // @Input() gridsterItemComponent: GridsterItemComponent;
-    @Input() editMode: boolean;
-
-    subscription: Subscription;
+  ngOnDestroy(): void {
+    this.subscription.unsubscribe();
+  }
 
-    hasSelectableColorSettings: boolean;
-    hasTitlePanelSettings: boolean;
+  computeCurrentWidth(width: number): number {
+    return this.adjustPadding ?
+      (width - (BaseStreamPipesWidget.PADDING * 2)) :
+      width;
+  }
 
-    selectedBackgroundColor: string;
-    selectedPrimaryTextColor: string;
-    selectedSecondaryTextColor: string;
-    selectedTitle: string;
+  computeCurrentHeight(height: number): number {
+    return this.adjustPadding ?
+      (height - (BaseStreamPipesWidget.PADDING * 2) - this.editModeOffset() - this.titlePanelOffset()) :
+      height - this.editModeOffset() - this.titlePanelOffset();
+  }
 
-    defaultBackgroundColor = '#1B1464';
-    defaultPrimaryTextColor = '#FFFFFF';
-    defaultSecondaryTextColor = '#39B54A';
+  editModeOffset(): number {
+    return this.editMode ? BaseStreamPipesWidget.EDIT_HEADER_HEIGHT : 0;
+  }
 
-    ngOnInit(): void {
-        this.prepareConfigExtraction();
-        this.resizeService.resizeSubject.subscribe(info => {
-            this.onResize(info);
-        });
-        this.subscription = this.rxStompService.watch('/topic/' + this.widgetDataConfig.topic).subscribe((message: Message) => {
-            this.onEvent(JSON.parse(message.body));
-        });
-    }
+  titlePanelOffset(): number {
+    return this.hasTitlePanelSettings ? 20 : 0;
+  }
 
-    prepareConfigExtraction() {
-        const extractor: StaticPropertyExtractor = new StaticPropertyExtractor(this.widgetDataConfig.schema, this.widgetConfig.dashboardWidgetSettings.config);
-        if (extractor.hasStaticProperty(WidgetConfigBuilder.BACKGROUND_COLOR_KEY)) {
-            this.hasSelectableColorSettings = true;
-            this.selectedBackgroundColor = extractor.selectedColor(WidgetConfigBuilder.BACKGROUND_COLOR_KEY);
-            this.selectedPrimaryTextColor = extractor.selectedColor(WidgetConfigBuilder.PRIMARY_TEXT_COLOR_KEY);
-            this.selectedSecondaryTextColor = extractor.selectedColor(WidgetConfigBuilder.SECONDARY_TEXT_COLOR_KEY);
-        } else {
-            this.selectedBackgroundColor = this.defaultBackgroundColor;
-            this.selectedPrimaryTextColor = this.defaultPrimaryTextColor;
-            this.selectedSecondaryTextColor = this.defaultSecondaryTextColor;
-        }
-        if (extractor.hasStaticProperty(WidgetConfigBuilder.TITLE_KEY)) {
-            this.hasTitlePanelSettings = true;
-            this.selectedTitle = extractor.stringParameter(WidgetConfigBuilder.TITLE_KEY);
-        }
-        this.extractConfig(extractor);
-    }
+  protected abstract extractConfig(extractor: StaticPropertyExtractor);
 
-    ngOnDestroy(): void {
-        this.subscription.unsubscribe();
-    }
+  protected abstract getQueryLimit(extractor: StaticPropertyExtractor): number;
 
-    computeCurrentWidth(width: number): number {
-        return this.adjustPadding ?
-            (width - (BaseStreamPipesWidget.PADDING * 2)) :
-            width;
-    }
+  protected abstract onEvent(events: any[]);
 
-    computeCurrentHeight(height: number): number {
-        return this.adjustPadding ?
-            (height - (BaseStreamPipesWidget.PADDING * 2) - this.editModeOffset() - this.titlePanelOffset()) :
-            height - this.editModeOffset() - this.titlePanelOffset();
-    }
+  protected abstract onSizeChanged(width: number, height: number);
 
-    editModeOffset(): number {
-        return this.editMode ? BaseStreamPipesWidget.EDIT_HEADER_HEIGHT : 0;
+  ngOnChanges(changes: SimpleChanges): void {
+    if (changes['widgetConfig']) {
+      this.prepareConfigExtraction();
     }
-
-    titlePanelOffset(): number {
-        return this.hasTitlePanelSettings ? 20 : 0;
+  }
+
+  onResize(info: WidgetInfo) {
+    if (info.id === this.widgetConfig._id) {
+      setTimeout(() => {
+        this.onSizeChanged(this.computeCurrentWidth(info.width),
+          this.computeCurrentHeight(info.height));
+      }, 100);
     }
-
-    protected abstract extractConfig(extractor: StaticPropertyExtractor);
-
-    protected abstract onEvent(event: any);
-
-    protected abstract onSizeChanged(width: number, height: number);
-
-    ngOnChanges(changes: SimpleChanges): void {
-        if (changes['widgetConfig']) {
-            this.prepareConfigExtraction();
-        }
+  }
+
+  fireQuery(): Observable<SpQueryResult> {
+    return this.dataLakeService
+      .getData(this.widgetDataConfig.measureName, this.buildQuery(), true)
+      .pipe(map(res => res as SpQueryResult));
+  }
+
+  processQueryResult(queryResult: SpQueryResult) {
+    if (queryResult.total > 0) {
+      if (queryResult.allDataSeries.length === 1) {
+        const series = queryResult.allDataSeries[0];
+        const events = [];
+        series.rows.forEach(row => {
+          const event = {};
+          series.headers.forEach((fieldName, index) => {
+            event[fieldName] = row[index];
+          });
+          events.push(event);
+        });
+        this.onEvent(events.reverse());
+      }
     }
+  }
 
-    onResize(info: WidgetInfo) {
-        if (info.id === this.widgetConfig._id) {
-            setTimeout(() => {
-                this.onSizeChanged(this.computeCurrentWidth(info.width),
-                    this.computeCurrentHeight(info.height));
-            }, 100);
-        }
-    }
+  buildQuery() {
+    const queryBuilder = DatalakeQueryParameterBuilder.create();
+    return queryBuilder.withLimit(this.queryLimit).withOrdering('DESC').build();
+  }
 }
diff --git a/ui/src/app/dashboard/components/widgets/gauge/gauge-widget.component.ts b/ui/src/app/dashboard/components/widgets/gauge/gauge-widget.component.ts
index f224350..f1993ed 100644
--- a/ui/src/app/dashboard/components/widgets/gauge/gauge-widget.component.ts
+++ b/ui/src/app/dashboard/components/widgets/gauge/gauge-widget.component.ts
@@ -16,13 +16,12 @@
  *
  */
 
-import {Component, ElementRef, OnDestroy, OnInit} from "@angular/core";
-import {BaseNgxChartsStreamPipesWidget} from "../base/base-ngx-charts-widget";
-import {RxStompService} from "@stomp/ng2-stompjs";
-import {ResizeService} from "../../../services/resize.service";
-import {StaticPropertyExtractor} from "../../../sdk/extractor/static-property-extractor";
-import {GaugeConfig} from "./gauge-config";
-import {DashboardService} from "../../../services/dashboard.service";
+import { Component, OnDestroy, OnInit } from '@angular/core';
+import { BaseNgxChartsStreamPipesWidget } from '../base/base-ngx-charts-widget';
+import { ResizeService } from '../../../services/resize.service';
+import { StaticPropertyExtractor } from '../../../sdk/extractor/static-property-extractor';
+import { GaugeConfig } from './gauge-config';
+import { DatalakeRestService } from '@streampipes/platform-services';
 
 
 @Component({
@@ -38,8 +37,8 @@ export class GaugeWidgetComponent extends BaseNgxChartsStreamPipesWidget impleme
 
     selectedProperty: string;
 
-    constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService, private el: ElementRef) {
-        super(rxStompService, dashboardService, resizeService);
+    constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+        super(dataLakeService, resizeService);
     }
 
     ngOnInit(): void {
@@ -60,9 +59,13 @@ export class GaugeWidgetComponent extends BaseNgxChartsStreamPipesWidget impleme
         return false;
     }
 
-    protected onEvent(event: any) {
-        this.data[0] = ({"name": "value", "value": event[this.selectedProperty]});
+    protected onEvent(events: any[]) {
+        this.data[0] = ({'name': 'value', 'value': events[0][this.selectedProperty]});
         this.data = [...this.data];
     }
 
-}
\ No newline at end of file
+    protected getQueryLimit(extractor: StaticPropertyExtractor): number {
+        return 1;
+    }
+
+}
diff --git a/ui/src/app/dashboard/components/widgets/html/html-widget.component.ts b/ui/src/app/dashboard/components/widgets/html/html-widget.component.ts
index fe07389..bce9eff 100644
--- a/ui/src/app/dashboard/components/widgets/html/html-widget.component.ts
+++ b/ui/src/app/dashboard/components/widgets/html/html-widget.component.ts
@@ -15,13 +15,12 @@
  *   limitations under the License.
  */
 
-import {Component, OnDestroy, OnInit} from "@angular/core";
-import {RxStompService} from "@stomp/ng2-stompjs";
-import {BaseStreamPipesWidget} from "../base/base-widget";
-import {StaticPropertyExtractor} from "../../../sdk/extractor/static-property-extractor";
-import {ResizeService} from "../../../services/resize.service";
-import {HtmlConfig} from "./html-config";
-import {DashboardService} from "../../../services/dashboard.service";
+import { Component, OnDestroy, OnInit } from '@angular/core';
+import { BaseStreamPipesWidget } from '../base/base-widget';
+import { StaticPropertyExtractor } from '../../../sdk/extractor/static-property-extractor';
+import { ResizeService } from '../../../services/resize.service';
+import { HtmlConfig } from './html-config';
+import { DatalakeRestService } from '@streampipes/platform-services';
 
 @Component({
     selector: 'html-widget',
@@ -36,8 +35,8 @@ export class HtmlWidgetComponent extends BaseStreamPipesWidget implements OnInit
 
     selectedHtmlField: string;
 
-    constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService) {
-        super(rxStompService, dashboardService, resizeService, false);
+    constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+        super(dataLakeService, resizeService, false);
     }
 
     ngOnInit(): void {
@@ -51,11 +50,11 @@ export class HtmlWidgetComponent extends BaseStreamPipesWidget implements OnInit
     }
 
     extractConfig(extractor: StaticPropertyExtractor) {
-        this.selectedHtmlField = extractor.mappingPropertyValue(HtmlConfig.HTML_MAPPING_KEY)
+        this.selectedHtmlField = extractor.mappingPropertyValue(HtmlConfig.HTML_MAPPING_KEY);
     }
 
-    protected onEvent(event: any) {
-        this.item = event[this.selectedHtmlField];
+    protected onEvent(events: any[]) {
+        this.item = events[0][this.selectedHtmlField];
     }
 
     protected onSizeChanged(width: number, height: number) {
@@ -63,4 +62,8 @@ export class HtmlWidgetComponent extends BaseStreamPipesWidget implements OnInit
         this.height = height;
     }
 
+    protected getQueryLimit(extractor: StaticPropertyExtractor): number {
+        return 1;
+    }
+
 }
diff --git a/ui/src/app/dashboard/components/widgets/image/image-widget.component.ts b/ui/src/app/dashboard/components/widgets/image/image-widget.component.ts
index 00d5b6f..e2ce556 100644
--- a/ui/src/app/dashboard/components/widgets/image/image-widget.component.ts
+++ b/ui/src/app/dashboard/components/widgets/image/image-widget.component.ts
@@ -16,13 +16,12 @@
  *
  */
 
-import {Component, OnDestroy, OnInit} from "@angular/core";
-import {BaseNgxChartsStreamPipesWidget} from "../base/base-ngx-charts-widget";
-import {RxStompService} from "@stomp/ng2-stompjs";
-import {ResizeService} from "../../../services/resize.service";
-import {StaticPropertyExtractor} from "../../../sdk/extractor/static-property-extractor";
-import {GaugeConfig} from "../gauge/gauge-config";
-import {DashboardService} from "../../../services/dashboard.service";
+import { Component, OnDestroy, OnInit } from '@angular/core';
+import { BaseNgxChartsStreamPipesWidget } from '../base/base-ngx-charts-widget';
+import { ResizeService } from '../../../services/resize.service';
+import { StaticPropertyExtractor } from '../../../sdk/extractor/static-property-extractor';
+import { GaugeConfig } from '../gauge/gauge-config';
+import { DatalakeRestService } from '@streampipes/platform-services';
 
 @Component({
     selector: 'image-widget',
@@ -35,8 +34,8 @@ export class ImageWidgetComponent extends BaseNgxChartsStreamPipesWidget impleme
     title: string;
     selectedProperty: string;
 
-    constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService) {
-        super(rxStompService, dashboardService, resizeService);
+    constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+        super(dataLakeService, resizeService);
     }
 
     ngOnInit(): void {
@@ -56,8 +55,12 @@ export class ImageWidgetComponent extends BaseNgxChartsStreamPipesWidget impleme
         return false;
     }
 
-    protected onEvent(event: any) {
-        this.item = event[this.selectedProperty];
+    protected onEvent(events: any[]) {
+        this.item = events[0][this.selectedProperty];
     }
 
-}
\ No newline at end of file
+    protected getQueryLimit(extractor: StaticPropertyExtractor): number {
+        return 1;
+    }
+
+}
diff --git a/ui/src/app/dashboard/components/widgets/line/line-widget.component.ts b/ui/src/app/dashboard/components/widgets/line/line-widget.component.ts
index 610691e..fa5cde3 100644
--- a/ui/src/app/dashboard/components/widgets/line/line-widget.component.ts
+++ b/ui/src/app/dashboard/components/widgets/line/line-widget.component.ts
@@ -16,11 +16,10 @@
  *
  */
 
-import {Component, OnDestroy, OnInit} from "@angular/core";
-import {RxStompService} from "@stomp/ng2-stompjs";
-import {ResizeService} from "../../../services/resize.service";
-import {BaseNgxLineChartsStreamPipesWidget} from "../base/base-ngx-line-charts-widget";
-import {DashboardService} from "../../../services/dashboard.service";
+import { Component, OnDestroy, OnInit } from '@angular/core';
+import { ResizeService } from '../../../services/resize.service';
+import { BaseNgxLineChartsStreamPipesWidget } from '../base/base-ngx-line-charts-widget';
+import { DatalakeRestService } from '@streampipes/platform-services';
 
 @Component({
     selector: 'line-widget',
@@ -29,8 +28,8 @@ import {DashboardService} from "../../../services/dashboard.service";
 })
 export class LineWidgetComponent extends BaseNgxLineChartsStreamPipesWidget implements OnInit, OnDestroy {
 
-    constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService) {
-        super(rxStompService, dashboardService, resizeService);
+    constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+        super(dataLakeService, resizeService);
     }
 
     ngOnInit(): void {
@@ -41,4 +40,4 @@ export class LineWidgetComponent extends BaseNgxLineChartsStreamPipesWidget impl
         super.ngOnDestroy();
     }
 
-}
\ No newline at end of file
+}
diff --git a/ui/src/app/dashboard/components/widgets/map/map-widget.component.ts b/ui/src/app/dashboard/components/widgets/map/map-widget.component.ts
index 72170c2..37fc542 100644
--- a/ui/src/app/dashboard/components/widgets/map/map-widget.component.ts
+++ b/ui/src/app/dashboard/components/widgets/map/map-widget.component.ts
@@ -16,13 +16,12 @@
  */
 
 import { Component, OnDestroy, OnInit } from '@angular/core';
-import { RxStompService } from '@stomp/ng2-stompjs';
 import { BaseStreamPipesWidget } from '../base/base-widget';
 import { StaticPropertyExtractor } from '../../../sdk/extractor/static-property-extractor';
 import { MapConfig } from './map-config';
-import { latLng, marker, Marker, tileLayer, Map, LatLngExpression, LatLng, icon, Content } from 'leaflet';
+import { Content, icon, latLng, LatLng, LatLngExpression, Map, Marker, marker, tileLayer } from 'leaflet';
 import { ResizeService } from '../../../services/resize.service';
-import { DashboardService } from '../../../services/dashboard.service';
+import { DatalakeRestService } from '@streampipes/platform-services';
 
 @Component({
     selector: 'map-widget',
@@ -56,8 +55,8 @@ export class MapWidgetComponent extends BaseStreamPipesWidget implements OnInit,
         center: latLng(46.879966, -121.726909)
     };
 
-    constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService) {
-        super(rxStompService, dashboardService, resizeService, false);
+    constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+        super(dataLakeService, resizeService, false);
     }
 
     ngOnInit(): void {
@@ -93,18 +92,18 @@ export class MapWidgetComponent extends BaseStreamPipesWidget implements OnInit,
         this.map.invalidateSize();
     }
 
-    protected onEvent(event: any) {
+    protected onEvent(events: any[]) {
 
         // TODO handle when user selected id field
 
-        const tmpMarker = this.getMarker(event);
+        const tmpMarker = this.getMarker(events)[0];
 
         // Set one marker when no ids are selected
         if (this.idsToDisplay.length === 0) {
             this.markerLayers = [tmpMarker];
         } else {
 
-            const id = this.getId(event);
+            const id = this.getId(events[0]);
             const index = this.markerIds.indexOf(id);
             if (index > -1) {
                 this.markerLayers[index] = tmpMarker;
@@ -161,4 +160,8 @@ export class MapWidgetComponent extends BaseStreamPipesWidget implements OnInit,
         this.map.invalidateSize();
     }
 
+    protected getQueryLimit(extractor: StaticPropertyExtractor): number {
+        return 1;
+    }
+
 }
diff --git a/ui/src/app/dashboard/components/widgets/number/number-widget.component.ts b/ui/src/app/dashboard/components/widgets/number/number-widget.component.ts
index 3835cca..1c36326 100644
--- a/ui/src/app/dashboard/components/widgets/number/number-widget.component.ts
+++ b/ui/src/app/dashboard/components/widgets/number/number-widget.component.ts
@@ -17,61 +17,66 @@
  */
 
 import { Component, OnDestroy, OnInit } from '@angular/core';
-import { RxStompService } from '@stomp/ng2-stompjs';
 import { BaseStreamPipesWidget } from '../base/base-widget';
 import { StaticPropertyExtractor } from '../../../sdk/extractor/static-property-extractor';
 import { NumberConfig } from './number-config';
 import { ResizeService } from '../../../services/resize.service';
 import { DashboardService } from '../../../services/dashboard.service';
-import { EventPropertyPrimitive } from '@streampipes/platform-services';
+import { DatalakeRestService, EventPropertyPrimitive } from '@streampipes/platform-services';
 
 @Component({
-    selector: 'number-widget',
-    templateUrl: './number-widget.component.html',
-    styleUrls: ['./number-widget.component.css']
+  selector: 'number-widget',
+  templateUrl: './number-widget.component.html',
+  styleUrls: ['./number-widget.component.css']
 })
 export class NumberWidgetComponent extends BaseStreamPipesWidget implements OnInit, OnDestroy {
 
-    item: any = '-';
+  item: any = '-';
 
-    selectedProperty: string;
-    measurementUnitAbbrev: string;
+  selectedProperty: string;
+  measurementUnitAbbrev: string;
 
-    constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService) {
-        super(rxStompService, dashboardService, resizeService, false);
-    }
+  constructor(dataLakeService: DatalakeRestService,
+              resizeService: ResizeService,
+              private dashboardService: DashboardService) {
+    super(dataLakeService, resizeService, false);
+  }
 
-    ngOnInit(): void {
-        super.ngOnInit();
-    }
+  ngOnInit(): void {
+    super.ngOnInit();
+  }
 
-    ngOnDestroy(): void {
-        super.ngOnDestroy();
-    }
+  ngOnDestroy(): void {
+    super.ngOnDestroy();
+  }
 
-    extractConfig(extractor: StaticPropertyExtractor) {
-        this.selectedProperty = extractor.mappingPropertyValue(NumberConfig.NUMBER_MAPPING_KEY);
-        const eventProperty: EventPropertyPrimitive = extractor.getEventPropertyByName(this.selectedProperty) as EventPropertyPrimitive;
-        if (eventProperty.measurementUnit) {
-            this.dashboardService.getMeasurementUnitInfo(eventProperty.measurementUnit).subscribe(unit => {
-                this.measurementUnitAbbrev = unit.abbreviation;
-            });
-        }
+  extractConfig(extractor: StaticPropertyExtractor) {
+    this.selectedProperty = extractor.mappingPropertyValue(NumberConfig.NUMBER_MAPPING_KEY);
+    const eventProperty: EventPropertyPrimitive = extractor.getEventPropertyByName(this.selectedProperty) as EventPropertyPrimitive;
+    if (eventProperty.measurementUnit) {
+      this.dashboardService.getMeasurementUnitInfo(eventProperty.measurementUnit).subscribe(unit => {
+        this.measurementUnitAbbrev = unit.abbreviation;
+      });
     }
+  }
 
-    isNumber(item: any): boolean {
-        return false;
-    }
+  isNumber(item: any): boolean {
+    return false;
+  }
 
-    protected onEvent(event: any) {
-        let value = event[this.selectedProperty];
-        if (!isNaN(value)) {
-            value = value.toFixed(2);
-        }
-        this.item = value;
+  protected onEvent(events: any[]) {
+    let value = events[0][this.selectedProperty];
+    if (!isNaN(value)) {
+      value = value.toFixed(2);
     }
+    this.item = value;
+  }
 
-    protected onSizeChanged(width: number, height: number) {
-    }
+  protected onSizeChanged(width: number, height: number) {
+  }
+
+  protected getQueryLimit(extractor: StaticPropertyExtractor): number {
+    return 1;
+  }
 
 }
diff --git a/ui/src/app/dashboard/components/widgets/raw/raw-config.ts b/ui/src/app/dashboard/components/widgets/raw/raw-config.ts
index 989c2d4..aab1d64 100644
--- a/ui/src/app/dashboard/components/widgets/raw/raw-config.ts
+++ b/ui/src/app/dashboard/components/widgets/raw/raw-config.ts
@@ -22,18 +22,20 @@ import { DashboardWidgetSettings } from '@streampipes/platform-services';
 
 export class RawConfig extends WidgetConfig {
 
-    constructor() {
-        super();
-    }
+  constructor() {
+    super();
+  }
 
-    getConfig(): DashboardWidgetSettings {
-        return WidgetConfigBuilder.createWithSelectableColorsAndTitlePanel('raw', 'Raw')
-            .withIcon('fas fa-terminal')
-            .withDescription('Displays the raw message as it comes in for testing purposes')
-            .requiredSchema(SchemaRequirementsBuilder
-                .create()
-                .build())
-            .build();
-    }
+  getConfig(): DashboardWidgetSettings {
+    return WidgetConfigBuilder.createWithSelectableColorsAndTitlePanel('raw', 'Raw')
+      .withIcon('fas fa-terminal')
+      .withNumberOfPastEvents()
+      .withDescription('Displays the raw message as it comes in for testing purposes')
+      .withNumberOfPastEvents()
+      .requiredSchema(SchemaRequirementsBuilder
+        .create()
+        .build())
+      .build();
+  }
 
 }
diff --git a/ui/src/app/dashboard/components/widgets/raw/raw-widget.component.ts b/ui/src/app/dashboard/components/widgets/raw/raw-widget.component.ts
index d420b19..d4f6708 100644
--- a/ui/src/app/dashboard/components/widgets/raw/raw-widget.component.ts
+++ b/ui/src/app/dashboard/components/widgets/raw/raw-widget.component.ts
@@ -15,12 +15,12 @@
  *   limitations under the License.
  */
 
-import {Component, OnDestroy, OnInit} from "@angular/core";
-import {RxStompService} from "@stomp/ng2-stompjs";
-import {BaseStreamPipesWidget} from "../base/base-widget";
-import {StaticPropertyExtractor} from "../../../sdk/extractor/static-property-extractor";
-import {ResizeService} from "../../../services/resize.service";
-import {DashboardService} from "../../../services/dashboard.service";
+import { Component, OnDestroy, OnInit } from "@angular/core";
+import { BaseStreamPipesWidget } from "../base/base-widget";
+import { StaticPropertyExtractor } from "../../../sdk/extractor/static-property-extractor";
+import { ResizeService } from "../../../services/resize.service";
+import { DatalakeRestService } from '@streampipes/platform-services';
+import { WidgetConfigBuilder } from "../../../registry/widget-config-builder";
 
 @Component({
     selector: 'raw-widget',
@@ -33,8 +33,8 @@ export class RawWidgetComponent extends BaseStreamPipesWidget implements OnInit,
     width: number;
     height: number;
 
-    constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService) {
-        super(rxStompService, dashboardService, resizeService, false);
+    constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+        super(dataLakeService, resizeService, false);
     }
 
     ngOnInit(): void {
@@ -52,12 +52,8 @@ export class RawWidgetComponent extends BaseStreamPipesWidget implements OnInit,
 
     }
 
-
-    protected onEvent(event: any) {
-        this.items.unshift(JSON.stringify(event));
-        if (this.items.length > 5) {
-            this.items.pop();
-        }
+    protected onEvent(events: any[]) {
+        this.items = events.map(ev => JSON.stringify(ev)).reverse();
     }
 
     protected onSizeChanged(width: number, height: number) {
@@ -65,4 +61,8 @@ export class RawWidgetComponent extends BaseStreamPipesWidget implements OnInit,
         this.height = height;
     }
 
+    protected getQueryLimit(extractor: StaticPropertyExtractor): number {
+        return extractor.integerParameter(WidgetConfigBuilder.QUERY_LIMIT_KEY);
+    }
+
 }
diff --git a/ui/src/app/dashboard/components/widgets/stacked-line-chart/stacked-line-chart-config.ts b/ui/src/app/dashboard/components/widgets/stacked-line-chart/stacked-line-chart-config.ts
index d2929ea..a4ed31e 100644
--- a/ui/src/app/dashboard/components/widgets/stacked-line-chart/stacked-line-chart-config.ts
+++ b/ui/src/app/dashboard/components/widgets/stacked-line-chart/stacked-line-chart-config.ts
@@ -25,7 +25,6 @@ import { DashboardWidgetSettings } from '@streampipes/platform-services';
 export class StackedLineChartConfig extends WidgetConfig {
 
   static readonly VALUE_KEY: string = 'value-key';
-  static readonly TIMESTAMP_KEY: string = 'timestamp-key';
 
   constructor() {
     super();
@@ -33,13 +32,13 @@ export class StackedLineChartConfig extends WidgetConfig {
 
   getConfig(): DashboardWidgetSettings {
     return WidgetConfigBuilder.createWithSelectableColorsAndTitlePanel('stacked-line-chart', 'Stacked Line Chart')
-        .withIcon('fas fa-chart-line')
-        .withDescription('Shows a stacked line chart based on multiple measurements.')
-        .requiredSchema(SchemaRequirementsBuilder
-            .create()
-            .requiredPropertyWithUnaryMapping(StackedLineChartConfig.TIMESTAMP_KEY, 'Timestamp field', '', EpRequirements.timestampReq())
-            .requiredPropertyWithNaryMapping(StackedLineChartConfig.VALUE_KEY, 'Measurement fields', '', EpRequirements.numberReq())
-            .build())
-        .build();
+      .withIcon('fas fa-chart-line')
+      .withDescription('Shows a stacked line chart based on multiple measurements.')
+      .withNumberOfPastEvents()
+      .requiredSchema(SchemaRequirementsBuilder
+        .create()
+        .requiredPropertyWithNaryMapping(StackedLineChartConfig.VALUE_KEY, 'Measurement fields', '', EpRequirements.numberReq())
+        .build())
+      .build();
   }
 }
diff --git a/ui/src/app/dashboard/components/widgets/stacked-line-chart/stacked-line-chart-widget.component.ts b/ui/src/app/dashboard/components/widgets/stacked-line-chart/stacked-line-chart-widget.component.ts
index 4d92c6f..d7cd1da 100644
--- a/ui/src/app/dashboard/components/widgets/stacked-line-chart/stacked-line-chart-widget.component.ts
+++ b/ui/src/app/dashboard/components/widgets/stacked-line-chart/stacked-line-chart-widget.component.ts
@@ -16,14 +16,15 @@
  *
  */
 
-import {Component, OnDestroy, OnInit} from "@angular/core";
-import {RxStompService} from "@stomp/ng2-stompjs";
-import {StaticPropertyExtractor} from "../../../sdk/extractor/static-property-extractor";
-import {DashboardService} from "../../../services/dashboard.service";
-import {ResizeService} from "../../../services/resize.service";
-import {BaseEchartsWidget} from "../base/base-echarts-widget";
-import {StackedLineChartConfig} from "./stacked-line-chart-config";
-import {EChartsOption} from "echarts";
+import { Component, OnDestroy, OnInit } from '@angular/core';
+import { StaticPropertyExtractor } from '../../../sdk/extractor/static-property-extractor';
+import { ResizeService } from '../../../services/resize.service';
+import { BaseEchartsWidget } from '../base/base-echarts-widget';
+import { StackedLineChartConfig } from './stacked-line-chart-config';
+import { EChartsOption } from 'echarts';
+import { DatalakeRestService } from '@streampipes/platform-services';
+import { BaseNgxLineChartsStreamPipesWidget } from '../base/base-ngx-line-charts-widget';
+import { WidgetConfigBuilder } from "../../../registry/widget-config-builder";
 
 
 @Component({
@@ -34,15 +35,15 @@ import {EChartsOption} from "echarts";
 export class StackedLineChartWidgetComponent extends BaseEchartsWidget implements OnInit, OnDestroy {
 
   partitionField: string;
-  valueFields: Array<string>;
+  valueFields: string[];
   timestampField: string;
 
   chartOption = {
     tooltip: {
       trigger: 'axis',
-      formatter: function (params) {
+      formatter (params) {
         params = params[0];
-        var date = new Date(params.value[0]);
+        const date = new Date(params.value[0]);
         return date.getHours() + ':' + (date.getMinutes() + 1) + ':' + date.getSeconds() + ' : ' + params.value[1];
       },
       axisPointer: {
@@ -53,8 +54,8 @@ export class StackedLineChartWidgetComponent extends BaseEchartsWidget implement
       type: 'time',
       axisLabel: {
         formatter: params => {
-          let date = new Date(params);
-          return date.getHours() + ":" + date.getMinutes() + ":" + date.getSeconds();
+          const date = new Date(params);
+          return date.getHours() + ':' + date.getMinutes() + ':' + date.getSeconds();
         },
         textStyle: {
           color: this.selectedPrimaryTextColor
@@ -73,12 +74,11 @@ export class StackedLineChartWidgetComponent extends BaseEchartsWidget implement
     animationDuration: 500
   };
 
-  constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService) {
-    super(rxStompService, dashboardService, resizeService);
+  constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+    super(dataLakeService, resizeService);
   }
 
   protected extractConfig(extractor: StaticPropertyExtractor) {
-    this.timestampField = extractor.mappingPropertyValue(StackedLineChartConfig.TIMESTAMP_KEY);
     this.valueFields = extractor.mappingPropertyValues(StackedLineChartConfig.VALUE_KEY);
     this.chartOption.xAxis.axisLabel.textStyle.color = this.selectedPrimaryTextColor;
     this.chartOption.yAxis.axisLabel.textStyle.color = this.selectedPrimaryTextColor;
@@ -86,15 +86,15 @@ export class StackedLineChartWidgetComponent extends BaseEchartsWidget implement
 
   protected onEvent(event: any) {
     this.dynamicData = this.chartOption;
-    let timestamp = event[this.timestampField];
+    const timestamp = event[BaseNgxLineChartsStreamPipesWidget.TIMESTAMP_KEY];
     this.valueFields.forEach(field => {
-      if (this.dynamicData.series.some(d => d.name == field)) {
-        let date = new Date(timestamp);
-        this.dynamicData.series.find(d => d.name == field).data.push(
-            {"name": date.toString(), value: [timestamp, event[field]]}
+      if (this.dynamicData.series.some(d => d.name === field)) {
+        const date = new Date(timestamp);
+        this.dynamicData.series.find(d => d.name === field).data.push(
+            {'name': date.toString(), value: [timestamp, event[field]]}
         );
-        if (this.dynamicData.series.find(d => d.name == field).data.length > 5) {
-          this.dynamicData.series.find(d => d.name == field).data.shift();
+        if (this.dynamicData.series.find(d => d.name === field).data.length > 5) {
+          this.dynamicData.series.find(d => d.name === field).data.shift();
         }
       } else {
         this.dynamicData.series.push(this.makeNewSeries(field, timestamp, event[field]));
@@ -107,16 +107,20 @@ export class StackedLineChartWidgetComponent extends BaseEchartsWidget implement
   }
 
   makeNewSeries(seriesName, timestamp, value) {
-    let date = new Date(timestamp);
+    const date = new Date(timestamp);
     return {
       type: 'line',
       smooth: true,
       name: seriesName,
       data: [{
-        "name": date.toString(),
+        'name': date.toString(),
         value: [timestamp, value]
       }],
-    }
+    };
+  }
+
+  protected getQueryLimit(extractor: StaticPropertyExtractor): number {
+    return extractor.integerParameter(WidgetConfigBuilder.QUERY_LIMIT_KEY);
   }
 
 }
diff --git a/ui/src/app/dashboard/components/widgets/status/status-widget.component.ts b/ui/src/app/dashboard/components/widgets/status/status-widget.component.ts
index 0fdcbc5..6d08ddf 100644
--- a/ui/src/app/dashboard/components/widgets/status/status-widget.component.ts
+++ b/ui/src/app/dashboard/components/widgets/status/status-widget.component.ts
@@ -16,13 +16,12 @@
  *
  */
 
-import {Component, OnDestroy, OnInit} from "@angular/core";
-import {RxStompService} from "@stomp/ng2-stompjs";
-import {BaseStreamPipesWidget} from "../base/base-widget";
-import {StaticPropertyExtractor} from "../../../sdk/extractor/static-property-extractor";
-import {DashboardService} from "../../../services/dashboard.service";
-import {ResizeService} from "../../../services/resize.service";
-import {StatusWidgetConfig} from "./status-config";
+import { Component, OnDestroy, OnInit } from '@angular/core';
+import { BaseStreamPipesWidget } from '../base/base-widget';
+import { StaticPropertyExtractor } from '../../../sdk/extractor/static-property-extractor';
+import { ResizeService } from '../../../services/resize.service';
+import { StatusWidgetConfig } from './status-config';
+import { DatalakeRestService } from '@streampipes/platform-services';
 
 
 @Component({
@@ -33,14 +32,14 @@ import {StatusWidgetConfig} from "./status-config";
 export class StatusWidgetComponent extends BaseStreamPipesWidget implements OnInit, OnDestroy {
 
   interval: number;
-  active: boolean = false;
-  lastTimestamp: number = 0;
+  active = false;
+  lastTimestamp = 0;
 
   statusLightWidth: string;
   statusLightHeight: string;
 
-  constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService) {
-    super(rxStompService, dashboardService, resizeService, false);
+  constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+    super(dataLakeService, resizeService, false);
   }
 
   ngOnInit(): void {
@@ -52,9 +51,9 @@ export class StatusWidgetComponent extends BaseStreamPipesWidget implements OnIn
     this.interval = extractor.integerParameter(StatusWidgetConfig.INTERVAL_KEY);
   }
 
-  protected onEvent(event: any) {
+  protected onEvent(events: any[]) {
     this.active = true;
-    let timestamp = new Date().getTime();
+    const timestamp = new Date().getTime();
     this.lastTimestamp = timestamp;
     setTimeout(() => {
       if (this.lastTimestamp <= timestamp) {
@@ -64,9 +63,13 @@ export class StatusWidgetComponent extends BaseStreamPipesWidget implements OnIn
   }
 
   protected onSizeChanged(width: number, height: number) {
-    let size: string = Math.min(width, height) * 0.6 + "px";
+    const size: string = Math.min(width, height) * 0.6 + 'px';
     this.statusLightWidth = size;
     this.statusLightHeight = size;
   }
 
+  protected getQueryLimit(extractor: StaticPropertyExtractor): number {
+    return 1;
+  }
+
 }
diff --git a/ui/src/app/dashboard/components/widgets/table/table-config.ts b/ui/src/app/dashboard/components/widgets/table/table-config.ts
index 84a1123..2a4782e 100644
--- a/ui/src/app/dashboard/components/widgets/table/table-config.ts
+++ b/ui/src/app/dashboard/components/widgets/table/table-config.ts
@@ -24,22 +24,23 @@ import { DashboardWidgetSettings } from '@streampipes/platform-services';
 
 export class TableConfig extends WidgetConfig {
 
-    static readonly TITLE_KEY: string = 'title-key';
-    static readonly SELECTED_PROPERTIES_KEYS: string = 'selected-fields-key';
+  static readonly TITLE_KEY: string = 'title-key';
+  static readonly SELECTED_PROPERTIES_KEYS: string = 'selected-fields-key';
 
-    constructor() {
-        super();
-    }
+  constructor() {
+    super();
+  }
 
-    getConfig(): DashboardWidgetSettings {
-        return WidgetConfigBuilder.createWithSelectableColorsAndTitlePanel('table', 'Table')
-            .withIcon('fas fa-table')
-            .withDescription('A table visualization with customizable columns')
-            .requiredSchema(SchemaRequirementsBuilder
-                .create()
-                .requiredPropertyWithNaryMapping(TableConfig.SELECTED_PROPERTIES_KEYS, 'Fields to display', '', EpRequirements.anyProperty())
-                .build())
-            .build();
-    }
+  getConfig(): DashboardWidgetSettings {
+    return WidgetConfigBuilder.createWithSelectableColorsAndTitlePanel('table', 'Table')
+      .withIcon('fas fa-table')
+      .withDescription('A table visualization with customizable columns')
+      .withNumberOfPastEvents()
+      .requiredSchema(SchemaRequirementsBuilder
+        .create()
+        .requiredPropertyWithNaryMapping(TableConfig.SELECTED_PROPERTIES_KEYS, 'Fields to display', '', EpRequirements.anyProperty())
+        .build())
+      .build();
+  }
 
 }
diff --git a/ui/src/app/dashboard/components/widgets/table/table-widget.component.ts b/ui/src/app/dashboard/components/widgets/table/table-widget.component.ts
index 706ada0..d291899 100644
--- a/ui/src/app/dashboard/components/widgets/table/table-widget.component.ts
+++ b/ui/src/app/dashboard/components/widgets/table/table-widget.component.ts
@@ -16,15 +16,15 @@
  *
  */
 
-import {Component, OnDestroy, OnInit} from "@angular/core";
-import {BaseStreamPipesWidget} from "../base/base-widget";
-import {RxStompService} from "@stomp/ng2-stompjs";
-import {StaticPropertyExtractor} from "../../../sdk/extractor/static-property-extractor";
-import {MatTableDataSource} from "@angular/material/table";
-import {TableConfig} from "./table-config";
-import {SemanticTypeUtilsService} from "../../../../core-services/semantic-type/semantic-type-utils.service";
-import {ResizeService} from "../../../services/resize.service";
-import {DashboardService} from "../../../services/dashboard.service";
+import { Component, OnDestroy, OnInit } from '@angular/core';
+import { BaseStreamPipesWidget } from '../base/base-widget';
+import { StaticPropertyExtractor } from '../../../sdk/extractor/static-property-extractor';
+import { MatTableDataSource } from '@angular/material/table';
+import { TableConfig } from './table-config';
+import { SemanticTypeUtilsService } from '../../../../core-services/semantic-type/semantic-type-utils.service';
+import { ResizeService } from '../../../services/resize.service';
+import { DatalakeRestService } from '@streampipes/platform-services';
+import { WidgetConfigBuilder } from '../../../registry/widget-config-builder';
 
 @Component({
     selector: 'table-widget',
@@ -33,23 +33,26 @@ import {DashboardService} from "../../../services/dashboard.service";
 })
 export class TableWidgetComponent extends BaseStreamPipesWidget implements OnInit, OnDestroy {
 
-    selectedProperties: Array<string>;
+    selectedProperties: string[];
 
-    displayedColumns: String[] = [];
+    displayedColumns: string[] = [];
     dataSource = new MatTableDataSource();
     semanticTypes: { [key: string]: string; } = {};
-    tableDisplayed: boolean = false;
+    tableDisplayed = false;
 
-    constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService, private semanticTypeUtils: SemanticTypeUtilsService) {
-        super(rxStompService, dashboardService, resizeService, false);
+    constructor(dataLakeService: DatalakeRestService,
+                resizeService: ResizeService,
+                private semanticTypeUtils: SemanticTypeUtilsService) {
+        super(dataLakeService, resizeService, false);
     }
 
     ngOnInit(): void {
         super.ngOnInit();
 
-        this.widgetDataConfig.schema.eventProperties.forEach((key, index) => {
-            this.semanticTypes[key.runtimeName] = key.domainProperties[0]
+        this.widgetDataConfig.eventSchema.eventProperties.forEach((key, index) => {
+            this.semanticTypes[key.runtimeName] = key.domainProperties[0];
         });
+        this.semanticTypes[BaseStreamPipesWidget.TIMESTAMP_KEY] = this.semanticTypeUtils.TIMESTAMP;
     }
 
     ngOnDestroy(): void {
@@ -58,18 +61,16 @@ export class TableWidgetComponent extends BaseStreamPipesWidget implements OnIni
 
     extractConfig(extractor: StaticPropertyExtractor) {
         this.selectedProperties = extractor.mappingPropertyValues(TableConfig.SELECTED_PROPERTIES_KEYS);
+        this.selectedProperties.push(BaseStreamPipesWidget.TIMESTAMP_KEY);
     }
 
-    protected onEvent(event: any) {
-        this.dataSource.data.unshift(this.createTableObject(event));
-        if (this.dataSource.data.length > 10) {
-            this.dataSource.data.pop();
-        }
+    protected onEvent(events: any[]) {
+        this.dataSource.data = events.map(ev => this.createTableObject(ev)).reverse();
         this.dataSource.data = [...this.dataSource.data];
     }
 
     createTableObject(event: any) {
-        let object = {};
+        const object = {};
         this.selectedProperties.forEach((key, index) => {
             event[key] = this.semanticTypeUtils.getValue(event[key], this.semanticTypes[key]);
             object[key] = event[key];
@@ -80,4 +81,8 @@ export class TableWidgetComponent extends BaseStreamPipesWidget implements OnIni
     protected onSizeChanged(width: number, height: number) {
     }
 
+    protected getQueryLimit(extractor: StaticPropertyExtractor): number {
+        return extractor.integerParameter(WidgetConfigBuilder.QUERY_LIMIT_KEY);
+    }
+
 }
diff --git a/ui/src/app/dashboard/components/widgets/trafficlight/traffic-light-widget.component.ts b/ui/src/app/dashboard/components/widgets/trafficlight/traffic-light-widget.component.ts
index 49eeeff..8681357 100644
--- a/ui/src/app/dashboard/components/widgets/trafficlight/traffic-light-widget.component.ts
+++ b/ui/src/app/dashboard/components/widgets/trafficlight/traffic-light-widget.component.ts
@@ -15,13 +15,12 @@
  *   limitations under the License.
  */
 
-import {Component, OnDestroy, OnInit} from "@angular/core";
-import {RxStompService} from "@stomp/ng2-stompjs";
-import {BaseStreamPipesWidget} from "../base/base-widget";
-import {StaticPropertyExtractor} from "../../../sdk/extractor/static-property-extractor";
-import {ResizeService} from "../../../services/resize.service";
-import {TrafficLightConfig} from "./traffic-light-config";
-import {DashboardService} from "../../../services/dashboard.service";
+import { Component, OnDestroy, OnInit } from '@angular/core';
+import { BaseStreamPipesWidget } from '../base/base-widget';
+import { StaticPropertyExtractor } from '../../../sdk/extractor/static-property-extractor';
+import { ResizeService } from '../../../services/resize.service';
+import { TrafficLightConfig } from './traffic-light-config';
+import { DatalakeRestService } from '@streampipes/platform-services';
 
 @Component({
     selector: 'traffic-light-widget',
@@ -30,25 +29,25 @@ import {DashboardService} from "../../../services/dashboard.service";
 })
 export class TrafficLightWidgetComponent extends BaseStreamPipesWidget implements OnInit, OnDestroy {
 
-    items: Array<string>;
+    items: string[];
     width: number;
     height: number;
-    
+
     containerWidth: number;
     containerHeight: number;
-    
+
     lightWidth: number;
     lightHeight: number;
 
     selectedWarningRange: number;
     selectedFieldToObserve: string;
-    selectedLimitGreaterThan: boolean
+    selectedLimitGreaterThan: boolean;
     selectedThreshold: number;
 
     activeClass = 'red';
 
-    constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService) {
-        super(rxStompService, dashboardService, resizeService, false);
+    constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+        super(dataLakeService, resizeService, false);
     }
 
     ngOnInit(): void {
@@ -78,18 +77,18 @@ export class TrafficLightWidgetComponent extends BaseStreamPipesWidget implement
         this.selectedThreshold = extractor.integerParameter(TrafficLightConfig.CRITICAL_VALUE_KEY);
         this.selectedFieldToObserve = extractor.mappingPropertyValue(TrafficLightConfig.NUMBER_MAPPING_KEY);
         this.selectedWarningRange = extractor.integerParameter(TrafficLightConfig.WARNING_RANGE_KEY);
-        this.selectedLimitGreaterThan = extractor.selectedSingleValue(TrafficLightConfig.CRITICAL_VALUE_LIMIT) === "Upper Limit";
+        this.selectedLimitGreaterThan = extractor.selectedSingleValue(TrafficLightConfig.CRITICAL_VALUE_LIMIT) === 'Upper Limit';
     }
 
 
-    protected onEvent(event: any) {
-        let item = event[this.selectedFieldToObserve];
+    protected onEvent(events: any[]) {
+        const item = events[0][this.selectedFieldToObserve];
         if (this.isInOkRange(item)) {
-            this.activeClass = "green";
+            this.activeClass = 'green';
         } else if (this.isInWarningRange(item)) {
-            this.activeClass = "yellow";
+            this.activeClass = 'yellow';
         } else {
-            this.activeClass = "red";
+            this.activeClass = 'red';
         }
     }
 
@@ -102,12 +101,12 @@ export class TrafficLightWidgetComponent extends BaseStreamPipesWidget implement
     }
 
     isInWarningRange(value) {
-        if (this.exceedsThreshold(value)) return false;
+        if (this.exceedsThreshold(value)) { return false; }
         else {
             if (this.selectedLimitGreaterThan) {
-                return value >= (this.selectedThreshold - this.selectedThreshold*(this.selectedWarningRange/100));
+                return value >= (this.selectedThreshold - this.selectedThreshold * (this.selectedWarningRange / 100));
             } else {
-                return value <= (this.selectedThreshold + this.selectedThreshold*(this.selectedWarningRange/100));
+                return value <= (this.selectedThreshold + this.selectedThreshold * (this.selectedWarningRange / 100));
             }
         }
     }
@@ -122,4 +121,8 @@ export class TrafficLightWidgetComponent extends BaseStreamPipesWidget implement
         this.updateLightSize();
     }
 
+    protected getQueryLimit(extractor: StaticPropertyExtractor): number {
+        return 1;
+    }
+
 }
diff --git a/ui/src/app/dashboard/components/widgets/wordcloud/wordcloud-widget.component.ts b/ui/src/app/dashboard/components/widgets/wordcloud/wordcloud-widget.component.ts
index b56ffad..0a08634 100644
--- a/ui/src/app/dashboard/components/widgets/wordcloud/wordcloud-widget.component.ts
+++ b/ui/src/app/dashboard/components/widgets/wordcloud/wordcloud-widget.component.ts
@@ -17,18 +17,16 @@
  */
 
 import { Component, OnDestroy, OnInit } from '@angular/core';
-import { RxStompService } from '@stomp/ng2-stompjs';
 import { BaseStreamPipesWidget } from '../base/base-widget';
 import { StaticPropertyExtractor } from '../../../sdk/extractor/static-property-extractor';
 import { ResizeService } from '../../../services/resize.service';
-import { DashboardService } from '../../../services/dashboard.service';
-import { EventPropertyList } from '@streampipes/platform-services';
+import { DatalakeRestService, EventPropertyList } from '@streampipes/platform-services';
 import { WordCloudConfig } from './wordcloud-config';
 
 import { EChartsOption } from 'echarts';
 import 'echarts-wordcloud';
 import { ECharts } from 'echarts/core';
-
+import { WidgetConfigBuilder } from '../../../registry/widget-config-builder';
 
 
 @Component({
@@ -93,8 +91,8 @@ export class WordcloudWidgetComponent extends BaseStreamPipesWidget implements O
     }]
   };
 
-  constructor(rxStompService: RxStompService, dashboardService: DashboardService, resizeService: ResizeService) {
-    super(rxStompService, dashboardService, resizeService, false);
+  constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+    super(dataLakeService, resizeService, false);
   }
 
   ngOnInit(): void {
@@ -143,4 +141,8 @@ export class WordcloudWidgetComponent extends BaseStreamPipesWidget implements O
     }
   }
 
+  protected getQueryLimit(extractor: StaticPropertyExtractor): number {
+    return extractor.integerParameter(WidgetConfigBuilder.QUERY_LIMIT_KEY);
+  }
+
 }
diff --git a/ui/src/app/dashboard/dialogs/add-widget/add-visualization-dialog.component.html b/ui/src/app/dashboard/dialogs/add-widget/add-visualization-dialog.component.html
index af78fcf..45c650a 100644
--- a/ui/src/app/dashboard/dialogs/add-widget/add-visualization-dialog.component.html
+++ b/ui/src/app/dashboard/dialogs/add-widget/add-visualization-dialog.component.html
@@ -28,10 +28,10 @@
                                        class="list-item"
                                         [attr.data-cy]="'dashboard-visualize-pipeline-' + pipeline.pipelineName">
                             <div mat-list-avatar
-                                 class="pipeline-avatar sp-accent-bg">{{iconText(pipeline.visualizationName)}}
+                                 class="pipeline-avatar sp-accent-bg">{{iconText(pipeline.measureName)}}
                             </div>
                             <h4 mat-line>{{pipeline.pipelineName}}</h4>
-                            <p mat-line>{{pipeline.visualizationName}} </p>
+                            <p mat-line>{{pipeline.measureName}} </p>
                         </mat-list-item>
                     </mat-list>
                 </div>
@@ -61,7 +61,7 @@
                     <div fxFlex="100" fxLayout="column">
                         <app-static-property *ngFor="let config of selectedWidget.config" [staticProperty]="config"
                                              [staticProperties]="selectedWidget.config"
-                                             [eventSchemas]="[selectedPipeline.schema]"
+                                             [eventSchemas]="[selectedPipeline.eventSchema]"
                                              [fieldName]="config.internalName"
                                              [parentForm]="parentForm">
                         </app-static-property>
diff --git a/ui/src/app/dashboard/dialogs/add-widget/add-visualization-dialog.component.ts b/ui/src/app/dashboard/dialogs/add-widget/add-visualization-dialog.component.ts
index 0b7bf74..9ec6155 100644
--- a/ui/src/app/dashboard/dialogs/add-widget/add-visualization-dialog.component.ts
+++ b/ui/src/app/dashboard/dialogs/add-widget/add-visualization-dialog.component.ts
@@ -23,211 +23,227 @@ import { WidgetRegistry } from '../../registry/widget-registry';
 import { MappingPropertyGenerator } from '../../sdk/matching/mapping-property-generator';
 import { DashboardService } from '../../services/dashboard.service';
 import {
-    Dashboard,
-    DashboardWidgetModel,
-    DashboardWidgetSettings,
-    EventPropertyUnion,
-    EventSchema,
-    FreeTextStaticProperty,
-    MappingPropertyNary,
-    MappingPropertyUnary,
-    VisualizablePipeline,
-    PipelineService
+  Dashboard,
+  DashboardWidgetModel,
+  DashboardWidgetSettings,
+  DataLakeMeasure, DatalakeRestService,
+  DataViewDataExplorerService,
+  EventPropertyUnion,
+  EventSchema,
+  FreeTextStaticProperty,
+  MappingPropertyNary,
+  MappingPropertyUnary,
+  PipelineService
 } from '@streampipes/platform-services';
 import { FormBuilder, FormGroup } from '@angular/forms';
 import { DialogRef } from '../../../core-ui/dialog/base-dialog/dialog-ref';
+import { zip } from "rxjs";
 
 @Component({
-    selector: 'add-visualization-dialog-component',
-    templateUrl: './add-visualization-dialog.component.html',
-    styleUrls: ['./add-visualization-dialog.component.scss']
+  selector: 'add-visualization-dialog-component',
+  templateUrl: './add-visualization-dialog.component.html',
+  styleUrls: ['./add-visualization-dialog.component.scss']
 })
 export class AddVisualizationDialogComponent implements OnInit, AfterViewInit {
 
-    pages = [{
-        type: 'select-pipeline',
-        title: 'Select Pipeline',
-        description: 'Select a pipeline you\'d like to visualize'
-    }, {
-        type: 'select-widget',
-        title: 'Select Widget',
-        description: 'Select widget'
-    }, {
-        type: 'configure-widget',
-        title: 'Configure Widget',
-        description: 'Configure widget'
-    }];
-
-    visualizablePipelines: VisualizablePipeline[] = [];
-    availableWidgets: DashboardWidgetSettings[];
-
-    selectedPipeline: VisualizablePipeline;
-    selectedWidget: DashboardWidgetSettings;
-
-    dashboard: Dashboard;
-
-    selectedType: any;
-    page: any = 'select-pipeline';
-    dialogTitle: string;
-
-    parentForm: FormGroup;
-
-    formValid = false;
-    viewInitialized = false;
-
-    @Input()
-    pipeline: VisualizablePipeline;
-
-    @Input()
-    widget: DashboardWidgetModel;
-
-    @Input()
-    editMode: boolean;
-
-    @Input()
-    startPage: string;
-
-
-    constructor(
-        private dialogRef: DialogRef<AddVisualizationDialogComponent>,
-        private dashboardService: DashboardService,
-        private pipelineService: PipelineService,
-        public elementIconText: ElementIconText,
-        private fb: FormBuilder,
-        private changeDetectorRef: ChangeDetectorRef) {
-    }
-
-    ngOnInit() {
-        this.parentForm = this.fb.group({});
-        this.parentForm.statusChanges.subscribe(status => {
-           this.formValid = this.viewInitialized && this.parentForm.valid;
-        });
-        if (!this.editMode) {
-            this.dialogTitle = 'Add widget';
-            this.loadVisualizablePipelines();
-        } else {
-            this.loadVisualizablePipelines();
-            this.dialogTitle = 'Edit widget';
-            this.selectedPipeline = this.pipeline;
-            this.selectedWidget = this.widget.dashboardWidgetSettings;
-            this.page = this.startPage;
-        }
-    }
-
-    ngAfterViewInit() {
-        this.viewInitialized = true;
-        this.formValid = this.viewInitialized && this.parentForm.valid;
-        this.changeDetectorRef.detectChanges();
-    }
-
-    loadVisualizablePipelines() {
-        this.dashboardService.getVisualizablePipelines().subscribe(visualizations => {
-            this.visualizablePipelines = this.sortPipeline(visualizations);
-        });
-    }
-
-    sortPipeline(visualizations: VisualizablePipeline[]): VisualizablePipeline[] {
-        return visualizations.sort((a, b) => {
-            if (a.pipelineName === b.pipelineName) {
-                return a.visualizationName.toLowerCase() < b.visualizationName.toLowerCase() ? -1 : 1;
-            } else {
-                return a.pipelineName.toLowerCase() < b.pipelineName.toLowerCase() ? -1 : 1;
-            }
-        });
-    }
-
-    onCancel(): void {
-        this.dialogRef.close();
-    }
-
-    getSelectedPipelineCss(vis) {
-        return this.getSelectedCss(this.selectedPipeline, vis);
-    }
-
-    getSelectedVisTypeCss(type) {
-        return this.getSelectedCss(this.selectedType, type);
-    }
-
-    getSelectedCss(selected, current) {
-        if (selected === current) {
-            return 'wizard-preview wizard-preview-selected';
-        } else {
-            return 'wizard-preview';
-        }
-    }
-
-    getTabCss(page) {
-        if (page === this.page) { return 'md-fab md-accent'; } else { return 'md-fab md-accent wizard-inactive'; }
-    }
-
-    selectPipeline(vis) {
-        this.selectedPipeline = vis;
-        this.next();
-    }
-
-    selectWidget(widget) {
-        this.selectedWidget = widget;
-        this.selectedWidget.config.forEach(sp => {
-            if (sp instanceof MappingPropertyUnary || sp instanceof MappingPropertyNary) {
-                const requirement: EventPropertyUnion = this.findRequirement(this.selectedWidget.requiredSchema, sp.internalName);
-                sp.mapsFromOptions = new MappingPropertyGenerator(requirement,
-                    this.selectedPipeline.schema.eventProperties).computeMatchingProperties();
-            }
-            if (sp instanceof FreeTextStaticProperty && sp.internalName === WidgetConfigBuilder.TITLE_KEY) {
-                sp.value = this.selectedPipeline.visualizationName;
-            }
+  pages = [{
+    type: 'select-pipeline',
+    title: 'Select Pipeline',
+    description: 'Select a pipeline you\'d like to visualize'
+  }, {
+    type: 'select-widget',
+    title: 'Select Widget',
+    description: 'Select widget'
+  }, {
+    type: 'configure-widget',
+    title: 'Configure Widget',
+    description: 'Configure widget'
+  }];
+
+  visualizablePipelines: DataLakeMeasure[] = [];
+  availableWidgets: DashboardWidgetSettings[];
+
+  selectedPipeline: DataLakeMeasure;
+  selectedWidget: DashboardWidgetSettings;
+
+  dashboard: Dashboard;
+
+  selectedType: any;
+  page: any = 'select-pipeline';
+  dialogTitle: string;
+
+  parentForm: FormGroup;
+
+  formValid = false;
+  viewInitialized = false;
+
+  @Input()
+  pipeline: DataLakeMeasure;
+
+  @Input()
+  widget: DashboardWidgetModel;
+
+  @Input()
+  editMode: boolean;
+
+  @Input()
+  startPage: string;
+
+
+  constructor(
+    private dialogRef: DialogRef<AddVisualizationDialogComponent>,
+    private dashboardService: DashboardService,
+    private dataLakeRestService: DatalakeRestService,
+    private dataExplorerService: DataViewDataExplorerService,
+    private pipelineService: PipelineService,
+    public elementIconText: ElementIconText,
+    private fb: FormBuilder,
+    private changeDetectorRef: ChangeDetectorRef) {
+  }
+
+  ngOnInit() {
+    this.parentForm = this.fb.group({});
+    this.parentForm.statusChanges.subscribe(status => {
+      this.formValid = this.viewInitialized && this.parentForm.valid;
+    });
+    if (!this.editMode) {
+      this.dialogTitle = 'Add widget';
+      this.loadVisualizablePipelines();
+    } else {
+      this.loadVisualizablePipelines();
+      this.dialogTitle = 'Edit widget';
+      this.selectedPipeline = this.pipeline;
+      this.selectedWidget = this.widget.dashboardWidgetSettings;
+      this.page = this.startPage;
+    }
+  }
+
+  ngAfterViewInit() {
+    this.viewInitialized = true;
+    this.formValid = this.viewInitialized && this.parentForm.valid;
+    this.changeDetectorRef.detectChanges();
+  }
+
+  loadVisualizablePipelines() {
+    zip(this.dataExplorerService.getAllPersistedDataStreams(), this.dataLakeRestService.getAllMeasurementSeries())
+      .subscribe(res => {
+        const visualizablePipelines = res[0];
+        visualizablePipelines.forEach(p => {
+          const measurement = res[1].find(m => {
+            return m.measureName === p.measureName;
+          });
+          p.eventSchema = measurement.eventSchema;
         });
-        this.next();
-    }
-
-    findRequirement(requiredSchema: EventSchema, internalName: string) {
-        return requiredSchema.eventProperties.find(ep => ep.runtimeName === internalName);
-    }
-
-    loadAvailableWidgets() {
-        this.availableWidgets = WidgetRegistry.getCompatibleWidgetTemplates(this.selectedPipeline);
-        this.availableWidgets.sort((a, b) => {
-            return a.widgetLabel < b.widgetLabel ? -1 : 1;
+        this.visualizablePipelines = this.sortPipeline(visualizablePipelines);
+      });
+  }
+
+  sortPipeline(visualizations: DataLakeMeasure[]): DataLakeMeasure[] {
+    return visualizations.sort((a, b) => {
+      if (a.pipelineName === b.pipelineName) {
+        return a.measureName.toLowerCase() < b.measureName.toLowerCase() ? -1 : 1;
+      } else {
+        return a.pipelineName.toLowerCase() < b.pipelineName.toLowerCase() ? -1 : 1;
+      }
+    });
+  }
+
+  onCancel(): void {
+    this.dialogRef.close();
+  }
+
+  getSelectedPipelineCss(vis) {
+    return this.getSelectedCss(this.selectedPipeline, vis);
+  }
+
+  getSelectedVisTypeCss(type) {
+    return this.getSelectedCss(this.selectedType, type);
+  }
+
+  getSelectedCss(selected, current) {
+    if (selected === current) {
+      return 'wizard-preview wizard-preview-selected';
+    } else {
+      return 'wizard-preview';
+    }
+  }
+
+  getTabCss(page) {
+    if (page === this.page) {
+      return 'md-fab md-accent';
+    } else {
+      return 'md-fab md-accent wizard-inactive';
+    }
+  }
+
+  selectPipeline(vis) {
+    this.selectedPipeline = vis;
+    this.next();
+  }
+
+  selectWidget(widget) {
+    this.selectedWidget = widget;
+    this.selectedWidget.config.forEach(sp => {
+      if (sp instanceof MappingPropertyUnary || sp instanceof MappingPropertyNary) {
+        const requirement: EventPropertyUnion = this.findRequirement(this.selectedWidget.requiredSchema, sp.internalName);
+        sp.mapsFromOptions = new MappingPropertyGenerator(requirement,
+          this.selectedPipeline.eventSchema.eventProperties).computeMatchingProperties();
+      }
+      if (sp instanceof FreeTextStaticProperty && sp.internalName === WidgetConfigBuilder.TITLE_KEY) {
+        sp.value = this.selectedPipeline.measureName;
+      }
+    });
+    this.next();
+  }
+
+  findRequirement(requiredSchema: EventSchema, internalName: string) {
+    return requiredSchema.eventProperties.find(ep => ep.runtimeName === internalName);
+  }
+
+  loadAvailableWidgets() {
+    this.availableWidgets = WidgetRegistry.getCompatibleWidgetTemplates(this.selectedPipeline);
+    this.availableWidgets.sort((a, b) => {
+      return a.widgetLabel < b.widgetLabel ? -1 : 1;
+    });
+  }
+
+  next() {
+    if (this.page === 'select-pipeline') {
+      this.loadAvailableWidgets();
+      this.page = 'select-widget';
+    } else if (this.page === 'select-widget') {
+      this.page = 'configure-widget';
+    } else {
+      const configuredWidget: DashboardWidgetModel = new DashboardWidgetModel();
+      configuredWidget['@class'] = 'org.apache.streampipes.model.dashboard.DashboardWidgetModel';
+      configuredWidget.dashboardWidgetSettings = this.selectedWidget;
+      configuredWidget.dashboardWidgetSettings['@class'] = 'org.apache.streampipes.model.dashboard.DashboardWidgetSettings';
+      configuredWidget.visualizationName = this.selectedPipeline.measureName;
+      configuredWidget.pipelineId = this.selectedPipeline.pipelineId;
+      configuredWidget.widgetType = configuredWidget.dashboardWidgetSettings.widgetName;
+      if (!this.editMode) {
+        this.dashboardService.saveWidget(configuredWidget).subscribe(response => {
+          this.dialogRef.close(response);
         });
-    }
-
-    next() {
-        if (this.page === 'select-pipeline') {
-            this.loadAvailableWidgets();
-            this.page = 'select-widget';
-        } else if (this.page === 'select-widget') {
-            this.page = 'configure-widget';
-        } else {
-            const configuredWidget: DashboardWidgetModel = new DashboardWidgetModel();
-            configuredWidget['@class'] = 'org.apache.streampipes.model.dashboard.DashboardWidgetModel';
-            configuredWidget.dashboardWidgetSettings = this.selectedWidget;
-            configuredWidget.dashboardWidgetSettings['@class'] = 'org.apache.streampipes.model.dashboard.DashboardWidgetSettings';
-            configuredWidget.visualizationName = this.selectedPipeline.visualizationName;
-            configuredWidget.pipelineId = this.selectedPipeline.pipelineId;
-            configuredWidget.widgetType = configuredWidget.dashboardWidgetSettings.widgetName;
-            if (!this.editMode) {
-                this.dashboardService.saveWidget(configuredWidget).subscribe(response => {
-                    this.dialogRef.close(response);
-                });
-            } else {
-                configuredWidget._id = this.widget._id;
-                configuredWidget._rev = this.widget._rev;
-                configuredWidget.widgetId = this.widget.widgetId;
-                this.dialogRef.close(configuredWidget);
-            }
-        }
-    }
-
-    back() {
-        if (this.page === 'select-widget') {
-            this.page = 'select-pipeline';
-        } else if (this.page === 'configure-widget') {
-            this.loadAvailableWidgets();
-            this.page = 'select-widget';
-        }
-    }
-
-    iconText(s) {
-        return this.elementIconText.getElementIconText(s);
-    }
+      } else {
+        configuredWidget._id = this.widget._id;
+        configuredWidget._rev = this.widget._rev;
+        configuredWidget.widgetId = this.widget.widgetId;
+        this.dialogRef.close(configuredWidget);
+      }
+    }
+  }
+
+  back() {
+    if (this.page === 'select-widget') {
+      this.page = 'select-pipeline';
+    } else if (this.page === 'configure-widget') {
+      this.loadAvailableWidgets();
+      this.page = 'select-widget';
+    }
+  }
+
+  iconText(s) {
+    return this.elementIconText.getElementIconText(s);
+  }
 }
diff --git a/ui/src/app/dashboard/registry/widget-config-builder.ts b/ui/src/app/dashboard/registry/widget-config-builder.ts
index f37a894..d32a4b8 100644
--- a/ui/src/app/dashboard/registry/widget-config-builder.ts
+++ b/ui/src/app/dashboard/registry/widget-config-builder.ts
@@ -30,6 +30,8 @@ export class WidgetConfigBuilder {
     static readonly BACKGROUND_COLOR_KEY: string = 'spi-background-color-key';
     static readonly PRIMARY_TEXT_COLOR_KEY: string = 'spi-primary-text-color-key';
     static readonly SECONDARY_TEXT_COLOR_KEY: string = 'spi-secondary-text-color-key';
+    static readonly REFRESH_INTERVAL_KEY: string = 'spi-refresh-interval-key';
+    static readonly QUERY_LIMIT_KEY: string = 'spi-query-limit-key';
 
     static readonly TITLE_KEY: string = 'spi-title-key';
 
@@ -52,6 +54,11 @@ export class WidgetConfigBuilder {
             ' secondary text' +
             ' color', '#39B54A');
         }
+        this.requiredIntegerParameter(
+          WidgetConfigBuilder.REFRESH_INTERVAL_KEY,
+          'Refresh interval [seconds]',
+          'The interval in seconds in which new data should be fetched',
+          5);
     }
 
     static create(widgetName: string, widgetLabel: string): WidgetConfigBuilder {
@@ -76,6 +83,14 @@ export class WidgetConfigBuilder {
         return this;
     }
 
+    withNumberOfPastEvents(): WidgetConfigBuilder {
+        const fst: FreeTextStaticProperty = this.prepareFreeTextStaticProperty(WidgetConfigBuilder.QUERY_LIMIT_KEY, 'Past data', 'The number of historic events that should be shown', Datatypes.Integer.toUri());
+        fst.value = '50';
+        this.widget.config.push(fst);
+
+        return this;
+    }
+
     requiredTextParameter(id: string, label: string, description: string): WidgetConfigBuilder {
         const fst: FreeTextStaticProperty = this.prepareFreeTextStaticProperty(id, label, description, Datatypes.String.toUri());
         this.widget.config.push(fst);
@@ -96,8 +111,11 @@ export class WidgetConfigBuilder {
     }
 
 
-    requiredIntegerParameter(id: string, label: string, description: string): WidgetConfigBuilder {
+    requiredIntegerParameter(id: string, label: string, description: string, defaultValue?: number): WidgetConfigBuilder {
         const fst: FreeTextStaticProperty = this.prepareFreeTextStaticProperty(id, label, description, Datatypes.Integer.toUri());
+        if (defaultValue) {
+            fst.value = defaultValue.toString();
+        }
         this.widget.config.push(fst);
         return this;
     }
diff --git a/ui/src/app/dashboard/registry/widget-registry.ts b/ui/src/app/dashboard/registry/widget-registry.ts
index 3687a6f..56e9012 100644
--- a/ui/src/app/dashboard/registry/widget-registry.ts
+++ b/ui/src/app/dashboard/registry/widget-registry.ts
@@ -29,7 +29,7 @@ import { TableConfig } from '../components/widgets/table/table-config';
 import { TrafficLightConfig } from '../components/widgets/trafficlight/traffic-light-config';
 import { SchemaMatch } from '../sdk/matching/schema-match';
 import {
-    DashboardWidgetSettings,
+    DashboardWidgetSettings, DataLakeMeasure,
     EventSchema,
     VisualizablePipeline
 } from '@streampipes/platform-services';
@@ -63,8 +63,8 @@ export class WidgetRegistry {
         return widgetTemplates;
     }
 
-    static getCompatibleWidgetTemplates(pipeline: VisualizablePipeline) {
-        const inputSchema: EventSchema = pipeline.schema;
+    static getCompatibleWidgetTemplates(dataLakeMeasure: DataLakeMeasure) {
+        const inputSchema: EventSchema = dataLakeMeasure.eventSchema;
         return this.getAvailableWidgetTemplates().filter(widget => WidgetRegistry.isCompatible(widget, inputSchema));
     }
 

[incubator-streampipes] 05/07: [STREAMPIPES-509] Refactor asset dashboard to receive data from data lake

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch experimental-module-federation-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 7781208d87147b51291076f1ca2a21410bfbfbc4
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Feb 6 19:19:21 2022 +0100

    [STREAMPIPES-509] Refactor asset dashboard to receive data from data lake
---
 .../app-asset-monitoring.module.ts                 | 51 ++++++++-----------
 .../components/view-asset/view-asset.component.ts  | 57 +++++++++++++++++-----
 .../add-pipeline-dialog.component.html             |  6 +--
 .../add-pipeline/add-pipeline-dialog.component.ts  | 32 ++++++++----
 .../model/selected-visualization-data.model.ts     |  3 +-
 .../app-asset-monitoring/services/shape.service.ts |  3 +-
 .../services/websocket.service.ts                  | 40 ---------------
 7 files changed, 90 insertions(+), 102 deletions(-)

diff --git a/ui/src/app/app-asset-monitoring/app-asset-monitoring.module.ts b/ui/src/app/app-asset-monitoring/app-asset-monitoring.module.ts
index f4de74d..c3fd643 100644
--- a/ui/src/app/app-asset-monitoring/app-asset-monitoring.module.ts
+++ b/ui/src/app/app-asset-monitoring/app-asset-monitoring.module.ts
@@ -16,32 +16,29 @@
  *
  */
 
-import {NgModule} from '@angular/core';
-import {FlexLayoutModule} from '@angular/flex-layout';
-import {CommonModule} from '@angular/common';
+import { NgModule } from '@angular/core';
+import { FlexLayoutModule } from '@angular/flex-layout';
+import { CommonModule } from '@angular/common';
 
-import {AppAssetMonitoringComponent} from './app-asset-monitoring.component';
+import { AppAssetMonitoringComponent } from './app-asset-monitoring.component';
 
-import {CustomMaterialModule} from '../CustomMaterial/custom-material.module';
+import { CustomMaterialModule } from '../CustomMaterial/custom-material.module';
 
-import {ViewAssetComponent} from "./components/view-asset/view-asset.component";
-import {CreateAssetComponent} from "./components/create-asset/create-asset.component";
-import {WebsocketService} from "./services/websocket.service";
-import {AddPipelineDialogComponent} from "./dialog/add-pipeline/add-pipeline-dialog.component";
-import {RestService} from './services/rest.service';
-import {MatFormFieldModule} from "@angular/material/form-field";
-import {MatGridListModule} from "@angular/material/grid-list";
-import {MatInputModule} from "@angular/material/input";
-import {ElementIconText} from "../services/get-element-icon-text.service";
-import {FormsModule} from "@angular/forms";
-import {ColorPickerModule} from "ngx-color-picker";
-import {ShapeService} from "./services/shape.service";
-import {SaveDashboardDialogComponent} from "./dialog/save-dashboard/save-dashboard-dialog.component";
-import {AssetDashboardOverviewComponent} from "./components/dashboard-overview/dashboard-overview.component";
-import {InjectableRxStompConfig, RxStompService, rxStompServiceFactory} from "@stomp/ng2-stompjs";
-import {streamPipesStompConfig} from "../dashboard/services/websocket.config";
+import { ViewAssetComponent } from './components/view-asset/view-asset.component';
+import { CreateAssetComponent } from './components/create-asset/create-asset.component';
+import { AddPipelineDialogComponent } from './dialog/add-pipeline/add-pipeline-dialog.component';
+import { RestService } from './services/rest.service';
+import { MatFormFieldModule } from '@angular/material/form-field';
+import { MatGridListModule } from '@angular/material/grid-list';
+import { MatInputModule } from '@angular/material/input';
+import { ElementIconText } from '../services/get-element-icon-text.service';
+import { FormsModule } from '@angular/forms';
+import { ColorPickerModule } from 'ngx-color-picker';
+import { ShapeService } from './services/shape.service';
+import { SaveDashboardDialogComponent } from './dialog/save-dashboard/save-dashboard-dialog.component';
+import { AssetDashboardOverviewComponent } from './components/dashboard-overview/dashboard-overview.component';
 import { AddLinkDialogComponent } from './dialog/add-link/add-link-dialog.component';
-import {DashboardModule} from "../dashboard/dashboard.module";
+import { DashboardModule } from '../dashboard/dashboard.module';
 
 @NgModule({
     imports: [
@@ -65,18 +62,8 @@ import {DashboardModule} from "../dashboard/dashboard.module";
         AssetDashboardOverviewComponent
     ],
     providers: [
-        WebsocketService,
         RestService,
         ShapeService,
-        {
-            provide: InjectableRxStompConfig,
-            useValue: streamPipesStompConfig
-        },
-        {
-            provide: RxStompService,
-            useFactory: rxStompServiceFactory,
-            deps: [InjectableRxStompConfig]
-        },
         ElementIconText
     ],
     entryComponents: [
diff --git a/ui/src/app/app-asset-monitoring/components/view-asset/view-asset.component.ts b/ui/src/app/app-asset-monitoring/components/view-asset/view-asset.component.ts
index 1fe5319..8a88fbf 100644
--- a/ui/src/app/app-asset-monitoring/components/view-asset/view-asset.component.ts
+++ b/ui/src/app/app-asset-monitoring/components/view-asset/view-asset.component.ts
@@ -16,14 +16,15 @@
  *
  */
 
-import { AfterViewInit, Component, EventEmitter, Input, OnInit, Output } from '@angular/core';
+import { AfterViewInit, Component, EventEmitter, Input, OnDestroy, OnInit, Output } from '@angular/core';
 
 import Konva from 'konva';
-import { WebsocketService } from '../../services/websocket.service';
 import { DashboardConfiguration } from '../../model/dashboard-configuration.model';
 import { RestService } from '../../services/rest.service';
 import { DashboardService } from '../../../dashboard/services/dashboard.service';
-import { DashboardItem } from '@streampipes/platform-services';
+import { DatalakeQueryParameterBuilder, DatalakeRestService, SpQueryResult } from '@streampipes/platform-services';
+import { Subscription, timer } from 'rxjs';
+import { switchMap } from 'rxjs/operators';
 
 interface Window {
     Image: any;
@@ -36,7 +37,7 @@ declare const window: Window;
     templateUrl: './view-asset.component.html',
     styleUrls: ['./view-asset.component.css']
 })
-export class ViewAssetComponent implements OnInit, AfterViewInit {
+export class ViewAssetComponent implements OnInit, AfterViewInit, OnDestroy {
 
     @Input() dashboardConfig: DashboardConfiguration;
     @Output() dashboardClosed = new EventEmitter<boolean>();
@@ -46,12 +47,11 @@ export class ViewAssetComponent implements OnInit, AfterViewInit {
     mainLayer: any;
     backgroundImageLayer: any;
 
-    dashboardItem: DashboardItem;
-    widgetLoaded = false;
+    subscriptions: Subscription[] = [];
 
-    constructor(private websocketService: WebsocketService,
-                private restService: RestService,
-                private dashboardService: DashboardService) {
+    constructor(private restService: RestService,
+                private dashboardService: DashboardService,
+                private dataLakeRestService: DatalakeRestService) {
 
     }
 
@@ -97,13 +97,40 @@ export class ViewAssetComponent implements OnInit, AfterViewInit {
         const dynamicShapes = this.mainCanvasStage.find('.dynamic-text');
         dynamicShapes.forEach(ds => {
             const monitoredField = ds.text();
-           this.websocketService.connect(ds.attrs.brokerUrl, ds.attrs.topic).subscribe(msg => {
-               ds.text(msg[monitoredField]);
-               this.mainCanvasStage.draw();
-           });
+            const measurement = ds.attrs.dataLakeMeasure;
+            const subscription = timer(0, 2000).pipe(
+              switchMap(() => this.dataLakeRestService.getData(measurement, this.buildQuery())))
+              .subscribe(queryResult => {
+                  this.handleResponse(ds, monitoredField, queryResult);
+              });
+            this.subscriptions.push(subscription);
         });
     }
 
+    handleResponse(ds: any,
+                   monitoredField: string,
+                   queryResult: SpQueryResult): void {
+        if (queryResult.total > 0) {
+            if (queryResult.allDataSeries.length === 1) {
+                const series = queryResult.allDataSeries[0];
+                if (series.rows.length > 0) {
+                    const row = series.rows[0];
+                    const event = {};
+                    series.headers.forEach((fieldName, index) => {
+                        event[fieldName] = row[index];
+                    });
+                    ds.text(event[monitoredField]);
+                    this.mainCanvasStage.draw();
+                }
+            }
+        }
+    }
+
+    buildQuery() {
+        const queryBuilder = DatalakeQueryParameterBuilder.create();
+        return queryBuilder.withLimit(1).withOrdering('DESC').build();
+    }
+
     showImage() {
         const image = new window.Image();
         image.src = this.restService.getImageUrl(this.dashboardConfig.imageInfo.imageName);
@@ -123,4 +150,8 @@ export class ViewAssetComponent implements OnInit, AfterViewInit {
         this.editDashboardEmitter.emit(this.dashboardConfig);
     }
 
+    ngOnDestroy(): void {
+        this.subscriptions.forEach(s => s.unsubscribe());
+    }
+
 }
diff --git a/ui/src/app/app-asset-monitoring/dialog/add-pipeline/add-pipeline-dialog.component.html b/ui/src/app/app-asset-monitoring/dialog/add-pipeline/add-pipeline-dialog.component.html
index e5748ce..4df8b47 100644
--- a/ui/src/app/app-asset-monitoring/dialog/add-pipeline/add-pipeline-dialog.component.html
+++ b/ui/src/app/app-asset-monitoring/dialog/add-pipeline/add-pipeline-dialog.component.html
@@ -27,10 +27,10 @@
                             <mat-list-item *ngFor="let pipeline of visualizablePipelines"
                                            (click)="selectPipeline(pipeline)" class="list-item">
                                 <div mat-list-avatar
-                                     class="pipeline-avatar sp-accent-bg">{{iconText(pipeline.visualizationName)}}
+                                     class="pipeline-avatar sp-accent-bg">{{iconText(pipeline.pipelineName)}}
                                 </div>
                                 <h4 mat-line>{{pipeline.pipelineName}}</h4>
-                                <p mat-line>{{pipeline.visualizationName}} </p>
+                                <p mat-line>{{pipeline.measureName}} </p>
                             </mat-list-item>
                         </mat-list>
                     </div>
@@ -43,7 +43,7 @@
                         <mat-radio-group fxLayout="column" class="example-radio-group"
                                          [(ngModel)]="selectedMeasurement">
                             <mat-radio-button class="example-radio-button"
-                                              *ngFor="let prop of selectedVisualization.schema.eventProperties"
+                                              *ngFor="let prop of selectedVisualization.eventSchema.eventProperties"
                                               [value]="prop.runtimeName">
                                 {{prop.runtimeName}}
                             </mat-radio-button>
diff --git a/ui/src/app/app-asset-monitoring/dialog/add-pipeline/add-pipeline-dialog.component.ts b/ui/src/app/app-asset-monitoring/dialog/add-pipeline/add-pipeline-dialog.component.ts
index 294f79d..5803c06 100644
--- a/ui/src/app/app-asset-monitoring/dialog/add-pipeline/add-pipeline-dialog.component.ts
+++ b/ui/src/app/app-asset-monitoring/dialog/add-pipeline/add-pipeline-dialog.component.ts
@@ -21,9 +21,9 @@ import { RestApi } from '../../../services/rest-api.service';
 import { RestService } from '../../services/rest.service';
 import { ElementIconText } from '../../../services/get-element-icon-text.service';
 import { SelectedVisualizationData } from '../../model/selected-visualization-data.model';
-import { DashboardService } from '../../../dashboard/services/dashboard.service';
 import { DialogRef } from '../../../core-ui/dialog/base-dialog/dialog-ref';
-import { VisualizablePipeline } from '@streampipes/platform-services';
+import { DataLakeMeasure, DatalakeRestService, DataViewDataExplorerService } from '@streampipes/platform-services';
+import { zip } from 'rxjs';
 
 @Component({
     selector: 'sp-add-pipeline-dialog-component',
@@ -46,9 +46,9 @@ export class AddPipelineDialogComponent implements OnInit {
         description: 'Choose label'
     }];
 
-    visualizablePipelines: VisualizablePipeline[] = [];
+    visualizablePipelines: DataLakeMeasure[] = [];
 
-    selectedVisualization: VisualizablePipeline;
+    selectedVisualization: DataLakeMeasure;
     selectedType: any;
     selectedMeasurement: any;
     page: any = 'select-pipeline';
@@ -64,14 +64,27 @@ export class AddPipelineDialogComponent implements OnInit {
         private dialogRef: DialogRef<AddPipelineDialogComponent>,
         private restApi: RestApi,
         private restService: RestService,
-        private dashboardService: DashboardService,
+        private dataLakeRestService: DatalakeRestService,
+        private dataExplorerService: DataViewDataExplorerService,
         public elementIconText: ElementIconText) {
     }
 
     ngOnInit() {
-        this.dashboardService.getVisualizablePipelines().subscribe(visualizations => {
-            this.visualizablePipelines = visualizations;
-        });
+        this.loadVisualizablePipelines();
+    }
+
+    loadVisualizablePipelines() {
+        zip(this.dataExplorerService.getAllPersistedDataStreams(), this.dataLakeRestService.getAllMeasurementSeries())
+          .subscribe(res => {
+              const visualizablePipelines = res[0];
+              visualizablePipelines.forEach(p => {
+                  const measurement = res[1].find(m => {
+                      return m.measureName === p.measureName;
+                  });
+                  p.eventSchema = measurement.eventSchema;
+              });
+              this.visualizablePipelines = visualizablePipelines;
+          });
     }
 
     onCancel(): void {
@@ -122,8 +135,7 @@ export class AddPipelineDialogComponent implements OnInit {
             selectedConfig.measurement = this.selectedMeasurement;
             selectedConfig.visualizationId = this.selectedVisualization.pipelineId;
             selectedConfig.label = this.selectedLabel;
-            selectedConfig.brokerUrl = (this.selectedVisualization as any).broker;
-            selectedConfig.topic = this.selectedVisualization.topic;
+            selectedConfig.dataLakeMeasure = this.selectedVisualization.measureName;
 
             this.dialogRef.close(selectedConfig);
         }
diff --git a/ui/src/app/app-asset-monitoring/model/selected-visualization-data.model.ts b/ui/src/app/app-asset-monitoring/model/selected-visualization-data.model.ts
index b79cb68..611bc0d 100644
--- a/ui/src/app/app-asset-monitoring/model/selected-visualization-data.model.ts
+++ b/ui/src/app/app-asset-monitoring/model/selected-visualization-data.model.ts
@@ -24,8 +24,7 @@ export interface SelectedVisualizationData {
     visualizationId: string;
     measurement: string;
     label: string;
-    brokerUrl: string;
-    topic: string;
+    dataLakeMeasure: string;
 }
 
 export interface HyperlinkConfig {
diff --git a/ui/src/app/app-asset-monitoring/services/shape.service.ts b/ui/src/app/app-asset-monitoring/services/shape.service.ts
index 3e61250..01c2c16 100644
--- a/ui/src/app/app-asset-monitoring/services/shape.service.ts
+++ b/ui/src/app/app-asset-monitoring/services/shape.service.ts
@@ -125,8 +125,7 @@ export class ShapeService {
 
         if (dynamicContent) {
             textSettings.name = 'dynamic-text';
-            textSettings.brokerUrl = config.brokerUrl;
-            textSettings.topic = config.topic;
+            textSettings.dataLakeMeasure = config.dataLakeMeasure;
             textSettings.fontSize = '30';
         }
 
diff --git a/ui/src/app/app-asset-monitoring/services/websocket.service.ts b/ui/src/app/app-asset-monitoring/services/websocket.service.ts
deleted file mode 100644
index 4a3de23..0000000
--- a/ui/src/app/app-asset-monitoring/services/websocket.service.ts
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import {Injectable} from "@angular/core";
-import {Observable, Subscription} from "rxjs";
-import {Message} from "@stomp/stompjs";
-import {RxStompService} from "@stomp/ng2-stompjs";
-
-@Injectable()
-export class WebsocketService {
-
-    subscription: Subscription;
-
-    constructor(private rxStompService: RxStompService) {
-    }
-
-    connect(url, topic): Observable<any> {
-        return new Observable<any>(observable => {
-            this.subscription = this.rxStompService.watch("/topic/" +topic).subscribe((message: Message) => {
-                observable.next(JSON.parse(message.body));
-            });
-        });
-    }
-
-}
\ No newline at end of file

[incubator-streampipes] 04/07: [STREAMPIPES-509] Remove dashboard sink

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch experimental-module-federation-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 74f9c724c620b1aa409e5cc656eb1bef644d431b
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Feb 6 17:13:33 2022 +0100

    [STREAMPIPES-509] Remove dashboard sink
---
 .../sinks/internal/jvm/SinksInternalJvmInit.java   |  2 -
 .../sinks/internal/jvm/dashboard/Dashboard.java    | 62 ----------------------
 .../jvm/dashboard/DashboardController.java         | 55 -------------------
 .../jvm/dashboard/DashboardParameters.java         | 36 -------------
 4 files changed, 155 deletions(-)

diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
index 8110058..53e3db7 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
@@ -29,7 +29,6 @@ import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
-import org.apache.streampipes.sinks.internal.jvm.dashboard.DashboardController;
 import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeController;
 import org.apache.streampipes.sinks.internal.jvm.notification.NotificationController;
 
@@ -46,7 +45,6 @@ public class SinksInternalJvmInit extends StandaloneModelSubmitter {
             "",
             8090)
             .registerPipelineElements(
-                    new DashboardController(),
                     new DataLakeController(),
                     new NotificationController())
             .registerMessagingFormats(
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
deleted file mode 100644
index f4d5840..0000000
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.internal.jvm.dashboard;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
-import org.apache.streampipes.wrapper.runtime.EventSink;
-
-public class Dashboard implements EventSink<DashboardParameters> {
-
-    private JsonDataFormatDefinition jsonDataFormatDefinition;
-
-    public Dashboard() {
-        this.jsonDataFormatDefinition = new JsonDataFormatDefinition();
-    }
-
-    @Override
-    public void onInvocation(DashboardParameters parameters,
-                             EventSinkRuntimeContext context) throws SpRuntimeException {
-    }
-
-    private String makeTopic(SpDataStream inputStream, String visualizationName) {
-        return extractTopic(inputStream)
-                + "-"
-                + visualizationName.replaceAll(" ", "").toLowerCase();
-    }
-
-    private String extractTopic(SpDataStream inputStream) {
-        return inputStream.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
-    }
-
-    @Override
-    public void onEvent(Event event) {
-        try {
-        } catch (SpRuntimeException e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    public void onDetach() throws SpRuntimeException {
-    }
-}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
deleted file mode 100644
index d30ce78..0000000
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardController.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.internal.jvm.dashboard;
-
-import org.apache.streampipes.model.DataSinkType;
-import org.apache.streampipes.model.graph.DataSinkDescription;
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.sdk.builder.DataSinkBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventSink;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventSinkDeclarer;
-
-public class DashboardController extends StandaloneEventSinkDeclarer<DashboardParameters> {
-
-  private static final String VISUALIZATION_NAME_KEY = "visualization-name";
-
-  @Override
-  public DataSinkDescription declareModel() {
-    return DataSinkBuilder.create("org.apache.streampipes.sinks.internal.jvm.dashboard")
-            .withLocales(Locales.EN)
-            .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-            .category(DataSinkType.VISUALIZATION_CHART)
-            .requiredStream(StreamRequirementsBuilder.any())
-            .requiredTextParameter(Labels.withId(VISUALIZATION_NAME_KEY))
-            .build();
-  }
-
-  @Override
-  public ConfiguredEventSink<DashboardParameters> onInvocation(DataSinkInvocation invocationGraph,
-                                                               DataSinkParameterExtractor extractor) {
-    String visualizationName = extractor.singleValueParameter(VISUALIZATION_NAME_KEY, String.class);
-    return new ConfiguredEventSink<>(new DashboardParameters(invocationGraph, visualizationName), Dashboard::new);
-  }
-
-}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
deleted file mode 100644
index feb61b7..0000000
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/DashboardParameters.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.internal.jvm.dashboard;
-
-import org.apache.streampipes.model.graph.DataSinkInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventSinkBindingParams;
-
-public class DashboardParameters extends EventSinkBindingParams {
-    private String visualizationName;
-
-    public DashboardParameters(DataSinkInvocation invocationGraph,
-                               String visualizationName) {
-        super(invocationGraph);
-        this.visualizationName = visualizationName;
-    }
-
-    public String getVisualizationName() {
-        return visualizationName;
-    }
-}

[incubator-streampipes] 07/07: [STREAMPIPES-509] Remove ActiveMQ dependency from installer files

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch experimental-module-federation-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 6080ce8cf50b1bfff0582cbd0f47e6a52158175b
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Feb 6 19:38:16 2022 +0100

    [STREAMPIPES-509] Remove ActiveMQ dependency from installer files
---
 installer/cli/environments/adapter                 |  1 -
 installer/cli/environments/backend                 |  1 -
 installer/cli/environments/basic                   |  1 -
 installer/cli/environments/full                    |  1 -
 installer/cli/environments/lite                    |  1 -
 installer/cli/environments/minimal                 |  4 +--
 installer/cli/environments/pipeline-element        |  4 +--
 installer/cli/environments/ui                      |  4 +--
 installer/compose/docker-compose.full.yml          |  9 -----
 installer/compose/docker-compose.yml               |  9 -----
 .../external/activemq/activemq-deployment.yaml     | 39 ----------------------
 .../external/activemq/activemq-service.yaml        | 39 ----------------------
 installer/k8s/values.yaml                          |  1 -
 13 files changed, 3 insertions(+), 111 deletions(-)

diff --git a/installer/cli/environments/adapter b/installer/cli/environments/adapter
index 5d4eb44..8511657 100644
--- a/installer/cli/environments/adapter
+++ b/installer/cli/environments/adapter
@@ -14,7 +14,6 @@
 # limitations under the License.
 
 [environment:adapter]
-activemq
 backend
 consul
 couchdb
diff --git a/installer/cli/environments/backend b/installer/cli/environments/backend
index 5094860..536f185 100644
--- a/installer/cli/environments/backend
+++ b/installer/cli/environments/backend
@@ -14,7 +14,6 @@
 # limitations under the License.
 
 [environment:backend]
-activemq
 kafka
 consul
 zookeeper
diff --git a/installer/cli/environments/basic b/installer/cli/environments/basic
index 6957fd3..ce25e8e 100644
--- a/installer/cli/environments/basic
+++ b/installer/cli/environments/basic
@@ -14,7 +14,6 @@
 # limitations under the License.
 
 [environment:basic]
-activemq
 kafka
 consul
 zookeeper
diff --git a/installer/cli/environments/full b/installer/cli/environments/full
index 2f50bb8..f595613 100644
--- a/installer/cli/environments/full
+++ b/installer/cli/environments/full
@@ -18,7 +18,6 @@ ui
 backend
 connect-adapters
 connect-adapters-iiot
-activemq
 consul
 couchdb
 jobmanager
diff --git a/installer/cli/environments/lite b/installer/cli/environments/lite
index d8737b0..3e6053a 100644
--- a/installer/cli/environments/lite
+++ b/installer/cli/environments/lite
@@ -17,7 +17,6 @@
 ui
 backend
 connect-adapters-iiot
-activemq
 consul
 couchdb
 kafka
diff --git a/installer/cli/environments/minimal b/installer/cli/environments/minimal
index d31a4fe..3f51549 100644
--- a/installer/cli/environments/minimal
+++ b/installer/cli/environments/minimal
@@ -16,10 +16,8 @@
 [environment:minimal]
 ui
 backend
-connect-adapters-iiot
-activemq
 consul
 couchdb
 influxdb
 mosquitto
-pipeline-elements-all-jvm
+extensions-all-jvm
diff --git a/installer/cli/environments/pipeline-element b/installer/cli/environments/pipeline-element
index 625c075..2f6f35c 100644
--- a/installer/cli/environments/pipeline-element
+++ b/installer/cli/environments/pipeline-element
@@ -14,14 +14,12 @@
 # limitations under the License.
 
 [environment:pipeline-element]
-activemq
 backend
 consul
-connect-adapters-iiot
+extensions-all-jvm
 couchdb
 kafka
 ui
 sources-watertank-simulator
 zookeeper
-pipeline-elements-all-jvm
 influxdb
diff --git a/installer/cli/environments/ui b/installer/cli/environments/ui
index 9f73304..6a246ef 100644
--- a/installer/cli/environments/ui
+++ b/installer/cli/environments/ui
@@ -14,13 +14,11 @@
 # limitations under the License.
 
 [environment:ui]
-activemq
 backend
 consul
-connect-adapters-iiot
+extensions-all-jvm
 couchdb
 kafka
 sources-watertank-simulator
 zookeeper
-pipeline-elements-all-jvm
 influxdb
diff --git a/installer/compose/docker-compose.full.yml b/installer/compose/docker-compose.full.yml
index 38ae812..92a4ca5 100644
--- a/installer/compose/docker-compose.full.yml
+++ b/installer/compose/docker-compose.full.yml
@@ -72,7 +72,6 @@ services:
       - "80:80"
     depends_on:
       - couchdb
-      - activemq
       - consul
       - backend
     logging: *default-logging
@@ -80,14 +79,6 @@ services:
     networks:
       spnet:
 
-  # Mandatory external services
-  activemq:
-    image: fogsyio/activemq:5.15.9
-    logging: *default-logging
-    restart: unless-stopped
-    networks:
-      spnet:
-
   consul:
     image: fogsyio/consul:1.9.6
     environment:
diff --git a/installer/compose/docker-compose.yml b/installer/compose/docker-compose.yml
index 99fa7d5..358493c 100644
--- a/installer/compose/docker-compose.yml
+++ b/installer/compose/docker-compose.yml
@@ -44,7 +44,6 @@ services:
       - "80:80"
     depends_on:
       - couchdb
-      - activemq
       - consul
       - backend
     logging: *default-logging
@@ -52,14 +51,6 @@ services:
     networks:
       spnet:
 
-  # Mandatory external services
-  activemq:
-    image: fogsyio/activemq:5.15.9
-    logging: *default-logging
-    restart: unless-stopped
-    networks:
-      spnet:
-
   consul:
     image: fogsyio/consul:1.9.6
     environment:
diff --git a/installer/k8s/templates/external/activemq/activemq-deployment.yaml b/installer/k8s/templates/external/activemq/activemq-deployment.yaml
deleted file mode 100644
index 979cef3..0000000
--- a/installer/k8s/templates/external/activemq/activemq-deployment.yaml
+++ /dev/null
@@ -1,39 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
----
-apiVersion: apps/v1
-kind: Deployment
-metadata:
-  name: activemq
-spec:
-  selector:
-    matchLabels:
-      app: activemq
-  replicas: 1
-  template:
-    metadata:
-      labels:
-        app: activemq
-    spec:
-      containers:
-        - name: activemq
-          image: fogsyio/activemq:{{ .Values.external.activemqVersion }}
-          imagePullPolicy: {{ .Values.pullPolicy }}
-          ports:
-            - containerPort: 61616
-            - containerPort: 61614
-            - containerPort: 8161
-            - containerPort: 1883
diff --git a/installer/k8s/templates/external/activemq/activemq-service.yaml b/installer/k8s/templates/external/activemq/activemq-service.yaml
deleted file mode 100644
index 8b8c874..0000000
--- a/installer/k8s/templates/external/activemq/activemq-service.yaml
+++ /dev/null
@@ -1,39 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-apiVersion: v1
-kind: Service
-metadata:
-  name: activemq
-spec:
-  selector:
-    app: activemq
-  ports:
-    - name: main
-      protocol: TCP
-      port: 61616
-      targetPort: 61616
-    - name: websocket
-      protocol: TCP
-      port: 61614
-      targetPort: 61614
-    - name: ui
-      protocol: TCP
-      port: 8161
-      targetPort: 8161
-    - name: mqtt
-      protocol: TCP
-      port: 1883
-      targetPort: 1883
\ No newline at end of file
diff --git a/installer/k8s/values.yaml b/installer/k8s/values.yaml
index 382f41e..55577f9 100644
--- a/installer/k8s/values.yaml
+++ b/installer/k8s/values.yaml
@@ -24,7 +24,6 @@ streampipes:
   registry: "apachestreampipes"
 
 external:
-  activemqVersion: 5.15.9
   consulVersion: 1.9.6
   couchdbVersion: 2.3.1
   flinkVersion: 1.13.5-scala_2.11

[incubator-streampipes] 06/07: [STREAMPIPES-509] Remove websocket dependencies

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch experimental-module-federation-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 6f5e65fc11d240c1712bd8edce5aeff74507b537
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Feb 6 19:33:10 2022 +0100

    [STREAMPIPES-509] Remove websocket dependencies
---
 ui/deployment/appng5.module.mst                    |  7 -----
 ui/package.json                                    |  2 +-
 ui/src/app/dashboard/services/websocket.config.ts  | 36 ----------------------
 .../app/dashboard/services/websocket.settings.ts   | 32 -------------------
 4 files changed, 1 insertion(+), 76 deletions(-)

diff --git a/ui/deployment/appng5.module.mst b/ui/deployment/appng5.module.mst
index 1fb4219..f0d2471 100644
--- a/ui/deployment/appng5.module.mst
+++ b/ui/deployment/appng5.module.mst
@@ -38,8 +38,6 @@ import { AuthService } from './services/auth.service';
 import { LoadingBarHttpClientModule } from '@ngx-loading-bar/http-client';
 import { LOADING_BAR_CONFIG } from '@ngx-loading-bar/core';
 
-import {InjectableRxStompConfig, RxStompService, rxStompServiceFactory} from '@stomp/ng2-stompjs';
-
 {{#modulesActive}}
 {{#ng5}}
 import { {{{ng5_moduleName}}} } from '{{{path}}}';
@@ -93,11 +91,6 @@ import * as $ from 'jquery';
         NotificationCountService,
         { provide: HTTP_INTERCEPTORS, useClass: HttpInterceptorProvider, multi: true },
         { provide: LOADING_BAR_CONFIG, useValue: { latencyThreshold: 100 }},
-        {
-        			provide: RxStompService,
-        			useFactory: rxStompServiceFactory,
-        			deps: [InjectableRxStompConfig]
-        }
     ],
     bootstrap: [AppComponent]
 })
diff --git a/ui/package.json b/ui/package.json
index 93ed9b2..3e6f495 100644
--- a/ui/package.json
+++ b/ui/package.json
@@ -49,7 +49,7 @@
     "@ngx-loading-bar/core": "5.1.2",
     "@ngx-loading-bar/http-client": "5.1.2",
     "@panzoom/panzoom": "^4.4.3",
-    "@stomp/ng2-stompjs": "7.2.0",
+    "@streampipes/platform-services": "file:dist/streampipes/platform-services",
     "@swimlane/ngx-charts": "16.0.0",
     "angular-datatables": "^12.0.2",
     "angular-gridster2": "12.1.1",
diff --git a/ui/src/app/dashboard/services/websocket.config.ts b/ui/src/app/dashboard/services/websocket.config.ts
deleted file mode 100644
index de4ce61..0000000
--- a/ui/src/app/dashboard/services/websocket.config.ts
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-import { InjectableRxStompConfig } from '@stomp/ng2-stompjs';
-import { WebsocketSettings } from './websocket.settings';
-
-export const streamPipesStompConfig: InjectableRxStompConfig = {
-
-    brokerURL: new WebsocketSettings().getBrokerUrl(),
-
-    connectHeaders: {
-        login: 'admin',
-        passcode: 'admin'
-    },
-
-    heartbeatIncoming: 0,
-    heartbeatOutgoing: 20000,
-
-    reconnectDelay: 200,
-
-};
diff --git a/ui/src/app/dashboard/services/websocket.settings.ts b/ui/src/app/dashboard/services/websocket.settings.ts
deleted file mode 100644
index 4f16e1b..0000000
--- a/ui/src/app/dashboard/services/websocket.settings.ts
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-export class WebsocketSettings {
-
-    getBrokerUrl(): string {
-        return this.getWebsocketScheme() + '//' + location.host + '/streampipes/ws';
-    }
-
-    getWebsocketScheme(): string {
-        if (location.protocol === 'https:') {
-            return 'wss:';
-        } else {
-            return 'ws:';
-        }
-    }
-}

[incubator-streampipes] 03/07: [STREAMPIPES-509] Store image blobs in database

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch experimental-module-federation-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 6301f2a0aaa5eaa5e6303a73c36daa824514d8aa
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Feb 6 17:12:46 2022 +0100

    [STREAMPIPES-509] Store image blobs in database
---
 .../backend/StreamPipesResourceConfig.java         |   2 +
 .../dataexplorer/DataLakeManagementV3.java         | 272 ---------------------
 .../streampipes-sinks-internal-jvm/development/env |   5 +-
 .../sinks/internal/jvm/SinksInternalJvmInit.java   |   6 +-
 .../sinks/internal/jvm/config/ConfigKeys.java      |   7 +-
 .../sinks/internal/jvm/dashboard/Dashboard.java    |   9 -
 .../sinks/internal/jvm/datalake/DataLake.java      |  34 +--
 .../sinks/internal/jvm/datalake/ImageStore.java    |  54 ++++
 .../kafka/config/ConsumerConfigFactory.java        |   5 +-
 .../streampipes/ps/DataLakeImageResource.java      |  29 ++-
 .../streampipes/storage/api/IImageStorage.java     |  18 +-
 .../streampipes/storage/api/INoSqlStorage.java     |   2 +
 .../storage/couchdb/CouchDbStorageManager.java     |   5 +
 .../storage/couchdb/impl/ImageStorageImpl.java     |  30 ++-
 .../streampipes/storage/couchdb/utils/Utils.java   |   4 +
 .../src/lib/apis/datalake-rest.service.ts          |   2 +-
 .../components/widgets/image/image-config.ts       |   4 +-
 .../widgets/image/image-widget.component.html      |   2 +-
 .../widgets/image/image-widget.component.ts        |  25 +-
 ui/src/app/dashboard/dashboard.module.ts           | 150 ++++++------
 .../src/app/services/secure.pipe.ts                |  33 ++-
 ui/src/app/services/services.module.ts             |   7 +-
 22 files changed, 256 insertions(+), 449 deletions(-)

diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
index a57ad7d..3b1741f 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.backend;
 
 import io.swagger.v3.jaxrs2.integration.resources.OpenApiResource;
+import org.apache.streampipes.ps.DataLakeImageResource;
 import org.apache.streampipes.ps.DataLakeResourceV3;
 import org.apache.streampipes.ps.DataLakeResourceV4;
 import org.apache.streampipes.ps.PipelineElementTemplateResource;
@@ -62,6 +63,7 @@ public class StreamPipesResourceConfig extends ResourceConfig {
         register(ContainerProvidedOptions.class);
         register(DashboardWidget.class);
         register(Dashboard.class);
+        register(DataLakeImageResource.class);
         register(DataLakeResourceV3.class);
         register(DataLakeMeasureResourceV3.class);
         register(DataStream.class);
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV3.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV3.java
deleted file mode 100644
index 9fc117c..0000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV3.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer;
-
-public class DataLakeManagementV3 {
-
-//  public DataSeries getEvents(String index, long startDate, long endDate, String aggregationUnit, int aggregationValue) {
-//    return new GetAggregatedEventsQuery(AggregatedTimeBoundQueryParams.from(index, startDate, endDate, aggregationUnit, aggregationValue))
-//            .executeQuery();
-//  }
-//
-//  public SPQueryResult getEvents(String index, long startDate, long endDate, String aggregationUnit, int aggregationValue,
-//                                 String groupingTag) {
-//    return new GetGroupedAggregatedEventsQuery(GroupedAggregatedTimeBoundQueryParams.from(index,
-//            startDate, endDate, aggregationUnit, aggregationValue, groupingTag)).executeQuery();
-//  }
-
-//  public DataSeries getEvents(String index, long startDate, long endDate) {
-//    return new GetEventsQuery(TimeBoundQueryParams.from(index, startDate, endDate)).executeQuery();
-//  }
-
-//    public SPQueryResult getEvents(String index, long startDate, long endDate, String groupingTag) {
-//      return new GetGroupedEventsQuery(GroupedQueryParams.from(index, startDate, endDate, groupingTag)).executeQuery();
-//    }
-//
-//  public DataSeries getEventsAutoAggregation(String index, long startDate, long endDate) {
-//    return new TimeBoundAutoAggregationQuery(TimeBoundQueryParams.from(index, startDate, endDate)).executeQuery();
-//  }
-//
-//  public SPQueryResult getEventsAutoAggregation(String index, long startDate, long endDate, String groupingTag) {
-//    return new GroupedAutoAggregationQuery(GroupedQueryParams.from(index, startDate, endDate, groupingTag)).executeQuery();
-//  }
-//
-//  public DataSeries getEventsFromNow(String index, String timeunit, int value,
-//                                     String aggregationUnit, int aggregationValue) {
-//    return new GetAggregatedEventsFromNowQuery(AggregatedTimeUnitQueryParams
-//            .from(index, timeunit, value, aggregationUnit, aggregationValue)).executeQuery();
-//  }
-//
-//  public DataSeries getEventsFromNowAutoAggregation(String index, String timeunit, int value) {
-//    return new FromNowAutoAggregationQuery(TimeUnitQueryParams.from(index, timeunit, value)).executeQuery();
-//  }
-//
-//  public PageResult getEvents(String index, int itemsPerPage, int page) {
-//    return new GetPagingEventsQuery(PagingQueryParams.from(index, itemsPerPage, page)).executeQuery();
-//  }
-//
-//  public PageResult getEvents(String index, int itemsPerPage) throws IOException {
-//    int page = getMaxPage(index, itemsPerPage);
-//
-//    if (page > 0) {
-//      page = page - 1;
-//    }
-//
-//    return getEvents(index, itemsPerPage, page);
-//  }
-//
-//  public void getAllEvents(String index, String outputFormat, OutputStream outputStream) throws IOException {
-//    getAllEvents(index, outputFormat, null, null, outputStream);
-//  }
-//
-//  public void getAllEvents(String index, String outputFormat, @Nullable Long startDate,
-//                           @Nullable Long endDate, OutputStream outputStream) throws IOException {
-//    int itemsPerRequest = 500000;
-//
-//    PageResult dataResult;
-//    //JSON
-//    if (outputFormat.equals("json")) {
-//
-//      Gson gson = new Gson();
-//      int i = 0;
-//      boolean isFirstDataObject = true;
-//
-//      outputStream.write(toBytes("["));
-//      do {
-//        dataResult = new GetPagingEventsQuery(PagingQueryParams.from(index, itemsPerRequest, i, startDate, endDate), TimeUnit.MILLISECONDS).executeQuery();
-//
-//        if (dataResult.getTotal() > 0) {
-//          for (List<Object> row : dataResult.getRows()) {
-//            if (!isFirstDataObject) {
-//              outputStream.write(toBytes(","));
-//            }
-//
-//            //produce one json object
-//            boolean isFirstElementInRow = true;
-//            outputStream.write(toBytes("{"));
-//            for (int i1 = 0; i1 < row.size(); i1++) {
-//              Object element = row.get(i1);
-//              if (!isFirstElementInRow) {
-//                outputStream.write(toBytes(","));
-//              }
-//              isFirstElementInRow = false;
-//              if (i1 == 0) {
-//                element = ((Double) element).longValue();
-//              }
-//              //produce json e.g. "name": "Pipes" or "load": 42
-//              outputStream.write(toBytes("\"" + dataResult.getHeaders().get(i1) + "\": "
-//                      + gson.toJson(element)));
-//            }
-//            outputStream.write(toBytes("}"));
-//            isFirstDataObject = false;
-//          }
-//
-//          i++;
-//        }
-//      } while (dataResult.getTotal() > 0);
-//      outputStream.write(toBytes("]"));
-//
-//      //CSV
-//    } else if (outputFormat.equals("csv")) {
-//      int i = 0;
-//
-//      boolean isFirstDataObject = true;
-//
-//      do {
-//        dataResult = new GetPagingEventsQuery(PagingQueryParams.from(index, itemsPerRequest, i, startDate, endDate), TimeUnit.MILLISECONDS).executeQuery();
-//        //Send first header
-//        if (dataResult.getTotal() > 0) {
-//          if (isFirstDataObject) {
-//            boolean isFirst = true;
-//            for (int i1 = 0; i1 < dataResult.getHeaders().size(); i1++) {
-//              if (!isFirst) {
-//                outputStream.write(toBytes(";"));
-//              }
-//              isFirst = false;
-//              outputStream.write(toBytes(dataResult.getHeaders().get(i1)));
-//            }
-//          }
-//          outputStream.write(toBytes("\n"));
-//          isFirstDataObject = false;
-//        }
-//
-//        if (dataResult.getTotal() > 0) {
-//          for (List<Object> row : dataResult.getRows()) {
-//            boolean isFirstInRow = true;
-//            for (int i1 = 0; i1 < row.size(); i1++) {
-//              Object element = row.get(i1);
-//              if (!isFirstInRow) {
-//                outputStream.write(toBytes(";"));
-//              }
-//              isFirstInRow = false;
-//              if (i1 == 0) {
-//                element = ((Double) element).longValue();
-//              }
-//              if (element == null) {
-//                outputStream.write(toBytes(""));
-//              } else {
-//                outputStream.write(toBytes(element.toString()));
-//              }
-//            }
-//            outputStream.write(toBytes("\n"));
-//          }
-//        }
-//        i++;
-//      } while (dataResult.getTotal() > 0);
-//    }
-//  }
-//
-//  public boolean removeAllDataFromDataLake() {
-//    List<DataLakeMeasure> allMeasurements = DataExplorerUtils.getInfos();
-//
-//    // Remove data from influxdb
-//    for (DataLakeMeasure measure : allMeasurements) {
-//      org.influxdb.dto.QueryResult influxResult = new DeleteDataQuery(measure).executeQuery();
-//      if (influxResult.hasError() || influxResult.getResults().get(0).getError() != null) {
-//        return false;
-//      }
-//    }
-//    return true;
-//  }
-//
-//  private byte[] toBytes(String value) {
-//    return value.getBytes();
-//  }
-//
-//  private int getMaxPage(String index, int itemsPerPage) {
-//    return new GetMaxPagesQuery(PagingQueryParams.from(index, itemsPerPage)).executeQuery();
-//  }
-//
-//  public byte[] getImage(String fileRoute) throws IOException {
-//    fileRoute = getImageFileRoute(fileRoute);
-//    File file = new File(fileRoute);
-//    return FileUtils.readFileToByteArray(file);
-//  }
-//
-//  public String getImageCoco(String fileRoute) throws IOException {
-//    fileRoute = getImageFileRoute(fileRoute);
-//    String cocoRoute = getCocoFileRoute(fileRoute);
-//
-//    File file = new File(cocoRoute);
-//    if (!file.exists()) {
-//      return "";
-//    } else {
-//      return FileUtils.readFileToString(file, "UTF-8");
-//    }
-//  }
-//
-//  public void saveImageCoco(String fileRoute, String data) throws IOException {
-//    fileRoute = getImageFileRoute(fileRoute);
-//    String cocoRoute = getCocoFileRoute(fileRoute);
-//
-//    File file = new File(cocoRoute);
-//    file.getParentFile().mkdirs();
-//    FileUtils.writeStringToFile(file, data, "UTF-8");
-//
-//  }
-//
-//  private String getImageFileRoute(String fileRoute) {
-//    fileRoute = fileRoute.replace("_", "/");
-//    fileRoute = fileRoute.replace("/png", ".png");
-//    return fileRoute;
-//  }
-//
-//  private String getCocoFileRoute(String imageRoute) {
-//    String[] splitedRoute = imageRoute.split("/");
-//    String route = "";
-//    for (int i = 0; splitedRoute.length - 2 >= i; i++) {
-//      route += "/" + splitedRoute[i];
-//    }
-//    route += "Coco";
-//    route += "/" + splitedRoute[splitedRoute.length - 1];
-//    route = route.replace(".png", ".json");
-//    return route;
-//  }
-//
-//  public void updateLabels(String index, String labelColumn, long startdate, long enddate, String label, String timestampColumn) {
-//    DataSeries queryResult = getEvents(index, startdate, enddate);
-//    Map<String, String> headerWithTypes = new GetHeadersWithTypesQuery(QueryParams.from(index)).executeQuery();
-//    List<String> headers = queryResult.getHeaders();
-//
-//    InfluxDB influxDB = DataExplorerUtils.getInfluxDBClient();
-//    influxDB.setDatabase(BackendConfig.INSTANCE.getInfluxDatabaseName());
-//
-//    for (List<Object> row : queryResult.getRows()) {
-//      long timestampValue = Math.round((double) row.get(headers.indexOf(timestampColumn)));
-//
-//      Point.Builder p = Point.measurement(index).time(timestampValue, TimeUnit.MILLISECONDS);
-//
-//      for (int i = 1; i < row.size(); i++) {
-//        String selected_header = headers.get(i);
-//        if (!selected_header.equals(labelColumn)) {
-//          if (headerWithTypes.get(selected_header).equals("integer")) {
-//            p.addField(selected_header, Math.round((double) row.get(i)));
-//          } else if (headerWithTypes.get(selected_header).equals("string")) {
-//            p.addField(selected_header, row.get(i).toString());
-//          }
-//        } else {
-//          p.addField(selected_header, label);
-//        }
-//      }
-//      influxDB.write(p.build());
-//    }
-//    influxDB.close();
-//  }
-
-}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/development/env b/streampipes-extensions/streampipes-sinks-internal-jvm/development/env
index 4e666ad..3ade8ae 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/development/env
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/development/env
@@ -18,10 +18,9 @@ SP_PORT=7020
 #SP_HOST=host.docker.internal
 SP_DEBUG=true
 SP_COUCHDB_HOST=localhost
-SP_JMS_HOST=localhost
-SP_JMS_PORT=61616
+SP_COUCHDB_PORT=5984
+SP_COUCHDB_PROTOCOL=http
 SP_DATA_LAKE_HOST=localhost
 SP_DATA_LAKE_PORT=8086
 SP_BACKEND_HOST=localhost
 SP_BACKEND_PORT=8030
-SP_IMAGE_STORAGE_LOCATION=
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
index 61123b2..8110058 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
@@ -58,15 +58,15 @@ public class SinksInternalJvmInit extends StandaloneModelSubmitter {
                     new SpKafkaProtocolFactory(),
                     new SpJmsProtocolFactory(),
                     new SpMqttProtocolFactory())
-            .addConfig(ConfigKeys.JMS_HOST, "activemq", "")
-            .addConfig(ConfigKeys.JMS_PORT, 61616, "")
+            .addConfig(ConfigKeys.COUCHDB_HOST, "couchdb", "Hostname for CouchDB to store image blobs")
+            .addConfig(ConfigKeys.COUCHDB_PORT, 5984, "")
+            .addConfig(ConfigKeys.COUCHDB_PROTOCOL, "http", "")
             .addConfig(ConfigKeys.DATA_LAKE_HOST, "influxdb", "Hostname for the StreamPipes data lake database")
             .addConfig(ConfigKeys.DATA_LAKE_PROTOCOL, "http", "Protocol for the StreamPipes data lake database")
             .addConfig(ConfigKeys.DATA_LAKE_PORT, 8086, "Port for the StreamPipes data lake database")
             .addConfig(ConfigKeys.DATA_LAKE_USERNAME, "default", "Username for the StreamPipes data lake database")
             .addConfig(ConfigKeys.DATA_LAKE_PASSWORD, "default", "Password for the StreamPipes data lake database")
             .addConfig(ConfigKeys.DATA_LAKE_DATABASE_NAME, "sp", "Database name for the StreamPipes data lake database")
-            .addConfig(ConfigKeys.IMAGE_STORAGE_LOCATION, "/spImages/", "Storage location of the data lake images")
             .build();
 
 
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
index fd8bd65..4d6f0d9 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
@@ -19,13 +19,14 @@
 package org.apache.streampipes.sinks.internal.jvm.config;
 
 public class ConfigKeys {
-    public final static String JMS_HOST = "SP_JMS_HOST";
-    public final static String JMS_PORT = "SP_JMS_PORT";
     public final static String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
     public final static String DATA_LAKE_PROTOCOL = "SP_DATA_LAKE_PROTOCOL";
     public final static String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
     public final static String DATA_LAKE_USERNAME = "SP_DATA_LAKE_USERNAME";
     public final static String DATA_LAKE_PASSWORD = "SP_DATA_LAKE_PASSWORD";
     public final static String DATA_LAKE_DATABASE_NAME = "SP_DATA_LAKE_DATABASE_NAME";
-    public final static String IMAGE_STORAGE_LOCATION = "SP_IMAGE_STORAGE_LOCATION";
+
+    public final static String COUCHDB_HOST = "SP_COUCHDB_HOST";
+    public final static String COUCHDB_PORT = "SP_COUCHDB_PORT";
+    public final static String COUCHDB_PROTOCOL = "SP_COUCHDB_PROTOCOL";
 }
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
index e826b98..f4d5840 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/dashboard/Dashboard.java
@@ -20,16 +20,13 @@ package org.apache.streampipes.sinks.internal.jvm.dashboard;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataformat.json.JsonDataFormatDefinition;
-import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
 import org.apache.streampipes.model.SpDataStream;
 import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
 
 public class Dashboard implements EventSink<DashboardParameters> {
 
-    private ActiveMQPublisher publisher;
     private JsonDataFormatDefinition jsonDataFormatDefinition;
 
     public Dashboard() {
@@ -39,10 +36,6 @@ public class Dashboard implements EventSink<DashboardParameters> {
     @Override
     public void onInvocation(DashboardParameters parameters,
                              EventSinkRuntimeContext context) throws SpRuntimeException {
-        this.publisher = new ActiveMQPublisher(
-                context.getConfigStore().getConfig().getString(ConfigKeys.JMS_HOST),
-                context.getConfigStore().getConfig().getInteger(ConfigKeys.JMS_PORT),
-                makeTopic(parameters.getGraph().getInputStreams().get(0), parameters.getVisualizationName()));
     }
 
     private String makeTopic(SpDataStream inputStream, String visualizationName) {
@@ -58,7 +51,6 @@ public class Dashboard implements EventSink<DashboardParameters> {
     @Override
     public void onEvent(Event event) {
         try {
-            publisher.publish(jsonDataFormatDefinition.fromMap(event.getRaw()));
         } catch (SpRuntimeException e) {
             e.printStackTrace();
         }
@@ -66,6 +58,5 @@ public class Dashboard implements EventSink<DashboardParameters> {
 
     @Override
     public void onDetach() throws SpRuntimeException {
-        this.publisher.disconnect();
     }
 }
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
index aa2ae39..52d4c4b 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
@@ -31,8 +31,8 @@ import org.apache.streampipes.vocabulary.SPSensor;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
 
-import java.io.*;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 /**
@@ -46,12 +46,10 @@ public class DataLake implements EventSink<DataLakeParameters> {
   private static Logger LOG;
 
   private List<EventProperty> imageProperties;
-
-  private String imageDirectory;
-
   private String timestampField;
 
   private EventSchema eventSchema;
+  private ImageStore imageStore;
 
   @Override
   public void onInvocation(DataLakeParameters parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
@@ -67,6 +65,12 @@ public class DataLake implements EventSink<DataLakeParameters> {
     String user = configStore.getString(ConfigKeys.DATA_LAKE_USERNAME);
     String password = configStore.getString(ConfigKeys.DATA_LAKE_PASSWORD);
 
+    String couchDbProtocol = configStore.getString(ConfigKeys.COUCHDB_PROTOCOL);
+    String couchDbHost = configStore.getString(ConfigKeys.COUCHDB_HOST);
+    int couchDbPort = configStore.getInteger(ConfigKeys.COUCHDB_PORT);
+
+    this.imageStore = new ImageStore(couchDbProtocol, couchDbHost, couchDbPort);
+
     EventSchema schema = runtimeContext.getInputSchemaInfo().get(0).getEventSchema();
     // Remove the timestamp field from the event schema
     List<EventProperty> eventPropertiesWithoutTimestamp = schema.getEventProperties()
@@ -88,8 +92,6 @@ public class DataLake implements EventSink<DataLakeParameters> {
                     eventProperty.getDomainProperties().get(0).toString().equals(SPSensor.IMAGE))
             .collect(Collectors.toList());
 
-    imageDirectory = configStore.getString(ConfigKeys.IMAGE_STORAGE_LOCATION) + parameters.getMeasurementName() + "/";
-
     InfluxDbConnectionSettings settings = InfluxDbConnectionSettings.from(
             influxHost, influxPort, databaseName, parameters.getMeasurementName(), user, password);
 
@@ -108,14 +110,11 @@ public class DataLake implements EventSink<DataLakeParameters> {
     try {
 
       this.imageProperties.forEach(eventProperty -> {
-        String eventTimestamp = Long.toString(event.getFieldBySelector(this.timestampField).getAsPrimitive().getAsLong());
-        String fileRoute = this.imageDirectory + eventProperty.getRuntimeName() + "/" + eventTimestamp + ".png";
+        String imageDocId = UUID.randomUUID().toString();
         String image = event.getFieldByRuntimeName(eventProperty.getRuntimeName()).getAsPrimitive().getAsString();
 
-        this.writeToImageFile(image, fileRoute);
-        fileRoute = fileRoute.replace("/", "_");
-        fileRoute = fileRoute.replace("." , "_");
-        event.updateFieldBySelector("s0::" + eventProperty.getRuntimeName(), fileRoute);
+        this.writeToImageFile(image, imageDocId);
+        event.updateFieldBySelector("s0::" + eventProperty.getRuntimeName(), imageDocId);
       });
 
       influxDbClient.save(event, this.eventSchema);
@@ -129,16 +128,9 @@ public class DataLake implements EventSink<DataLakeParameters> {
     influxDbClient.stop();
   }
 
-  private void writeToImageFile(String image, String fileRoute) {
+  private void writeToImageFile(String image, String imageDocId) {
     byte[] data = Base64.decodeBase64(image);
-    try {
-      File file = new File(fileRoute);
-      file.getParentFile().mkdirs();
-      OutputStream stream = new FileOutputStream(file, false);
-      stream.write(data);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
+    this.imageStore.storeImage(data, imageDocId);
   }
 
   /**
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/ImageStore.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/ImageStore.java
new file mode 100644
index 0000000..cd946b4
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/ImageStore.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.internal.jvm.datalake;
+
+import org.lightcouch.CouchDbClient;
+import org.lightcouch.CouchDbProperties;
+
+import java.io.ByteArrayInputStream;
+
+public class ImageStore {
+
+  private static final String DB_NAME = "images";
+
+  private CouchDbClient couchDbClient;
+
+  public ImageStore(String couchDbProtocol,
+                    String couchDbHost,
+                    int couchDbPort) {
+    this.couchDbClient = new CouchDbClient(props(couchDbProtocol, couchDbHost, couchDbPort));
+  }
+
+  public void storeImage(byte[] imageBytes,
+                         String imageDocId) {
+    this.couchDbClient.saveAttachment(
+            new ByteArrayInputStream(imageBytes),
+            imageDocId,
+            "image/jpeg",
+            imageDocId,
+            null);
+  }
+
+  private static CouchDbProperties props(String couchDbProtocol,
+                                         String couchDbHost,
+                                         int couchDbPort) {
+    return new CouchDbProperties(DB_NAME, true, couchDbProtocol,
+            couchDbHost, couchDbPort, null, null);
+  }
+}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
index 6dd7922..ad19534 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
@@ -42,8 +42,9 @@ public class ConsumerConfigFactory extends AbstractConfigFactory {
   public Properties makeProperties() {
     Properties props = new Properties();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, getConfigOrDefault(protocol::getGroupId,
-            UUID.randomUUID().toString()));
+//    props.put(ConsumerConfig.GROUP_ID_CONFIG, getConfigOrDefault(protocol::getGroupId,
+//            UUID.randomUUID().toString()));
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT_CONFIG_DEFAULT);
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
             AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT);
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeImageResource.java
similarity index 52%
copy from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
copy to streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeImageResource.java
index fd8bd65..f8a73e0 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeImageResource.java
@@ -1,3 +1,4 @@
+package org.apache.streampipes.ps;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -16,16 +17,22 @@
  *
  */
 
-package org.apache.streampipes.sinks.internal.jvm.config;
+import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
+import org.apache.streampipes.storage.management.StorageDispatcher;
 
-public class ConfigKeys {
-    public final static String JMS_HOST = "SP_JMS_HOST";
-    public final static String JMS_PORT = "SP_JMS_PORT";
-    public final static String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
-    public final static String DATA_LAKE_PROTOCOL = "SP_DATA_LAKE_PROTOCOL";
-    public final static String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
-    public final static String DATA_LAKE_USERNAME = "SP_DATA_LAKE_USERNAME";
-    public final static String DATA_LAKE_PASSWORD = "SP_DATA_LAKE_PASSWORD";
-    public final static String DATA_LAKE_DATABASE_NAME = "SP_DATA_LAKE_DATABASE_NAME";
-    public final static String IMAGE_STORAGE_LOCATION = "SP_IMAGE_STORAGE_LOCATION";
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Response;
+
+@Path("v4/datalake/images")
+public class DataLakeImageResource extends AbstractAuthGuardedRestResource {
+
+  @GET
+  @Path("{imageId}")
+  @Produces("image/jpeg")
+  public Response getImage(@PathParam("imageId") String imageId) {
+    return ok(StorageDispatcher.INSTANCE.getNoSqlStore().getImageStorage().getImageBytes(imageId));
+  }
 }
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IImageStorage.java
similarity index 52%
copy from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
copy to streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IImageStorage.java
index fd8bd65..737c846 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IImageStorage.java
@@ -16,16 +16,12 @@
  *
  */
 
-package org.apache.streampipes.sinks.internal.jvm.config;
+package org.apache.streampipes.storage.api;
+
+import java.io.InputStream;
+
+public interface IImageStorage {
+
+  InputStream getImageBytes(String documentId);
 
-public class ConfigKeys {
-    public final static String JMS_HOST = "SP_JMS_HOST";
-    public final static String JMS_PORT = "SP_JMS_PORT";
-    public final static String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
-    public final static String DATA_LAKE_PROTOCOL = "SP_DATA_LAKE_PROTOCOL";
-    public final static String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
-    public final static String DATA_LAKE_USERNAME = "SP_DATA_LAKE_USERNAME";
-    public final static String DATA_LAKE_PASSWORD = "SP_DATA_LAKE_PASSWORD";
-    public final static String DATA_LAKE_DATABASE_NAME = "SP_DATA_LAKE_DATABASE_NAME";
-    public final static String IMAGE_STORAGE_LOCATION = "SP_IMAGE_STORAGE_LOCATION";
 }
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
index 6e5c278..5c52dbc 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INoSqlStorage.java
@@ -25,6 +25,8 @@ public interface INoSqlStorage {
 
   ICategoryStorage getCategoryStorageAPI();
 
+  IImageStorage getImageStorage();
+
   IUserGroupStorage getUserGroupStorage();
 
   ILabelStorage getLabelStorageAPI();
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
index 2b80297..4a8b835 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
@@ -39,6 +39,11 @@ public enum CouchDbStorageManager implements INoSqlStorage {
   public ICategoryStorage getCategoryStorageAPI() { return new CategoryStorageImpl(); }
 
   @Override
+  public IImageStorage getImageStorage() {
+    return new ImageStorageImpl();
+  }
+
+  @Override
   public IUserGroupStorage getUserGroupStorage() {
     return new UserGroupStorageImpl();
   }
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ImageStorageImpl.java
similarity index 52%
copy from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
copy to streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ImageStorageImpl.java
index fd8bd65..04783b8 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/ImageStorageImpl.java
@@ -16,16 +16,24 @@
  *
  */
 
-package org.apache.streampipes.sinks.internal.jvm.config;
+package org.apache.streampipes.storage.couchdb.impl;
 
-public class ConfigKeys {
-    public final static String JMS_HOST = "SP_JMS_HOST";
-    public final static String JMS_PORT = "SP_JMS_PORT";
-    public final static String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
-    public final static String DATA_LAKE_PROTOCOL = "SP_DATA_LAKE_PROTOCOL";
-    public final static String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
-    public final static String DATA_LAKE_USERNAME = "SP_DATA_LAKE_USERNAME";
-    public final static String DATA_LAKE_PASSWORD = "SP_DATA_LAKE_PASSWORD";
-    public final static String DATA_LAKE_DATABASE_NAME = "SP_DATA_LAKE_DATABASE_NAME";
-    public final static String IMAGE_STORAGE_LOCATION = "SP_IMAGE_STORAGE_LOCATION";
+import org.apache.streampipes.storage.api.IImageStorage;
+import org.apache.streampipes.storage.couchdb.utils.Utils;
+import org.lightcouch.CouchDbClient;
+
+import java.io.InputStream;
+
+public class ImageStorageImpl implements IImageStorage {
+
+  private final CouchDbClient couchDbClient;
+
+  public ImageStorageImpl() {
+    this.couchDbClient = Utils.getCouchDbImageClient();
+  }
+
+  @Override
+  public InputStream getImageBytes(String documentId) {
+    return couchDbClient.find(documentId + "/" + documentId);
+  }
 }
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
index 8cdfc82..024107d 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java
@@ -36,6 +36,10 @@ public class Utils {
     return getCouchDbGsonClient("data-sink");
   }
 
+  public static CouchDbClient getCouchDbImageClient() {
+    return getCouchDbGsonClient("images");
+  }
+
   public static CouchDbClient getCouchDbPipelineElementTemplateClient() {
     return getCouchDbGsonClient("pipelineelementtemplate");
   }
diff --git a/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts b/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts
index 4a5a2d1..249c9f5 100644
--- a/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts
+++ b/ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts
@@ -34,7 +34,7 @@ export class DatalakeRestService {
     return '/streampipes-backend';
   }
 
-  private get dataLakeUrl() {
+  public get dataLakeUrl() {
     return this.baseUrl + '/api/v4' + '/datalake';
   }
 
diff --git a/ui/src/app/dashboard/components/widgets/image/image-config.ts b/ui/src/app/dashboard/components/widgets/image/image-config.ts
index 0a2c9e3..bd14ffd 100644
--- a/ui/src/app/dashboard/components/widgets/image/image-config.ts
+++ b/ui/src/app/dashboard/components/widgets/image/image-config.ts
@@ -25,7 +25,7 @@ import { DashboardWidgetSettings } from '@streampipes/platform-services';
 export class ImageConfig extends WidgetConfig {
 
     static readonly TITLE_KEY: string = 'title-key';
-    static readonly NUMBER_MAPPING_KEY: string = 'number-mapping';
+    static readonly IMAGE_MAPPING_KEY: string = 'image-mapping';
 
     constructor() {
         super();
@@ -37,7 +37,7 @@ export class ImageConfig extends WidgetConfig {
             .withIcon('fas fa-image')
             .requiredSchema(SchemaRequirementsBuilder
                 .create()
-                .requiredPropertyWithUnaryMapping(ImageConfig.NUMBER_MAPPING_KEY, 'Select property', '', EpRequirements.imageReq())
+                .requiredPropertyWithUnaryMapping(ImageConfig.IMAGE_MAPPING_KEY, 'Select image field', '', EpRequirements.imageReq())
                 .build())
             .requiredTextParameter(ImageConfig.TITLE_KEY, 'Title', 'The title')
             .build();
diff --git a/ui/src/app/dashboard/components/widgets/image/image-widget.component.html b/ui/src/app/dashboard/components/widgets/image/image-widget.component.html
index aab9dc6..2c1c1ef 100644
--- a/ui/src/app/dashboard/components/widgets/image/image-widget.component.html
+++ b/ui/src/app/dashboard/components/widgets/image/image-widget.component.html
@@ -21,6 +21,6 @@
         {{title}}
     </div>
     <div class="imageContent">
-        <img class="w-100" src="data:image/jpg;base64,{{item}}" *ngIf="item" />
+        <img class="w-100" [src]="currentImageUrl" *ngIf="imageReady" />
     </div>
 </div>
diff --git a/ui/src/app/dashboard/components/widgets/image/image-widget.component.ts b/ui/src/app/dashboard/components/widgets/image/image-widget.component.ts
index e2ce556..f2129ad 100644
--- a/ui/src/app/dashboard/components/widgets/image/image-widget.component.ts
+++ b/ui/src/app/dashboard/components/widgets/image/image-widget.component.ts
@@ -22,6 +22,9 @@ import { ResizeService } from '../../../services/resize.service';
 import { StaticPropertyExtractor } from '../../../sdk/extractor/static-property-extractor';
 import { GaugeConfig } from '../gauge/gauge-config';
 import { DatalakeRestService } from '@streampipes/platform-services';
+import { SafeUrl } from '@angular/platform-browser';
+import { SecurePipe } from '../../../../services/secure.pipe';
+import { ImageConfig } from './image-config';
 
 @Component({
     selector: 'image-widget',
@@ -33,13 +36,19 @@ export class ImageWidgetComponent extends BaseNgxChartsStreamPipesWidget impleme
     item: any;
     title: string;
     selectedProperty: string;
+    imageBaseUrl: string;
+    currentImageUrl: SafeUrl;
+    imageReady = false;
 
-    constructor(dataLakeService: DatalakeRestService, resizeService: ResizeService) {
+    constructor(dataLakeService: DatalakeRestService,
+                resizeService: ResizeService,
+                private securePipe: SecurePipe) {
         super(dataLakeService, resizeService);
     }
 
     ngOnInit(): void {
         super.ngOnInit();
+        this.imageBaseUrl = this.dataLakeService.dataLakeUrl + '/images/';
     }
 
     ngOnDestroy(): void {
@@ -48,15 +57,17 @@ export class ImageWidgetComponent extends BaseNgxChartsStreamPipesWidget impleme
 
     extractConfig(extractor: StaticPropertyExtractor) {
         this.title = extractor.singleValueParameter(GaugeConfig.TITLE_KEY);
-        this.selectedProperty = extractor.mappingPropertyValue(GaugeConfig.NUMBER_MAPPING_KEY);
-    }
-
-    isNumber(item: any): boolean {
-        return false;
+        this.selectedProperty = extractor.mappingPropertyValue(ImageConfig.IMAGE_MAPPING_KEY);
     }
 
     protected onEvent(events: any[]) {
-        this.item = events[0][this.selectedProperty];
+        const url = this.imageBaseUrl + events[0][this.selectedProperty];
+        this.securePipe.transform(url).subscribe(res => {
+           this.currentImageUrl = res;
+            if (!this.imageReady) {
+                this.imageReady = true;
+            }
+        });
     }
 
     protected getQueryLimit(extractor: StaticPropertyExtractor): number {
diff --git a/ui/src/app/dashboard/dashboard.module.ts b/ui/src/app/dashboard/dashboard.module.ts
index d97a75f..15b347d 100644
--- a/ui/src/app/dashboard/dashboard.module.ts
+++ b/ui/src/app/dashboard/dashboard.module.ts
@@ -24,15 +24,13 @@ import { DashboardComponent } from './dashboard.component';
 import { DashboardPanelComponent } from './components/panel/dashboard-panel.component';
 import { MatTabsModule } from '@angular/material/tabs';
 import { DashboardWidgetComponent } from './components/widget/dashboard-widget.component';
-import { CustomMaterialModule } from '../CustomMaterial/custom-material.module';
+// import { CustomMaterialModule } from '../CustomMaterial/custom-material.module';
 import { FormsModule } from '@angular/forms';
 import { ColorPickerModule } from 'ngx-color-picker';
 import { AddVisualizationDialogComponent } from './dialogs/add-widget/add-visualization-dialog.component';
 import { MatGridListModule } from '@angular/material/grid-list';
 import { DashboardService } from './services/dashboard.service';
 import { NumberWidgetComponent } from './components/widgets/number/number-widget.component';
-import { streamPipesStompConfig } from './services/websocket.config';
-import { InjectableRxStompConfig, RxStompService, rxStompServiceFactory } from '@stomp/ng2-stompjs';
 import { DashboardOverviewComponent } from './components/overview/dashboard-overview.component';
 import { EditDashboardDialogComponent } from './dialogs/edit-dashboard/edit-dashboard-dialog.component';
 import { DashboardGridComponent } from './components/grid/dashboard-grid.component';
@@ -61,86 +59,80 @@ import { StackedLineChartWidgetComponent } from './components/widgets/stacked-li
 import { EditModeService } from './services/edit-mode.service';
 import { ReloadPipelineService } from './services/reload-pipeline.service';
 import { PlatformServicesModule } from '@streampipes/platform-services';
+import { CustomMaterialModule } from '../CustomMaterial/custom-material.module';
+import { ServicesModule } from "../services/services.module";
 
 @NgModule({
-    imports: [
-        NgxEchartsModule.forRoot({
-            /**
-             * This will import all modules from echarts.
-             * If you only need custom modules,
-             * please refer to [Custom Build] section.
-             */
-            echarts: () => import('echarts'),
-        }),
-        CommonModule,
-        CoreUiModule,
-        MatTabsModule,
-        GridsterModule,
-        FlexLayoutModule,
-        CustomMaterialModule,
-        FormsModule,
-        ColorPickerModule,
-        MatGridListModule,
-        NgxChartsModule,
-        CdkTableModule,
-        LeafletModule,
-        PlatformServicesModule
-    ],
-    declarations: [
-        BarRaceWidgetComponent,
-        DashboardComponent,
-        DashboardGridComponent,
-        DashboardOverviewComponent,
-        DashboardPanelComponent,
-        DashboardWidgetComponent,
-        AddVisualizationDialogComponent,
-        EditDashboardDialogComponent,
-        AreaWidgetComponent,
-        LineWidgetComponent,
-        NumberWidgetComponent,
-        TableWidgetComponent,
-        GaugeWidgetComponent,
-        ImageWidgetComponent,
-        MapWidgetComponent,
-        RawWidgetComponent,
-        StackedLineChartWidgetComponent,
-        HtmlWidgetComponent,
-        StatusWidgetComponent,
-        TrafficLightWidgetComponent,
-        WordcloudWidgetComponent,
-        StandaloneDashboardComponent
-    ],
-    providers: [
-        DashboardService,
-        EditModeService,
-        ReloadPipelineService,
-        ResizeService,
-        RefreshDashboardService,
-        SemanticTypeUtilsService,
-        {
-            provide: InjectableRxStompConfig,
-            useValue: streamPipesStompConfig
-        },
-        {
-            provide: RxStompService,
-            useFactory: rxStompServiceFactory,
-            deps: [InjectableRxStompConfig]
-        },
-    ],
-    exports: [
-        DashboardComponent,
-        DashboardWidgetComponent
-    ],
-    entryComponents: [
-        DashboardComponent,
-        AddVisualizationDialogComponent,
-        EditDashboardDialogComponent,
-        StandaloneDashboardComponent
-    ]
+  imports: [
+    NgxEchartsModule.forRoot({
+      /**
+       * This will import all modules from echarts.
+       * If you only need custom modules,
+       * please refer to [Custom Build] section.
+       */
+      echarts: () => import('echarts'),
+    }),
+    CommonModule,
+    CoreUiModule,
+    MatTabsModule,
+    GridsterModule,
+    FlexLayoutModule,
+    CustomMaterialModule,
+    FormsModule,
+    ColorPickerModule,
+    MatGridListModule,
+    NgxChartsModule,
+    CdkTableModule,
+    LeafletModule,
+    PlatformServicesModule,
+    ServicesModule
+  ],
+  declarations: [
+    BarRaceWidgetComponent,
+    DashboardComponent,
+    DashboardGridComponent,
+    DashboardOverviewComponent,
+    DashboardPanelComponent,
+    DashboardWidgetComponent,
+    AddVisualizationDialogComponent,
+    EditDashboardDialogComponent,
+    AreaWidgetComponent,
+    LineWidgetComponent,
+    NumberWidgetComponent,
+    TableWidgetComponent,
+    GaugeWidgetComponent,
+    ImageWidgetComponent,
+    MapWidgetComponent,
+    RawWidgetComponent,
+    StackedLineChartWidgetComponent,
+    HtmlWidgetComponent,
+    StatusWidgetComponent,
+    TrafficLightWidgetComponent,
+    WordcloudWidgetComponent,
+    StandaloneDashboardComponent
+  ],
+  providers: [
+    DashboardService,
+    EditModeService,
+    ReloadPipelineService,
+    ResizeService,
+    RefreshDashboardService,
+    SemanticTypeUtilsService
+  ],
+  exports: [
+    DashboardComponent,
+    DashboardWidgetComponent
+  ],
+  entryComponents: [
+    DashboardComponent,
+    AddVisualizationDialogComponent,
+    EditDashboardDialogComponent,
+    StandaloneDashboardComponent
+  ]
 })
 export class DashboardModule {
 
-    constructor() {
-    }
+  constructor() {
+  }
 
 }
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java b/ui/src/app/services/secure.pipe.ts
similarity index 50%
copy from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
copy to ui/src/app/services/secure.pipe.ts
index fd8bd65..11538bd 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
+++ b/ui/src/app/services/secure.pipe.ts
@@ -16,16 +16,27 @@
  *
  */
 
-package org.apache.streampipes.sinks.internal.jvm.config;
 
-public class ConfigKeys {
-    public final static String JMS_HOST = "SP_JMS_HOST";
-    public final static String JMS_PORT = "SP_JMS_PORT";
-    public final static String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
-    public final static String DATA_LAKE_PROTOCOL = "SP_DATA_LAKE_PROTOCOL";
-    public final static String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
-    public final static String DATA_LAKE_USERNAME = "SP_DATA_LAKE_USERNAME";
-    public final static String DATA_LAKE_PASSWORD = "SP_DATA_LAKE_PASSWORD";
-    public final static String DATA_LAKE_DATABASE_NAME = "SP_DATA_LAKE_DATABASE_NAME";
-    public final static String IMAGE_STORAGE_LOCATION = "SP_IMAGE_STORAGE_LOCATION";
+import { Pipe, PipeTransform } from '@angular/core';
+import { HttpClient } from '@angular/common/http';
+import { DomSanitizer, SafeUrl } from '@angular/platform-browser';
+import { AuthService } from './auth.service';
+import { map } from 'rxjs/operators';
+import { Observable } from 'rxjs';
+
+@Pipe({
+  name: 'secure'
+})
+export class SecurePipe implements PipeTransform {
+
+  constructor(private http: HttpClient,
+              private sanitizer: DomSanitizer,
+              private authService: AuthService) { }
+
+  transform(url): Observable<SafeUrl> {
+    return this.http
+      .get(url, { responseType: 'blob', headers: {'Authorization': 'Bearer ' + this.authService.authToken$.value}})
+      .pipe(map(val => this.sanitizer.bypassSecurityTrustUrl(URL.createObjectURL(val))));
+  }
+
 }
diff --git a/ui/src/app/services/services.module.ts b/ui/src/app/services/services.module.ts
index 43f8351..7e41210 100644
--- a/ui/src/app/services/services.module.ts
+++ b/ui/src/app/services/services.module.ts
@@ -27,10 +27,11 @@ import { ElementIconText } from './get-element-icon-text.service';
 import { AppConstants } from './app.constants';
 import { JwtTokenStorageService } from './jwt-token-storage.service';
 import { PlatformServicesModule } from '@streampipes/platform-services';
+import { SecurePipe } from "./secure.pipe";
 
 @NgModule({
   imports: [],
-  declarations: [],
+  declarations: [SecurePipe],
   providers: [
     AppConstants,
     RestApi,
@@ -42,8 +43,10 @@ import { PlatformServicesModule } from '@streampipes/platform-services';
     NotificationCountService,
     PropertySelectorService,
     PlatformServicesModule,
+    SecurePipe,
   ],
-  entryComponents: []
+  entryComponents: [],
+  exports: [SecurePipe]
 })
 export class ServicesModule {
 }

[incubator-streampipes] 02/07: [STREAMPIPES-509] Load notifications over REST instead of websockets

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch experimental-module-federation-494
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 42405c7eb3d72318dc7971da4b2d55ebd89294ca
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Feb 6 14:38:21 2022 +0100

    [STREAMPIPES-509] Load notifications over REST instead of websockets
---
 .../streampipes/client/StreamPipesClient.java      |   4 +
 .../streampipes/client/api/NotificationsApi.java   |  22 ++
 .../jvm/notification/NotificationProducer.java     |  19 +-
 .../org/apache/streampipes/model/Notification.java |   6 +-
 .../manager/execution/http/PipelineExecutor.java   |   1 +
 .../apache/streampipes/rest/impl/Notification.java |  24 +-
 .../storage/api/INotificationStorage.java          |   2 +
 .../couchdb/impl/NotificationStorageImpl.java      |  18 +-
 .../src/lib/model/gen/streampipes-model.ts         |  10 +-
 .../core/components/iconbar/iconbar.component.html |   4 +-
 .../core/components/iconbar/iconbar.component.ts   |  50 ++--
 ui/src/app/home/components/status.component.html   |   2 +-
 ui/src/app/home/components/status.component.ts     |   3 +
 .../components/notification-item.component.html    |   4 +-
 .../app/notifications/notifications.component.html |   4 +-
 .../app/notifications/notifications.component.ts   | 326 +++++++++++----------
 .../notifications/service/notifications.service.ts |  15 +-
 ui/src/app/services/notification-count-service.ts  |  17 +-
 ui/src/app/services/rest-api.service.ts            |   2 +-
 19 files changed, 302 insertions(+), 231 deletions(-)

diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
index 9dd2da1..5daa809 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
@@ -185,6 +185,10 @@ public class StreamPipesClient implements SupportsPipelineApi,
     return new AdminApi(config);
   }
 
+  public NotificationsApi notificationsApi() {
+    return new NotificationsApi(config);
+  }
+
   public void deliverEmail(SpEmail email) {
     CustomRequestApi api = customRequest();
     api.sendPost(ApiPath.EMAIL_RESOURCE, email);
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/NotificationsApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/NotificationsApi.java
new file mode 100644
index 0000000..5733b5a
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/NotificationsApi.java
@@ -0,0 +1,22 @@
+package org.apache.streampipes.client.api;
+
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.model.Notification;
+
+public class NotificationsApi extends AbstractTypedClientApi<Notification> {
+
+  public NotificationsApi(StreamPipesClientConfig clientConfig) {
+    super(clientConfig, Notification.class);
+  }
+
+  @Override
+  protected StreamPipesApiPath getBaseResourcePath() {
+    return StreamPipesApiPath.fromBaseApiPath()
+            .addToPath("notifications");
+  }
+
+  public void add(Notification notification) {
+    post(getBaseResourcePath(), notification);
+  }
+}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java
index a18692c..c7bcad0 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/notification/NotificationProducer.java
@@ -19,14 +19,11 @@
 package org.apache.streampipes.sinks.internal.jvm.notification;
 
 
-import com.google.gson.Gson;
+import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.messaging.jms.ActiveMQPublisher;
 import org.apache.streampipes.model.Notification;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.pe.shared.PlaceholderExtractor;
-import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventSink;
 
@@ -42,23 +39,17 @@ public class NotificationProducer implements EventSink<NotificationParameters> {
   private String correspondingPipelineId;
   private String correspondingUser;
 
-  private ActiveMQPublisher publisher;
-  private Gson gson;
+  private StreamPipesClient client;
 
 
   @Override
   public void onInvocation(NotificationParameters parameters, EventSinkRuntimeContext context) throws
           SpRuntimeException {
-    this.gson = new Gson();
     this.title = parameters.getTitle();
     this.content = parameters.getContent();
     this.correspondingPipelineId = parameters.getGraph().getCorrespondingPipeline();
     this.correspondingUser = parameters.getGraph().getCorrespondingUser();
-    SpConfig configStore = context.getConfigStore().getConfig();
-    this.publisher = new ActiveMQPublisher(
-            configStore.getString(ConfigKeys.JMS_HOST),
-            configStore.getInteger(ConfigKeys.JMS_PORT),
-            "org.apache.streampipes.notifications." + this.correspondingUser);
+    this.client = context.getStreamPipesClient();
   }
 
   @Override
@@ -76,12 +67,12 @@ public class NotificationProducer implements EventSink<NotificationParameters> {
 
     // TODO add targeted user to notification object
 
-    publisher.publish(gson.toJson(notification).getBytes());
+    client.notificationsApi().add(notification);
   }
 
   @Override
   public void onDetach() throws SpRuntimeException {
-    this.publisher.disconnect();
+
   }
 
 }
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/Notification.java b/streampipes-model/src/main/java/org/apache/streampipes/model/Notification.java
index af4f5cf..3b9f47f 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/Notification.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/Notification.java
@@ -18,14 +18,16 @@
 
 package org.apache.streampipes.model;
 
+
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.gson.annotations.SerializedName;
 
 import java.util.Date;
 
 public class Notification {
 
-  private @SerializedName("_id") String id;
-  private @SerializedName("_rev") String rev;
+  private @JsonProperty("_id") @SerializedName("_id") String id;
+  private @JsonProperty("_rev") @SerializedName("_rev") String rev;
 
   private String title;
   private Date createdAt;
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index 8953b77..ed6dda9 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -108,6 +108,7 @@ public class PipelineExecutor {
       graphs.forEach(g -> {
         try {
           g.setSelectedEndpointUrl(findSelectedEndpoint(g));
+          g.setCorrespondingPipeline(pipeline.getPipelineId());
         } catch (NoServiceEndpointsAvailableException e) {
           failedServices.add(g);
         }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Notification.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Notification.java
index ea0c02b..14925bd 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Notification.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/Notification.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.rest.impl;
 
 import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
-import org.apache.streampipes.rest.shared.annotation.GsonWithIds;
+import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
@@ -29,9 +29,18 @@ import javax.ws.rs.core.Response;
 @Path("/v2/notifications")
 public class Notification extends AbstractAuthGuardedRestResource {
 
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @JacksonSerialized
+    public Response addNotification(org.apache.streampipes.model.Notification notification) {
+        getNotificationStorage().addNotification(notification);
+        return ok();
+    }
+
     @GET
     @Produces(MediaType.APPLICATION_JSON)
-    @GsonWithIds
+    @JacksonSerialized
+    @Path("/offset")
     public Response getNotifications(@QueryParam("notificationType") String notificationTypeId,
                                      @QueryParam("offset") Integer offset,
                                      @QueryParam("count") Integer count) {
@@ -41,10 +50,19 @@ public class Notification extends AbstractAuthGuardedRestResource {
 
     @GET
     @Produces(MediaType.APPLICATION_JSON)
+    @JacksonSerialized
+    @Path("/time")
+    public Response getNotifications(@QueryParam("startTime") long startTime) {
+        return ok(getNotificationStorage()
+                .getAllNotificationsFromTimestamp(startTime));
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/count")
     public Response getUnreadNotificationsCount() {
         return ok(getNotificationStorage()
-                .getUnreadNotificationsCount(getAuthenticatedUsername()));
+                .getUnreadNotificationsCount(getAuthenticatedUserSid()));
     }
 
     @GET
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INotificationStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INotificationStorage.java
index 17fb587..e052037 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INotificationStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/INotificationStorage.java
@@ -29,6 +29,8 @@ public interface INotificationStorage {
 
   List<Notification> getAllNotifications(String notificationTypeId, Integer offset, Integer count);
 
+  List<Notification> getAllNotificationsFromTimestamp(long startTime);
+
   List<Notification> getUnreadNotifications();
 
   NotificationCount getUnreadNotificationsCount(String username);
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NotificationStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NotificationStorageImpl.java
index 4665143..7d8da4c 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NotificationStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/NotificationStorageImpl.java
@@ -50,20 +50,32 @@ public class NotificationStorageImpl extends AbstractDao<Notification> implement
   public List<Notification> getAllNotifications(String notificationTypeId,
                                                 Integer offset,
                                                 Integer count) {
-    Gson gson = couchDbClientSupplier.get().getGson();
     List<JsonObject> notifications =
             couchDbClientSupplier
                     .get()
                     .view("notificationtypes/notificationtypes")
                     .startKey(Arrays.asList(notificationTypeId, "\ufff0"))
-                    .endKey("\ufff0")
+                    .endKey(Arrays.asList(notificationTypeId, 0))
                     .descending(true)
                     .includeDocs(true)
                     .skip(offset)
                     .limit(count)
                     .query(JsonObject.class);
 
-    return notifications
+    return map(notifications);
+  }
+
+  @Override
+  public List<Notification> getAllNotificationsFromTimestamp(long startTime) {
+
+    return couchDbClientSupplier
+            .get()
+            .findDocs("{\"selector\": {\"createdAtTimestamp\": {\"$gt\": " +startTime + "}}}", Notification.class);
+  }
+
+  private List<Notification> map(List<JsonObject> jsonObjects) {
+    Gson gson = couchDbClientSupplier.get().getGson();
+    return jsonObjects
             .stream()
             .map(notification -> gson.fromJson(notification, Notification.class))
             .collect(Collectors.toList());
diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
index 4e1cf41..a68c527 100644
--- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
+++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
@@ -18,7 +18,7 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 2.27.744 on 2022-02-02 13:43:14.
+// Generated using typescript-generator version 2.27.744 on 2022-02-05 22:40:52.
 
 export class AbstractStreamPipesEntity {
     "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
@@ -1666,9 +1666,9 @@ export class GenericAdapterSetDescription extends AdapterSetDescription implemen
         }
         const instance = target || new GenericAdapterSetDescription();
         super.fromData(data, instance);
+        instance.eventSchema = EventSchema.fromData(data.eventSchema);
         instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
-        instance.eventSchema = EventSchema.fromData(data.eventSchema);
         return instance;
     }
 }
@@ -1685,9 +1685,9 @@ export class GenericAdapterStreamDescription extends AdapterStreamDescription im
         }
         const instance = target || new GenericAdapterStreamDescription();
         super.fromData(data, instance);
+        instance.eventSchema = EventSchema.fromData(data.eventSchema);
         instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
-        instance.eventSchema = EventSchema.fromData(data.eventSchema);
         return instance;
     }
 }
@@ -2499,8 +2499,8 @@ export class PipelineTemplateDescription extends NamedStreamPipesEntity {
         super.fromData(data, instance);
         instance.boundTo = __getCopyArrayFn(BoundPipelineElement.fromData)(data.boundTo);
         instance.pipelineTemplateName = data.pipelineTemplateName;
-        instance.pipelineTemplateId = data.pipelineTemplateId;
         instance.pipelineTemplateDescription = data.pipelineTemplateDescription;
+        instance.pipelineTemplateId = data.pipelineTemplateId;
         return instance;
     }
 }
@@ -2901,8 +2901,8 @@ export class SpDataSet extends SpDataStream {
         instance.datasetInvocationId = data.datasetInvocationId;
         instance.correspondingPipeline = data.correspondingPipeline;
         instance.selectedEndpointUrl = data.selectedEndpointUrl;
-        instance.actualTopicName = data.actualTopicName;
         instance.brokerHostname = data.brokerHostname;
+        instance.actualTopicName = data.actualTopicName;
         return instance;
     }
 }
diff --git a/ui/src/app/core/components/iconbar/iconbar.component.html b/ui/src/app/core/components/iconbar/iconbar.component.html
index b975f72..a04b784 100644
--- a/ui/src/app/core/components/iconbar/iconbar.component.html
+++ b/ui/src/app/core/components/iconbar/iconbar.component.html
@@ -33,8 +33,8 @@
         <button mat-button mat-icon-button class="md-icon-button button-margin-iconbar iconbar-size"
                 (click)="go('notifications')"
                 matTooltip="Notifications" matTooltipPosition="right">
-            <mat-icon [matBadge]="notificationCountService.unreadNotificationCount"
-                      [matBadgeHidden]="notificationCountService.unreadNotificationCount == 0"
+            <mat-icon [matBadge]="unreadNotificationCount"
+                      [matBadgeHidden]="unreadNotificationCount == 0"
                       [ngClass]="'notifications' === activePage ?'sp-navbar-icon-selected' : 'sp-navbar-icon'"
                       data-cy="navigation-icon">chat
 
diff --git a/ui/src/app/core/components/iconbar/iconbar.component.ts b/ui/src/app/core/components/iconbar/iconbar.component.ts
index 958519a..c775ce9 100644
--- a/ui/src/app/core/components/iconbar/iconbar.component.ts
+++ b/ui/src/app/core/components/iconbar/iconbar.component.ts
@@ -16,58 +16,46 @@
  *
  */
 
-import { Component, OnInit } from '@angular/core';
+import { Component, OnDestroy, OnInit } from '@angular/core';
 import { BaseNavigationComponent } from '../base-navigation.component';
-import { Client } from '@stomp/stompjs';
-import { NotificationItem } from '../../../notifications/model/notifications.model';
 import { Router } from '@angular/router';
 import { NotificationCountService } from '../../../services/notification-count-service';
 import { AuthService } from '../../../services/auth.service';
+import { Subscription, timer } from "rxjs";
+import { exhaustMap, mergeMap } from "rxjs/operators";
+import { RestApi } from "../../../services/rest-api.service";
 
 @Component({
   selector: 'iconbar',
   templateUrl: './iconbar.component.html',
   styleUrls: ['./iconbar.component.scss']
 })
-export class IconbarComponent extends BaseNavigationComponent implements OnInit {
+export class IconbarComponent extends BaseNavigationComponent implements OnInit, OnDestroy {
 
-  unreadNotifications = 0;
+  unreadNotificationCount = 0;
+  unreadNotificationsSubscription: Subscription;
 
   constructor(router: Router,
               authService: AuthService,
-              public notificationCountService: NotificationCountService) {
+              public notificationCountService: NotificationCountService,
+              private restApi: RestApi) {
     super(authService, router);
   }
 
   ngOnInit(): void {
     super.onInit();
-    this.connectToBroker();
-    this.notificationCountService.loadUnreadNotifications();
-  }
-
-  connectToBroker() {
-    const login = 'admin';
-    const passcode = 'admin';
-    const websocketProtocol = window.location.protocol === 'http:' ? 'ws' : 'wss';
-    const brokerUrl = websocketProtocol + '://' + window.location.hostname + ':' + window.location.port + '/streampipes/ws';
-    const inputTopic = '/topic/org.apache.streampipes.notifications.' + this.authService.getCurrentUser().username;
+    this.unreadNotificationsSubscription = timer(0, 10000).pipe(
+      exhaustMap(() => this.restApi.getUnreadNotificationsCount()))
+      .subscribe(response => {
+        this.notificationCountService.unreadNotificationCount$.next(response.count);
+      });
 
-    const stompClient = new Client({
-      brokerURL: brokerUrl,
-      connectHeaders: {
-        login,
-        passcode
-      },
-      reconnectDelay: 5000
+    this.notificationCountService.unreadNotificationCount$.subscribe(count => {
+      this.unreadNotificationCount = count;
     });
+  }
 
-    stompClient.onConnect = (frame) => {
-
-      stompClient.subscribe(inputTopic, message => {
-        this.notificationCountService.increaseNotificationCount(JSON.parse(message.body) as NotificationItem);
-      });
-    };
-
-    stompClient.activate();
+  ngOnDestroy() {
+    this.unreadNotificationsSubscription.unsubscribe();
   }
 }
diff --git a/ui/src/app/home/components/status.component.html b/ui/src/app/home/components/status.component.html
index 1441e63..f1cf5cc 100644
--- a/ui/src/app/home/components/status.component.html
+++ b/ui/src/app/home/components/status.component.html
@@ -31,7 +31,7 @@
     </div>
     <div fxFlex="25" class="status-container notifications mr-20" (click)="navigate('notifications')">
         <div fxFill="100" fxLayout="column" fxLayoutAlign="center center">
-            <div class="status-container-number" fxLayoutAlign="start center">{{notificationCountService.unreadNotificationCount}}</div>
+            <div class="status-container-number" fxLayoutAlign="start center">{{unreadNotificationCount}}</div>
             <div class="status-container-text">unread notifications</div>
         </div>
     </div>
diff --git a/ui/src/app/home/components/status.component.ts b/ui/src/app/home/components/status.component.ts
index bb499f2..c863eaa 100644
--- a/ui/src/app/home/components/status.component.ts
+++ b/ui/src/app/home/components/status.component.ts
@@ -31,6 +31,7 @@ export class StatusComponent implements OnInit {
     pipelines = 0;
     runningPipelines = 0;
     installedPipelineElements = 0;
+    unreadNotificationCount = 0;
 
     constructor(private pipelineElementService: PipelineElementService,
                 private router: Router,
@@ -42,6 +43,8 @@ export class StatusComponent implements OnInit {
         this.getStreams();
         this.getProcessors();
         this.getSinks();
+        this.notificationCountService.unreadNotificationCount$
+          .subscribe(count => this.unreadNotificationCount = count);
     }
 
     getPipelines() {
diff --git a/ui/src/app/notifications/components/notification-item.component.html b/ui/src/app/notifications/components/notification-item.component.html
index 7a31381..1f8786d 100644
--- a/ui/src/app/notifications/components/notification-item.component.html
+++ b/ui/src/app/notifications/components/notification-item.component.html
@@ -18,11 +18,11 @@
 <div class="notification-item">
     <div class="i">
         <div class="head">
-            <span class="time">{{notification.createdAt}}</span>
+            <span class="time">{{notification.createdAt | date: 'yyyy-MM-dd HH:mm:ss'}}</span>
         </div>
         <div class="notification-box sp-accent-bg">
             <div [innerHTML]="sanitizedMessage"></div>
         </div>
     </div>
 
-</div>
\ No newline at end of file
+</div>
diff --git a/ui/src/app/notifications/notifications.component.html b/ui/src/app/notifications/notifications.component.html
index 26a27f8..e91ac59 100644
--- a/ui/src/app/notifications/notifications.component.html
+++ b/ui/src/app/notifications/notifications.component.html
@@ -65,8 +65,8 @@
                     <div class="notification-header-notification-name">{{currentlySelectedNotification.notificationTitle}}</div>
                     <hr class="header-divider"/>
                 </div>
-                <div #notificationPane class="notification-pane" (scroll)="onScroll($event)">
-                    <notification-item [notification]="notification" *ngFor="let notification of notifications"></notification-item>
+                <div #notificationPane class="notification-pane" (scroll)="onScroll()" *ngIf="currentlySelectedNotificationId">
+                    <notification-item [notification]="notification" *ngFor="let notification of allNotifications.get(currentlySelectedNotificationId)"></notification-item>
                 </div>
             </div>
 
diff --git a/ui/src/app/notifications/notifications.component.ts b/ui/src/app/notifications/notifications.component.ts
index dc623d7..33f9ce6 100644
--- a/ui/src/app/notifications/notifications.component.ts
+++ b/ui/src/app/notifications/notifications.component.ts
@@ -20,174 +20,204 @@ import { Component, ElementRef, OnDestroy, OnInit, ViewChild } from '@angular/co
 import { ExistingNotification, NotificationItem } from './model/notifications.model';
 import { ElementIconText } from '../services/get-element-icon-text.service';
 import { NotificationsService } from './service/notifications.service';
-import { Message } from '@stomp/stompjs';
-import { Subscription } from 'rxjs';
-import { RxStompService } from '@stomp/ng2-stompjs';
+import { Subscription, timer } from 'rxjs';
 import { NotificationUtils } from './utils/notifications.utils';
 import { NotificationCountService } from '../services/notification-count-service';
 import { FreeTextStaticProperty, Pipeline, PipelineService } from '@streampipes/platform-services';
 import { AuthService } from '../services/auth.service';
+import { filter, switchMap } from 'rxjs/operators';
 
 @Component({
-    selector: 'notifications',
-    templateUrl: './notifications.component.html',
-    styleUrls: ['./notifications.component.scss']
+  selector: 'notifications',
+  templateUrl: './notifications.component.html',
+  styleUrls: ['./notifications.component.scss']
 })
 export class NotificationsComponent implements OnInit, OnDestroy {
 
-    static readonly NOTIFICATIONS_APP_ID = 'org.apache.streampipes.sinks.internal.jvm.notification';
-    static readonly NOTIFICATION_TOPIC_PREFIX = 'org.apache.streampipes.notifications.';
-    static readonly NOTIFICATION_TITLE_KEY = 'title';
-
-    @ViewChild('notificationPane') private notificationContainer: ElementRef;
-
-    notifications: NotificationItem[] = [];
-    unreadNotifications: any;
-    existingNotifications: ExistingNotification[] = [];
-    currentlySelectedNotification: ExistingNotification;
-    currentlySelectedNotificationId: string;
-
-    pipelinesWithNotificationsPresent = false;
-    notificationsLoading = false;
-
-    currentOffset = 0;
-    liveOffset = 0;
-    previousScrollHeight: number;
-
-    subscription: Subscription;
-    notificationTopic: string;
-
-    newNotificationInfo: boolean[] = [];
-
-    newEventArriving = false;
-
-    constructor(private authService: AuthService,
-                private pipelineService: PipelineService,
-                public elementIconText: ElementIconText,
-                private notificationService: NotificationsService,
-                private rxStompService: RxStompService,
-                private notificationCountService: NotificationCountService) {
-        this.notifications = [];
-        this.unreadNotifications = [];
-        this.notificationTopic = NotificationsComponent.NOTIFICATION_TOPIC_PREFIX + this.authService.getCurrentUser().username;
-    }
-
-    ngOnInit() {
-        this.getPipelinesWithNotifications();
-    }
-
-    createSubscription() {
-        this.subscription = this.rxStompService.watch('/topic/' + this.notificationTopic).subscribe((message: Message) => {
-            let scrollToBottom = false;
-            if ((this.notificationContainer.nativeElement.scrollHeight - this.notificationContainer.nativeElement.scrollTop) <=
-                (this.notificationContainer.nativeElement.clientHeight + 10) &&
-                (this.notificationContainer.nativeElement.scrollHeight - this.notificationContainer.nativeElement.scrollTop) >=
-                (this.notificationContainer.nativeElement.clientHeight - 10)) {
-                scrollToBottom = true;
-            }
-            this.newEventArriving = true;
-            const notification: NotificationItem = JSON.parse(message.body) as NotificationItem;
+  static readonly NOTIFICATIONS_APP_ID = 'org.apache.streampipes.sinks.internal.jvm.notification';
+  static readonly NOTIFICATION_TITLE_KEY = 'title';
+
+  @ViewChild('notificationPane') private notificationContainer: ElementRef;
+
+  allNotifications: Map<string, NotificationItem[]> = new Map();
+  unreadNotifications: any;
+  existingNotifications: ExistingNotification[] = [];
+  currentlySelectedNotification: ExistingNotification;
+  currentlySelectedNotificationId: string;
+
+  pipelinesWithNotificationsPresent = false;
+  notificationsLoading = false;
+
+  currentOffset = 0;
+  liveOffset = 0;
+  previousScrollHeight: number;
+
+  subscription: Subscription;
+  notificationTopic: string;
+
+  newNotificationInfo: boolean[] = [];
+
+  newEventArriving = false;
+  lastFetchTime = new Date().getTime();
+
+  constructor(private authService: AuthService,
+              private pipelineService: PipelineService,
+              public elementIconText: ElementIconText,
+              private notificationService: NotificationsService,
+              private notificationCountService: NotificationCountService) {
+    this.unreadNotifications = [];
+  }
+
+  ngOnInit() {
+    this.getPipelinesWithNotifications();
+    this.subscription = timer(0, 5000).pipe(
+      filter(() => (this.currentlySelectedNotification !== undefined && this.allNotifications.size > 0)),
+      switchMap(() => this.notificationService.getNotificationsFromTime(
+        this.lastFetchTime)))
+      .subscribe((notifications) => {
+        let scrollToBottom = false;
+        if (notifications.length > 0) {
+          if ((this.notificationContainer.nativeElement.scrollHeight - this.notificationContainer.nativeElement.scrollTop) <=
+            (this.notificationContainer.nativeElement.clientHeight + 10) &&
+            (this.notificationContainer.nativeElement.scrollHeight - this.notificationContainer.nativeElement.scrollTop) >=
+            (this.notificationContainer.nativeElement.clientHeight - 10)) {
+            scrollToBottom = true;
+          }
+          this.newEventArriving = true;
+          notifications.forEach(notification => {
             const notificationId = NotificationUtils.makeNotificationId(notification.correspondingPipelineId, notification.title);
+            const existingNots = this.allNotifications.get(notificationId);
+            existingNots.push(notification);
+            this.allNotifications.set(notificationId, existingNots);
             if (this.currentlySelectedNotificationId === notificationId) {
-                this.notifications.push(notification);
-                this.liveOffset++;
-                notification.read = true;
-                setTimeout(() => {
-                    this.notificationService.updateNotification(notification).subscribe();
-                }, 500);
-
+              this.liveOffset++;
+              notification.read = true;
+              setTimeout(() => {
+                this.notificationService.updateNotification(notification).subscribe();
+              }, 500);
             } else {
-                this.newNotificationInfo[notificationId] = true;
-            }
-            if (scrollToBottom) {
-                setTimeout(() => {
-                    this.notificationContainer.nativeElement.scrollTop = this.notificationContainer.nativeElement.scrollHeight + 100;
-                });
+              this.newNotificationInfo[notificationId] = true;
             }
+          });
 
-            this.newEventArriving = false;
-        });
-    }
-
-    getPipelinesWithNotifications() {
-        this.notificationsLoading = true;
-        this.pipelineService.getOwnPipelines().subscribe(pipelines => {
-            this.filterForNotifications(pipelines);
-            this.notificationsLoading = false;
-            if (this.existingNotifications.length > 0) {
-                this.pipelinesWithNotificationsPresent = true;
-                this.selectNotification(this.existingNotifications[0]);
-                this.createSubscription();
-            }
+          if (scrollToBottom) {
+            setTimeout(() => {
+              this.scrollToBottom();
+            });
+          }
+        }
+        this.lastFetchTime = new Date().getTime();
+        this.newEventArriving = false;
+      });
+  }
+
+  scrollToBottom() {
+   setTimeout(() => {
+     this.notificationContainer.nativeElement.scrollTop = this.notificationContainer.nativeElement.scrollHeight + 100;
+   }, 200);
+  }
+
+  getPipelinesWithNotifications() {
+    this.notificationsLoading = true;
+    this.pipelineService.getOwnPipelines().subscribe(pipelines => {
+      this.existingNotifications = this.getAllExistingNotifications(pipelines);
+      this.notificationsLoading = false;
+      if (this.existingNotifications.length > 0) {
+        this.pipelinesWithNotificationsPresent = true;
+        this.selectNotification(this.existingNotifications[0]);
+        this.existingNotifications.forEach(notification => {
+          this.getNotifications(notification, this.currentOffset, 10, true);
         });
-    }
-
-    filterForNotifications(pipelines: Pipeline[]) {
-        pipelines.forEach(pipeline => {
-           const notificationActions = pipeline.actions.filter(sink => sink.appId === NotificationsComponent.NOTIFICATIONS_APP_ID);
-             notificationActions.forEach(notificationAction => {
-                const notificationName = notificationAction
-                    .staticProperties
-                    .filter(sp => sp.internalName === NotificationsComponent.NOTIFICATION_TITLE_KEY)
-                    .map(sp => (sp as FreeTextStaticProperty).value)[0];
-                const pipelineName = pipeline.name;
-                this.existingNotifications.push({
-                    notificationTitle: notificationName,
-                    pipelineName,
-                    pipelineId: pipeline._id,
-                    notificationId: NotificationUtils.makeNotificationId(pipeline._id, notificationName)});
-             });
+      }
+    });
+  }
+
+  getAllExistingNotifications(pipelines: Pipeline[]): ExistingNotification[] {
+    const existingNotifications = [];
+    pipelines.forEach(pipeline => {
+      const notificationActions = pipeline.actions.filter(sink => sink.appId === NotificationsComponent.NOTIFICATIONS_APP_ID);
+      notificationActions.forEach(notificationAction => {
+        const notificationName = notificationAction
+          .staticProperties
+          .filter(sp => sp.internalName === NotificationsComponent.NOTIFICATION_TITLE_KEY)
+          .map(sp => (sp as FreeTextStaticProperty).value)[0];
+        const pipelineName = pipeline.name;
+        existingNotifications.push({
+          notificationTitle: notificationName,
+          pipelineName,
+          pipelineId: pipeline._id,
+          notificationId: NotificationUtils.makeNotificationId(pipeline._id, notificationName)
         });
-    }
-
-    getNotifications(notification: ExistingNotification, offset: number, count: number, scrollToBottom: boolean) {
-        this.notificationService.getNotifications(notification, offset, count).subscribe(notifications => {
-            notifications.sort((a, b) => {
-                return (a.createdAtTimestamp - b.createdAtTimestamp);
-            });
-            this.notifications.unshift(...notifications);
-            if (scrollToBottom) {
-                setTimeout(() => {
-                    this.notificationContainer.nativeElement.scrollTop = this.notificationContainer.nativeElement.scrollHeight;
-                });
-            } else {
-                setTimeout(() => {
-                    this.notificationContainer.nativeElement.scrollTop =
-                        this.notificationContainer.nativeElement.scrollHeight - this.previousScrollHeight;
-                });
-            }
-            notifications.forEach(n => {
-                if (!n.read) {
-                    n.read = true;
-                    this.notificationCountService.decreaseNotificationCount();
-                    this.notificationService.updateNotification(n).subscribe();
-                }
-            });
+      });
+    });
+    return existingNotifications;
+  }
+
+  getNotifications(notification: ExistingNotification,
+                   offset: number,
+                   count: number,
+                   scrollToBottom: boolean) {
+    this.notificationService.getNotifications(notification, offset, count).subscribe(notifications => {
+      notifications.sort((a, b) => {
+        return (a.createdAtTimestamp - b.createdAtTimestamp);
+      });
+      const notificationId = NotificationUtils.makeNotificationId(notification.pipelineId, notification.notificationTitle);
+      if (!this.allNotifications.has(notificationId)) {
+        this.allNotifications.set(notificationId, []);
+      }
+      this.allNotifications.get(notificationId).unshift(...notifications);
+      if (scrollToBottom) {
+        setTimeout(() => {
+          this.scrollToBottom();
+        }, 500);
+      } else {
+        setTimeout(() => {
+          this.notificationContainer.nativeElement.scrollTop =
+            this.notificationContainer.nativeElement.scrollHeight - this.previousScrollHeight;
         });
+      }
+      this.updateUnreadNotifications(notifications);
+    });
+  }
+
+  updateUnreadNotifications(notifications: NotificationItem[]) {
+    notifications.forEach(n => {
+      if (!n.read) {
+        n.read = true;
+        this.notificationCountService.decreaseNotificationCount();
+        this.notificationService.updateNotification(n).subscribe();
+      }
+    });
+  }
+
+  selectNotification(notification: ExistingNotification) {
+    this.currentOffset = 0;
+    this.liveOffset = 0;
+    this.currentlySelectedNotification = notification;
+    this.currentlySelectedNotificationId = NotificationUtils.makeNotificationIdFromNotification(notification);
+    if (this.allNotifications.has(this.currentlySelectedNotificationId)) {
+      this.updateUnreadNotifications(this.allNotifications.get(this.currentlySelectedNotificationId));
     }
-
-    selectNotification(notification: ExistingNotification) {
-        this.notifications = [];
-        this.currentOffset = 0;
-        this.liveOffset = 0;
-        this.currentlySelectedNotification = notification;
-        this.currentlySelectedNotificationId = NotificationUtils.makeNotificationIdFromNotification(notification);
-        this.notificationCountService.lockIncreaseUpdateForId(this.currentlySelectedNotificationId);
-        this.getNotifications(notification, this.currentOffset, 10, true);
+    this.newNotificationInfo[this.currentlySelectedNotificationId] = false;
+    this.notificationCountService.lockIncreaseUpdateForId(this.currentlySelectedNotificationId);
+    if (this.notificationContainer) {
+      this.scrollToBottom();
     }
-
-    onScroll(event: any) {
-        if (this.notificationContainer.nativeElement.scrollTop === 0) {
-            this.currentOffset += 10;
-            this.previousScrollHeight = this.notificationContainer.nativeElement.scrollHeight;
-            this.getNotifications(this.currentlySelectedNotification, this.notifications.length + this.liveOffset, 10, false);
-        }
+  }
+
+  onScroll() {
+    if (this.notificationContainer.nativeElement.scrollTop === 0) {
+      this.currentOffset += 10;
+      this.previousScrollHeight = this.notificationContainer.nativeElement.scrollHeight;
+      const currentNotifications = this.allNotifications.get(this.currentlySelectedNotificationId);
+      this.getNotifications(this.currentlySelectedNotification, currentNotifications.length + this.liveOffset, 10, false);
     }
+  }
 
-    ngOnDestroy(): void {
-        if (this.subscription) {
-            this.subscription.unsubscribe();
-        }
-        this.notificationCountService.unlockIncreaseUpdate();
+  ngOnDestroy(): void {
+    if (this.subscription) {
+      this.subscription.unsubscribe();
     }
+    this.notificationCountService.unlockIncreaseUpdate();
+  }
 }
diff --git a/ui/src/app/notifications/service/notifications.service.ts b/ui/src/app/notifications/service/notifications.service.ts
index 41f88bf..f359b47 100644
--- a/ui/src/app/notifications/service/notifications.service.ts
+++ b/ui/src/app/notifications/service/notifications.service.ts
@@ -19,7 +19,6 @@ import { HttpClient } from '@angular/common/http';
 import { Observable } from 'rxjs';
 import {
     ExistingNotification,
-    NotificationCount,
     NotificationItem
 } from '../model/notifications.model';
 import { Injectable } from '@angular/core';
@@ -34,15 +33,21 @@ export class NotificationsService {
                 private platformServicesCommons: PlatformServicesCommons) {
     }
 
-    getUnreadNotificationsCount(): Observable<NotificationCount> {
-        return this.http.get(this.notificationUrl + '/count').pipe(map(data => {
-            return data as NotificationCount;
-        }));
+    getNotificationsFromTime(startTime: number): Observable<NotificationItem[]> {
+        return this.http
+          .get(this.notificationUrl
+            + '/time'
+            + '?'
+            + 'startTime=' + startTime)
+          .pipe(map(data => {
+              return data as NotificationItem[];
+          }));
     }
 
     getNotifications(existingNotification: ExistingNotification, offset: number, limit: number): Observable<NotificationItem[]> {
         return this.http
             .get(this.notificationUrl
+                + '/offset'
                 + '?'
                 + 'notificationType=' + NotificationUtils.makeNotificationIdFromNotification(existingNotification)
                 + '&'
diff --git a/ui/src/app/services/notification-count-service.ts b/ui/src/app/services/notification-count-service.ts
index bfdacf4..bc8a8ec 100644
--- a/ui/src/app/services/notification-count-service.ts
+++ b/ui/src/app/services/notification-count-service.ts
@@ -17,12 +17,12 @@
 
 import { Injectable } from '@angular/core';
 import { RestApi } from './rest-api.service';
-import { NotificationItem } from '../notifications/model/notifications.model';
+import { BehaviorSubject } from 'rxjs';
 
 @Injectable()
 export class NotificationCountService {
 
-    unreadNotificationCount = 0;
+    public unreadNotificationCount$ = new BehaviorSubject(0);
     lockNotificationId: string;
     lockActive = false;
 
@@ -32,20 +32,13 @@ export class NotificationCountService {
     loadUnreadNotifications() {
         this.restApi.getUnreadNotificationsCount()
             .subscribe(response => {
-                this.unreadNotificationCount = response.count;
+                this.unreadNotificationCount$.next(response.count);
             });
     }
 
-    increaseNotificationCount(notification: NotificationItem) {
-        const id = this.makeId(notification.correspondingPipelineId, notification.title);
-        if (this.lockActive && (id === this.lockNotificationId)) {
-        } else {
-            this.unreadNotificationCount = this.unreadNotificationCount + 1;
-        }
-    }
-
     decreaseNotificationCount() {
-        this.unreadNotificationCount = this.unreadNotificationCount - 1;
+        const newValue = Math.max(this.unreadNotificationCount$.value - 1, 0);
+        this.unreadNotificationCount$.next(newValue);
     }
 
     lockIncreaseUpdateForId(notificationId: string) {
diff --git a/ui/src/app/services/rest-api.service.ts b/ui/src/app/services/rest-api.service.ts
index ed2084a..23eae54 100644
--- a/ui/src/app/services/rest-api.service.ts
+++ b/ui/src/app/services/rest-api.service.ts
@@ -49,7 +49,7 @@ export class RestApi {
     }
 
     getUnreadNotificationsCount(): Observable<any> {
-        return this.$http.get(this.urlApiBase() + '/notifications/count');
+        return this.$http.get(this.urlApiBase() + '/notifications/count', { headers: { ignoreLoadingBar: '' }});
     }
 
     getDomainKnowledgeItems(query) {