You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2023/02/11 21:24:56 UTC

[streampipes] branch 1040-change-event-runtime-name-of-geometry-field updated: [#1040] misc changes in runtime names, ad missing fields in schema

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch 1040-change-event-runtime-name-of-geometry-field
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/1040-change-event-runtime-name-of-geometry-field by this push:
     new 8d0aea2a4 [#1040] misc changes in runtime names, ad missing fields in schema
8d0aea2a4 is described below

commit 8d0aea2a4bb9c9736512e527a199796ae25071ec
Author: micklich <mi...@apache.org>
AuthorDate: Sat Feb 11 22:19:21 2023 +0100

    [#1040] misc changes in runtime names, ad missing fields in schema
---
 .../latlngtojtspoint/LatLngToJtsPointProcessor.java | 10 +++++++---
 .../reprojection/ReprojectionProcessor.java         |  4 ++--
 .../trajectory/TrajectoryFromPointsProcessor.java   | 16 +++++++++++-----
 .../HaversineDistanceCalculatorProcessor.java       | 21 ++++++++++++---------
 .../speedcalculator/SpeedCalculatorProcessor.java   |  6 +++---
 5 files changed, 35 insertions(+), 22 deletions(-)

diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
index aae802664..7c96602f1 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
@@ -47,7 +47,7 @@ public class LatLngToJtsPointProcessor extends StreamPipesDataProcessor {
   private static final String LAT_KEY = "latitude-key";
   private static final String LNG_KEY = "longitude-key";
   private static final String EPSG_KEY = "epsg-key";
-  private static final String WKT_RUNTIME = "geomWKT";
+  private static final String GEOMETRY_RUNTIME = "geometry";
   private String latitudeMapper;
   private String longitudeMapper;
   private String epsgMapper;
@@ -76,7 +76,7 @@ public class LatLngToJtsPointProcessor extends StreamPipesDataProcessor {
         .outputStrategy(
             OutputStrategies.append(
                 PrimitivePropertyBuilder
-                    .create(Datatypes.String, WKT_RUNTIME)
+                    .create(Datatypes.String, GEOMETRY_RUNTIME)
                     .domainProperty("http://www.opengis.net/ont/geosparql#Geometry")
                     .build()
             )
@@ -101,7 +101,11 @@ public class LatLngToJtsPointProcessor extends StreamPipesDataProcessor {
     Point geom = SpGeometryBuilder.createSPGeom(lng, lat, epsg);
 
     if (!geom.isEmpty()) {
-      event.addField(WKT_RUNTIME, geom.toString());
+      // if activated the stream fails at all
+      //event.removeFieldBySelector(latitudeMapper);
+      //event.removeFieldBySelector(longitudeMapper);
+      event.addField(GEOMETRY_RUNTIME, geom.toString());
+
       LOG.debug("Created Geometry: " + geom.toString());
       collector.collect(event);
     } else {
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
index 0ea67deba..82b11f589 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
@@ -49,7 +49,7 @@ public class ReprojectionProcessor extends StreamPipesDataProcessor {
   public static final String GEOM_KEY = "geom-key";
   public static final String SOURCE_EPSG_KEY = "source-epsg-key";
   public static final String TARGET_EPSG_KEY = "target-epsg-key";
-  public static final String GEOM_RUNTIME = "geomWKT";
+  public static final String GEOMETRY_RUNTIME = "geometry";
   public static final String EPSG_RUNTIME = "epsg";
   private String geometryMapper;
   private String sourceEpsgMapper;
@@ -134,7 +134,7 @@ public class ReprojectionProcessor extends StreamPipesDataProcessor {
 
     if (!reprojected.isEmpty()) {
       event.updateFieldBySelector("s0::" + EPSG_RUNTIME, targetEpsg);
-      event.updateFieldBySelector("s0::" + GEOM_RUNTIME, reprojected.toText());
+      event.updateFieldBySelector("s0::" + GEOMETRY_RUNTIME, reprojected.toText());
 
       collector.collect(event);
     } else {
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
index 9b1e1e710..ad8554e47 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
@@ -51,8 +51,9 @@ public class TrajectoryFromPointsProcessor extends StreamPipesDataProcessor {
   private static final String SUBPOINTS_KEY = "subpoints-key";
   private static final String DESCRIPTION_KEY = "description-key";
   private static final String TRAJECTORY_KEY = "trajectory-key";
-  private static final String TRAJECTORY_RUNTIME = "trajectoryWKT";
-  private static final String DESCRIPTION_RUNTIME = "trajectoryDescription";
+  private static final String TRAJECTORY_GEOMETRY_RUNTIME = "trajectory-geometry";
+  private static final String TRAJECTORY_EPSG_RUNTIME = "trajectory-epsg";
+  private static final String DESCRIPTION_RUNTIME = "trajectory-description";
   private String pointMapper;
   private String epsgMapper;
   private String mValueMapper;
@@ -96,8 +97,12 @@ public class TrajectoryFromPointsProcessor extends StreamPipesDataProcessor {
                     SO.TEXT),
                 EpProperties.stringEp(
                     Labels.withId(TRAJECTORY_KEY),
-                    TRAJECTORY_RUNTIME,
-                    "http://www.opengis.net/ont/geosparql#Geometry")
+                    TRAJECTORY_GEOMETRY_RUNTIME,
+                    "http://www.opengis.net/ont/geosparql#Geometry"),
+                EpProperties.integerEp(
+                    Labels.withId(EPSG_KEY),
+                    TRAJECTORY_EPSG_RUNTIME,
+                "http://data.ign.fr/def/ignf#CartesianCS")
             )
         )
         .build();
@@ -131,7 +136,8 @@ public class TrajectoryFromPointsProcessor extends StreamPipesDataProcessor {
     LineString geom = trajectory.returnAsLineString(eventGeom.getFactory());
     // adds to stream
     event.addField(DESCRIPTION_RUNTIME, trajectory.getDescription());
-    event.addField(TRAJECTORY_RUNTIME, geom.toString());
+    event.addField(TRAJECTORY_GEOMETRY_RUNTIME, geom.toString());
+    event.addField(TRAJECTORY_EPSG_RUNTIME, epsg);
     collector.collect(event);
   }
 
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java
index e806f7938..23ea01758 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java
@@ -24,14 +24,15 @@ import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.processors.geo.jvm.latlong.helper.HaversineDistanceUtil;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.sdk.utils.Datatypes;
 import org.apache.streampipes.vocabulary.Geo;
 import org.apache.streampipes.vocabulary.SO;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
@@ -39,12 +40,14 @@ import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.standalone.ProcessorParams;
 import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
+import java.net.URI;
+
 public class HaversineDistanceCalculatorProcessor extends StreamPipesDataProcessor {
   private static final String LAT_1_KEY = "lat1";
   private static final String LONG_1_KEY = "long1";
   private static final String LAT_2_KEY = "lat2";
   private static final String LONG_2_KEY = "long2";
-  private static final String CALCULATED_DISTANCE_KEY = "calculatedDistance";
+  private static final String DISTANCE_RUNTIME_NAME = "distance";
   String lat1FieldMapper;
   String long1FieldMapper;
   String lat2FieldMapper;
@@ -69,12 +72,12 @@ public class HaversineDistanceCalculatorProcessor extends StreamPipesDataProcess
                 Labels.withId(LONG_2_KEY), PropertyScope.MEASUREMENT_PROPERTY)
             .build()
         )
-        .outputStrategy(OutputStrategies
-            .append(EpProperties.numberEp(
-                Labels.withId(CALCULATED_DISTANCE_KEY),
-                "distance",
-                SO.NUMBER))
-        )
+        .outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
+                .create(Datatypes.Float, DISTANCE_RUNTIME_NAME)
+                .domainProperty(SO.NUMBER)
+                .measurementUnit(URI.create("http://qudt.org/vocab/unit#Kilometer"))
+                .build())
+            )
         .build();
   }
 
@@ -99,7 +102,7 @@ public class HaversineDistanceCalculatorProcessor extends StreamPipesDataProcess
 
     double resultDist = HaversineDistanceUtil.dist(lat1, long1, lat2, long2);
 
-    event.addField("distance", resultDist);
+    event.addField(DISTANCE_RUNTIME_NAME, resultDist);
 
     collector.collect(event);
   }
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java
index 9018ac78c..e102dcceb 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java
@@ -49,7 +49,7 @@ public class SpeedCalculatorProcessor extends StreamPipesDataProcessor {
   private static final String LATITUDE_KEY = "latitude-key";
   private static final String LONGITUDE_KEY = "longitude-key";
   private static final String COUNT_WINDOW_KEY = "count-window-key";
-  private static final String SPEED_KEY = "speed-key";
+  private static final String SPEED_RUNTIME_NAME = "speed";
   private String latitudeFieldMapper;
   private String longitudeFieldMapper;
   private String timestampFieldMapper;
@@ -75,7 +75,7 @@ public class SpeedCalculatorProcessor extends StreamPipesDataProcessor {
         .requiredIntegerParameter(Labels.withId(COUNT_WINDOW_KEY))
         .outputStrategy(
             OutputStrategies.append(PrimitivePropertyBuilder
-                .create(Datatypes.Float, SPEED_KEY)
+                .create(Datatypes.Float, SPEED_RUNTIME_NAME)
                 .domainProperty(SO.NUMBER)
                 .measurementUnit(URI.create("http://qudt.org/vocab/unit#KilometerPerHour"))
                 .build())
@@ -98,7 +98,7 @@ public class SpeedCalculatorProcessor extends StreamPipesDataProcessor {
     if (this.buffer.isFull()) {
       Event firstEvent = (Event) buffer.get();
       double speed = calculateSpeed(firstEvent, event);
-      event.addField(SPEED_KEY, speed);
+      event.addField(SPEED_RUNTIME_NAME, speed);
       collector.collect(event);
     }
     this.buffer.add(event);