You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/06/30 09:48:55 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-552] Cancel subscriptions of overwritten data explorer queries
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new a79bf2a33 [STREAMPIPES-552] Cancel subscriptions of overwritten data explorer queries
a79bf2a33 is described below
commit a79bf2a33ca036b3af36f8e4bdeed4e419f40c28
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jun 30 11:48:45 2022 +0200
[STREAMPIPES-552] Cancel subscriptions of overwritten data explorer queries
---
.../base/base-data-explorer-widget.directive.ts | 48 +++++++++++++---------
1 file changed, 28 insertions(+), 20 deletions(-)
diff --git a/ui/src/app/data-explorer/components/widgets/base/base-data-explorer-widget.directive.ts b/ui/src/app/data-explorer/components/widgets/base/base-data-explorer-widget.directive.ts
index ca2e91b70..17e657284 100644
--- a/ui/src/app/data-explorer/components/widgets/base/base-data-explorer-widget.directive.ts
+++ b/ui/src/app/data-explorer/components/widgets/base/base-data-explorer-widget.directive.ts
@@ -31,10 +31,11 @@ import {
} from '@streampipes/platform-services';
import { ResizeService } from '../../../services/resize.service';
import { FieldProvider } from '../../../models/dataview-dashboard.model';
-import { Observable, Subscription, zip } from 'rxjs';
+import { Observable, Subject, Subscription, zip } from 'rxjs';
import { DataExplorerFieldProviderService } from '../../../services/data-explorer-field-provider-service';
import { BaseWidgetData } from './data-explorer-widget-data';
import { TimeSelectionService } from '../../../services/time-selection.service';
+import { switchMap } from 'rxjs/operators';
@Directive()
export abstract class BaseDataExplorerWidgetDirective<T extends DataExplorerWidgetModel> implements BaseWidgetData<T>, OnInit, OnDestroy {
@@ -78,6 +79,8 @@ export abstract class BaseDataExplorerWidgetDirective<T extends DataExplorerWidg
resizeSub: Subscription;
timeSelectionSub: Subscription;
+ requestQueue$: Subject<Observable<SpQueryResult>[]> = new Subject<Observable<SpQueryResult>[]>();
+
constructor(protected dataLakeRestService: DatalakeRestService,
protected widgetConfigurationService: WidgetConfigurationService,
protected resizeService: ResizeService,
@@ -92,6 +95,16 @@ export abstract class BaseDataExplorerWidgetDirective<T extends DataExplorerWidg
this.showData = true;
const sourceConfigs = this.dataExplorerWidget.dataConfig.sourceConfigs;
this.fieldProvider = this.fieldService.generateFieldLists(sourceConfigs);
+
+ this.requestQueue$.pipe(switchMap((observables) => {
+ return zip(...observables);
+ })).subscribe(results => {
+ results.forEach((result, index) => result.sourceIndex = index);
+ this.validateReceivedData(results);
+ this.refreshView();
+ this.timerCallback.emit(false);
+ });
+
this.widgetConfigurationSub = this.widgetConfigurationService.configurationChangedSubject.subscribe(refreshMessage => {
if (refreshMessage.widgetId === this.dataExplorerWidget._id) {
if (refreshMessage.refreshData) {
@@ -129,6 +142,7 @@ export abstract class BaseDataExplorerWidgetDirective<T extends DataExplorerWidg
this.resizeSub.unsubscribe();
}
this.timeSelectionSub.unsubscribe();
+ this.requestQueue$.unsubscribe();
}
public removeWidget() {
@@ -158,30 +172,24 @@ export abstract class BaseDataExplorerWidgetDirective<T extends DataExplorerWidg
if (includeTooMuchEventsParameter && !this.dataExplorerWidget.dataConfig.ignoreTooMuchDataWarning) {
observables = this
- .dataViewQueryGeneratorService
- .generateObservables(
- this.timeSettings.startTime,
- this.timeSettings.endTime,
- this.dataExplorerWidget.dataConfig as DataExplorerDataConfig,
- BaseDataExplorerWidgetDirective.TOO_MUCH_DATA_PARAMETER
- );
+ .dataViewQueryGeneratorService
+ .generateObservables(
+ this.timeSettings.startTime,
+ this.timeSettings.endTime,
+ this.dataExplorerWidget.dataConfig as DataExplorerDataConfig,
+ BaseDataExplorerWidgetDirective.TOO_MUCH_DATA_PARAMETER
+ );
} else {
observables = this
- .dataViewQueryGeneratorService
- .generateObservables(
- this.timeSettings.startTime,
- this.timeSettings.endTime,
- this.dataExplorerWidget.dataConfig as DataExplorerDataConfig);
+ .dataViewQueryGeneratorService
+ .generateObservables(
+ this.timeSettings.startTime,
+ this.timeSettings.endTime,
+ this.dataExplorerWidget.dataConfig as DataExplorerDataConfig);
}
this.timerCallback.emit(true);
- zip(...observables).subscribe(results => {
- results.forEach((result, index) => result.sourceIndex = index);
- this.validateReceivedData(results);
- // this.onDataReceived(results);
- this.refreshView();
- this.timerCallback.emit(false);
- });
+ this.requestQueue$.next(observables);
}
validateReceivedData(spQueryResults: SpQueryResult[]) {