You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2023/02/11 21:24:56 UTC
[streampipes] branch 1040-change-event-runtime-name-of-geometry-field updated: [#1040] misc changes in runtime names, ad missing fields in schema
This is an automated email from the ASF dual-hosted git repository.
micklich pushed a commit to branch 1040-change-event-runtime-name-of-geometry-field
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/1040-change-event-runtime-name-of-geometry-field by this push:
new 8d0aea2a4 [#1040] misc changes in runtime names, ad missing fields in schema
8d0aea2a4 is described below
commit 8d0aea2a4bb9c9736512e527a199796ae25071ec
Author: micklich <mi...@apache.org>
AuthorDate: Sat Feb 11 22:19:21 2023 +0100
[#1040] misc changes in runtime names, ad missing fields in schema
---
.../latlngtojtspoint/LatLngToJtsPointProcessor.java | 10 +++++++---
.../reprojection/ReprojectionProcessor.java | 4 ++--
.../trajectory/TrajectoryFromPointsProcessor.java | 16 +++++++++++-----
.../HaversineDistanceCalculatorProcessor.java | 21 ++++++++++++---------
.../speedcalculator/SpeedCalculatorProcessor.java | 6 +++---
5 files changed, 35 insertions(+), 22 deletions(-)
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
index aae802664..7c96602f1 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
@@ -47,7 +47,7 @@ public class LatLngToJtsPointProcessor extends StreamPipesDataProcessor {
private static final String LAT_KEY = "latitude-key";
private static final String LNG_KEY = "longitude-key";
private static final String EPSG_KEY = "epsg-key";
- private static final String WKT_RUNTIME = "geomWKT";
+ private static final String GEOMETRY_RUNTIME = "geometry";
private String latitudeMapper;
private String longitudeMapper;
private String epsgMapper;
@@ -76,7 +76,7 @@ public class LatLngToJtsPointProcessor extends StreamPipesDataProcessor {
.outputStrategy(
OutputStrategies.append(
PrimitivePropertyBuilder
- .create(Datatypes.String, WKT_RUNTIME)
+ .create(Datatypes.String, GEOMETRY_RUNTIME)
.domainProperty("http://www.opengis.net/ont/geosparql#Geometry")
.build()
)
@@ -101,7 +101,11 @@ public class LatLngToJtsPointProcessor extends StreamPipesDataProcessor {
Point geom = SpGeometryBuilder.createSPGeom(lng, lat, epsg);
if (!geom.isEmpty()) {
- event.addField(WKT_RUNTIME, geom.toString());
+ // if activated the stream fails at all
+ //event.removeFieldBySelector(latitudeMapper);
+ //event.removeFieldBySelector(longitudeMapper);
+ event.addField(GEOMETRY_RUNTIME, geom.toString());
+
LOG.debug("Created Geometry: " + geom.toString());
collector.collect(event);
} else {
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
index 0ea67deba..82b11f589 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
@@ -49,7 +49,7 @@ public class ReprojectionProcessor extends StreamPipesDataProcessor {
public static final String GEOM_KEY = "geom-key";
public static final String SOURCE_EPSG_KEY = "source-epsg-key";
public static final String TARGET_EPSG_KEY = "target-epsg-key";
- public static final String GEOM_RUNTIME = "geomWKT";
+ public static final String GEOMETRY_RUNTIME = "geometry";
public static final String EPSG_RUNTIME = "epsg";
private String geometryMapper;
private String sourceEpsgMapper;
@@ -134,7 +134,7 @@ public class ReprojectionProcessor extends StreamPipesDataProcessor {
if (!reprojected.isEmpty()) {
event.updateFieldBySelector("s0::" + EPSG_RUNTIME, targetEpsg);
- event.updateFieldBySelector("s0::" + GEOM_RUNTIME, reprojected.toText());
+ event.updateFieldBySelector("s0::" + GEOMETRY_RUNTIME, reprojected.toText());
collector.collect(event);
} else {
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
index 9b1e1e710..ad8554e47 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
@@ -51,8 +51,9 @@ public class TrajectoryFromPointsProcessor extends StreamPipesDataProcessor {
private static final String SUBPOINTS_KEY = "subpoints-key";
private static final String DESCRIPTION_KEY = "description-key";
private static final String TRAJECTORY_KEY = "trajectory-key";
- private static final String TRAJECTORY_RUNTIME = "trajectoryWKT";
- private static final String DESCRIPTION_RUNTIME = "trajectoryDescription";
+ private static final String TRAJECTORY_GEOMETRY_RUNTIME = "trajectory-geometry";
+ private static final String TRAJECTORY_EPSG_RUNTIME = "trajectory-epsg";
+ private static final String DESCRIPTION_RUNTIME = "trajectory-description";
private String pointMapper;
private String epsgMapper;
private String mValueMapper;
@@ -96,8 +97,12 @@ public class TrajectoryFromPointsProcessor extends StreamPipesDataProcessor {
SO.TEXT),
EpProperties.stringEp(
Labels.withId(TRAJECTORY_KEY),
- TRAJECTORY_RUNTIME,
- "http://www.opengis.net/ont/geosparql#Geometry")
+ TRAJECTORY_GEOMETRY_RUNTIME,
+ "http://www.opengis.net/ont/geosparql#Geometry"),
+ EpProperties.integerEp(
+ Labels.withId(EPSG_KEY),
+ TRAJECTORY_EPSG_RUNTIME,
+ "http://data.ign.fr/def/ignf#CartesianCS")
)
)
.build();
@@ -131,7 +136,8 @@ public class TrajectoryFromPointsProcessor extends StreamPipesDataProcessor {
LineString geom = trajectory.returnAsLineString(eventGeom.getFactory());
// adds to stream
event.addField(DESCRIPTION_RUNTIME, trajectory.getDescription());
- event.addField(TRAJECTORY_RUNTIME, geom.toString());
+ event.addField(TRAJECTORY_GEOMETRY_RUNTIME, geom.toString());
+ event.addField(TRAJECTORY_EPSG_RUNTIME, epsg);
collector.collect(event);
}
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java
index e806f7938..23ea01758 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java
@@ -24,14 +24,15 @@ import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.processors.geo.jvm.latlong.helper.HaversineDistanceUtil;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.sdk.utils.Datatypes;
import org.apache.streampipes.vocabulary.Geo;
import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
@@ -39,12 +40,14 @@ import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.standalone.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+import java.net.URI;
+
public class HaversineDistanceCalculatorProcessor extends StreamPipesDataProcessor {
private static final String LAT_1_KEY = "lat1";
private static final String LONG_1_KEY = "long1";
private static final String LAT_2_KEY = "lat2";
private static final String LONG_2_KEY = "long2";
- private static final String CALCULATED_DISTANCE_KEY = "calculatedDistance";
+ private static final String DISTANCE_RUNTIME_NAME = "distance";
String lat1FieldMapper;
String long1FieldMapper;
String lat2FieldMapper;
@@ -69,12 +72,12 @@ public class HaversineDistanceCalculatorProcessor extends StreamPipesDataProcess
Labels.withId(LONG_2_KEY), PropertyScope.MEASUREMENT_PROPERTY)
.build()
)
- .outputStrategy(OutputStrategies
- .append(EpProperties.numberEp(
- Labels.withId(CALCULATED_DISTANCE_KEY),
- "distance",
- SO.NUMBER))
- )
+ .outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
+ .create(Datatypes.Float, DISTANCE_RUNTIME_NAME)
+ .domainProperty(SO.NUMBER)
+ .measurementUnit(URI.create("http://qudt.org/vocab/unit#Kilometer"))
+ .build())
+ )
.build();
}
@@ -99,7 +102,7 @@ public class HaversineDistanceCalculatorProcessor extends StreamPipesDataProcess
double resultDist = HaversineDistanceUtil.dist(lat1, long1, lat2, long2);
- event.addField("distance", resultDist);
+ event.addField(DISTANCE_RUNTIME_NAME, resultDist);
collector.collect(event);
}
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java
index 9018ac78c..e102dcceb 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java
@@ -49,7 +49,7 @@ public class SpeedCalculatorProcessor extends StreamPipesDataProcessor {
private static final String LATITUDE_KEY = "latitude-key";
private static final String LONGITUDE_KEY = "longitude-key";
private static final String COUNT_WINDOW_KEY = "count-window-key";
- private static final String SPEED_KEY = "speed-key";
+ private static final String SPEED_RUNTIME_NAME = "speed";
private String latitudeFieldMapper;
private String longitudeFieldMapper;
private String timestampFieldMapper;
@@ -75,7 +75,7 @@ public class SpeedCalculatorProcessor extends StreamPipesDataProcessor {
.requiredIntegerParameter(Labels.withId(COUNT_WINDOW_KEY))
.outputStrategy(
OutputStrategies.append(PrimitivePropertyBuilder
- .create(Datatypes.Float, SPEED_KEY)
+ .create(Datatypes.Float, SPEED_RUNTIME_NAME)
.domainProperty(SO.NUMBER)
.measurementUnit(URI.create("http://qudt.org/vocab/unit#KilometerPerHour"))
.build())
@@ -98,7 +98,7 @@ public class SpeedCalculatorProcessor extends StreamPipesDataProcessor {
if (this.buffer.isFull()) {
Event firstEvent = (Event) buffer.get();
double speed = calculateSpeed(firstEvent, event);
- event.addField(SPEED_KEY, speed);
+ event.addField(SPEED_RUNTIME_NAME, speed);
collector.collect(event);
}
this.buffer.add(event);