You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/11/01 22:03:12 UTC
[incubator-streampipes] 02/03: [hotfix] Improve logging of incomplete events when writing to event storage
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 59a07101798ab777f5cb9204e03460a2e79214ee
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Nov 1 23:02:49 2022 +0100
[hotfix] Improve logging of incomplete events when writing to event storage
---
.../dataexplorer/commons/influx/InfluxStore.java | 43 ++++++++++-------
.../apache/streampipes/model/runtime/Event.java | 55 ++++++++++++----------
2 files changed, 57 insertions(+), 41 deletions(-)
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
index 3d33e1d3e..c8f1c7b7b 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
@@ -36,6 +36,7 @@ import org.influxdb.dto.QueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -129,6 +130,7 @@ public class InfluxStore {
* @throws SpRuntimeException If the column name (key-value of the event map) is not allowed
*/
public void onEvent(Event event) throws SpRuntimeException {
+ var missingFields = new ArrayList<String>();
if (event == null) {
throw new SpRuntimeException("event is null");
}
@@ -146,31 +148,40 @@ public class InfluxStore {
String sanitizedRuntimeName = sanitizedRuntimeNames.get(runtimeName);
try {
- PrimitiveField eventPropertyPrimitiveField = event.getFieldByRuntimeName(runtimeName).getAsPrimitive();
- if (eventPropertyPrimitiveField.getRawValue() == null) {
- LOG.warn("Field value for {} is null, ignoring value.", sanitizedRuntimeName);
- } else {
-
- // store property as tag when the field is a dimension property
- if (PropertyScope.DIMENSION_PROPERTY.name().equals(ep.getPropertyScope())) {
- point.tag(sanitizedRuntimeName, eventPropertyPrimitiveField.getAsString());
+ var field = event.getOptionalFieldByRuntimeName(runtimeName);
+ if (field.isPresent()) {
+ PrimitiveField eventPropertyPrimitiveField = field.get().getAsPrimitive();
+ if (eventPropertyPrimitiveField.getRawValue() == null) {
+ LOG.warn("Field value for {} is null, ignoring value.", sanitizedRuntimeName);
} else {
- handleMeasurementProperty(
- point,
- (EventPropertyPrimitive) ep,
- sanitizedRuntimeName,
- eventPropertyPrimitiveField);
+
+ // store property as tag when the field is a dimension property
+ if (PropertyScope.DIMENSION_PROPERTY.name().equals(ep.getPropertyScope())) {
+ point.tag(sanitizedRuntimeName, eventPropertyPrimitiveField.getAsString());
+ } else {
+ handleMeasurementProperty(
+ point,
+ (EventPropertyPrimitive) ep,
+ sanitizedRuntimeName,
+ eventPropertyPrimitiveField);
+ }
}
+ } else {
+ missingFields.add(runtimeName);
}
} catch (SpRuntimeException iae) {
- LOG.warn("Field {} was missing in event and will be ignored", runtimeName, iae);
+ LOG.warn("Runtime exception while extracting field value of field {} - this field will be ignored", runtimeName, iae);
}
-
}
-
}
}
+ if (missingFields.size() > 0) {
+ LOG.warn("Ignored {} fields which were present in the schema, but not in the provided event: {}",
+ missingFields.size(),
+ String.join(", ", missingFields));
+ }
+
influxDb.write(point.build());
}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java
index 84fd0101b..1e0557e91 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/runtime/Event.java
@@ -26,6 +26,7 @@ import org.apache.streampipes.model.schema.EventSchema;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
public class Event {
@@ -34,7 +35,7 @@ public class Event {
private Map<String, AbstractField> fieldMap;
public Event(Map<String, AbstractField> fieldMap, SourceInfo
- sourceInfo, SchemaInfo schemaInfo) {
+ sourceInfo, SchemaInfo schemaInfo) {
this.fieldMap = fieldMap;
this.sourceInfo = sourceInfo;
this.schemaInfo = schemaInfo;
@@ -64,15 +65,19 @@ public class Event {
return EventFactory.fromEvents(this, otherEvent, outputSchema);
}
+ public Optional<AbstractField> getOptionalFieldByRuntimeName(String runtimeName) {
+ return fieldMap
+ .entrySet()
+ .stream()
+ .map(Map.Entry::getValue)
+ .filter(entry -> entry.getFieldNameIn().equals(runtimeName))
+ .findFirst();
+ }
+
public AbstractField getFieldByRuntimeName(String runtimeName) {
// TODO this currently only works for first-level properties
- return fieldMap
- .entrySet()
- .stream()
- .map(Map.Entry::getValue)
- .filter(entry -> entry.getFieldNameIn().equals(runtimeName))
- .findFirst()
- .orElseThrow(() -> new SpRuntimeException("Field " + runtimeName + " not found"));
+ return getOptionalFieldByRuntimeName(runtimeName)
+ .orElseThrow(() -> new SpRuntimeException("Field " + runtimeName + " not found"));
}
public void removeFieldBySelector(String fieldSelector) {
@@ -84,7 +89,7 @@ public class Event {
}
private AbstractField getFieldBySelector(String fieldSelector, Map<String, AbstractField>
- currentFieldMap) {
+ currentFieldMap) {
if (currentFieldMap.containsKey(fieldSelector)) {
return currentFieldMap.get(fieldSelector);
} else {
@@ -93,9 +98,9 @@ public class Event {
}
private Map<String, AbstractField> getNestedItem(String fieldSelector, Map<String,
- AbstractField> currentFieldMap) {
+ AbstractField> currentFieldMap) {
String key = currentFieldMap.keySet().stream().filter(fieldSelector::startsWith)
- .findFirst().orElseThrow(() -> new IllegalArgumentException("Key not found"));
+ .findFirst().orElseThrow(() -> new IllegalArgumentException("Key not found"));
return currentFieldMap.get(key).getAsComposite().getRawValue();
}
@@ -104,20 +109,20 @@ public class Event {
fieldMap.put(selector, field);
} else {
updateFieldMap(fieldMap.get(makeSelector(selector, 2))
- .getAsComposite()
- .getRawValue(), selector, 2, field);
+ .getAsComposite()
+ .getRawValue(), selector, 2, field);
}
}
private void updateFieldMap(Map<String, AbstractField> currentFieldMap,
- String selector, Integer position,
- AbstractField field) {
+ String selector, Integer position,
+ AbstractField field) {
if (currentFieldMap.containsKey(selector)) {
currentFieldMap.put(selector, field);
} else {
- updateFieldMap(currentFieldMap.get(makeSelector(selector, position + 1))
- .getAsComposite()
- .getRawValue(), selector, 2, field);
+ updateFieldMap(currentFieldMap.get(makeSelector(selector, position + 1))
+ .getAsComposite()
+ .getRawValue(), selector, 2, field);
}
}
@@ -163,7 +168,7 @@ public class Event {
}
public void addField(String runtimeName, Integer value) {
- addPrimitive(runtimeName, value);
+ addPrimitive(runtimeName, value);
}
public void addField(String runtimeName, Long value) {
@@ -172,8 +177,8 @@ public class Event {
public void addField(String runtimeName, Object value) {
if (AbstractField.class.isInstance(value)) {
- ((AbstractField<?>) value).rename(runtimeName);
- addField((AbstractField) value);
+ ((AbstractField<?>) value).rename(runtimeName);
+ addField((AbstractField) value);
} else {
addPrimitive(runtimeName, value);
}
@@ -201,14 +206,14 @@ public class Event {
public void addFieldAtPosition(String baseSelector, AbstractField field) {
getFieldBySelector(baseSelector).getAsComposite().addField
- (makeSelector(baseSelector, field.getFieldNameIn()), field);
+ (makeSelector(baseSelector, field.getFieldNameIn()), field);
}
private String makeKey(AbstractField field) {
return sourceInfo != null && sourceInfo.getSelectorPrefix() != null ? sourceInfo
- .getSelectorPrefix()
- + PropertySelectorConstants.PROPERTY_DELIMITER
- + field.getFieldNameIn() : field.getFieldNameIn();
+ .getSelectorPrefix()
+ + PropertySelectorConstants.PROPERTY_DELIMITER
+ + field.getFieldNameIn() : field.getFieldNameIn();
}
public Event getSubset(List<String> fieldSelectors) {