You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/06/30 09:48:55 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-552] Cancel subscriptions of overwritten data explorer queries

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new a79bf2a33 [STREAMPIPES-552] Cancel subscriptions of overwritten data explorer queries
a79bf2a33 is described below

commit a79bf2a33ca036b3af36f8e4bdeed4e419f40c28
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jun 30 11:48:45 2022 +0200

    [STREAMPIPES-552] Cancel subscriptions of overwritten data explorer queries
---
 .../base/base-data-explorer-widget.directive.ts    | 48 +++++++++++++---------
 1 file changed, 28 insertions(+), 20 deletions(-)

diff --git a/ui/src/app/data-explorer/components/widgets/base/base-data-explorer-widget.directive.ts b/ui/src/app/data-explorer/components/widgets/base/base-data-explorer-widget.directive.ts
index ca2e91b70..17e657284 100644
--- a/ui/src/app/data-explorer/components/widgets/base/base-data-explorer-widget.directive.ts
+++ b/ui/src/app/data-explorer/components/widgets/base/base-data-explorer-widget.directive.ts
@@ -31,10 +31,11 @@ import {
 } from '@streampipes/platform-services';
 import { ResizeService } from '../../../services/resize.service';
 import { FieldProvider } from '../../../models/dataview-dashboard.model';
-import { Observable, Subscription, zip } from 'rxjs';
+import { Observable, Subject, Subscription, zip } from 'rxjs';
 import { DataExplorerFieldProviderService } from '../../../services/data-explorer-field-provider-service';
 import { BaseWidgetData } from './data-explorer-widget-data';
 import { TimeSelectionService } from '../../../services/time-selection.service';
+import { switchMap } from 'rxjs/operators';
 
 @Directive()
 export abstract class BaseDataExplorerWidgetDirective<T extends DataExplorerWidgetModel> implements BaseWidgetData<T>, OnInit, OnDestroy {
@@ -78,6 +79,8 @@ export abstract class BaseDataExplorerWidgetDirective<T extends DataExplorerWidg
   resizeSub: Subscription;
   timeSelectionSub: Subscription;
 
+  requestQueue$: Subject<Observable<SpQueryResult>[]> = new Subject<Observable<SpQueryResult>[]>();
+
   constructor(protected dataLakeRestService: DatalakeRestService,
               protected widgetConfigurationService: WidgetConfigurationService,
               protected resizeService: ResizeService,
@@ -92,6 +95,16 @@ export abstract class BaseDataExplorerWidgetDirective<T extends DataExplorerWidg
     this.showData = true;
     const sourceConfigs = this.dataExplorerWidget.dataConfig.sourceConfigs;
     this.fieldProvider = this.fieldService.generateFieldLists(sourceConfigs);
+
+    this.requestQueue$.pipe(switchMap((observables) => {
+      return zip(...observables);
+    })).subscribe(results => {
+      results.forEach((result, index) => result.sourceIndex = index);
+      this.validateReceivedData(results);
+      this.refreshView();
+      this.timerCallback.emit(false);
+    });
+
     this.widgetConfigurationSub = this.widgetConfigurationService.configurationChangedSubject.subscribe(refreshMessage => {
       if (refreshMessage.widgetId === this.dataExplorerWidget._id) {
         if (refreshMessage.refreshData) {
@@ -129,6 +142,7 @@ export abstract class BaseDataExplorerWidgetDirective<T extends DataExplorerWidg
       this.resizeSub.unsubscribe();
     }
     this.timeSelectionSub.unsubscribe();
+    this.requestQueue$.unsubscribe();
   }
 
   public removeWidget() {
@@ -158,30 +172,24 @@ export abstract class BaseDataExplorerWidgetDirective<T extends DataExplorerWidg
 
     if (includeTooMuchEventsParameter && !this.dataExplorerWidget.dataConfig.ignoreTooMuchDataWarning) {
       observables = this
-          .dataViewQueryGeneratorService
-          .generateObservables(
-              this.timeSettings.startTime,
-              this.timeSettings.endTime,
-              this.dataExplorerWidget.dataConfig as DataExplorerDataConfig,
-              BaseDataExplorerWidgetDirective.TOO_MUCH_DATA_PARAMETER
-          );
+        .dataViewQueryGeneratorService
+        .generateObservables(
+          this.timeSettings.startTime,
+          this.timeSettings.endTime,
+          this.dataExplorerWidget.dataConfig as DataExplorerDataConfig,
+          BaseDataExplorerWidgetDirective.TOO_MUCH_DATA_PARAMETER
+        );
     } else {
       observables = this
-          .dataViewQueryGeneratorService
-          .generateObservables(
-              this.timeSettings.startTime,
-              this.timeSettings.endTime,
-              this.dataExplorerWidget.dataConfig as DataExplorerDataConfig);
+        .dataViewQueryGeneratorService
+        .generateObservables(
+          this.timeSettings.startTime,
+          this.timeSettings.endTime,
+          this.dataExplorerWidget.dataConfig as DataExplorerDataConfig);
     }
 
     this.timerCallback.emit(true);
-    zip(...observables).subscribe(results => {
-      results.forEach((result, index) => result.sourceIndex = index);
-      this.validateReceivedData(results);
-      // this.onDataReceived(results);
-      this.refreshView();
-      this.timerCallback.emit(false);
-    });
+    this.requestQueue$.next(observables);
   }
 
   validateReceivedData(spQueryResults: SpQueryResult[]) {