You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by eb...@apache.org on 2021/06/11 18:20:08 UTC

[incubator-streampipes] branch STREAMPIPES-349 created (now 3fb5a97)

This is an automated email from the ASF dual-hosted git repository.

ebi pushed a change to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


      at 3fb5a97  Add DataLakeResourceV4 to openapi.yaml

This branch includes the following new commits:

     new ded2489  [STREAMPIPES-349] Add skeleton of the revised REST API
     new 33c617c  [STREAMPIPES-349] Add implementation of endpoint for 'getting all stored measurement series'
     new 20f8838  [STREAMPIPES-349] Rename method as well as list containing all stored measurement series
     new 2540b3e  [STREAMPIPES-349] Add implementation of endpoint for 'removing all stored measurement series'
     new 1cbb5da  [STREAMPIPES-349] Add data lake configuration and retention policy defining classes
     new 20b5586  [STREAMPIPES-349] Add label definition for data lake
     new 7a4f1c3  [STREAMPIPES-349] Add query defining classes for changes at the retention policies
     new 414b619  [STREAMPIPES-349] Update definition of configuration endpoints and add implementation
     new 12c2b39  [STREAMPIPES-349] Add first implementation of methods for getting and setting data lake configuration
     new ec066a6  Add abstract query element class and query parameter class
     new 3d11553  Add query template class (includes database specific templates for various query statements)
     new bf7eb2e  Add query element "Select From Statement" and corresponding parameter class
     new c8ca2a6  Add query element "Time Boundary" and corresponding parameter class
     new 029334f  Add query element "Ordering By Time" and corresponding parameter class
     new 4dd3411  Add query element "Grouping By Tags" and corresponding parameter class
     new aec29d6  Add query element "Grouping By Time" and corresponding parameter class
     new 6dbf7c6  Add query element "Item Limitation" and corresponding parameter class
     new 2513b45  Add query element "Offset" and corresponding parameter class
     new ee2fe10  Add query element "Delete From Statement" and corresponding parameter class
     new f53de50  Add Query Builder class
     new 015641e  Add Data Explorer Query class
     new 9d14cf9  Add implementation of endpoint for removing data from a single measurement series
     new c51509e  Add implementation of endpoint for getting data from data lake
     new b416e91  Add endpoint definition for downloading data from data lake (as csv- or json-file)
     new bee08cc  Add implementation of endpoint for downloading data from data lake
     new f974f4d  Add endpoint definition for dropping entire measurement series from data lake and removing related event property
     new f14a05b  Add implementation of endpoint for removing entire measurement series and related event property
     new 5915b93  Add DataLakeResourceV4 to StreamPipesResourceConfig
     new 3fb5a97  Add DataLakeResourceV4 to openapi.yaml

The 29 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[incubator-streampipes] 21/29: Add Data Explorer Query class

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 015641e4a2b2e5a8ebf43b819757ac3f3aa4f3c4
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 19:46:03 2021 +0200

    Add Data Explorer Query class
---
 .../dataexplorer/v4/query/DataExplorerQueryV4.java | 121 +++++++++++++++++++++
 1 file changed, 121 insertions(+)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java
new file mode 100644
index 0000000..6b06f21
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query;
+
+import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
+import org.apache.streampipes.dataexplorer.v4.params.*;
+import org.apache.streampipes.dataexplorer.v4.query.elements.*;
+import org.apache.streampipes.model.datalake.DataResult;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class DataExplorerQueryV4 {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DataExplorerQueryV4.class);
+
+
+    protected Map<String, QueryParamsV4> params;
+
+    public DataExplorerQueryV4(Map<String, QueryParamsV4> params) {
+        this.params = params;
+    }
+
+    public DataResult executeQuery() throws RuntimeException {
+        InfluxDB influxDB = DataExplorerUtils.getInfluxDBClient();
+        List<QueryElement> queryElements = getQueryElements();
+
+        QueryBuilder queryBuilder = QueryBuilder.create(BackendConfig.INSTANCE.getInfluxDatabaseName());
+        Query query = queryBuilder.build(queryElements);
+        LOG.debug("Data Lake Query (database:" + query.getDatabase() + "): " + query.getCommand());
+
+        QueryResult result = influxDB.query(query);
+        LOG.debug("Data Lake Query Result: " + result.toString());
+        DataResult dataResult = postQuery(result);
+
+        influxDB.close();
+        return dataResult;
+    }
+
+    protected DataResult convertResult(QueryResult result) {
+        if (result.getResults().get(0).getSeries() != null) {
+            return convertResult(result.getResults().get(0).getSeries().get(0));
+        } else {
+            return new DataResult();
+        }
+    }
+
+    protected DataResult convertResult(QueryResult.Series series) {
+        List<String> columns = series.getColumns();
+        for (int i = 0; i < columns.size(); i++) {
+            String replacedColumnName = columns.get(i).replaceAll("mean_", "");
+            columns.set(i, replacedColumnName);
+        }
+        List values = series.getValues();
+        return new DataResult(values.size(), columns, values);
+    }
+
+    protected DataResult postQuery(QueryResult result) throws RuntimeException {
+        return convertResult(result);
+    }
+
+    protected List<QueryElement> getQueryElements() {
+        List<QueryElement> queryElements = new ArrayList<>();
+
+        if (this.params.containsKey("SELECT")) {
+            queryElements.add(new SelectFromStatement((SelectFromStatementParams) this.params.get("SELECT")));
+        } else {
+            queryElements.add(new DeleteFromStatement((DeleteFromStatementParams) this.params.get("DELETE")));
+        }
+
+        if (this.params.containsKey("WHERE")) {
+            queryElements.add(new TimeBoundary((TimeBoundaryParams) this.params.get("WHERE")));
+        }
+
+        if (this.params.containsKey("GROUPBYTIME")) {
+            queryElements.add(new GroupingByTime((GroupingByTimeParams) this.params.get("GROUPBYTIME")));
+
+        } else if (this.params.containsKey("GROUPBY")) {
+            queryElements.add(new GroupingByTags((GroupingByTagsParams) this.params.get("GROUPBY")));
+        }
+
+        if (this.params.containsKey("DESCENDING")) {
+            queryElements.add(new OrderingByTime((OrderingByTimeParams) this.params.get("DESCENDING")));
+        } else if (this.params.containsKey("SELECT")) {
+            queryElements.add(new OrderingByTime(OrderingByTimeParams.from(this.params.get("SELECT").getIndex(), "ASC")));
+        }
+
+        if (this.params.containsKey("LIMIT")) {
+            queryElements.add(new ItemLimitation((ItemLimitationParams) this.params.get("LIMIT")));
+        }
+
+        if (this.params.containsKey("OFFSET")) {
+            queryElements.add(new Offset((OffsetParams) this.params.get("OFFSET")));
+        }
+
+        return queryElements;
+    }
+}

[incubator-streampipes] 04/29: [STREAMPIPES-349] Add implementation of endpoint for 'removing all stored measurement series'

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 2540b3e9025cf54084df1dbf86d505db54b69381
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Apr 30 10:31:21 2021 +0200

    [STREAMPIPES-349] Add implementation of endpoint for 'removing all stored measurement series'
---
 .../streampipes/dataexplorer/DataLakeManagementV4.java    | 15 +++++++++++++++
 .../org/apache/streampipes/ps/DataLakeResourceV4.java     |  6 ++----
 2 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 06e055c..2f177f6 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -18,8 +18,10 @@
 
 package org.apache.streampipes.dataexplorer;
 
+import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
 import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.influxdb.dto.QueryResult;
 
 import java.util.List;
 
@@ -29,4 +31,17 @@ public class DataLakeManagementV4 {
         List<DataLakeMeasure> allMeasurements = DataExplorerUtils.getInfos();
         return allMeasurements;
     }
+
+    public boolean removeAllMeasurements() {
+        List<DataLakeMeasure> allMeasurements = getAllMeasurements();
+
+        for (DataLakeMeasure measure : allMeasurements) {
+            QueryResult queryResult = new DeleteDataQuery(measure).executeQuery();
+            if (queryResult.hasError() || queryResult.getResults().get(0).getError() != null) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
 }
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index 942d32a..f16dc3a 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -160,9 +160,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
             responses = {
                     @ApiResponse(responseCode = "200", description = "All measurement series successfully removed")})
     public Response removeAll(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username) {
-        /**
-         * TODO: implementation of method stump
-         */
-        return null;
+        boolean isSuccess = this.dataLakeManagement.removeAllMeasurements();
+        return Response.ok(isSuccess).build();
     }
 }

[incubator-streampipes] 24/29: Add endpoint definition for downloading data from data lake (as csv- or json-file)

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit b416e9175d47b2c45ae04126c74a10603e753df5
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 20:04:14 2021 +0200

    Add endpoint definition for downloading data from data lake (as csv- or json-file)
---
 .../apache/streampipes/ps/DataLakeResourceV4.java  | 24 ++++++++++++++++++++++
 1 file changed, 24 insertions(+)

diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index fd6ab92..caf76e8 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -121,6 +121,30 @@ public class DataLakeResourceV4 extends AbstractRestResource {
     }
 
     @GET
