You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/11/01 22:03:12 UTC

[incubator-streampipes] 02/03: [hotfix] Improve logging of incomplete events when writing to event storage

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 59a07101798ab777f5cb9204e03460a2e79214ee
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Nov 1 23:02:49 2022 +0100

    [hotfix] Improve logging of incomplete events when writing to event storage
---
 .../dataexplorer/commons/influx/InfluxStore.java   | 43 ++++++++++-------
 .../apache/streampipes/model/runtime/Event.java    | 55 ++++++++++++----------
 2 files changed, 57 insertions(+), 41 deletions(-)

diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
index 3d33e1d3e..c8f1c7b7b 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
@@ -36,6 +36,7 @@ import org.influxdb.dto.QueryResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -129,6 +130,7 @@ public class InfluxStore {
    * @throws SpRuntimeException If the column name (key-value of the event map) is not allowed
    */
   public void onEvent(Event event) throws SpRuntimeException {
+    var missingFields = new ArrayList<String>();
     if (event == null) {
       throw new SpRuntimeException("event is null");
     }
@@ -146,31 +148,40 @@ public class InfluxStore {
           String sanitizedRuntimeName = sanitizedRuntimeNames.get(runtimeName);
 
           try {
-            PrimitiveField eventPropertyPrimitiveField = event.getFieldByRuntimeName(runtimeName).getAsPrimitive();
-            if (eventPropertyPrimitiveField.getRawValue() == null) {
-              LOG.warn("Field value for {} is null, ignoring value.", sanitizedRuntimeName);
-            } else {
-
-              // store property as tag when the field is a dimension property
-              if (PropertyScope.DIMENSION_PROPERTY.name().equals(ep.getPropertyScope())) {
-                point.tag(sanitizedRuntimeName, eventPropertyPrimitiveField.getAsString());
+            var field = event.getOptionalFieldByRuntimeName(runtimeName);
+            if (field.isPresent()) {
+              PrimitiveField eventPropertyPrimitiveField = field.get().getAsPrimitive();
+              if (eventPropertyPrimitiveField.getRawValue() == null) {
+                LOG.warn("Field value for {} is null, ignoring value.", sanitizedRuntimeName);
               } else {
-                handleMeasurementProperty(
-                    point,
-                    (EventPropertyPrimitive) ep,
-                    sanitizedRuntimeName,
-                    eventPropertyPrimitiveField);
+
+                // store property as tag when the field is a dimension property
+                if (PropertyScope.DIMENSION_PROPERTY.name().equals(ep.getPropertyScope())) {
+                  point.tag(sanitizedRuntimeName, eventPropertyPrimitiveField.getAsString());
+                } else {
+                  handleMeasurementProperty(
+                      point,
+                      (EventPropertyPrimitive) ep,
+                      sanitizedRuntimeName,
+                      eventPropertyPrimitiveField);
+                }
               }
+            } else {
+              missingFields.add(runtimeName);
             }
           } catch (SpRuntimeException iae) {
-            LOG.warn("Field {} was missing in event and will be ignored", runtimeName, iae);
+            LOG.warn("Runtime exception while extracting field value of field {} - this field will be ignored", runtimeName, iae);
           }
-
         }
-
       }
     }
 
+    if (missingFields.size() > 0) {
+      LOG.warn("Ignored {} fields which were present in the schema, but not in the provided event: {}",
+          missingFields.size(),
+          String.join(", ", missingFields));
+    }
+
     influxDb.write(point.build());
   }
 
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java
index 84fd0101b..1e0557e91 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java
@@ -26,6 +26,7 @@ import org.apache.streampipes.model.schema.EventSchema;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class Event {
 
@@ -34,7 +35,7 @@ public class Event {
   private Map<String, AbstractField> fieldMap;
 
   public Event(Map<String, AbstractField> fieldMap, SourceInfo
-          sourceInfo, SchemaInfo schemaInfo) {
+      sourceInfo, SchemaInfo schemaInfo) {
     this.fieldMap = fieldMap;
     this.sourceInfo = sourceInfo;
     this.schemaInfo = schemaInfo;
@@ -64,15 +65,19 @@ public class Event {
     return EventFactory.fromEvents(this, otherEvent, outputSchema);
   }
 
+  public Optional<AbstractField> getOptionalFieldByRuntimeName(String runtimeName) {
+    return fieldMap
+        .entrySet()
+        .stream()
+        .map(Map.Entry::getValue)
+        .filter(entry -> entry.getFieldNameIn().equals(runtimeName))
+        .findFirst();
+  }
+
   public AbstractField getFieldByRuntimeName(String runtimeName) {
     // TODO this currently only works for first-level properties
-    return fieldMap
-            .entrySet()
-            .stream()
-            .map(Map.Entry::getValue)
-            .filter(entry -> entry.getFieldNameIn().equals(runtimeName))
-            .findFirst()
-            .orElseThrow(() -> new SpRuntimeException("Field " + runtimeName + " not found"));
+    return getOptionalFieldByRuntimeName(runtimeName)
+        .orElseThrow(() -> new SpRuntimeException("Field " + runtimeName + " not found"));
   }
 
   public void removeFieldBySelector(String fieldSelector) {
@@ -84,7 +89,7 @@ public class Event {
   }
 
   private AbstractField getFieldBySelector(String fieldSelector, Map<String, AbstractField>
-          currentFieldMap) {
+      currentFieldMap) {
     if (currentFieldMap.containsKey(fieldSelector)) {
       return currentFieldMap.get(fieldSelector);
     } else {
@@ -93,9 +98,9 @@ public class Event {
   }
 
   private Map<String, AbstractField> getNestedItem(String fieldSelector, Map<String,
-          AbstractField> currentFieldMap) {
+      AbstractField> currentFieldMap) {
     String key = currentFieldMap.keySet().stream().filter(fieldSelector::startsWith)
-            .findFirst().orElseThrow(() -> new IllegalArgumentException("Key not found"));
+        .findFirst().orElseThrow(() -> new IllegalArgumentException("Key not found"));
     return currentFieldMap.get(key).getAsComposite().getRawValue();
   }
 
@@ -104,20 +109,20 @@ public class Event {
       fieldMap.put(selector, field);
     } else {
       updateFieldMap(fieldMap.get(makeSelector(selector, 2))
-              .getAsComposite()
-              .getRawValue(), selector, 2, field);
+          .getAsComposite()
+          .getRawValue(), selector, 2, field);
     }
   }
 
   private void updateFieldMap(Map<String, AbstractField> currentFieldMap,
-                                                  String selector, Integer position,
-                                                  AbstractField field) {
+                              String selector, Integer position,
+                              AbstractField field) {
     if (currentFieldMap.containsKey(selector)) {
       currentFieldMap.put(selector, field);
     } else {
-        updateFieldMap(currentFieldMap.get(makeSelector(selector, position + 1))
-                .getAsComposite()
-                .getRawValue(), selector, 2, field);
+      updateFieldMap(currentFieldMap.get(makeSelector(selector, position + 1))
+          .getAsComposite()
+          .getRawValue(), selector, 2, field);
     }
   }
 
@@ -163,7 +168,7 @@ public class Event {
   }
 
   public void addField(String runtimeName, Integer value) {
-   addPrimitive(runtimeName, value);
+    addPrimitive(runtimeName, value);
   }
 
   public void addField(String runtimeName, Long value) {
@@ -172,8 +177,8 @@ public class Event {
 
   public void addField(String runtimeName, Object value) {
     if (AbstractField.class.isInstance(value)) {
-     ((AbstractField<?>) value).rename(runtimeName);
-     addField((AbstractField) value);
+      ((AbstractField<?>) value).rename(runtimeName);
+      addField((AbstractField) value);
     } else {
       addPrimitive(runtimeName, value);
     }
@@ -201,14 +206,14 @@ public class Event {
 
   public void addFieldAtPosition(String baseSelector, AbstractField field) {
     getFieldBySelector(baseSelector).getAsComposite().addField
-            (makeSelector(baseSelector, field.getFieldNameIn()), field);
+        (makeSelector(baseSelector, field.getFieldNameIn()), field);
   }
 
   private String makeKey(AbstractField field) {
     return sourceInfo != null && sourceInfo.getSelectorPrefix() != null ? sourceInfo
-            .getSelectorPrefix()
-            + PropertySelectorConstants.PROPERTY_DELIMITER
-            + field.getFieldNameIn() : field.getFieldNameIn();
+        .getSelectorPrefix()
+        + PropertySelectorConstants.PROPERTY_DELIMITER
+        + field.getFieldNameIn() : field.getFieldNameIn();
   }
 
   public Event getSubset(List<String> fieldSelectors) {