You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by eb...@apache.org on 2021/08/04 13:06:04 UTC
[incubator-streampipes] 04/05: [STREAMPIPES-319] Add basic
functionalities to Datalake REST Service
This is an automated email from the ASF dual-hosted git repository.
ebi pushed a commit to branch STREAMPIPES-319
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 635d12ec919ea8cb82dd50939b9a83f4b00b7634
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Wed Aug 4 15:04:46 2021 +0200
[STREAMPIPES-319] Add basic functionalities to Datalake REST Service
---
.../apis/datalake-rest.service.ts | 159 ++++++++++++++++++++-
1 file changed, 158 insertions(+), 1 deletion(-)
diff --git a/ui/src/app/platform-services/apis/datalake-rest.service.ts b/ui/src/app/platform-services/apis/datalake-rest.service.ts
index a90fdf4..d8cfd0d 100644
--- a/ui/src/app/platform-services/apis/datalake-rest.service.ts
+++ b/ui/src/app/platform-services/apis/datalake-rest.service.ts
@@ -17,8 +17,15 @@
*/
import { Injectable } from '@angular/core';
-import { HttpClient } from '@angular/common/http';
+import { HttpClient, HttpRequest } from '@angular/common/http';
import { AuthStatusService } from '../../services/auth-status.service';
+import { Observable } from 'rxjs';
+import { DataLakeMeasure } from '../../core-model/gen/streampipes-model';
+import { map } from 'rxjs/operators';
+import { DataResult } from '../../core-model/datalake/DataResult';
+import { DatalakeQueryParameters } from '../../core-services/datalake/DatalakeQueryParameters';
+import { PageResult } from '../../core-model/datalake/PageResult';
+import { GroupedDataResult } from '../../core-model/datalake/GroupedDataResult';
@Injectable()
export class DatalakeRestService {
@@ -33,4 +40,154 @@ export class DatalakeRestService {
private get dataLakeUrl() {
return this.baseUrl + '/api/v4/users/' + this.authStatusService.email + '/datalake';
}
+
+ getAllMeasurementSeries(): Observable<DataLakeMeasure[]> {
+ const url = this.dataLakeUrl + '/measurements/';
+ return this.http.get(url).pipe(map(response => {
+ return (response as any[]).map(p => DataLakeMeasure.fromData(p));
+ }));
+ }
+
+ getData(index, startDate, endDate, columns, aggregationFunction, aggregationTimeUnit, aggregationTimeValue): Observable<DataResult> {
+ const url = this.dataLakeUrl + '/measurements/' + index;
+ const timeInterval = aggregationTimeValue + aggregationTimeUnit;
+
+ const queryParams: DatalakeQueryParameters = this.getQueryParameters(columns, startDate, endDate, undefined, undefined,
+ undefined, undefined, undefined, aggregationFunction, timeInterval);
+
+ // @ts-ignore
+ return this.http.get<DataResult>(url, {params: queryParams});
+ }
+
+ getPagedData(index: string, itemsPerPage: number, page: number, columns?: string, order?: string): Observable<PageResult> {
+ const url = this.dataLakeUrl + '/measurements/' + index;
+
+ const queryParams: DatalakeQueryParameters = this.getQueryParameters(columns, undefined, undefined, page,
+ itemsPerPage, undefined, undefined, order, undefined, undefined);
+
+ // @ts-ignore
+ return this.http.get<PageResult>(url, {params: queryParams});
+ }
+
+ getGroupedData(index: string, groupingTags: string, aggregationFunction?: string, columns?: string, startDate?: number, endDate?:
+ number, aggregationTimeUnit?: string, aggregationTimeValue?: number, order?: string, limit?: number):
+ Observable<GroupedDataResult> {
+
+ const url = this.dataLakeUrl + '/measurements/' + index;
+ let _aggregationFunction = 'mean';
+ let timeInterval = '2000ms';
+
+ if (aggregationFunction) {
+ _aggregationFunction = aggregationFunction;
+ }
+
+ if (aggregationTimeUnit && aggregationTimeValue) {
+ timeInterval = aggregationTimeValue + aggregationTimeUnit;
+ }
+
+ const queryParams: DatalakeQueryParameters = this.getQueryParameters(columns, startDate, endDate, undefined, limit,
+ undefined, groupingTags, order, _aggregationFunction, timeInterval);
+
+ // @ts-ignore
+ return this.http.get<GroupedDataResult>(url, {params: queryParams});
+ }
+
+ downloadRawData(index, format) {
+ const url = this.dataLakeUrl + '/measurements/' + index + '/download?format=' + format;
+
+ const request = new HttpRequest('GET', url, {
+ reportProgress: true,
+ responseType: 'text'
+ });
+
+ return this.http.request(request);
+ }
+
+ downloadQueriedData(index, format, startDate?, endDate?, columns?, aggregationFunction?, aggregationTimeUnit?, aggregationTimeValue?, groupingsTags?, order?, limit?, offset?) {
+ const url = this.dataLakeUrl + '/measurements/' + index + '/download?format=' + format;
+ const timeInterval = aggregationTimeValue + aggregationTimeUnit;
+
+ const queryParams: DatalakeQueryParameters = this.getQueryParameters(columns, startDate, endDate, undefined,
+ limit, offset, groupingsTags, order, aggregationFunction, timeInterval);
+
+ const request = new HttpRequest('GET', url, {
+ reportProgress: true,
+ responseType: 'text',
+ params: queryParams
+ },);
+
+ return this.http.request(request);
+ }
+
+ removeData(index: string, startDate?: number, endDate?: number) {
+ const url = this.dataLakeUrl + '/measurements/' + index;
+
+ const queryParams: DatalakeQueryParameters = this.getQueryParameters(undefined, startDate, endDate, undefined,
+ undefined, undefined, undefined, undefined, undefined, undefined);
+
+ const request = new HttpRequest('DELETE', url, {
+ reportProgress: true,
+ responseType: 'text',
+ params: queryParams
+ });
+
+ return this.http.request(request);
+ }
+
+ dropSingleMeasurementSeries(index: string) {
+ const url = this.dataLakeUrl + '/measurements/' + index + '/drop';
+ return this.http.delete(url);
+ }
+
+ dropAllMeasurementSeries() {
+ const url = this.dataLakeUrl + '/measurements/';
+ return this.http.delete(url);
+ }
+
+ private getQueryParameters(columns?: string, startDate?: number, endDate?: number, page?: number, limit?: number,
+ offset?: number, groupBy?: string, order?: string, aggregationFunction?: string, timeInterval?: string):
+ DatalakeQueryParameters {
+ const queryParams: DatalakeQueryParameters = new DatalakeQueryParameters();
+
+ if (columns) {
+ queryParams.columns = columns;
+ }
+
+ if (startDate) {
+ queryParams.startDate = startDate;
+ }
+
+ if (endDate) {
+ queryParams.endDate = endDate;
+ }
+ if (page) {
+ queryParams.page = page;
+ }
+
+ if (limit) {
+ queryParams.limit = limit;
+ }
+
+ if (offset) {
+ queryParams.offset = offset;
+ }
+
+ if (groupBy) {
+ queryParams.groupBy = groupBy;
+ }
+
+ if (order) {
+ queryParams.order = order;
+ }
+
+ if (aggregationFunction) {
+ queryParams.aggregationFunction = aggregationFunction;
+ }
+
+ if (timeInterval) {
+ queryParams.timeInterval = timeInterval;
+ }
+
+ return queryParams;
+ }
}