You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/11/01 22:03:10 UTC

[incubator-streampipes] branch dev updated (3e0f7bc0b -> a1ceb2b1f)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


    from 3e0f7bc0b Merge pull request #129 from SvenO3/dev
     new 2f8e8a896 [STREAMPIPES-611] Support retrieval of DataLakeMeasure in client API
     new 59a071017 [hotfix] Improve logging of incomplete events when writing to event storage
     new a1ceb2b1f Merge branch 'dev' of github.com:apache/incubator-streampipes into dev

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../streampipes/client/StreamPipesClient.java      |  4 ++
 .../streampipes/client/api/AbstractClientApi.java  | 10 ++--
 ...entTemplateApi.java => DataLakeMeasureApi.java} | 34 ++++++-------
 .../http/{GetRequest.java => PutRequest.java}      | 30 +++++++-----
 .../dataexplorer/commons/DataExplorerUtils.java    | 16 +++++--
 .../dataexplorer/commons/influx/InfluxStore.java   | 43 ++++++++++-------
 .../dataexplorer/DataLakeManagementV4.java         | 18 ++++++-
 .../apache/streampipes/model/runtime/Event.java    | 55 ++++++++++++----------
 .../streampipes/ps/DataLakeMeasureResourceV4.java  | 30 ++++++++++++
 .../streampipes/storage/api/IDataLakeStorage.java  |  3 +-
 .../storage/couchdb/impl/DataLakeStorageImpl.java  |  5 ++
 11 files changed, 170 insertions(+), 78 deletions(-)
 copy streampipes-client/src/main/java/org/apache/streampipes/client/api/{PipelineElementTemplateApi.java => DataLakeMeasureApi.java} (58%)
 copy streampipes-client/src/main/java/org/apache/streampipes/client/http/{GetRequest.java => PutRequest.java} (64%)


[incubator-streampipes] 01/03: [STREAMPIPES-611] Support retrieval of DataLakeMeasure in client API

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 2f8e8a8961ebdfc4abbd38f0ed2b728383d2c87e
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Nov 1 15:28:12 2022 +0100

    [STREAMPIPES-611] Support retrieval of DataLakeMeasure in client API
---
 .../streampipes/client/StreamPipesClient.java      |  4 ++
 .../streampipes/client/api/AbstractClientApi.java  | 10 ++--
 .../streampipes/client/api/DataLakeMeasureApi.java | 65 ++++++++++++++++++++++
 .../apache/streampipes/client/http/PutRequest.java | 57 +++++++++++++++++++
 .../dataexplorer/commons/DataExplorerUtils.java    | 16 +++++-
 .../dataexplorer/DataLakeManagementV4.java         | 18 +++++-
 .../streampipes/ps/DataLakeMeasureResourceV4.java  | 30 ++++++++++
 .../streampipes/storage/api/IDataLakeStorage.java  |  3 +-
 .../storage/couchdb/impl/DataLakeStorageImpl.java  |  5 ++
 9 files changed, 199 insertions(+), 9 deletions(-)

diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
index 5daa809d4..e8506a326 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/StreamPipesClient.java
@@ -189,6 +189,10 @@ public class StreamPipesClient implements SupportsPipelineApi,
     return new NotificationsApi(config);
   }
 
