You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/11/01 22:03:11 UTC

[incubator-streampipes] 01/03: [STREAMPIPES-611] Support retrieval of DataLakeMeasure in client API

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 2f8e8a8961ebdfc4abbd38f0ed2b728383d2c87e
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Nov 1 15:28:12 2022 +0100

    [STREAMPIPES-611] Support retrieval of DataLakeMeasure in client API
---
 .../streampipes/client/StreamPipesClient.java      |  4 ++
 .../streampipes/client/api/AbstractClientApi.java  | 10 ++--
 .../streampipes/client/api/DataLakeMeasureApi.java | 65 ++++++++++++++++++++++
 .../apache/streampipes/client/http/PutRequest.java | 57 +++++++++++++++++++
 .../dataexplorer/commons/DataExplorerUtils.java    | 16 +++++-
 .../dataexplorer/DataLakeManagementV4.java         | 18 +++++-
 .../streampipes/ps/DataLakeMeasureResourceV4.java  | 30 ++++++++++
 .../streampipes/storage/api/IDataLakeStorage.java  |  3 +-
 .../storage/couchdb/impl/DataLakeStorageImpl.java  |  5 ++
 9 files changed, 199 insertions(+), 9 deletions(-)

diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
index 5daa809d4..e8506a326 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
@@ -189,6 +189,10 @@ public class StreamPipesClient implements SupportsPipelineApi,
     return new NotificationsApi(config);
   }
 
