You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by eb...@apache.org on 2021/08/04 13:06:04 UTC

[incubator-streampipes] 04/05: [STREAMPIPES-319] Add basic functionalities to Datalake REST Service

This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-319
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 635d12ec919ea8cb82dd50939b9a83f4b00b7634
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Wed Aug 4 15:04:46 2021 +0200

    [STREAMPIPES-319] Add basic functionalities to Datalake REST Service
---
 .../apis/datalake-rest.service.ts                  | 159 ++++++++++++++++++++-
 1 file changed, 158 insertions(+), 1 deletion(-)

diff --git a/ui/src/app/platform-services/apis/datalake-rest.service.ts b/ui/src/app/platform-services/apis/datalake-rest.service.ts
index a90fdf4..d8cfd0d 100644
--- a/ui/src/app/platform-services/apis/datalake-rest.service.ts
+++ b/ui/src/app/platform-services/apis/datalake-rest.service.ts
@@ -17,8 +17,15 @@
  */
 
 import { Injectable } from '@angular/core';
-import { HttpClient } from '@angular/common/http';
+import { HttpClient, HttpRequest } from '@angular/common/http';
 import { AuthStatusService } from '../../services/auth-status.service';
+import { Observable } from 'rxjs';
+import { DataLakeMeasure } from '../../core-model/gen/streampipes-model';
+import { map } from 'rxjs/operators';
+import { DataResult } from '../../core-model/datalake/DataResult';
+import { DatalakeQueryParameters } from '../../core-services/datalake/DatalakeQueryParameters';
+import { PageResult } from '../../core-model/datalake/PageResult';
+import { GroupedDataResult } from '../../core-model/datalake/GroupedDataResult';
 
 @Injectable()
 export class DatalakeRestService {
@@ -33,4 +40,154 @@ export class DatalakeRestService {
     private get dataLakeUrl() {
         return this.baseUrl + '/api/v4/users/' + this.authStatusService.email + '/datalake';
     }
+
+    getAllMeasurementSeries(): Observable<DataLakeMeasure[]> {
+        const url = this.dataLakeUrl + '/measurements/';
+        return this.http.get(url).pipe(map(response => {
+            return (response as any[]).map(p => DataLakeMeasure.fromData(p));
+        }));
+    }
+
+    getData(index, startDate, endDate, columns, aggregationFunction, aggregationTimeUnit, aggregationTimeValue): Observable<DataResult> {
+        const url = this.dataLakeUrl + '/measurements/' + index;
+        const timeInterval = aggregationTimeValue + aggregationTimeUnit;
+
+        const queryParams: DatalakeQueryParameters = this.getQueryParameters(columns, startDate, endDate, undefined, undefined,
+            undefined, undefined, undefined, aggregationFunction, timeInterval);
+
+        // @ts-ignore
+        return this.http.get<DataResult>(url, {params: queryParams});
+    }
+
+    getPagedData(index: string, itemsPerPage: number, page: number, columns?: string, order?: string): Observable<PageResult> {
+        const url = this.dataLakeUrl + '/measurements/' + index;
+
+        const queryParams: DatalakeQueryParameters = this.getQueryParameters(columns, undefined, undefined, page,
+            itemsPerPage, undefined, undefined, order, undefined, undefined);
+
+        // @ts-ignore
+        return this.http.get<PageResult>(url, {params: queryParams});
+    }
+
+    getGroupedData(index: string, groupingTags: string, aggregationFunction?: string, columns?: string, startDate?: number, endDate?:
+        number, aggregationTimeUnit?: string, aggregationTimeValue?: number, order?: string, limit?: number):
+        Observable<GroupedDataResult> {
+
+        const url = this.dataLakeUrl + '/measurements/' + index;
+        let _aggregationFunction = 'mean';
+        let timeInterval = '2000ms';
+
+        if (aggregationFunction) {
+            _aggregationFunction = aggregationFunction;
+        }
+
+        if (aggregationTimeUnit && aggregationTimeValue) {
+            timeInterval = aggregationTimeValue + aggregationTimeUnit;
+        }
+
+        const queryParams: DatalakeQueryParameters = this.getQueryParameters(columns, startDate, endDate, undefined, limit,
+            undefined, groupingTags, order, _aggregationFunction, timeInterval);
+
+        // @ts-ignore
+        return this.http.get<GroupedDataResult>(url, {params: queryParams});
+    }
+
+    downloadRawData(index, format) {
+        const url = this.dataLakeUrl + '/measurements/' + index + '/download?format=' + format;
+
+        const request = new HttpRequest('GET', url, {
+            reportProgress: true,
+            responseType: 'text'
+        });
+
+        return this.http.request(request);
+    }
+
+    downloadQueriedData(index, format, startDate?, endDate?, columns?, aggregationFunction?, aggregationTimeUnit?, aggregationTimeValue?, groupingsTags?, order?, limit?, offset?) {
+        const url = this.dataLakeUrl + '/measurements/' + index + '/download?format=' + format;
+        const timeInterval = aggregationTimeValue + aggregationTimeUnit;
+
+        const queryParams: DatalakeQueryParameters = this.getQueryParameters(columns, startDate, endDate, undefined,
+            limit, offset, groupingsTags, order, aggregationFunction, timeInterval);
+
+        const request = new HttpRequest('GET', url, {
+            reportProgress: true,
+            responseType: 'text',
+            params: queryParams
+        },);
+
+        return this.http.request(request);
+    }
+
+    removeData(index: string, startDate?: number, endDate?: number) {
+        const url = this.dataLakeUrl + '/measurements/' + index;
+
+        const queryParams: DatalakeQueryParameters = this.getQueryParameters(undefined, startDate, endDate, undefined,
+            undefined, undefined, undefined, undefined, undefined, undefined);
+
+        const request = new HttpRequest('DELETE', url, {
+            reportProgress: true,
+            responseType: 'text',
+            params: queryParams
+        });
+
+        return this.http.request(request);
+    }
+
+    dropSingleMeasurementSeries(index: string) {
+        const url = this.dataLakeUrl + '/measurements/' + index + '/drop';
+        return this.http.delete(url);
+    }
+
+    dropAllMeasurementSeries() {
+        const url = this.dataLakeUrl + '/measurements/';
+        return this.http.delete(url);
+    }
+
+    private getQueryParameters(columns?: string, startDate?: number, endDate?: number, page?: number, limit?: number,
+                               offset?: number, groupBy?: string, order?: string, aggregationFunction?: string, timeInterval?: string):
+        DatalakeQueryParameters {
+        const queryParams: DatalakeQueryParameters = new DatalakeQueryParameters();
+
+        if (columns) {
+            queryParams.columns = columns;
+        }
+
+        if (startDate) {
+            queryParams.startDate = startDate;
+        }
+
+        if (endDate) {
+            queryParams.endDate = endDate;
+        }
+        if (page) {
+            queryParams.page = page;
+        }
+
+        if (limit) {
+            queryParams.limit = limit;
+        }
+
+        if (offset) {
+            queryParams.offset = offset;
+        }
+
+        if (groupBy) {
+            queryParams.groupBy = groupBy;
+        }
+
+        if (order) {
+            queryParams.order = order;
+        }
+
+        if (aggregationFunction) {
+            queryParams.aggregationFunction = aggregationFunction;
+        }
+
+        if (timeInterval) {
+            queryParams.timeInterval = timeInterval;
+        }
+
+        return queryParams;
+    }
 }