+    @Path("/measurements/{measurementID}/download")
+    @Produces(MediaType.APPLICATION_OCTET_STREAM)
+    @Operation(summary = "Download data from a single measurement series by a given id", tags = {"Data Lake"},
+            responses = {
+                    @ApiResponse(responseCode = "400", description = "Measurement series with given id and requested query specification not found"),
+                    @ApiResponse(responseCode = "200", description = "requested data", content = @Content(schema = @Schema(implementation = DataResult.class)))})
+    public Response downloadData(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
+            , @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID
+            , @Parameter(in = ParameterIn.QUERY, description = "start date for slicing operation") @QueryParam("startDate") Long startDate
+            , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing operation") @QueryParam("endDate") Long endDate
+            , @Parameter(in = ParameterIn.QUERY, description = "page number for paging operation") @QueryParam("page") Integer page
+            , @Parameter(in = ParameterIn.QUERY, description = "maximum number of retrieved query results") @QueryParam("limit") Integer limit
+            , @Parameter(in = ParameterIn.QUERY, description = "offset") @QueryParam("offset") Integer offset
+            , @Parameter(in = ParameterIn.QUERY, description = "grouping tags (comma-separated) for grouping operation") @QueryParam("groupBy") String groupBy
+            , @Parameter(in = ParameterIn.QUERY, description = "ordering of retrieved query results (ASC or DESC - default is ASC)") @QueryParam("order") String order
+            , @Parameter(in = ParameterIn.QUERY, description = "name of aggregation function used for grouping operation") @QueryParam("aggregationFunction") String aggregationFunction
+            , @Parameter(in = ParameterIn.QUERY, description = "time interval for aggregation (e.g. 1m - one minute) for grouping operation") @QueryParam("timeInterval") String timeInterval
+            , @Parameter(in = ParameterIn.QUERY, description = "format specification (csv, json - default is csv) for data download") @QueryParam("format") String format) {
+        /**
+         * TODO: implementation of method stump
+         */
+    }
+
+    @GET
     @Path("/configuration")
     @Produces(MediaType.APPLICATION_JSON)
     @Operation(summary = "Get the configuration parameters of the data lake", tags = {"Data Lake"},

[incubator-streampipes] 29/29: Add DataLakeResourceV4 to openapi.yaml

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 3fb5a97d9e9957139cf9ad8167a4fc01e570a9bd
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 20:17:18 2021 +0200

    Add DataLakeResourceV4 to openapi.yaml
---
 streampipes-backend/src/main/resources/openapi.yaml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/streampipes-backend/src/main/resources/openapi.yaml b/streampipes-backend/src/main/resources/openapi.yaml
index db3b2a6..77a224d 100644
--- a/streampipes-backend/src/main/resources/openapi.yaml
+++ b/streampipes-backend/src/main/resources/openapi.yaml
@@ -2,6 +2,7 @@ scannerClass: io.swagger.v3.oas.integration.GenericOpenApiScanner
 resourceClasses:
   - org.apache.streampipes.ps.PipelineElementTemplateResource
   - org.apache.streampipes.rest.impl.PipelineResource
+  - org.apache.streampipes.ps.DataLakeResourceV4
 prettyPrint: true
 cacheTTL: 0
 openAPI:

[incubator-streampipes] 05/29: [STREAMPIPES-349] Add data lake configuration and retention policy defining classes

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 1cbb5da2a589aefa33cc9bacc1fe41383cdde882
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Wed May 5 10:18:32 2021 +0200

    [STREAMPIPES-349] Add data lake configuration and retention policy defining classes
---
 .../model/datalake/DataLakeConfiguration.java      | 60 +++++++++++++++++++++
 .../model/datalake/DataLakeRetentionPolicy.java    | 62 ++++++++++++++++++++++
 2 files changed, 122 insertions(+)

diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeConfiguration.java b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeConfiguration.java
new file mode 100644
index 0000000..48c8875
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeConfiguration.java
@@ -0,0 +1,60 @@
+package org.apache.streampipes.model.datalake;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DataLakeConfiguration {
+
+    private Integer batchSize;
+    private Integer flushDuration;
+
+    private List<DataLakeRetentionPolicy> retentionPolicies;
+
+    public DataLakeConfiguration() {
+        this.batchSize = 2000;
+        this.flushDuration = 500;
+        this.retentionPolicies = new ArrayList<DataLakeRetentionPolicy>();
+    }
+
+    public DataLakeConfiguration(List<DataLakeRetentionPolicy> retentionPolicies) {
+        this.batchSize = 2000;
+        this.flushDuration = 500;
+        this.retentionPolicies = retentionPolicies;
+    }
+
+    public DataLakeConfiguration(Integer batchSize, Integer flushDuration) {
+        this.batchSize = batchSize;
+        this.flushDuration = flushDuration;
+        this.retentionPolicies = new ArrayList<DataLakeRetentionPolicy>();
+    }
+
+    public DataLakeConfiguration(Integer batchSize, Integer flushDuration, List<DataLakeRetentionPolicy> retentionPolicies) {
+        this.batchSize = batchSize;
+        this.flushDuration = flushDuration;
+        this.retentionPolicies = retentionPolicies;
+    }
+
+    public List<DataLakeRetentionPolicy> getRetentionPolicies() {
+        return retentionPolicies;
+    }
+
+    public void setRetentionPolicies(List<DataLakeRetentionPolicy> retentionPolicies) {
+        this.retentionPolicies = retentionPolicies;
+    }
+
+    public Integer getBatchSize() {
+        return batchSize;
+    }
+
+    public void setBatchSize(Integer batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public Integer getFlushDuration() {
+        return flushDuration;
+    }
+
+    public void setFlushDuration(Integer flushDuration) {
+        this.flushDuration = flushDuration;
+    }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeRetentionPolicy.java b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeRetentionPolicy.java
new file mode 100644
index 0000000..c069ce1
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeRetentionPolicy.java
@@ -0,0 +1,62 @@
+package org.apache.streampipes.model.datalake;
+
+public class DataLakeRetentionPolicy {
+
+    private String name;
+    private String durationLiteral;
+    private boolean isDefault;
+
+    public DataLakeRetentionPolicy(String name, int duration, String durationUnit, boolean isDefault) {
+        this.name = name;
+        this.durationLiteral = duration + durationUnit;
+        this.isDefault = isDefault;
+    }
+
+    public DataLakeRetentionPolicy(String name) {
+        this.name = name;
+        this.isDefault = false;
+        setDurationLiteralToInfinity();
+    }
+
+    public DataLakeRetentionPolicy(String name, String durationLiteral, boolean isDefault) {
+        this.name = name;
+        this.durationLiteral = durationLiteral;
+        this.isDefault = isDefault;
+    }
+
+    public String getDurationLiteral() {
+        return durationLiteral;
+    }
+
+    public void setDurationLiteral(int duration, String durationUnit) {
+        this.durationLiteral = duration + durationUnit;
+    }
+
+    public void setDurationLiteralToInfinity() {
+        this.durationLiteral = "INF";
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public boolean isDefault() {
+        return isDefault;
+    }
+
+    public void setDefault(boolean isDefault) {
+        this.isDefault = isDefault;
+    }
+
+    @Override
+    public String toString() {
+        return "DataLakeRetentionPolicy{" +
+                "name='" + name + '\'' +
+                ", durationLiteral='" + durationLiteral + '\'' +
+                '}';
+    }
+}

[incubator-streampipes] 25/29: Add implementation of endpoint for downloading data from data lake

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit bee08cc430f3be5af2c9f6f1934225ca67f6807f
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 20:07:26 2021 +0200

    Add implementation of endpoint for downloading data from data lake
---
 .../dataexplorer/DataLakeManagementV4.java         | 123 +++++++++++++++++++++
 .../apache/streampipes/ps/DataLakeResourceV4.java  |  15 ++-
 2 files changed, 135 insertions(+), 3 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 8e175b8..b2b7192 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.dataexplorer;
 
+import com.google.gson.Gson;
 import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
 import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
 import org.apache.streampipes.dataexplorer.query.EditRetentionPolicyQuery;
@@ -31,6 +32,10 @@ import org.apache.streampipes.model.datalake.DataLakeRetentionPolicy;
 import org.apache.streampipes.model.datalake.DataResult;
 import org.influxdb.dto.QueryResult;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -47,6 +52,120 @@ public class DataLakeManagementV4 {
         return new DataExplorerQueryV4(queryParts).executeQuery();
     }
 
+    public void getDataAsStream(String measurementID, Long startDate, Long endDate, Integer page, Integer limit, Integer offset, String groupBy, String order, String aggregationFunction, String timeInterval, String format, OutputStream outputStream) throws IOException {
+        if (limit == null) {
+            limit = 500000;
+        }
+        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+
+        DataResult dataResult;
+        //JSON
+        if (format.equals("json")) {
+
+            Gson gson = new Gson();
+            int i = 0;
+            if (page != null) {
+                i = page;
+            }
+
+            boolean isFirstDataObject = true;
+
+            outputStream.write(toBytes("["));
+            do {
+                dataResult = getData(measurementID, startDate, endDate, i, limit, null, groupBy, order, aggregationFunction, timeInterval);
+
+                if (dataResult.getTotal() > 0) {
+                    for (List<Object> row : dataResult.getRows()) {
+                        if (!isFirstDataObject) {
+                            outputStream.write(toBytes(","));
+                        }
+
+                        //produce one json object
+                        boolean isFirstElementInRow = true;
+                        outputStream.write(toBytes("{"));
+                        for (int i1 = 0; i1 < row.size(); i1++) {
+                            Object element = row.get(i1);
+                            if (!isFirstElementInRow) {
+                                outputStream.write(toBytes(","));
+                            }
+                            isFirstElementInRow = false;
+                            if (i1 == 0) {
+                                try {
+                                    element = formatter.parse(element.toString()).getTime();
+                                } catch (ParseException e) {
+                                    element = element.toString();
+                                }
+                            }
+                            //produce json e.g. "name": "Pipes" or "load": 42
+                            outputStream.write(toBytes("\"" + dataResult.getHeaders().get(i1) + "\": "
+                                    + gson.toJson(element)));
+                        }
+                        outputStream.write(toBytes("}"));
+                        isFirstDataObject = false;
+                    }
+
+                    i++;
+                }
+            } while (dataResult.getTotal() > 0);
+            outputStream.write(toBytes("]"));
+
+            //CSV
+        } else if (format.equals("csv")) {
+            int i = 0;
+            if (page != null) {
+                i = page;
+            }
+
+            boolean isFirstDataObject = true;
+
+            do {
+                dataResult = getData(measurementID, startDate, endDate, i, limit, null, groupBy, order, aggregationFunction, timeInterval);
+                //Send first header
+                if (dataResult.getTotal() > 0) {
+                    if (isFirstDataObject) {
+                        boolean isFirst = true;
+                        for (int i1 = 0; i1 < dataResult.getHeaders().size(); i1++) {
+                            if (!isFirst) {
+                                outputStream.write(toBytes(";"));
+                            }
+                            isFirst = false;
+                            outputStream.write(toBytes(dataResult.getHeaders().get(i1)));
+                        }
+                    }
+                    outputStream.write(toBytes("\n"));
+                    isFirstDataObject = false;
+                }
+
+                if (dataResult.getTotal() > 0) {
+                    for (List<Object> row : dataResult.getRows()) {
+                        boolean isFirstInRow = true;
+                        for (int i1 = 0; i1 < row.size(); i1++) {
+                            Object element = row.get(i1);
+                            if (!isFirstInRow) {
+                                outputStream.write(toBytes(";"));
+                            }
+                            isFirstInRow = false;
+                            if (i1 == 0) {
+                                try {
+                                    element = formatter.parse(element.toString()).getTime();
+                                } catch (ParseException e) {
+                                    element = element.toString();
+                                }
+                            }
+                            if (element == null) {
+                                outputStream.write(toBytes(""));
+                            } else {
+                                outputStream.write(toBytes(element.toString()));
+                            }
+                        }
+                        outputStream.write(toBytes("\n"));
+                    }
+                }
+                i++;
+            } while (dataResult.getTotal() > 0);
+        }
+    }
+
     public boolean removeAllMeasurements() {
         List<DataLakeMeasure> allMeasurements = getAllMeasurements();
 
@@ -157,4 +276,8 @@ public class DataLakeManagementV4 {
         }
         return queryParts;
     }
+
+    private byte[] toBytes(String value) {
+        return value.getBytes();
+    }
 }
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index caf76e8..f2bd49b 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
 import java.util.List;
 
 class Placeholder {
@@ -139,9 +140,17 @@ public class DataLakeResourceV4 extends AbstractRestResource {
             , @Parameter(in = ParameterIn.QUERY, description = "name of aggregation function used for grouping operation") @QueryParam("aggregationFunction") String aggregationFunction
             , @Parameter(in = ParameterIn.QUERY, description = "time interval for aggregation (e.g. 1m - one minute) for grouping operation") @QueryParam("timeInterval") String timeInterval
             , @Parameter(in = ParameterIn.QUERY, description = "format specification (csv, json - default is csv) for data download") @QueryParam("format") String format) {
-        /**
-         * TODO: implementation of method stump
-         */
+
+        if (format == null) {
+            format = "csv";
+        }
+        String outputFormat = format;
+
+        StreamingOutput streamingOutput = output -> dataLakeManagement.getDataAsStream(measurementID, startDate, endDate, page, limit, offset, groupBy, order, aggregationFunction, timeInterval, outputFormat, output);
+
+        return Response.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM).
+                header("Content-Disposition", "attachment; filename=\"datalake." + outputFormat + "\"")
+                .build();
     }
 
     @GET

[incubator-streampipes] 13/29: Add query element "Time Boundary" and corresponding parameter class

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit c8ca2a67249a3491f8214babd85bda01410f8daa
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 19:03:56 2021 +0200

    Add query element "Time Boundary" and corresponding parameter class
---
 .../dataexplorer/v4/params/TimeBoundaryParams.java | 43 ++++++++++++++++++++++
 .../v4/query/elements/TimeBoundary.java            | 40 ++++++++++++++++++++
 2 files changed, 83 insertions(+)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/TimeBoundaryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/TimeBoundaryParams.java
new file mode 100644
index 0000000..c893829
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/TimeBoundaryParams.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.params;
+
+public class TimeBoundaryParams extends QueryParamsV4 {
+
+    private final Long startDate;
+    private final Long endDate;
+
+    public static TimeBoundaryParams from(String measurementID, Long startDate, Long endDate) {
+        return new TimeBoundaryParams(measurementID, startDate, endDate);
+    }
+
+    protected TimeBoundaryParams(String measurementID, Long startDate, Long endDate) {
+        super(measurementID);
+        this.startDate = startDate;
+        this.endDate = endDate;
+    }
+
+    public Long getStartDate() {
+        return startDate;
+    }
+
+    public Long getEndDate() {
+        return endDate;
+    }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/TimeBoundary.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/TimeBoundary.java
new file mode 100644
index 0000000..bae5f27
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/TimeBoundary.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.elements;
+
+import org.apache.streampipes.dataexplorer.v4.params.TimeBoundaryParams;
+import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
+
+public class TimeBoundary extends QueryElement<TimeBoundaryParams> {
+
+    public TimeBoundary(TimeBoundaryParams timeBoundaryParams) {
+        super(timeBoundaryParams);
+    }
+
+    @Override
+    protected String buildStatement(TimeBoundaryParams timeBoundaryParams) {
+        if (timeBoundaryParams.getStartDate() == null) {
+            return QueryTemplatesV4.whereTimeRightBound(timeBoundaryParams.getEndDate());
+        } else if (timeBoundaryParams.getEndDate() == null) {
+            return QueryTemplatesV4.whereTimeLeftBound(timeBoundaryParams.getStartDate());
+        } else {
+            return QueryTemplatesV4.whereTimeWithin(timeBoundaryParams.getStartDate(), timeBoundaryParams.getEndDate());
+        }
+    }
+}

[incubator-streampipes] 18/29: Add query element "Offset" and corresponding parameter class

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 2513b45bfc0713e3b4bea44404934ef337219aa7
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 19:14:55 2021 +0200

    Add query element "Offset" and corresponding parameter class
---
 .../dataexplorer/v4/params/OffsetParams.java       | 37 ++++++++++++++++++++++
 .../dataexplorer/v4/query/elements/Offset.java     | 34 ++++++++++++++++++++
 2 files changed, 71 insertions(+)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OffsetParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OffsetParams.java
new file mode 100644
index 0000000..ac6a1b3
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OffsetParams.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.params;
+
+public class OffsetParams extends QueryParamsV4 {
+
+    private final Integer offset;
+
+    public static OffsetParams from(String measurementID, Integer offset) {
+        return new OffsetParams(measurementID, offset);
+    }
+
+    public OffsetParams(String measurementID, Integer offset) {
+        super(measurementID);
+        this.offset = offset;
+    }
+
+    public Integer getOffset() {
+        return offset;
+    }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/Offset.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/Offset.java
new file mode 100644
index 0000000..c8c8c0a
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/Offset.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.elements;
+
+import org.apache.streampipes.dataexplorer.v4.params.OffsetParams;
+import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
+
+public class Offset extends QueryElement<OffsetParams> {
+
+    public Offset(OffsetParams offsetParams) {
+        super(offsetParams);
+    }
+
+    @Override
+    protected String buildStatement(OffsetParams offsetParams) {
+        return QueryTemplatesV4.offset(offsetParams.getOffset());
+    }
+}

[incubator-streampipes] 22/29: Add implementation of endpoint for removing data from a single measurement series

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 9d14cf92a9722bf2b8cc2e39374420df3a579c36
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 19:57:54 2021 +0200

    Add implementation of endpoint for removing data from a single measurement series
---
 .../dataexplorer/DataLakeManagementV4.java          | 21 +++++++++++++++++++++
 .../apache/streampipes/ps/DataLakeResourceV4.java   | 12 ++++++------
 2 files changed, 27 insertions(+), 6 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index f1bc058..76a7577 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -23,12 +23,19 @@ import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
 import org.apache.streampipes.dataexplorer.query.EditRetentionPolicyQuery;
 import org.apache.streampipes.dataexplorer.query.ShowRetentionPolicyQuery;
 import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
+import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams;
+import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
+import org.apache.streampipes.dataexplorer.v4.params.TimeBoundaryParams;
+import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
 import org.apache.streampipes.model.datalake.DataLakeConfiguration;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.datalake.DataLakeRetentionPolicy;
+import org.apache.streampipes.model.datalake.DataResult;
 import org.influxdb.dto.QueryResult;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class DataLakeManagementV4 {
 
@@ -49,6 +56,11 @@ public class DataLakeManagementV4 {
         return true;
     }
 
+    public DataResult deleteData(String measurementID, Long startDate, Long endDate) {
+        Map<String, QueryParamsV4> queryParts = getDeleteQueryParams(measurementID, startDate, endDate);
+        return new DataExplorerQueryV4(queryParts).executeQuery();
+    }
+
     public DataLakeConfiguration getDataLakeConfiguration() {
         List<DataLakeRetentionPolicy> retentionPolicies = getAllExistingRetentionPolicies();
         return new DataLakeConfiguration(retentionPolicies);
@@ -91,4 +103,13 @@ public class DataLakeManagementV4 {
          */
         return new ShowRetentionPolicyQuery(RetentionPolicyQueryParams.from("", "0s")).executeQuery();
     }
+
+    public Map<String, QueryParamsV4> getDeleteQueryParams(String measurementID, Long startDate, Long endDate) {
+        Map<String, QueryParamsV4> queryParts = new HashMap<>();
+        queryParts.put("DELETE", DeleteFromStatementParams.from(measurementID));
+        if (startDate != null || endDate != null) {
+            queryParts.put("WHERE", TimeBoundaryParams.from(measurementID, startDate, endDate));
+        }
+        return queryParts;
+    }
 }
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index 91807c1..cbef9de 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -28,6 +28,7 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse;
 import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
 import org.apache.streampipes.model.datalake.DataLakeConfiguration;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.datalake.DataResult;
 import org.apache.streampipes.rest.impl.AbstractRestResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,12 +77,11 @@ public class DataLakeResourceV4 extends AbstractRestResource {
                     @ApiResponse(responseCode = "400", description = "Measurement series with given id not found")})
     public Response deleteData(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
             , @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID
-            , @Parameter(in = ParameterIn.QUERY, description = "start date for slicing operation") @QueryParam("startDate") String startDate
-            , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing operation") @QueryParam("endDate") String endDate) {
-        /**
-         * TODO: implementation of method stump
-         */
-        return null;
+            , @Parameter(in = ParameterIn.QUERY, description = "start date for slicing operation") @QueryParam("startDate") Long startDate
+            , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing operation") @QueryParam("endDate") Long endDate) {
+
+        DataResult result = this.dataLakeManagement.deleteData(measurementID, startDate, endDate);
+        return ok();
     }
 
     @GET

[incubator-streampipes] 17/29: Add query element "Item Limitation" and corresponding parameter class

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 6dbf7c6ad328e6e4692830710de42541651f12f6
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 19:13:26 2021 +0200

    Add query element "Item Limitation" and corresponding parameter class
---
 .../v4/params/ItemLimitationParams.java            | 37 ++++++++++++++++++++++
 .../v4/query/elements/ItemLimitation.java          | 34 ++++++++++++++++++++
 2 files changed, 71 insertions(+)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ItemLimitationParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ItemLimitationParams.java
new file mode 100644
index 0000000..6656ad5
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ItemLimitationParams.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.params;
+
+public class ItemLimitationParams extends QueryParamsV4 {
+
+    private final Integer limit;
+
+    public static ItemLimitationParams from(String measurementID, Integer limit) {
+        return new ItemLimitationParams(measurementID, limit);
+    }
+
+    public ItemLimitationParams(String measurementID, Integer limit) {
+        super(measurementID);
+        this.limit = limit;
+    }
+
+    public Integer getLimit() {
+        return limit;
+    }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/ItemLimitation.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/ItemLimitation.java
new file mode 100644
index 0000000..b4f42e0
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/ItemLimitation.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.elements;
+
+import org.apache.streampipes.dataexplorer.v4.params.ItemLimitationParams;
+import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
+
+public class ItemLimitation extends QueryElement<ItemLimitationParams> {
+
+    public ItemLimitation(ItemLimitationParams itemLimitationParams) {
+        super(itemLimitationParams);
+    }
+
+    @Override
+    protected String buildStatement(ItemLimitationParams itemLimitationParams) {
+        return QueryTemplatesV4.limitItems(itemLimitationParams.getLimit());
+    }
+}

[incubator-streampipes] 28/29: Add DataLakeResourceV4 to StreamPipesResourceConfig

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 5915b93b7094c55cfe4ab4c564ae2753c2e2f634
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 20:16:46 2021 +0200

    Add DataLakeResourceV4 to StreamPipesResourceConfig
---
 .../backend/StreamPipesResourceConfig.java         | 156 +++++++++++----------
 1 file changed, 79 insertions(+), 77 deletions(-)

diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
index 2261149..1e66b44 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
@@ -20,6 +20,8 @@ package org.apache.streampipes.backend;
 
 import io.swagger.v3.jaxrs2.integration.resources.OpenApiResource;
 import org.apache.streampipes.connect.container.master.rest.*;
+import org.apache.streampipes.ps.DataLakeResourceV3;
+import org.apache.streampipes.ps.DataLakeResourceV4;
 import org.apache.streampipes.ps.PipelineElementTemplateResource;
 import org.apache.streampipes.rest.impl.*;
 import org.apache.streampipes.rest.impl.dashboard.Dashboard;
@@ -27,7 +29,6 @@ import org.apache.streampipes.rest.impl.dashboard.DashboardWidget;
 import org.apache.streampipes.rest.impl.dashboard.VisualizablePipeline;
 import org.apache.streampipes.rest.impl.datalake.DataLakeDashboardResource;
 import org.apache.streampipes.rest.impl.datalake.DataLakeNoUserResourceV3;
-import org.apache.streampipes.ps.DataLakeResourceV3;
 import org.apache.streampipes.rest.impl.datalake.DataLakeWidgetResource;
 import org.apache.streampipes.rest.impl.nouser.FileServingResource;
 import org.apache.streampipes.rest.impl.nouser.PipelineElementImportNoUser;
@@ -47,86 +48,87 @@ import javax.ws.rs.ApplicationPath;
 @ApplicationPath("/api")
 public class StreamPipesResourceConfig extends ResourceConfig {
 
-  public StreamPipesResourceConfig() {
-    register(Authentication.class);
-    register(AssetDashboard.class);
-    register(AutoComplete.class);
-    register(CategoryResource.class);
-    register(ConsulConfig.class);
-    register(ContainerProvidedOptions.class);
-    register(Couchdb.class);
-    register(DashboardWidget.class);
-    register(Dashboard.class);
-    register(DataLakeResourceV3.class);
-    register(DataLakeNoUserResourceV3.class);
-    register(DataStream.class);
-    register(Deployment.class);
-    register(FileServingResource.class);
-    register(InternalPipelineTemplates.class);
-    register(LabelResource.class);
-    register(MeasurementUnitResource.class);
-    register(Notification.class);
-    register(OntologyContext.class);
-    register(OntologyKnowledge.class);
-    register(OntologyMeasurementUnit.class);
-    register(OntologyPipelineElement.class);
-    register(PipelineCache.class);
-    register(PipelineCategory.class);
-    register(PipelineElementAsset.class);
-    register(PipelineElementCategory.class);
-    register(PipelineElementFile.class);
-    register(PipelineElementImportNoUser.class);
-    register(PipelineElementImport.class);
-    register(PipelineElementRuntimeInfo.class);
-    register(PipelineMonitoring.class);
-    register(PipelineNoUserResource.class);
-    register(PipelineTemplate.class);
-    register(PipelineResource.class);
-    register(RdfEndpoint.class);
-    register(SemanticEventConsumer.class);
-    register(SemanticEventProcessingAgent.class);
-    register(SemanticEventProducer.class);
-    register(Setup.class);
-    register(User.class);
-    register(Version.class);
-    register(PipelineElementAsset.class);
-    register(DataLakeDashboardResource.class);
-    register(DataLakeWidgetResource.class);
-    register(DataLakeResourceV3.class);
-    register(DataLakeNoUserResourceV3.class);
-    register(PipelineElementFile.class);
-    register(FileServingResource.class);
-    register(DashboardWidget.class);
-    register(Dashboard.class);
-    register(VirtualSensor.class);
-    register(Visualization.class);
-    register(VisualizablePipeline.class);
+    public StreamPipesResourceConfig() {
+        register(Authentication.class);
+        register(AssetDashboard.class);
+        register(AutoComplete.class);
+        register(CategoryResource.class);
+        register(ConsulConfig.class);
+        register(ContainerProvidedOptions.class);
+        register(Couchdb.class);
+        register(DashboardWidget.class);
+        register(Dashboard.class);
+        register(DataLakeResourceV3.class);
+        register(DataLakeNoUserResourceV3.class);
+        register(DataStream.class);
+        register(Deployment.class);
+        register(FileServingResource.class);
+        register(InternalPipelineTemplates.class);
+        register(LabelResource.class);
+        register(MeasurementUnitResource.class);
+        register(Notification.class);
+        register(OntologyContext.class);
+        register(OntologyKnowledge.class);
+        register(OntologyMeasurementUnit.class);
+        register(OntologyPipelineElement.class);
+        register(PipelineCache.class);
+        register(PipelineCategory.class);
+        register(PipelineElementAsset.class);
+        register(PipelineElementCategory.class);
+        register(PipelineElementFile.class);
+        register(PipelineElementImportNoUser.class);
+        register(PipelineElementImport.class);
+        register(PipelineElementRuntimeInfo.class);
+        register(PipelineMonitoring.class);
+        register(PipelineNoUserResource.class);
+        register(PipelineTemplate.class);
+        register(PipelineResource.class);
+        register(RdfEndpoint.class);
+        register(SemanticEventConsumer.class);
+        register(SemanticEventProcessingAgent.class);
+        register(SemanticEventProducer.class);
+        register(Setup.class);
+        register(User.class);
+        register(Version.class);
+        register(PipelineElementAsset.class);
+        register(DataLakeDashboardResource.class);
+        register(DataLakeWidgetResource.class);
+        register(DataLakeResourceV3.class);
+        register(DataLakeNoUserResourceV3.class);
+        register(PipelineElementFile.class);
+        register(FileServingResource.class);
+        register(DashboardWidget.class);
+        register(Dashboard.class);
+        register(VirtualSensor.class);
+        register(Visualization.class);
+        register(VisualizablePipeline.class);
+
+        // Serializers
+        register(GsonWithIdProvider.class);
+        register(GsonWithoutIdProvider.class);
+        register(GsonClientModelProvider.class);
+        register(JsonLdProvider.class);
+        register(JacksonSerializationProvider.class);
+        register(MultiPartFeature.class);
 
-    // Serializers
-    register(GsonWithIdProvider.class);
-    register(GsonWithoutIdProvider.class);
-    register(GsonClientModelProvider.class);
-    register(JsonLdProvider.class);
-    register(JacksonSerializationProvider.class);
-    register(MultiPartFeature.class);
-    
-    // Platform Services
-    register(PipelineElementTemplateResource.class);
+        // Platform Services
+        register(PipelineElementTemplateResource.class);
+        register(DataLakeResourceV4.class);
 
-    register(OpenApiResource.class);
+        register(OpenApiResource.class);
 
 
-    // Connect Master
-    register(WelcomePageMaster.class);
-    register(AdapterResource.class);
-    register(AdapterTemplateResource.class);
-    register(DescriptionResource.class);
-    register(SourcesResource.class);
-    register(GuessResource.class);
+        // Connect Master
+        register(WelcomePageMaster.class);
+        register(AdapterResource.class);
+        register(AdapterTemplateResource.class);
+        register(DescriptionResource.class);
+        register(SourcesResource.class);
+        register(GuessResource.class);
 //    register(MultiPartFeature.class);
-    register(UnitResource.class);
-    register(WorkerAdministrationResource.class);
-    register(RuntimeResolvableResource.class);
-  }
+        register(UnitResource.class);
+        register(WorkerAdministrationResource.class);
+        register(RuntimeResolvableResource.class);
+    }
 
 }

[incubator-streampipes] 27/29: Add implementation of endpoint for removing entire measurement series and related event property

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit f14a05b43e9648d65f45ed56e718aa63f55e5374
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 20:13:54 2021 +0200

    Add implementation of endpoint for removing entire measurement series and related event property
---
 .../dataexplorer/DataLakeManagementV4.java         | 39 ++++++++++++++++++++++
 .../apache/streampipes/ps/DataLakeResourceV4.java  | 15 +++++++--
 2 files changed, 51 insertions(+), 3 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index b2b7192..1eed837 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.dataexplorer;
 
 import com.google.gson.Gson;
+import com.google.gson.JsonObject;
 import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
 import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
 import org.apache.streampipes.dataexplorer.query.EditRetentionPolicyQuery;
@@ -30,7 +31,9 @@ import org.apache.streampipes.model.datalake.DataLakeConfiguration;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.datalake.DataLakeRetentionPolicy;
 import org.apache.streampipes.model.datalake.DataResult;
+import org.apache.streampipes.storage.couchdb.utils.Utils;
 import org.influxdb.dto.QueryResult;
+import org.lightcouch.CouchDbClient;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -178,6 +181,21 @@ public class DataLakeManagementV4 {
         return true;
     }
 
+    public boolean removeMeasurement(String measurementID) {
+        List<DataLakeMeasure> allMeasurements = getAllMeasurements();
+        for (DataLakeMeasure measure : allMeasurements) {
+            if (measure.getMeasureName().equals(measurementID)) {
+                QueryResult queryResult = new DeleteDataQuery(new DataLakeMeasure(measurementID, null)).executeQuery();
+
+                if (queryResult.hasError() || queryResult.getResults().get(0).getError() != null) {
+                    return false;
+                }
+                return true;
+            }
+        }
+        return false;
+    }
+
     public DataResult deleteData(String measurementID, Long startDate, Long endDate) {
         Map<String, QueryParamsV4> queryParts = getDeleteQueryParams(measurementID, startDate, endDate);
         return new DataExplorerQueryV4(queryParts).executeQuery();
@@ -226,6 +244,27 @@ public class DataLakeManagementV4 {
         return new ShowRetentionPolicyQuery(RetentionPolicyQueryParams.from("", "0s")).executeQuery();
     }
 
+    public boolean removeEventProperty(String measurementID) {
+        boolean isSuccess = false;
+        CouchDbClient couchDbClient = Utils.getCouchDbDataLakeClient();
+        List<JsonObject> docs = couchDbClient.view("_all_docs").includeDocs(true).query(JsonObject.class);
+
+        for (JsonObject document : docs) {
+            if (document.get("measureName").toString().replace("\"", "").equals(measurementID)) {
+                couchDbClient.remove(document.get("_id").toString().replace("\"", ""), document.get("_rev").toString().replace("\"", ""));
+                isSuccess = true;
+                break;
+            }
+        }
+
+        try {
+            couchDbClient.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return isSuccess;
+    }
+
     private Map<String, QueryParamsV4> getQueryParams(String measurementID, Long startDate, Long endDate, Integer page, Integer limit, Integer offset, String groupBy, String order, String aggregationFunction, String timeInterval) {
         Map<String, QueryParamsV4> queryParts = new HashMap<>();
 
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index 53b3414..8692af0 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -94,9 +94,18 @@ public class DataLakeResourceV4 extends AbstractRestResource {
     public Response dropMeasurementSeries(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
             , @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID) {
 
-        /**
-         * TODO: implementation of method stump
-         */
+        boolean isSuccessDataLake = this.dataLakeManagement.removeMeasurement(measurementID);
+
+        if (isSuccessDataLake) {
+            boolean isSuccessEventProperty = this.dataLakeManagement.removeEventProperty(measurementID);
+            if (isSuccessEventProperty) {
+                return ok();
+            } else {
+                return Response.status(Response.Status.NOT_FOUND).entity("Event property related to measurement series with given id not found.").build();
+            }
+        } else {
+            return Response.status(Response.Status.NOT_FOUND).entity("Measurement series with given id not found.").build();
+        }
     }
 
     @GET

[incubator-streampipes] 15/29: Add query element "Grouping By Tags" and corresponding parameter class

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 4dd341156be60d772b87961fa7da23dd0e0be1a0
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 19:08:43 2021 +0200

    Add query element "Grouping By Tags" and corresponding parameter class
---
 .../v4/params/GroupingByTagsParams.java            | 42 +++++++++++++++++++++
 .../v4/query/elements/GroupingByTags.java          | 43 ++++++++++++++++++++++
 2 files changed, 85 insertions(+)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTagsParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTagsParams.java
new file mode 100644
index 0000000..bcbb02b
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTagsParams.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.params;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class GroupingByTagsParams extends QueryParamsV4 {
+    private final List<String> groupingTags;
+
+    public static GroupingByTagsParams from(String measurementID, String groupingTagsSeparatedByComma) {
+        return new GroupingByTagsParams(measurementID, groupingTagsSeparatedByComma);
+    }
+
+    public GroupingByTagsParams(String measurementID, String groupingTagsSeparatedByComma) {
+        super(measurementID);
+        this.groupingTags = new ArrayList<>();
+        for (String tag : groupingTagsSeparatedByComma.split(",")) {
+            this.groupingTags.add(tag);
+        }
+    }
+
+    public List<String> getGroupingTags() {
+        return groupingTags;
+    }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTags.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTags.java
new file mode 100644
index 0000000..61b9b9e
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTags.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.elements;
+
+import org.apache.streampipes.dataexplorer.v4.params.GroupingByTagsParams;
+import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
+
+public class GroupingByTags extends QueryElement<GroupingByTagsParams> {
+
+    public GroupingByTags(GroupingByTagsParams groupingByTagsParams) {
+        super(groupingByTagsParams);
+    }
+
+    @Override
+    protected String buildStatement(GroupingByTagsParams groupingByTagsParams) {
+        String tags = "";
+        for (String tag : groupingByTagsParams.getGroupingTags()) {
+            if (tags.equals("")) {
+                tags = tag;
+            } else {
+                tags = tags + ", " + tag;
+            }
+        }
+
+        return QueryTemplatesV4.groupByTags(tags);
+    }
+}

[incubator-streampipes] 16/29: Add query element "Grouping By Time" and corresponding parameter class

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit aec29d60ea90859b07af2957dd997a6889aa7f85
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 19:09:57 2021 +0200

    Add query element "Grouping By Time" and corresponding parameter class
---
 .../v4/params/GroupingByTimeParams.java            | 36 ++++++++++++++++++++++
 .../v4/query/elements/GroupingByTime.java          | 34 ++++++++++++++++++++
 2 files changed, 70 insertions(+)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTimeParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTimeParams.java
new file mode 100644
index 0000000..1751bcc
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTimeParams.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.params;
+
+public class GroupingByTimeParams extends QueryParamsV4 {
+    private final String timeInterval;
+
+    public static GroupingByTimeParams from(String measurementID, String timeInterval) {
+        return new GroupingByTimeParams(measurementID, timeInterval);
+    }
+
+    public GroupingByTimeParams(String measurementID, String timeInterval) {
+        super(measurementID);
+        this.timeInterval = timeInterval;
+    }
+
+    public String getTimeInterval() {
+        return this.timeInterval;
+    }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTime.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTime.java
new file mode 100644
index 0000000..dc16b65
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTime.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.elements;
+
+import org.apache.streampipes.dataexplorer.v4.params.GroupingByTimeParams;
+import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
+
+public class GroupingByTime extends QueryElement<GroupingByTimeParams> {
+
+    public GroupingByTime(GroupingByTimeParams groupingByTimeParams) {
+        super(groupingByTimeParams);
+    }
+
+    @Override
+    protected String buildStatement(GroupingByTimeParams groupingByTimeParams) {
+        return QueryTemplatesV4.groupByTime(groupingByTimeParams.getTimeInterval());
+    }
+}

[incubator-streampipes] 10/29: Add abstract query element class and query parameter class

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit ec066a6a8032e6349b834f956a15dffaf37cc273
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 18:53:02 2021 +0200

    Add abstract query element class and query parameter class
---
 .../dataexplorer/v4/params/QueryParamsV4.java      | 36 ++++++++++++++++++++++
 .../v4/query/elements/QueryElement.java            | 33 ++++++++++++++++++++
 2 files changed, 69 insertions(+)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/QueryParamsV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/QueryParamsV4.java
new file mode 100644
index 0000000..c5d2699
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/QueryParamsV4.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.dataexplorer.v4.params;
+
+public class QueryParamsV4 {
+
+    private final String index;
+
+    public static QueryParamsV4 from(String index) {
+        return new QueryParamsV4(index);
+    }
+
+    protected QueryParamsV4(String index) {
+        this.index = index;
+    }
+
+    public String getIndex() {
+        return index;
+    }
+
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/QueryElement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/QueryElement.java
new file mode 100644
index 0000000..8fd3f50
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/QueryElement.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.elements;
+
+public abstract class QueryElement<QueryElementParams> {
+    protected String queryStatement;
+
+    public QueryElement(QueryElementParams params) {
+        this.queryStatement = buildStatement(params);
+    }
+
+    protected abstract String buildStatement(QueryElementParams params);
+
+    public String getStatement() {
+        return queryStatement;
+    }
+}

[incubator-streampipes] 02/29: [STREAMPIPES-349] Add implementation of endpoint for 'getting all stored measurement series'

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 33c617cd98b4d60434b06e7221b0539a10df9d83
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Apr 30 10:06:29 2021 +0200

    [STREAMPIPES-349] Add implementation of endpoint for 'getting all stored measurement series'
---
 .../dataexplorer/DataLakeManagementV4.java         | 32 ++++++++++++++++++++++
 .../apache/streampipes/ps/DataLakeResourceV4.java  | 24 ++++++++++++----
 2 files changed, 51 insertions(+), 5 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
new file mode 100644
index 0000000..07f2b90
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer;
+
+import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+
+import java.util.List;
+
+public class DataLakeManagementV4 {
+
+    public List<DataLakeMeasure> getAllMeasurementSeries() {
+        List<DataLakeMeasure> dataLakeMeasuresList = DataExplorerUtils.getInfos();
+        return dataLakeMeasuresList;
+    }
+}
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index 35ac486..d6f51f6 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -25,20 +25,36 @@ import io.swagger.v3.oas.annotations.media.ArraySchema;
 import io.swagger.v3.oas.annotations.media.Content;
 import io.swagger.v3.oas.annotations.media.Schema;
 import io.swagger.v3.oas.annotations.responses.ApiResponse;
+import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.rest.impl.AbstractRestResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import java.util.List;
+
+class Placeholder {
+}
 
 
 @Path("v4/users/{username}/datalake")
-public class DataLakeResourceV4 {
+public class DataLakeResourceV4 extends AbstractRestResource {
 
     private static final Logger logger = LoggerFactory.getLogger(DataLakeResourceV4.class);
 
+    private DataLakeManagementV4 dataLakeManagement;
+
+    public DataLakeResourceV4() {
+        this.dataLakeManagement = new DataLakeManagementV4();
+    }
+
+    public DataLakeResourceV4(DataLakeManagementV4 dataLakeManagement) {
+        this.dataLakeManagement = dataLakeManagement;
+    }
+
 
     @POST
     @Path("/measurements/{measurementID}/configuration")
@@ -77,10 +93,8 @@ public class DataLakeResourceV4 {
             responses = {
                     @ApiResponse(responseCode = "200", description = "array of stored measurement series", content = @Content(array = @ArraySchema(schema = @Schema(implementation = DataLakeMeasure.class))))})
     public Response getAll(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username) {
-        /**
-         * TODO: implementation of method stump
-         */
-        return null;
+        List<DataLakeMeasure> allMeasurementSeries = this.dataLakeManagement.getAllMeasurementSeries();
+        return ok(allMeasurementSeries);
     }
 
     @GET

[incubator-streampipes] 23/29: Add implementation of endpoint for getting data from data lake

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit c51509e9d0a9c203de8d9f3bd40c7d63d9a02a7c
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 20:01:22 2021 +0200

    Add implementation of endpoint for getting data from data lake
---
 .../dataexplorer/DataLakeManagementV4.java         | 51 ++++++++++++++++++++--
 .../apache/streampipes/ps/DataLakeResourceV4.java  | 25 ++++++-----
 2 files changed, 61 insertions(+), 15 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 76a7577..8e175b8 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -23,9 +23,7 @@ import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
 import org.apache.streampipes.dataexplorer.query.EditRetentionPolicyQuery;
 import org.apache.streampipes.dataexplorer.query.ShowRetentionPolicyQuery;
 import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
-import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams;
-import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
-import org.apache.streampipes.dataexplorer.v4.params.TimeBoundaryParams;
+import org.apache.streampipes.dataexplorer.v4.params.*;
 import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
 import org.apache.streampipes.model.datalake.DataLakeConfiguration;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
@@ -44,6 +42,11 @@ public class DataLakeManagementV4 {
         return allMeasurements;
     }
 
+    public DataResult getData(String measurementID, Long startDate, Long endDate, Integer page, Integer limit, Integer offset, String groupBy, String order, String aggregationFunction, String timeInterval) {
+        Map<String, QueryParamsV4> queryParts = getQueryParams(measurementID, startDate, endDate, page, limit, offset, groupBy, order, aggregationFunction, timeInterval);
+        return new DataExplorerQueryV4(queryParts).executeQuery();
+    }
+
     public boolean removeAllMeasurements() {
         List<DataLakeMeasure> allMeasurements = getAllMeasurements();
 
@@ -104,6 +107,48 @@ public class DataLakeManagementV4 {
         return new ShowRetentionPolicyQuery(RetentionPolicyQueryParams.from("", "0s")).executeQuery();
     }
 
+    private Map<String, QueryParamsV4> getQueryParams(String measurementID, Long startDate, Long endDate, Integer page, Integer limit, Integer offset, String groupBy, String order, String aggregationFunction, String timeInterval) {
+        Map<String, QueryParamsV4> queryParts = new HashMap<>();
+
+        queryParts.put("SELECT", SelectFromStatementParams.from(measurementID, aggregationFunction));
+
+        if (startDate != null || endDate != null) {
+            queryParts.put("WHERE", TimeBoundaryParams.from(measurementID, startDate, endDate));
+        }
+
+
+        if (timeInterval != null && aggregationFunction != null) {
+            if (groupBy == null) {
+                queryParts.put("GROUPBYTIME", GroupingByTimeParams.from(measurementID, timeInterval));
+            } else {
+                groupBy = groupBy + ",time(" + timeInterval + ")";
+            }
+        }
+
+        if (groupBy != null) {
+            queryParts.put("GROUPBY", GroupingByTagsParams.from(measurementID, groupBy));
+        }
+
+        if (order != null) {
+            if (order.equals("DESC")) {
+                queryParts.put("DESCENDING", OrderingByTimeParams.from(measurementID, order));
+            }
+        }
+
+        if (limit != null) {
+            queryParts.put("LIMIT", ItemLimitationParams.from(measurementID, limit));
+        }
+
+        if (offset != null) {
+            queryParts.put("OFFSET", OffsetParams.from(measurementID, offset));
+        } else if (limit != null && page != null) {
+            queryParts.put("OFFSET", OffsetParams.from(measurementID, page * limit));
+        }
+
+        return queryParts;
+
+    }
+
     public Map<String, QueryParamsV4> getDeleteQueryParams(String measurementID, Long startDate, Long endDate) {
         Map<String, QueryParamsV4> queryParts = new HashMap<>();
         queryParts.put("DELETE", DeleteFromStatementParams.from(measurementID));
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index cbef9de..fd6ab92 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -95,28 +95,29 @@ public class DataLakeResourceV4 extends AbstractRestResource {
         return ok(allMeasurements);
     }
 
+
     @GET
     @Path("/measurements/{measurementID}")
     @Produces(MediaType.APPLICATION_JSON)
     @Operation(summary = "Get data from a single measurement series by a given id", tags = {"Data Lake"},
             responses = {
                     @ApiResponse(responseCode = "400", description = "Measurement series with given id and requested query specification not found"),
-                    @ApiResponse(responseCode = "200", description = "requested data", content = @Content(schema = @Schema(implementation = Placeholder.class)))})
+                    @ApiResponse(responseCode = "200", description = "requested data", content = @Content(schema = @Schema(implementation = DataResult.class)))})
     public Response getData(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
             , @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID
-            , @Parameter(in = ParameterIn.QUERY, description = "start date for slicing operation") @QueryParam("startDate") String startDate
-            , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing operation") @QueryParam("endDate") String endDate
-            , @Parameter(in = ParameterIn.QUERY, description = "page number for paging operation") @QueryParam("page") String page
-            , @Parameter(in = ParameterIn.QUERY, description = "items per page limitation for paging operation") @QueryParam("limit") Integer limit
-            , @Parameter(in = ParameterIn.QUERY, description = "offset for paging operation") @QueryParam("offset") Integer offset
+            , @Parameter(in = ParameterIn.QUERY, description = "start date for slicing operation") @QueryParam("startDate") Long startDate
+            , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing operation") @QueryParam("endDate") Long endDate
+            , @Parameter(in = ParameterIn.QUERY, description = "page number for paging operation") @QueryParam("page") Integer page
+            , @Parameter(in = ParameterIn.QUERY, description = "maximum number of retrieved query results") @QueryParam("limit") Integer limit
+            , @Parameter(in = ParameterIn.QUERY, description = "offset") @QueryParam("offset") Integer offset
             , @Parameter(in = ParameterIn.QUERY, description = "grouping tags (comma-separated) for grouping operation") @QueryParam("groupBy") String groupBy
+            , @Parameter(in = ParameterIn.QUERY, description = "ordering of retrieved query results (ASC or DESC - default is ASC)") @QueryParam("order") String order
             , @Parameter(in = ParameterIn.QUERY, description = "name of aggregation function used for grouping operation") @QueryParam("aggregationFunction") String aggregationFunction
-            , @Parameter(in = ParameterIn.QUERY, description = "time interval for aggregation (e.g. 1m - one minute) for grouping operation") @QueryParam("timeInterval") String timeInterval
-            , @Parameter(in = ParameterIn.QUERY, description = "format specification (csv, json) for data download") @QueryParam("format") String format) {
-        /**
-         * TODO: implementation of method stump
-         */
-        return null;
+            , @Parameter(in = ParameterIn.QUERY, description = "time interval for aggregation (e.g. 1m - one minute) for grouping operation") @QueryParam("timeInterval") String timeInterval) {
+
+        DataResult result = this.dataLakeManagement.getData(measurementID, startDate, endDate, page, limit, offset, groupBy, order, aggregationFunction, timeInterval);
+
+        return Response.ok(result).build();
     }
 
     @GET

[incubator-streampipes] 08/29: [STREAMPIPES-349] Update definition of configuration endpoints and add implementation

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 414b619b5d66711b679fa3fe51787ec194697b68
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Wed May 5 10:30:09 2021 +0200

    [STREAMPIPES-349] Update definition of configuration endpoints and add implementation
---
 .../apache/streampipes/ps/DataLakeResourceV4.java  | 32 +++++++++-------------
 1 file changed, 13 insertions(+), 19 deletions(-)

diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index f16dc3a..91807c1 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -26,6 +26,7 @@ import io.swagger.v3.oas.annotations.media.Content;
 import io.swagger.v3.oas.annotations.media.Schema;
 import io.swagger.v3.oas.annotations.responses.ApiResponse;
 import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
+import org.apache.streampipes.model.datalake.DataLakeConfiguration;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.rest.impl.AbstractRestResource;
 import org.slf4j.Logger;
@@ -57,17 +58,14 @@ public class DataLakeResourceV4 extends AbstractRestResource {
 
 
     @POST
-    @Path("/measurements/{measurementID}/configuration")
+    @Path("/configuration")
     @Consumes(MediaType.APPLICATION_JSON)
-    @Operation(summary = "Configure parameters of measurement series with a given id", tags = {"Data Lake"},
+    @Operation(summary = "Configure the parameters of the data lake", tags = {"Data Lake"},
             responses = {@ApiResponse(responseCode = "200", description = "Configuration was successful")})
-    public Response configureMeasurement(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
-            , @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID
-            , @Parameter(in = ParameterIn.DEFAULT, description = "the configuration parameters") Placeholder body) {
-        /**
-         * TODO: implementation of method stump
-         */
-        return null;
+    public Response configureMeasurement(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username,
+                                         @Parameter(in = ParameterIn.QUERY, description = "should any parameter be reset to its default value?") @DefaultValue("false") @QueryParam("resetToDefault") boolean resetToDefault,
+                                         @Parameter(in = ParameterIn.DEFAULT, description = "the configuration parameters") DataLakeConfiguration config) {
+        return ok(this.dataLakeManagement.editMeasurementConfiguration(config, resetToDefault));
     }
 
     @DELETE
@@ -122,18 +120,14 @@ public class DataLakeResourceV4 extends AbstractRestResource {
     }
 
     @GET
-    @Path("/measurements/{measurementID}/configuration")
+    @Path("/configuration")
     @Produces(MediaType.APPLICATION_JSON)
-    @Operation(summary = "Get configuration parameters of a measurement series by a given id", tags = {"Data Lake"},
+    @Operation(summary = "Get the configuration parameters of the data lake", tags = {"Data Lake"},
             responses = {
-                    @ApiResponse(responseCode = "200", description = "configuration parameters", content = @Content(schema = @Schema(implementation = Placeholder.class)))})
-    public Response getMeasurementConfiguration(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
-            , @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID
-            , @Parameter(in = ParameterIn.QUERY, description = "the id of a specific configuration parameter") @QueryParam("parameterID") String parameterID) {
-        /**
-         * TODO: implementation of method stump
-         */
-        return null;
+                    @ApiResponse(responseCode = "200", description = "configuration parameters", content = @Content(schema = @Schema(implementation = DataLakeConfiguration.class)))})
+    public Response getMeasurementConfiguration(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username,
+                                                @Parameter(in = ParameterIn.QUERY, description = "the id of a specific configuration parameter") @QueryParam("parameterID") String parameterID) {
+        return ok(this.dataLakeManagement.getDataLakeConfiguration());
     }
 
     @POST

[incubator-streampipes] 09/29: [STREAMPIPES-349] Add first implementation of methods for getting and setting data lake configuration

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 12c2b396dd198ac2ab6b56ec2aad31e9136d361f
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Wed May 5 10:33:09 2021 +0200

    [STREAMPIPES-349] Add first implementation of methods for getting and setting data lake configuration
---
 .../dataexplorer/DataLakeManagementV4.java         | 49 +++++++++++++++++++++-
 1 file changed, 48 insertions(+), 1 deletion(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 2f177f6..f1bc058 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -18,9 +18,14 @@
 
 package org.apache.streampipes.dataexplorer;
 
+import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
 import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
+import org.apache.streampipes.dataexplorer.query.EditRetentionPolicyQuery;
+import org.apache.streampipes.dataexplorer.query.ShowRetentionPolicyQuery;
 import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
+import org.apache.streampipes.model.datalake.DataLakeConfiguration;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.datalake.DataLakeRetentionPolicy;
 import org.influxdb.dto.QueryResult;
 
 import java.util.List;
@@ -43,5 +48,47 @@ public class DataLakeManagementV4 {
         }
         return true;
     }
-}
+
+    public DataLakeConfiguration getDataLakeConfiguration() {
+        List<DataLakeRetentionPolicy> retentionPolicies = getAllExistingRetentionPolicies();
+        return new DataLakeConfiguration(retentionPolicies);
+    }
+
+    public String editMeasurementConfiguration(DataLakeConfiguration config, boolean resetToDefault) {
+
+        List<DataLakeRetentionPolicy> existingRetentionPolicies = getAllExistingRetentionPolicies();
+
+        if (resetToDefault) {
+            if (existingRetentionPolicies.size() > 1) {
+                String drop = new EditRetentionPolicyQuery(RetentionPolicyQueryParams.from("custom", "0s"), "DROP").executeQuery();
+            }
+            String reset = new EditRetentionPolicyQuery(RetentionPolicyQueryParams.from("autogen", "0s"), "DEFAULT").executeQuery();
+            return reset;
+        } else {
+
+            Integer batchSize = config.getBatchSize();
+            Integer flushDuration = config.getFlushDuration();
+
+            /**
+             * TODO:
+             * - Implementation of parameter update for batchSize and flushDuration
+             * - Updating multiple retention policies
+             */
+
+            String operation = "CREATE";
+            if (existingRetentionPolicies.size() > 1) {
+                operation = "ALTER";
+            }
+            String result = new EditRetentionPolicyQuery(RetentionPolicyQueryParams.from("custom", "1d"), operation).executeQuery();
+            return result;
+        }
+    }
+
+    public List<DataLakeRetentionPolicy> getAllExistingRetentionPolicies() {
+        /**
+         * TODO:
+         * - Implementation of parameter return for batchSize and flushDuration
+         */
+        return new ShowRetentionPolicyQuery(RetentionPolicyQueryParams.from("", "0s")).executeQuery();
+    }
 }

[incubator-streampipes] 11/29: Add query template class (includes database specific templates for various query statements)

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 3d115538d1455c63741933c21288c6bbd6cdba32
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 19:00:47 2021 +0200

    Add query template class (includes database specific templates for various query statements)
---
 .../dataexplorer/v4/template/QueryTemplatesV4.java | 72 ++++++++++++++++++++++
 1 file changed, 72 insertions(+)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java
new file mode 100644
index 0000000..23c2789
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.dataexplorer.v4.template;
+
+public class QueryTemplatesV4 {
+
+    public static String selectWildcardFrom(String index) {
+        return "SELECT * FROM " + index;
+    }
+
+    public static String selectAggregationFrom(String index, String aggregationFunction) {
+        return "SELECT " + aggregationFunction + "(*) FROM " + index;
+    }
+
+    public static String deleteFrom(String index) {
+        return "DELETE FROM " + index;
+    }
+
+
+    public static String whereTimeWithin(long startDate, long endDate) {
+        return "WHERE time > "
+                + startDate * 1000000
+                + " AND time < "
+                + endDate * 1000000;
+    }
+
+    public static String whereTimeLeftBound(long startDate) {
+        return "WHERE time > "
+                + startDate * 1000000;
+    }
+
+    public static String whereTimeRightBound(long endDate) {
+        return "WHERE time < "
+                + endDate * 1000000;
+    }
+
+    public static String groupByTags(String tags) {
+        return "GROUP BY " + tags;
+    }
+
+    public static String groupByTime(String timeInterval) {
+        return "GROUP BY time(" + timeInterval + ")";
+    }
+
+    public static String orderByTime(String ordering) {
+        return "ORDER BY time " + ordering.toUpperCase();
+    }
+
+    public static String limitItems(int limit) {
+        return "LIMIT " + limit;
+    }
+
+    public static String offset(int offset) {
+        return "OFFSET " + offset;
+    }
+
+}

[incubator-streampipes] 26/29: Add endpoint definition for dropping entire measurement series from data lake and removing related event property

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit f974f4d69d9b1121d2847cdd17392c2dbb0748be
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 20:11:40 2021 +0200

    Add endpoint definition for dropping entire measurement series from data lake and removing related event property
---
 .../java/org/apache/streampipes/ps/DataLakeResourceV4.java | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index f2bd49b..53b3414 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -85,6 +85,20 @@ public class DataLakeResourceV4 extends AbstractRestResource {
         return ok();
     }
 
+    @DELETE
+    @Path("/measurements/{measurementID}/drop")
+    @Operation(summary = "Drop a single measurement series with given id from Data Lake and remove related event property", tags = {"Data Lake"},
+            responses = {
+                    @ApiResponse(responseCode = "200", description = "Measurement series successfully dropped from Data Lake"),
+                    @ApiResponse(responseCode = "400", description = "Measurement series with given id or related event property not found")})
+    public Response dropMeasurementSeries(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
+            , @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID) {
+
+        /**
+         * TODO: implementation of method stump
+         */
+    }
+
     @GET
     @Path("/measurements")
     @Produces(MediaType.APPLICATION_JSON)

[incubator-streampipes] 07/29: [STREAMPIPES-349] Add query defining classes for changes at the retention policies

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 7a4f1c376de352f1ab3d9eb546cf8e706f2a767e
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Wed May 5 10:24:49 2021 +0200

    [STREAMPIPES-349] Add query defining classes for changes at the retention policies
---
 .../param/RetentionPolicyQueryParams.java          | 20 ++++++++
 .../query/EditRetentionPolicyQuery.java            | 56 ++++++++++++++++++++++
 .../query/ShowRetentionPolicyQuery.java            | 38 +++++++++++++++
 3 files changed, 114 insertions(+)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/RetentionPolicyQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/RetentionPolicyQueryParams.java
new file mode 100644
index 0000000..e224570
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/RetentionPolicyQueryParams.java
@@ -0,0 +1,20 @@
+package org.apache.streampipes.dataexplorer.param;
+
+public class RetentionPolicyQueryParams extends QueryParams {
+    private final String durationLiteral;
+
+    public static RetentionPolicyQueryParams from(String index, String durationLiteral) {
+        return new RetentionPolicyQueryParams(index, durationLiteral);
+    }
+
+    protected RetentionPolicyQueryParams(String index, String durationLiteral) {
+        super(index);
+        this.durationLiteral = durationLiteral;
+    }
+
+    public String getDurationLiteral() {
+        return durationLiteral;
+    }
+
+
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/EditRetentionPolicyQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/EditRetentionPolicyQuery.java
new file mode 100644
index 0000000..ddaed73
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/EditRetentionPolicyQuery.java
@@ -0,0 +1,56 @@
+package org.apache.streampipes.dataexplorer.query;
+
+import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
+import org.influxdb.dto.QueryResult;
+
+public class EditRetentionPolicyQuery extends ParameterizedDataExplorerQuery<RetentionPolicyQueryParams, String> {
+
+    private static final String CREATE_OPERATOR = "CREATE";
+    private static final String ALTER_OPERATOR = "ALTER";
+    private static final String DROP_OPERATOR = "DROP";
+    private static final String RESET_OPERATOR = "DEFAULT";
+
+    private String operationToPerform;
+
+    public EditRetentionPolicyQuery(RetentionPolicyQueryParams queryParams, String operation) {
+        super(queryParams);
+        this.operationToPerform = operation;
+    }
+
+
+    @Override
+    protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
+        if (this.operationToPerform.equals(CREATE_OPERATOR)) {
+            queryBuilder.add(createRetentionPolicyStatement(params.getIndex()));
+        } else if (this.operationToPerform.equals(ALTER_OPERATOR)) {
+            queryBuilder.add(alterRetentionPolicyStatement(params.getIndex()));
+        } else if (this.operationToPerform.equals(DROP_OPERATOR)) {
+            queryBuilder.add(dropRetentionPolicyStatement(params.getIndex()));
+        } else if (this.operationToPerform.equals(RESET_OPERATOR)) {
+            queryBuilder.add(resetRetentionPolicyStatement());
+        }
+
+    }
+
+    @Override
+    protected String postQuery(QueryResult result) throws RuntimeException {
+        return result.toString();
+    }
+
+    private String createRetentionPolicyStatement(String index) {
+        return "CREATE RETENTION POLICY " + index + " ON " + "sp DURATION " + params.getDurationLiteral() + " REPLICATION 1 DEFAULT";
+    }
+
+    private String alterRetentionPolicyStatement(String index) {
+        return "ALTER RETENTION POLICY " + index + " ON " + "sp DURATION " + params.getDurationLiteral() + " REPLICATION 1 DEFAULT";
+    }
+
+    private String dropRetentionPolicyStatement(String index) {
+        return "DROP RETENTION POLICY " + index + " ON " + "sp";
+    }
+
+    private String resetRetentionPolicyStatement() {
+        return "ALTER RETENTION POLICY " + "autogen" + " ON " + "sp DURATION " + "0s" + " REPLICATION 1 DEFAULT";
+    }
+
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/ShowRetentionPolicyQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/ShowRetentionPolicyQuery.java
new file mode 100644
index 0000000..5c4c3d8
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/ShowRetentionPolicyQuery.java
@@ -0,0 +1,38 @@
+package org.apache.streampipes.dataexplorer.query;
+
+import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
+import org.apache.streampipes.model.datalake.DataLakeRetentionPolicy;
+import org.influxdb.dto.QueryResult;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ShowRetentionPolicyQuery extends ParameterizedDataExplorerQuery<RetentionPolicyQueryParams, List<DataLakeRetentionPolicy>> {
+
+    public ShowRetentionPolicyQuery(RetentionPolicyQueryParams queryParams) {
+        super(queryParams);
+    }
+
+
+    @Override
+    protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
+        queryBuilder.add(showRetentionPolicyStatement());
+    }
+
+    @Override
+    protected List<DataLakeRetentionPolicy> postQuery(QueryResult result) throws RuntimeException {
+        List<DataLakeRetentionPolicy> policies = new ArrayList<>();
+        for (List<Object> a : result.getResults().get(0).getSeries().get(0).getValues()) {
+            boolean isDefault = false;
+            if (a.get(4).toString().equals("true")) {
+                isDefault = true;
+            }
+            policies.add(new DataLakeRetentionPolicy(a.get(0).toString(), a.get(1).toString(), isDefault));
+        }
+        return policies;
+    }
+
+    private String showRetentionPolicyStatement() {
+        return "SHOW RETENTION POLICIES  ON " + "sp";
+    }
+}

[incubator-streampipes] 14/29: Add query element "Ordering By Time" and corresponding parameter class

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 029334f5392d2a71bbe9dd70185aa4229a809d74
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 19:06:01 2021 +0200

    Add query element "Ordering By Time" and corresponding parameter class
---
 .../v4/params/OrderingByTimeParams.java            | 36 ++++++++++++++++++++++
 .../v4/query/elements/OrderingByTime.java          | 34 ++++++++++++++++++++
 2 files changed, 70 insertions(+)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OrderingByTimeParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OrderingByTimeParams.java
new file mode 100644
index 0000000..b7b71e9
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OrderingByTimeParams.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.params;
+
+public class OrderingByTimeParams extends QueryParamsV4 {
+    private final String ordering;
+
+    public static OrderingByTimeParams from(String measurementID, String ordering) {
+        return new OrderingByTimeParams(measurementID, ordering);
+    }
+
+    public OrderingByTimeParams(String measurementID, String ordering) {
+        super(measurementID);
+        this.ordering = ordering;
+    }
+
+    public String getOrdering() {
+        return this.ordering;
+    }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/OrderingByTime.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/OrderingByTime.java
new file mode 100644
index 0000000..877d7e0
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/OrderingByTime.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.elements;
+
+import org.apache.streampipes.dataexplorer.v4.params.OrderingByTimeParams;
+import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
+
+public class OrderingByTime extends QueryElement<OrderingByTimeParams> {
+
+    public OrderingByTime(OrderingByTimeParams orderingByTimeParams) {
+        super(orderingByTimeParams);
+    }
+
+    @Override
+    protected String buildStatement(OrderingByTimeParams orderingByTimeParams) {
+        return QueryTemplatesV4.orderByTime(orderingByTimeParams.getOrdering());
+    }
+}

[incubator-streampipes] 03/29: [STREAMPIPES-349] Rename method as well as list containing all stored measurement series

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 20f8838fe882a8950f3f82cb34dc665c38a08e97
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Apr 30 10:17:52 2021 +0200

    [STREAMPIPES-349] Rename method as well as list containing all stored measurement series
---
 .../org/apache/streampipes/dataexplorer/DataLakeManagementV4.java   | 6 +++---
 .../src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java | 4 ++--
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 07f2b90..06e055c 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -25,8 +25,8 @@ import java.util.List;
 
 public class DataLakeManagementV4 {
 
-    public List<DataLakeMeasure> getAllMeasurementSeries() {
-        List<DataLakeMeasure> dataLakeMeasuresList = DataExplorerUtils.getInfos();
-        return dataLakeMeasuresList;
+    public List<DataLakeMeasure> getAllMeasurements() {
+        List<DataLakeMeasure> allMeasurements = DataExplorerUtils.getInfos();
+        return allMeasurements;
     }
 }
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index d6f51f6..942d32a 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -93,8 +93,8 @@ public class DataLakeResourceV4 extends AbstractRestResource {
             responses = {
                     @ApiResponse(responseCode = "200", description = "array of stored measurement series", content = @Content(array = @ArraySchema(schema = @Schema(implementation = DataLakeMeasure.class))))})
     public Response getAll(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username) {
-        List<DataLakeMeasure> allMeasurementSeries = this.dataLakeManagement.getAllMeasurementSeries();
-        return ok(allMeasurementSeries);
+        List<DataLakeMeasure> allMeasurements = this.dataLakeManagement.getAllMeasurements();
+        return ok(allMeasurements);
     }
 
     @GET

[incubator-streampipes] 06/29: [STREAMPIPES-349] Add label definition for data lake

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 20b5586fccdb89213634793ebc73b4ffc2b9920e
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Wed May 5 10:21:43 2021 +0200

    [STREAMPIPES-349] Add label definition for data lake
---
 .../model/datalake/LabelDefinition.java            | 46 ++++++++++++++++++++++
 1 file changed, 46 insertions(+)

diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/LabelDefinition.java b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/LabelDefinition.java
new file mode 100644
index 0000000..60eb7db
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/LabelDefinition.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.model.datalake;
+
+public class LabelDefinition {
+
+    private String classLabel;
+    private String labelColumn;
+
+    public LabelDefinition(String classLabel, String labelColumn) {
+        this.classLabel = classLabel;
+        this.labelColumn = labelColumn;
+    }
+
+    public String getClassLabel() {
+        return classLabel;
+    }
+
+    public void setClassLabel(String classLabel) {
+        this.classLabel = classLabel;
+    }
+
+    public String getLabelColumn() {
+        return labelColumn;
+    }
+
+    public void setLabelColumn(String labelColumn) {
+        this.labelColumn = labelColumn;
+    }
+}

[incubator-streampipes] 20/29: Add Query Builder class

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit f53de502b6932fc3639b53ff4353b180650d4d19
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 19:19:43 2021 +0200

    Add Query Builder class
---
 .../dataexplorer/v4/query/QueryBuilder.java        | 54 ++++++++++++++++++++++
 1 file changed, 54 insertions(+)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryBuilder.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryBuilder.java
new file mode 100644
index 0000000..8606797
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryBuilder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query;
+
+import org.apache.streampipes.dataexplorer.v4.query.elements.QueryElement;
+import org.influxdb.dto.Query;
+
+import java.util.List;
+import java.util.StringJoiner;
+
+public class QueryBuilder {
+
+    private StringJoiner queryParts;
+    private String databaseName;
+
+    public static QueryBuilder create(String databaseName) {
+        return new QueryBuilder(databaseName);
+    }
+
+    private QueryBuilder(String databaseName) {
+        this.queryParts = new StringJoiner(" ");
+        this.databaseName = databaseName;
+    }
+
+    public Query build(List<QueryElement> queryElements) {
+        for (QueryElement queryPart : queryElements) {
+            this.queryParts.add(queryPart.getStatement());
+        }
+        return toQuery();
+    }
+
+    public Query toQuery() {
+        return new Query(this.queryParts.toString(), this.databaseName);
+    }
+}
+
+
+

[incubator-streampipes] 19/29: Add query element "Delete From Statement" and corresponding parameter class

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit ee2fe10ee1fe0e71acad3162367c4b121aaea25c
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 19:16:10 2021 +0200

    Add query element "Delete From Statement" and corresponding parameter class
---
 .../v4/params/DeleteFromStatementParams.java       | 31 ++++++++++++++++++++
 .../v4/query/elements/DeleteFromStatement.java     | 33 ++++++++++++++++++++++
 2 files changed, 64 insertions(+)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/DeleteFromStatementParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/DeleteFromStatementParams.java
new file mode 100644
index 0000000..df93347
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/DeleteFromStatementParams.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.params;
+
+public class DeleteFromStatementParams extends QueryParamsV4 {
+
+    public static DeleteFromStatementParams from(String measurementID) {
+        return new DeleteFromStatementParams(measurementID);
+    }
+
+    public DeleteFromStatementParams(String measurementID) {
+        super(measurementID);
+    }
+
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/DeleteFromStatement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/DeleteFromStatement.java
new file mode 100644
index 0000000..cabd8b6
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/DeleteFromStatement.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.elements;
+
+import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams;
+import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
+
+public class DeleteFromStatement extends QueryElement<DeleteFromStatementParams> {
+    public DeleteFromStatement(DeleteFromStatementParams deleteFromStatementParams) {
+        super(deleteFromStatementParams);
+    }
+
+    @Override
+    protected String buildStatement(DeleteFromStatementParams deleteFromStatementParams) {
+        return QueryTemplatesV4.deleteFrom(deleteFromStatementParams.getIndex());
+    }
+}

[incubator-streampipes] 01/29: [STREAMPIPES-349] Add skeleton of the revised REST API

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit ded2489b1f0cb36712602d3ae8497a7246766603
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Thu Apr 29 09:55:53 2021 +0200

    [STREAMPIPES-349] Add skeleton of the revised REST API
---
 .../apache/streampipes/ps/DataLakeResourceV4.java  | 154 +++++++++++++++++++++
 1 file changed, 154 insertions(+)

diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
new file mode 100644
index 0000000..35ac486
--- /dev/null
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.ps;
+
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.Parameter;
+import io.swagger.v3.oas.annotations.enums.ParameterIn;
+import io.swagger.v3.oas.annotations.media.ArraySchema;
+import io.swagger.v3.oas.annotations.media.Content;
+import io.swagger.v3.oas.annotations.media.Schema;
+import io.swagger.v3.oas.annotations.responses.ApiResponse;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+
+@Path("v4/users/{username}/datalake")
+public class DataLakeResourceV4 {
+
+    private static final Logger logger = LoggerFactory.getLogger(DataLakeResourceV4.class);
+
+
+    @POST
+    @Path("/measurements/{measurementID}/configuration")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Operation(summary = "Configure parameters of measurement series with a given id", tags = {"Data Lake"},
+            responses = {@ApiResponse(responseCode = "200", description = "Configuration was successful")})
+    public Response configureMeasurement(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
+            , @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID
+            , @Parameter(in = ParameterIn.DEFAULT, description = "the configuration parameters") Placeholder body) {
+        /**
+         * TODO: implementation of method stump
+         */
+        return null;
+    }
+
+    @DELETE
+    @Path("/measurements/{measurementID}")
+    @Operation(summary = "Remove data from a single measurement series with given id", tags = {"Data Lake"},
+            responses = {
+                    @ApiResponse(responseCode = "200", description = "Data from measurement series successfully removed"),
+                    @ApiResponse(responseCode = "400", description = "Measurement series with given id not found")})
+    public Response deleteData(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
+            , @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID
+            , @Parameter(in = ParameterIn.QUERY, description = "start date for slicing operation") @QueryParam("startDate") String startDate
+            , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing operation") @QueryParam("endDate") String endDate) {
+        /**
+         * TODO: implementation of method stump
+         */
+        return null;
+    }
+
+    @GET
+    @Path("/measurements")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Operation(summary = "Get a list of all measurement series", tags = {"Data Lake"},
+            responses = {
+                    @ApiResponse(responseCode = "200", description = "array of stored measurement series", content = @Content(array = @ArraySchema(schema = @Schema(implementation = DataLakeMeasure.class))))})
+    public Response getAll(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username) {
+        /**
+         * TODO: implementation of method stump
+         */
+        return null;
+    }
+
+    @GET
+    @Path("/measurements/{measurementID}")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Operation(summary = "Get data from a single measurement series by a given id", tags = {"Data Lake"},
+            responses = {
+                    @ApiResponse(responseCode = "400", description = "Measurement series with given id and requested query specification not found"),
+                    @ApiResponse(responseCode = "200", description = "requested data", content = @Content(schema = @Schema(implementation = Placeholder.class)))})
+    public Response getData(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
+            , @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID
+            , @Parameter(in = ParameterIn.QUERY, description = "start date for slicing operation") @QueryParam("startDate") String startDate
+            , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing operation") @QueryParam("endDate") String endDate
+            , @Parameter(in = ParameterIn.QUERY, description = "page number for paging operation") @QueryParam("page") String page
+            , @Parameter(in = ParameterIn.QUERY, description = "items per page limitation for paging operation") @QueryParam("limit") Integer limit
+            , @Parameter(in = ParameterIn.QUERY, description = "offset for paging operation") @QueryParam("offset") Integer offset
+            , @Parameter(in = ParameterIn.QUERY, description = "grouping tags (comma-separated) for grouping operation") @QueryParam("groupBy") String groupBy
+            , @Parameter(in = ParameterIn.QUERY, description = "name of aggregation function used for grouping operation") @QueryParam("aggregationFunction") String aggregationFunction
+            , @Parameter(in = ParameterIn.QUERY, description = "time interval for aggregation (e.g. 1m - one minute) for grouping operation") @QueryParam("timeInterval") String timeInterval
+            , @Parameter(in = ParameterIn.QUERY, description = "format specification (csv, json) for data download") @QueryParam("format") String format) {
+        /**
+         * TODO: implementation of method stump
+         */
+        return null;
+    }
+
+    @GET
+    @Path("/measurements/{measurementID}/configuration")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Operation(summary = "Get configuration parameters of a measurement series by a given id", tags = {"Data Lake"},
+            responses = {
+                    @ApiResponse(responseCode = "200", description = "configuration parameters", content = @Content(schema = @Schema(implementation = Placeholder.class)))})
+    public Response getMeasurementConfiguration(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
+            , @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID
+            , @Parameter(in = ParameterIn.QUERY, description = "the id of a specific configuration parameter") @QueryParam("parameterID") String parameterID) {
+        /**
+         * TODO: implementation of method stump
+         */
+        return null;
+    }
+
+    @POST
+    @Path("/measurements/{measurementID}/labeling")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Operation(summary = "Label data points of the measurement series with given id", tags = {"Data Lake"},
+            responses = {
+                    @ApiResponse(responseCode = "200", description = "Labeling was successful")})
+    public Response labelData(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
+            , @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID
+            , @Parameter(in = ParameterIn.DEFAULT, description = "the label details that should be written into database") Placeholder body
+
+            , @Parameter(in = ParameterIn.QUERY, description = "start date for slicing operation") @QueryParam("startDate") String startDate
+            , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing operation") @QueryParam("endDate") String endDate) {
+        /**
+         * TODO: implementation of method stump
+         */
+        return null;
+    }
+
+    @DELETE
+    @Path("/measurements")
+    @Operation(summary = "Remove all stored measurement series from Data Lake", tags = {"Data Lake"},
+            responses = {
+                    @ApiResponse(responseCode = "200", description = "All measurement series successfully removed")})
+    public Response removeAll(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username) {
+        /**
+         * TODO: implementation of method stump
+         */
+        return null;
+    }
+}

[incubator-streampipes] 12/29: Add query element "Select From Statement" and corresponding parameter class

Posted by eb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit bf7eb2eb62045933895a22169368fad9bed7c904
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 19:02:30 2021 +0200

    Add query element "Select From Statement" and corresponding parameter class
---
 .../v4/params/SelectFromStatementParams.java       | 45 ++++++++++++++++++++++
 .../v4/query/elements/SelectFromStatement.java     | 37 ++++++++++++++++++
 2 files changed, 82 insertions(+)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java
new file mode 100644
index 0000000..0385963
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.params;
+
+import javax.annotation.Nullable;
+
+public class SelectFromStatementParams extends QueryParamsV4 {
+
+    private final String aggregationFunction;
+
+    public static SelectFromStatementParams from(String measurementID, @Nullable String aggregationFunction) {
+        return new SelectFromStatementParams(measurementID, aggregationFunction);
+    }
+
+    public SelectFromStatementParams(String measurementID) {
+        super(measurementID);
+        this.aggregationFunction = null;
+    }
+
+    public SelectFromStatementParams(String measurementID, String aggregationFunction) {
+        super(measurementID);
+        this.aggregationFunction = aggregationFunction;
+    }
+
+
+    public String getAggregationFunction() {
+        return aggregationFunction;
+    }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java
new file mode 100644
index 0000000..efd7654
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.elements;
+
+import org.apache.streampipes.dataexplorer.v4.params.SelectFromStatementParams;
+import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
+
+public class SelectFromStatement extends QueryElement<SelectFromStatementParams> {
+    public SelectFromStatement(SelectFromStatementParams selectFromStatementParams) {
+        super(selectFromStatementParams);
+    }
+
+    @Override
+    protected String buildStatement(SelectFromStatementParams selectFromStatementParams) {
+        if (selectFromStatementParams.getAggregationFunction() == null) {
+            return QueryTemplatesV4.selectWildcardFrom(selectFromStatementParams.getIndex());
+        } else {
+            return QueryTemplatesV4.selectAggregationFrom(selectFromStatementParams.getIndex(), selectFromStatementParams.getAggregationFunction());
+        }
+    }
+}