+  public DataLakeMeasureApi dataLakeMeasureApi() {
+    return new DataLakeMeasureApi(config);
+  }
+
   public void deliverEmail(SpEmail email) {
     CustomRequestApi api = customRequest();
     api.sendPost(ApiPath.EMAIL_RESOURCE, email);
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
index 887399a83..5b8fa0158 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AbstractClientApi.java
@@ -17,10 +17,7 @@
  */
 package org.apache.streampipes.client.api;
 
-import org.apache.streampipes.client.http.DeleteRequest;
-import org.apache.streampipes.client.http.GetRequest;
-import org.apache.streampipes.client.http.PostRequestWithPayloadResponse;
-import org.apache.streampipes.client.http.PostRequestWithoutPayloadResponse;
+import org.apache.streampipes.client.http.*;
 import org.apache.streampipes.client.model.StreamPipesClientConfig;
 import org.apache.streampipes.client.serializer.ObjectSerializer;
 import org.apache.streampipes.client.serializer.Serializer;
@@ -45,6 +42,11 @@ public class AbstractClientApi {
     new PostRequestWithoutPayloadResponse<>(clientConfig, apiPath, serializer, object).executeRequest();
   }
 
+  protected <T> void put(StreamPipesApiPath apiPath, T object) {
+    ObjectSerializer<T, Void> serializer = new ObjectSerializer<>();
+    new PutRequest<>(clientConfig, apiPath, serializer, object).executeRequest();
+  }
+
   protected <O> O delete(StreamPipesApiPath apiPath, Class<O> responseClass) {
     Serializer<Void, O, O> serializer = new ObjectSerializer<>();
     return new DeleteRequest<>(clientConfig, apiPath, responseClass, serializer).executeRequest();
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeMeasureApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeMeasureApi.java
new file mode 100644
index 000000000..7e5a1b9f0
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataLakeMeasureApi.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.api;
+
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+
+import java.util.List;
+
+public class DataLakeMeasureApi extends AbstractTypedClientApi<DataLakeMeasure> implements CRUDApi<String, DataLakeMeasure> {
+
+  public DataLakeMeasureApi(StreamPipesClientConfig clientConfig) {
+    super(clientConfig, DataLakeMeasure.class);
+  }
+
+  public DataLakeMeasure get(String id) {
+    return getSingle(getBaseResourcePath().addToPath(id));
+  }
+
+  @Override
+  public List<DataLakeMeasure> all() {
+    throw new IllegalArgumentException("Not yet implemented");
+  }
+
+  @Override
+  public void create(DataLakeMeasure element) {
+    post(getBaseResourcePath(), element);
+  }
+
+  @Override
+  public void delete(String elementId) {
+    throw new IllegalArgumentException("Not yet implemented");
+  }
+
+  @Override
+  public void update(DataLakeMeasure measure) {
+    put(getBaseResourcePath().addToPath(measure.getElementId()), measure);
+  }
+
+  @Override
+  protected StreamPipesApiPath getBaseResourcePath() {
+    return StreamPipesApiPath.fromStreamPipesBasePath()
+        .addToPath("api")
+        .addToPath("v4")
+        .addToPath("datalake")
+        .addToPath("measure");
+  }
+}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequest.java b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequest.java
new file mode 100644
index 000000000..cf36cf131
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/http/PutRequest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.http;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.entity.ContentType;
+import org.apache.streampipes.client.model.StreamPipesClientConfig;
+import org.apache.streampipes.client.serializer.Serializer;
+import org.apache.streampipes.client.util.StreamPipesApiPath;
+
+import java.io.IOException;
+
+public class PutRequest<SO> extends HttpRequest<SO, Void, Void> {
+
+  private final SO body;
+
+  public PutRequest(StreamPipesClientConfig clientConfig,
+                    StreamPipesApiPath apiPath,
+                    Serializer<SO, Void, Void> serializer,
+                    SO body) {
+    super(clientConfig, apiPath, serializer);
+    this.body = body;
+  }
+
+  @Override
+  protected Request makeRequest(Serializer<SO, Void, Void> serializer) {
+    Request request = Request
+        .Put(makeUrl())
+        .setHeaders(standardPostHeaders());
+
+    request.bodyString(serializer.serialize(body), ContentType.APPLICATION_JSON);
+
+    return request;
+  }
+
+  @Override
+  protected Void afterRequest(Serializer<SO, Void, Void> serializer, HttpEntity entity) throws IOException {
+    return null;
+  }
+}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
index 4fde78355..ba420d20d 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
@@ -41,11 +41,21 @@ public class DataExplorerUtils {
         return measure;
     }
 
+    public static DataLakeMeasure sanitizeAndUpdateAtDataLake(StreamPipesClient client,
+                                                              DataLakeMeasure measure) throws SpRuntimeException {
+        sanitizeDataLakeMeasure(measure);
+        updateAtDataLake(client, measure);
+        return measure;
+    }
+
     private static void registerAtDataLake(StreamPipesClient client,
                                                      DataLakeMeasure measure) throws SpRuntimeException {
-        client
-          .customRequest()
-          .sendPost("api/v4/datalake/measure/", measure);
+        client.dataLakeMeasureApi().create(measure);
+    }
+
+    public static void updateAtDataLake(StreamPipesClient client,
+                                        DataLakeMeasure measure) throws SpRuntimeException {
+        client.dataLakeMeasureApi().update(measure);
     }
 
 
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index af2d672dd..1823d32ab 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -355,8 +355,24 @@ public class DataLakeManagementV4 {
         return tags;
     }
 
+    public void updateDataLake(DataLakeMeasure measure) throws IllegalArgumentException {
+        var existingMeasure = getDataLakeStorage().findOne(measure.getElementId());
+        if (existingMeasure != null) {
+            measure.setRev(existingMeasure.getRev());
+            getDataLakeStorage().updateDataLakeMeasure(measure);
+        } else {
+            getDataLakeStorage().storeDataLakeMeasure(measure);
+        }
+    }
+
+    public void deleteDataLakeMeasure(String elementId) throws IllegalArgumentException {
+        if (getDataLakeStorage().findOne(elementId) != null) {
+            getDataLakeStorage().deleteDataLakeMeasure(elementId);
+        } else {
+            throw new IllegalArgumentException("Could not find measure with this ID");
+        }
+    }
 
-    // TODO validate method
     public DataLakeMeasure addDataLake(DataLakeMeasure measure) {
         List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
         Optional<DataLakeMeasure> optional = dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName())).findFirst();
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
index 9f71f168f..636cf0c0d 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
@@ -53,4 +53,34 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
     public Response getDataLakeMeasure(@PathParam("id") String measureId) {
         return ok(this.dataLakeManagement.getById(measureId));
     }
+
+    @PUT
+    @JacksonSerialized
+    @Produces(MediaType.APPLICATION_JSON)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Path("{id}")
+    public Response updateDataLakeMeasure(@PathParam("id") String measureId,
+                                          DataLakeMeasure measure) {
+        if (measureId.equals(measure.getElementId())) {
+            try {
+                this.dataLakeManagement.updateDataLake(measure);
+                return ok();
+            } catch (IllegalArgumentException e) {
+                return badRequest(e.getMessage());
+            }
+        }
+        return badRequest();
+    }
+
+    @DELETE
+    @JacksonSerialized
+    @Path("{id}")
+    public Response deleteDataLakeMeasure(@PathParam("id") String measureId) {
+        try {
+            this.dataLakeManagement.deleteDataLakeMeasure(measureId);
+            return ok();
+        } catch (IllegalArgumentException e) {
+            return badRequest(e.getMessage());
+        }
+    }
 }
diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataLakeStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataLakeStorage.java
index 25159f01a..df6a1c35f 100644
--- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataLakeStorage.java
+++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/IDataLakeStorage.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.storage.api;
 
-import org.apache.streampipes.model.datalake.DataExplorerWidgetModel;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 
 import java.util.List;
@@ -32,4 +31,6 @@ public interface IDataLakeStorage {
     DataLakeMeasure findOne(String id);
 
     void updateDataLakeMeasure(DataLakeMeasure measure);
+
+    void deleteDataLakeMeasure(String id);
 }
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataLakeStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataLakeStorageImpl.java
index 943638af9..81fb323f2 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataLakeStorageImpl.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/DataLakeStorageImpl.java
@@ -51,4 +51,9 @@ public class DataLakeStorageImpl extends AbstractDao<DataLakeMeasure> implements
     public void updateDataLakeMeasure(DataLakeMeasure measure) {
         update(measure);
     }
+
+    @Override
+    public void deleteDataLakeMeasure(String id) {
+        delete(id);
+    }
 }


[incubator-streampipes] 02/03: [hotfix] Improve logging of incomplete events when writing to event storage

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 59a07101798ab777f5cb9204e03460a2e79214ee
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Nov 1 23:02:49 2022 +0100

    [hotfix] Improve logging of incomplete events when writing to event storage
---
 .../dataexplorer/commons/influx/InfluxStore.java   | 43 ++++++++++-------
 .../apache/streampipes/model/runtime/Event.java    | 55 ++++++++++++----------
 2 files changed, 57 insertions(+), 41 deletions(-)

diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
index 3d33e1d3e..c8f1c7b7b 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
@@ -36,6 +36,7 @@ import org.influxdb.dto.QueryResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -129,6 +130,7 @@ public class InfluxStore {
    * @throws SpRuntimeException If the column name (key-value of the event map) is not allowed
    */
   public void onEvent(Event event) throws SpRuntimeException {
+    var missingFields = new ArrayList<String>();
     if (event == null) {
       throw new SpRuntimeException("event is null");
     }
@@ -146,31 +148,40 @@ public class InfluxStore {
           String sanitizedRuntimeName = sanitizedRuntimeNames.get(runtimeName);
 
           try {
-            PrimitiveField eventPropertyPrimitiveField = event.getFieldByRuntimeName(runtimeName).getAsPrimitive();
-            if (eventPropertyPrimitiveField.getRawValue() == null) {
-              LOG.warn("Field value for {} is null, ignoring value.", sanitizedRuntimeName);
-            } else {
-
-              // store property as tag when the field is a dimension property
-              if (PropertyScope.DIMENSION_PROPERTY.name().equals(ep.getPropertyScope())) {
-                point.tag(sanitizedRuntimeName, eventPropertyPrimitiveField.getAsString());
+            var field = event.getOptionalFieldByRuntimeName(runtimeName);
+            if (field.isPresent()) {
+              PrimitiveField eventPropertyPrimitiveField = field.get().getAsPrimitive();
+              if (eventPropertyPrimitiveField.getRawValue() == null) {
+                LOG.warn("Field value for {} is null, ignoring value.", sanitizedRuntimeName);
               } else {
-                handleMeasurementProperty(
-                    point,
-                    (EventPropertyPrimitive) ep,
-                    sanitizedRuntimeName,
-                    eventPropertyPrimitiveField);
+
+                // store property as tag when the field is a dimension property
+                if (PropertyScope.DIMENSION_PROPERTY.name().equals(ep.getPropertyScope())) {
+                  point.tag(sanitizedRuntimeName, eventPropertyPrimitiveField.getAsString());
+                } else {
+                  handleMeasurementProperty(
+                      point,
+                      (EventPropertyPrimitive) ep,
+                      sanitizedRuntimeName,
+                      eventPropertyPrimitiveField);
+                }
               }
+            } else {
+              missingFields.add(runtimeName);
             }
           } catch (SpRuntimeException iae) {
-            LOG.warn("Field {} was missing in event and will be ignored", runtimeName, iae);
+            LOG.warn("Runtime exception while extracting field value of field {} - this field will be ignored", runtimeName, iae);
           }
-
         }
-
       }
     }
 
+    if (missingFields.size() > 0) {
+      LOG.warn("Ignored {} fields which were present in the schema, but not in the provided event: {}",
+          missingFields.size(),
+          String.join(", ", missingFields));
+    }
+
     influxDb.write(point.build());
   }
 
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java
index 84fd0101b..1e0557e91 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java
@@ -26,6 +26,7 @@ import org.apache.streampipes.model.schema.EventSchema;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class Event {
 
@@ -34,7 +35,7 @@ public class Event {
   private Map<String, AbstractField> fieldMap;
 
   public Event(Map<String, AbstractField> fieldMap, SourceInfo
-          sourceInfo, SchemaInfo schemaInfo) {
+      sourceInfo, SchemaInfo schemaInfo) {
     this.fieldMap = fieldMap;
     this.sourceInfo = sourceInfo;
     this.schemaInfo = schemaInfo;
@@ -64,15 +65,19 @@ public class Event {
     return EventFactory.fromEvents(this, otherEvent, outputSchema);
   }
 
+  public Optional<AbstractField> getOptionalFieldByRuntimeName(String runtimeName) {
+    return fieldMap
+        .entrySet()
+        .stream()
+        .map(Map.Entry::getValue)
+        .filter(entry -> entry.getFieldNameIn().equals(runtimeName))
+        .findFirst();
+  }
+
   public AbstractField getFieldByRuntimeName(String runtimeName) {
     // TODO this currently only works for first-level properties
-    return fieldMap
-            .entrySet()
-            .stream()
-            .map(Map.Entry::getValue)
-            .filter(entry -> entry.getFieldNameIn().equals(runtimeName))
-            .findFirst()
-            .orElseThrow(() -> new SpRuntimeException("Field " + runtimeName + " not found"));
+    return getOptionalFieldByRuntimeName(runtimeName)
+        .orElseThrow(() -> new SpRuntimeException("Field " + runtimeName + " not found"));
   }
 
   public void removeFieldBySelector(String fieldSelector) {
@@ -84,7 +89,7 @@ public class Event {
   }
 
   private AbstractField getFieldBySelector(String fieldSelector, Map<String, AbstractField>
-          currentFieldMap) {
+      currentFieldMap) {
     if (currentFieldMap.containsKey(fieldSelector)) {
       return currentFieldMap.get(fieldSelector);
     } else {
@@ -93,9 +98,9 @@ public class Event {
   }
 
   private Map<String, AbstractField> getNestedItem(String fieldSelector, Map<String,
-          AbstractField> currentFieldMap) {
+      AbstractField> currentFieldMap) {
     String key = currentFieldMap.keySet().stream().filter(fieldSelector::startsWith)
-            .findFirst().orElseThrow(() -> new IllegalArgumentException("Key not found"));
+        .findFirst().orElseThrow(() -> new IllegalArgumentException("Key not found"));
     return currentFieldMap.get(key).getAsComposite().getRawValue();
   }
 
@@ -104,20 +109,20 @@ public class Event {
       fieldMap.put(selector, field);
     } else {
       updateFieldMap(fieldMap.get(makeSelector(selector, 2))
-              .getAsComposite()
-              .getRawValue(), selector, 2, field);
+          .getAsComposite()
+          .getRawValue(), selector, 2, field);
     }
   }
 
   private void updateFieldMap(Map<String, AbstractField> currentFieldMap,
-                                                  String selector, Integer position,
-                                                  AbstractField field) {
+                              String selector, Integer position,
+                              AbstractField field) {
     if (currentFieldMap.containsKey(selector)) {
       currentFieldMap.put(selector, field);
     } else {
-        updateFieldMap(currentFieldMap.get(makeSelector(selector, position + 1))
-                .getAsComposite()
-                .getRawValue(), selector, 2, field);
+      updateFieldMap(currentFieldMap.get(makeSelector(selector, position + 1))
+          .getAsComposite()
+          .getRawValue(), selector, 2, field);
     }
   }
 
@@ -163,7 +168,7 @@ public class Event {
   }
 
   public void addField(String runtimeName, Integer value) {
-   addPrimitive(runtimeName, value);
+    addPrimitive(runtimeName, value);
   }
 
   public void addField(String runtimeName, Long value) {
@@ -172,8 +177,8 @@ public class Event {
 
   public void addField(String runtimeName, Object value) {
     if (AbstractField.class.isInstance(value)) {
-     ((AbstractField<?>) value).rename(runtimeName);
-     addField((AbstractField) value);
+      ((AbstractField<?>) value).rename(runtimeName);
+      addField((AbstractField) value);
     } else {
       addPrimitive(runtimeName, value);
     }
@@ -201,14 +206,14 @@ public class Event {
 
   public void addFieldAtPosition(String baseSelector, AbstractField field) {
     getFieldBySelector(baseSelector).getAsComposite().addField
-            (makeSelector(baseSelector, field.getFieldNameIn()), field);
+        (makeSelector(baseSelector, field.getFieldNameIn()), field);
   }
 
   private String makeKey(AbstractField field) {
     return sourceInfo != null && sourceInfo.getSelectorPrefix() != null ? sourceInfo
-            .getSelectorPrefix()
-            + PropertySelectorConstants.PROPERTY_DELIMITER
-            + field.getFieldNameIn() : field.getFieldNameIn();
+        .getSelectorPrefix()
+        + PropertySelectorConstants.PROPERTY_DELIMITER
+        + field.getFieldNameIn() : field.getFieldNameIn();
   }
 
   public Event getSubset(List<String> fieldSelectors) {


[incubator-streampipes] 03/03: Merge branch 'dev' of github.com:apache/incubator-streampipes into dev

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit a1ceb2b1f511edb32268dbc22828010277f22879
Merge: 59a071017 3e0f7bc0b
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Nov 1 23:02:59 2022 +0100

    Merge branch 'dev' of github.com:apache/incubator-streampipes into dev

 .../streampipes/connect/adapter/Adapter.java       |  6 ++
 .../connect/adapter/AdapterPipelineGenerator.java  | 10 +++
 .../preprocessing/elements/DebugAdapterSink.java   | 46 ++++++++++
 .../protocol/stream/pulsar/PulsarConsumer.java     |  7 +-
 .../protocol/stream/pulsar/PulsarProtocol.java     |  6 +-
 .../integration/adapters/AdapterTesterBase.java    | 30 ++++++-
 .../integration/adapters/AdaptersTest.java         |  4 +-
 .../integration/adapters/PulsarAdapterTester.java  | 98 +++++++++++++++-------
 .../integration/containers/PulsarContainer.java    |  8 +-
 .../PulsarDevContainer.java}                       | 23 +++--
 .../connect/grounding/ProtocolDescription.java     |  1 +
 .../connect/rules/DebugSinkRuleDescription.java    | 18 ++--
 .../rules/TransformationRuleDescription.java       |  1 +
 .../staticproperty/FreeTextStaticProperty.java     |  6 ++
 ui/package.json                                    |  2 +-
 15 files changed, 201 insertions(+), 65 deletions(-)