You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/10/02 08:53:40 UTC

[incubator-streampipes-extensions] branch STREAMPIPES-426 created (now a19a8e0)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch STREAMPIPES-426
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git.


      at a19a8e0  [STREAMPIPES-436] Inject StreamPipes client into pipeline element services, improve client configuration

This branch includes the following new commits:

     new a19a8e0  [STREAMPIPES-436] Inject StreamPipes client into pipeline element services, improve client configuration

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[incubator-streampipes-extensions] 01/01: [STREAMPIPES-436] Inject StreamPipes client into pipeline element services, improve client configuration

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-426
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit a19a8e02ba7b6a474615a1b74ee02e07846d5402
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sat Oct 2 10:53:22 2021 +0200

    [STREAMPIPES-436] Inject StreamPipes client into pipeline element services, improve client configuration
---
 .../sinks/internal/jvm/datalake/DataLake.java      | 32 ++++++----------------
 1 file changed, 8 insertions(+), 24 deletions(-)

diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
index d7134f1..f3d7cec 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
@@ -19,16 +19,12 @@
 package org.apache.streampipes.sinks.internal.jvm.datalake;
 
 import org.apache.commons.codec.binary.Base64;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
+import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.logging.api.Logger;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig;
 import org.apache.streampipes.vocabulary.SPSensor;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
@@ -91,7 +87,7 @@ public class DataLake implements EventSink<DataLakeParameters> {
     schema.getEventProperties().stream().forEach(eventProperty -> {
       eventProperty.setRuntimeName(prepareString(eventProperty.getRuntimeName()));
     });
-    registerAtDataLake(parameters.getMeasurementName(), schema);
+    registerAtDataLake(parameters.getMeasurementName(), schema, runtimeContext.getStreamPipesClient());
 
     imageProperties = schema.getEventProperties().stream()
             .filter(eventProperty -> eventProperty.getDomainProperties() != null &&
@@ -150,24 +146,12 @@ public class DataLake implements EventSink<DataLakeParameters> {
    * @param eventSchema
    * @throws SpRuntimeException
    */
-  private void registerAtDataLake(String measure, EventSchema eventSchema) throws SpRuntimeException {
-    String url = SinksInternalJvmConfig.INSTANCE.getStreamPipesBackendUrl();
-
-    try {
-      String json = JacksonSerializer.getObjectMapper().writeValueAsString(eventSchema);
-      StringEntity stringEntity = new StringEntity(json, ContentType.APPLICATION_JSON);
-      HttpResponse response = Request.Post(url + "/streampipes-backend/api/v3/noauth/datalake/" + measure)
-              .addHeader("Content-type", "application/json")
-              .body(stringEntity)
-              .execute()
-              .returnResponse();
-      if (response.getStatusLine().getStatusCode() == 409) {
-        throw new SpRuntimeException("The measurement '" + measure +"' is already registered as Data lake with different Event schema");
-      }
-    } catch (IOException e) {
-      LOG.error(e.toString());
-    }
-
+  private void registerAtDataLake(String measure,
+                                  EventSchema eventSchema,
+                                  StreamPipesClient client) throws SpRuntimeException {
+      client
+        .customRequest()
+        .sendPost("api/v3/datalake/measure/" + measure, eventSchema);
   }
 
   public static String prepareString(String s) {