+  public DataLakeMeasureApi dataLakeMeasureApi() {
+    return new DataLakeMeasureApi(config);
+  }
+
   public void deliverEmail(SpEmail email) {
     CustomRequestApi api = customRequest();
     api.sendPost(ApiPath.EMAIL_RESOURCE, email);
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
index 887399a83..5b8fa0158 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
@@ -17,10 +17,7 @@
  */
 package org.apache.streampipes.client.api;
 
-import org.apache.streampipes.client.http.DeleteRequest;
-import org.apache.streampipes.client.http.GetRequest;
-import org.apache.streampipes.client.http.PostRequestWithPayloadResponse;
-import org.apache.streampipes.client.http.PostRequestWithoutPayloadResponse;
+import org.apache.streampipes.client.http.*;
 import org.apache.streampipes.client.model.StreamPipesClientConfig;
 import org.apache.streampipes.client.serializer.ObjectSerializer;
 import org.apache.streampipes.client.serializer.Serializer;
@@ -45,6 +42,11 @@ public class AbstractClientApi {
     new PostRequestWithoutPayloadResponse<>(clientConfig, apiPath, serializer, object).executeRequest();
   }
 
+  protected <T> void put(StreamPipesApiPath apiPath, T object) {
+    ObjectSerializer<T, Void> serializer = new ObjectSerializer<>();
+    new PutRequest<>(clientConfig, apiPath, serializer, object).executeRequest();
+  }
+
   protected <O> O delete(StreamPipesApiPath apiPath, Class<O> responseClass) {
     Serializer<Void, O, O> serializer = new ObjectSerializer<>();
     return new DeleteRequest<>(clientConfig, apiPath, responseClass, serializer).executeRequest();
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeMeasureApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeMeasureApi.java
new file mode 100644
index 000000000..7e5a1b9f0
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeMeasureApi.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.api;
+
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+
+import java.util.List;
+
+public class DataLakeMeasureApi extends AbstractTypedClientApi<DataLakeMeasure> implements CRUDApi<String, DataLakeMeasure> {
+
+  public DataLakeMeasureApi(StreamPipesClientConfig clientConfig) {
+    super(clientConfig, DataLakeMeasure.class);
+  }
+
+  public DataLakeMeasure get(String id) {
+    return getSingle(getBaseResourcePath().addToPath(id));
+  }
+
+  @Override
+  public List<DataLakeMeasure> all() {
+    throw new IllegalArgumentException("Not yet implemented");
+  }
+
+  @Override
+  public void create(DataLakeMeasure element) {
+    post(getBaseResourcePath(), element);
+  }
+
+  @Override
+  public void delete(String elementId) {
+    throw new IllegalArgumentException("Not yet implemented");
+  }
+
+  @Override
+  public void update(DataLakeMeasure measure) {
+    put(getBaseResourcePath().addToPath(measure.getElementId()), measure);
+  }
+
+  @Override
+  protected StreamPipesApiPath getBaseResourcePath() {
+    return StreamPipesApiPath.fromStreamPipesBasePath()
+        .addToPath("api")
+        .addToPath("v4")
+        .addToPath("datalake")
+        .addToPath("measure");
+  }
+}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequest.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequest.java
new file mode 100644
index 000000000..cf36cf131
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.http;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.serializer.Serializer;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+
+import java.io.IOException;
+
+public class PutRequest<SO> extends HttpRequest<SO, Void, Void> {
+
+  private final SO body;
+
+  public PutRequest(StreamPipesClientConfig clientConfig,
+                    StreamPipesApiPath apiPath,
+                    Serializer<SO, Void, Void> serializer,
+                    SO body) {
+    super(clientConfig, apiPath, serializer);
+    this.body = body;
+  }
+
+  @Override
+  protected Request makeRequest(Serializer<SO, Void, Void> serializer) {
+    Request request = Request
+        .Put(makeUrl())
+        .setHeaders(standardPostHeaders());
+
+    request.bodyString(serializer.serialize(body), ContentType.APPLICATION_JSON);
+
+    return request;
+  }
+
+  @Override
+  protected Void afterRequest(Serializer<SO, Void, Void> serializer, HttpEntity entity) throws IOException {
+    return null;
+  }
+}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
index 4fde78355..ba420d20d 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
@@ -41,11 +41,21 @@ public class DataExplorerUtils {
         return measure;
     }
 
+    public static DataLakeMeasure sanitizeAndUpdateAtDataLake(StreamPipesClient client,
+                                                              DataLakeMeasure measure) throws SpRuntimeException {
+        sanitizeDataLakeMeasure(measure);
+        updateAtDataLake(client, measure);
+        return measure;
+    }
+
     private static void registerAtDataLake(StreamPipesClient client,
                                                      DataLakeMeasure measure) throws SpRuntimeException {
-        client
-          .customRequest()
-          .sendPost("api/v4/datalake/measure/", measure);
+        client.dataLakeMeasureApi().create(measure);
+    }
+
+    public static void updateAtDataLake(StreamPipesClient client,
+                                        DataLakeMeasure measure) throws SpRuntimeException {
+        client.dataLakeMeasureApi().update(measure);
     }
 
 
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index af2d672dd..1823d32ab 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -355,8 +355,24 @@ public class DataLakeManagementV4 {
         return tags;
     }
 
+    public void updateDataLake(DataLakeMeasure measure) throws IllegalArgumentException {
+        var existingMeasure = getDataLakeStorage().findOne(measure.getElementId());
+        if (existingMeasure != null) {
+            measure.setRev(existingMeasure.getRev());
+            getDataLakeStorage().updateDataLakeMeasure(measure);
+        } else {
+            getDataLakeStorage().storeDataLakeMeasure(measure);
+        }
+    }
+
+    public void deleteDataLakeMeasure(String elementId) throws IllegalArgumentException {
+        if (getDataLakeStorage().findOne(elementId) != null) {
+            getDataLakeStorage().deleteDataLakeMeasure(elementId);
+        } else {
+            throw new IllegalArgumentException("Could not find measure with this ID");
+        }
+    }
 
-    // TODO validate method
     public DataLakeMeasure addDataLake(DataLakeMeasure measure) {
         List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
         Optional<DataLakeMeasure> optional = dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName())).findFirst();
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
index 9f71f168f..636cf0c0d 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
@@ -53,4 +53,34 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
     public Response getDataLakeMeasure(@PathParam("id") String measureId) {
         return ok(this.dataLakeManagement.getById(measureId));
     }
+
+    @PUT
+    @JacksonSerialized
+    @Produces(MediaType.APPLICATION_JSON)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Path("{id}")
+    public Response updateDataLakeMeasure(@PathParam("id") String measureId,
+                                          DataLakeMeasure measure) {
+        if (measureId.equals(measure.getElementId())) {
+            try {
+                this.dataLakeManagement.updateDataLake(measure);
+                return ok();
+            } catch (IllegalArgumentException e) {
+                return badRequest(e.getMessage());
+            }
+        }
+        return badRequest();
+    }
+
+    @DELETE
+    @JacksonSerialized
+    @Path("{id}")
+    public Response deleteDataLakeMeasure(@PathParam("id") String measureId) {
+        try {
+            this.dataLakeManagement.deleteDataLakeMeasure(measureId);
+            return ok();
+        } catch (IllegalArgumentException e) {
+            return badRequest(e.getMessage());
+        }
+    }
 }
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataLakeStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataLakeStorage.java
index 25159f01a..df6a1c35f 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataLakeStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataLakeStorage.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.storage.api;
 
-import org.apache.streampipes.model.datalake.DataExplorerWidgetModel;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 
 import java.util.List;
@@ -32,4 +31,6 @@ public interface IDataLakeStorage {
     DataLakeMeasure findOne(String id);
 
     void updateDataLakeMeasure(DataLakeMeasure measure);
+
+    void deleteDataLakeMeasure(String id);
 }
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataLakeStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataLakeStorageImpl.java
index 943638af9..81fb323f2 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataLakeStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataLakeStorageImpl.java
@@ -51,4 +51,9 @@ public class DataLakeStorageImpl extends AbstractDao<DataLakeMeasure> implements
     public void updateDataLakeMeasure(DataLakeMeasure measure) {
         update(measure);
     }
+
+    @Override
+    public void deleteDataLakeMeasure(String id) {
+        delete(id);
+    }
 }