You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/11/01 22:03:11 UTC
[incubator-streampipes] 01/03: [STREAMPIPES-611] Support retrieval of DataLakeMeasure in client API
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 2f8e8a8961ebdfc4abbd38f0ed2b728383d2c87e
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Nov 1 15:28:12 2022 +0100
[STREAMPIPES-611] Support retrieval of DataLakeMeasure in client API
---
.../streampipes/client/StreamPipesClient.java | 4 ++
.../streampipes/client/api/AbstractClientApi.java | 10 ++--
.../streampipes/client/api/DataLakeMeasureApi.java | 65 ++++++++++++++++++++++
.../apache/streampipes/client/http/PutRequest.java | 57 +++++++++++++++++++
.../dataexplorer/commons/DataExplorerUtils.java | 16 +++++-
.../dataexplorer/DataLakeManagementV4.java | 18 +++++-
.../streampipes/ps/DataLakeMeasureResourceV4.java | 30 ++++++++++
.../streampipes/storage/api/IDataLakeStorage.java | 3 +-
.../storage/couchdb/impl/DataLakeStorageImpl.java | 5 ++
9 files changed, 199 insertions(+), 9 deletions(-)
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
index 5daa809d4..e8506a326 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
@@ -189,6 +189,10 @@ public class StreamPipesClient implements SupportsPipelineApi,
return new NotificationsApi(config);
}
+ public DataLakeMeasureApi dataLakeMeasureApi() {
+ return new DataLakeMeasureApi(config);
+ }
+
public void deliverEmail(SpEmail email) {
CustomRequestApi api = customRequest();
api.sendPost(ApiPath.EMAIL_RESOURCE, email);
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
index 887399a83..5b8fa0158 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
@@ -17,10 +17,7 @@
*/
package org.apache.streampipes.client.api;
-import org.apache.streampipes.client.http.DeleteRequest;
-import org.apache.streampipes.client.http.GetRequest;
-import org.apache.streampipes.client.http.PostRequestWithPayloadResponse;
-import org.apache.streampipes.client.http.PostRequestWithoutPayloadResponse;
+import org.apache.streampipes.client.http.*;
import org.apache.streampipes.client.model.StreamPipesClientConfig;
import org.apache.streampipes.client.serializer.ObjectSerializer;
import org.apache.streampipes.client.serializer.Serializer;
@@ -45,6 +42,11 @@ public class AbstractClientApi {
new PostRequestWithoutPayloadResponse<>(clientConfig, apiPath, serializer, object).executeRequest();
}
+ protected <T> void put(StreamPipesApiPath apiPath, T object) {
+ ObjectSerializer<T, Void> serializer = new ObjectSerializer<>();
+ new PutRequest<>(clientConfig, apiPath, serializer, object).executeRequest();
+ }
+
protected <O> O delete(StreamPipesApiPath apiPath, Class<O> responseClass) {
Serializer<Void, O, O> serializer = new ObjectSerializer<>();
return new DeleteRequest<>(clientConfig, apiPath, responseClass, serializer).executeRequest();
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeMeasureApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeMeasureApi.java
new file mode 100644
index 000000000..7e5a1b9f0
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeMeasureApi.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.api;
+
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+
+import java.util.List;
+
+public class DataLakeMeasureApi extends AbstractTypedClientApi<DataLakeMeasure> implements CRUDApi<String, DataLakeMeasure> {
+
+ public DataLakeMeasureApi(StreamPipesClientConfig clientConfig) {
+ super(clientConfig, DataLakeMeasure.class);
+ }
+
+ public DataLakeMeasure get(String id) {
+ return getSingle(getBaseResourcePath().addToPath(id));
+ }
+
+ @Override
+ public List<DataLakeMeasure> all() {
+ throw new IllegalArgumentException("Not yet implemented");
+ }
+
+ @Override
+ public void create(DataLakeMeasure element) {
+ post(getBaseResourcePath(), element);
+ }
+
+ @Override
+ public void delete(String elementId) {
+ throw new IllegalArgumentException("Not yet implemented");
+ }
+
+ @Override
+ public void update(DataLakeMeasure measure) {
+ put(getBaseResourcePath().addToPath(measure.getElementId()), measure);
+ }
+
+ @Override
+ protected StreamPipesApiPath getBaseResourcePath() {
+ return StreamPipesApiPath.fromStreamPipesBasePath()
+ .addToPath("api")
+ .addToPath("v4")
+ .addToPath("datalake")
+ .addToPath("measure");
+ }
+}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequest.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequest.java
new file mode 100644
index 000000000..cf36cf131
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.http;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.serializer.Serializer;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+
+import java.io.IOException;
+
+public class PutRequest<SO> extends HttpRequest<SO, Void, Void> {
+
+ private final SO body;
+
+ public PutRequest(StreamPipesClientConfig clientConfig,
+ StreamPipesApiPath apiPath,
+ Serializer<SO, Void, Void> serializer,
+ SO body) {
+ super(clientConfig, apiPath, serializer);
+ this.body = body;
+ }
+
+ @Override
+ protected Request makeRequest(Serializer<SO, Void, Void> serializer) {
+ Request request = Request
+ .Put(makeUrl())
+ .setHeaders(standardPostHeaders());
+
+ request.bodyString(serializer.serialize(body), ContentType.APPLICATION_JSON);
+
+ return request;
+ }
+
+ @Override
+ protected Void afterRequest(Serializer<SO, Void, Void> serializer, HttpEntity entity) throws IOException {
+ return null;
+ }
+}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
index 4fde78355..ba420d20d 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
@@ -41,11 +41,21 @@ public class DataExplorerUtils {
return measure;
}
+ public static DataLakeMeasure sanitizeAndUpdateAtDataLake(StreamPipesClient client,
+ DataLakeMeasure measure) throws SpRuntimeException {
+ sanitizeDataLakeMeasure(measure);
+ updateAtDataLake(client, measure);
+ return measure;
+ }
+
private static void registerAtDataLake(StreamPipesClient client,
DataLakeMeasure measure) throws SpRuntimeException {
- client
- .customRequest()
- .sendPost("api/v4/datalake/measure/", measure);
+ client.dataLakeMeasureApi().create(measure);
+ }
+
+ public static void updateAtDataLake(StreamPipesClient client,
+ DataLakeMeasure measure) throws SpRuntimeException {
+ client.dataLakeMeasureApi().update(measure);
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index af2d672dd..1823d32ab 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -355,8 +355,24 @@ public class DataLakeManagementV4 {
return tags;
}
+ public void updateDataLake(DataLakeMeasure measure) throws IllegalArgumentException {
+ var existingMeasure = getDataLakeStorage().findOne(measure.getElementId());
+ if (existingMeasure != null) {
+ measure.setRev(existingMeasure.getRev());
+ getDataLakeStorage().updateDataLakeMeasure(measure);
+ } else {
+ getDataLakeStorage().storeDataLakeMeasure(measure);
+ }
+ }
+
+ public void deleteDataLakeMeasure(String elementId) throws IllegalArgumentException {
+ if (getDataLakeStorage().findOne(elementId) != null) {
+ getDataLakeStorage().deleteDataLakeMeasure(elementId);
+ } else {
+ throw new IllegalArgumentException("Could not find measure with this ID");
+ }
+ }
- // TODO validate method
public DataLakeMeasure addDataLake(DataLakeMeasure measure) {
List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
Optional<DataLakeMeasure> optional = dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName())).findFirst();
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
index 9f71f168f..636cf0c0d 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
@@ -53,4 +53,34 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
public Response getDataLakeMeasure(@PathParam("id") String measureId) {
return ok(this.dataLakeManagement.getById(measureId));
}
+
+ @PUT
+ @JacksonSerialized
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Path("{id}")
+ public Response updateDataLakeMeasure(@PathParam("id") String measureId,
+ DataLakeMeasure measure) {
+ if (measureId.equals(measure.getElementId())) {
+ try {
+ this.dataLakeManagement.updateDataLake(measure);
+ return ok();
+ } catch (IllegalArgumentException e) {
+ return badRequest(e.getMessage());
+ }
+ }
+ return badRequest();
+ }
+
+ @DELETE
+ @JacksonSerialized
+ @Path("{id}")
+ public Response deleteDataLakeMeasure(@PathParam("id") String measureId) {
+ try {
+ this.dataLakeManagement.deleteDataLakeMeasure(measureId);
+ return ok();
+ } catch (IllegalArgumentException e) {
+ return badRequest(e.getMessage());
+ }
+ }
}
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataLakeStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataLakeStorage.java
index 25159f01a..df6a1c35f 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataLakeStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataLakeStorage.java
@@ -18,7 +18,6 @@
package org.apache.streampipes.storage.api;
-import org.apache.streampipes.model.datalake.DataExplorerWidgetModel;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import java.util.List;
@@ -32,4 +31,6 @@ public interface IDataLakeStorage {
DataLakeMeasure findOne(String id);
void updateDataLakeMeasure(DataLakeMeasure measure);
+
+ void deleteDataLakeMeasure(String id);
}
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataLakeStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataLakeStorageImpl.java
index 943638af9..81fb323f2 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataLakeStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataLakeStorageImpl.java
@@ -51,4 +51,9 @@ public class DataLakeStorageImpl extends AbstractDao<DataLakeMeasure> implements
public void updateDataLakeMeasure(DataLakeMeasure measure) {
update(measure);
}
+
+ @Override
+ public void deleteDataLakeMeasure(String id) {
+ delete(id);
+ }
}