You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2022/11/23 22:53:13 UTC

[streampipes] 05/16: rewrite latlng to 1 class

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch STREAMPIPES-642
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit c42696c3d489fc68641dfc07f894d7b97d395cba
Author: micklich <mi...@apache.org>
AuthorDate: Wed Nov 23 19:58:02 2022 +0100

    rewrite latlng to 1 class
---
 .../LatLngToJtsPointProcessor.java                 | 118 +++++++++++++++++++++
 .../documentation.md                               |   0
 .../icon.png                                       | Bin
 .../strings.en                                     |   0
 4 files changed, 118 insertions(+)

diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
new file mode 100644
index 000000000..38bfdcd80
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampipes.processors.geo.jvm.jts.processor.latlngtojtspoint;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.vocabulary.Geo;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import org.locationtech.jts.geom.Point;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LatLngToJtsPointProcessor extends StreamPipesDataProcessor {
+    public static final String LAT_KEY = "latitude-key";
+    public static final String LNG_KEY = "longitude-key";
+    public static final String EPSG_KEY = "epsg-key";
+    public static final String WKT_RUNTIME = "geomWKT";
+    private String latitudeMapper;
+    private String longitudeMapper;
+    private String epsgMapper;
+    private static final Logger LOG = LoggerFactory.getLogger(LatLngToJtsPointProcessor.class);
+    public static final String EPA_NAME = "Latitude Longitude To JTS Point";
+
+    @Override
+    public DataProcessorDescription declareModel() {
+        return ProcessingElementBuilder
+                .create("org.apache.streampipes.processors.geo.jvm.jts.processor.latlngtojtspoint")
+                .category(DataProcessorType.GEO)
+                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+                .withLocales(Locales.EN)
+                .requiredStream(
+                        StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq(Geo.lat),
+                                        Labels.withId(LAT_KEY), PropertyScope.MEASUREMENT_PROPERTY)
+                                .requiredPropertyWithUnaryMapping(
+                                        EpRequirements.domainPropertyReq(Geo.lng),
+                                        Labels.withId(LNG_KEY), PropertyScope.MEASUREMENT_PROPERTY)
+                                .requiredPropertyWithUnaryMapping(
+                                        EpRequirements.domainPropertyReq("http://data.ign.fr/def/ignf#CartesianCS"),
+                                        Labels.withId(EPSG_KEY), PropertyScope.MEASUREMENT_PROPERTY)
+                                .build()
+                )
+                .outputStrategy(
+                        OutputStrategies.append(
+                                PrimitivePropertyBuilder
+                                        .create(Datatypes.String, WKT_RUNTIME)
+                                        .domainProperty("http://www.opengis.net/ont/geosparql#Geometry")
+                                        .build()
+                        )
+                )
+                .build();
+    }
+
+    @Override
+    public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector,
+                             EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+        this.latitudeMapper = parameters.extractor().mappingPropertyValue(LAT_KEY);
+        this.longitudeMapper = parameters.extractor().mappingPropertyValue(LNG_KEY);
+        this.epsgMapper = parameters.extractor().mappingPropertyValue(EPSG_KEY);
+    }
+
+    @Override
+    public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+        Double lat = event.getFieldBySelector(latitudeMapper).getAsPrimitive().getAsDouble();
+        Double lng = event.getFieldBySelector(longitudeMapper).getAsPrimitive().getAsDouble();
+        Integer epsg = event.getFieldBySelector(epsgMapper).getAsPrimitive().getAsInt();
+
+        Point geom = SpGeometryBuilder.createSPGeom(lng, lat, epsg);
+
+        if (!geom.isEmpty()) {
+            event.addField(WKT_RUNTIME, geom.toString());
+            LOG.debug("Created Geometry: " + geom.toString());
+            collector.collect(event);
+        } else {
+            LOG.warn("An empty point geometry in " + EPA_NAME + " is created due"
+                    + "invalid input field. Latitude: " + lat + "Longitude: " + lng);
+            LOG.error("An event is filtered out because of invalid geometry");
+        }
+    }
+
+    @Override
+    public void onDetach() throws SpRuntimeException {
+
+    }
+}
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.processor.latLngToGeo/documentation.md b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.processor.latlngtojtspoint/documentation.md
similarity index 100%
rename from streampipes-extensions/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.processor.latLngToGeo/documentation.md
rename to streampipes-extensions/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.processor.latlngtojtspoint/documentation.md
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.processor.latLngToGeo/icon.png b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.processor.latlngtojtspoint/icon.png
similarity index 100%
rename from streampipes-extensions/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.processor.latLngToGeo/icon.png
rename to streampipes-extensions/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.processor.latlngtojtspoint/icon.png
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.processor.latLngToGeo/strings.en b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.processor.latlngtojtspoint/strings.en
similarity index 100%
rename from streampipes-extensions/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.processor.latLngToGeo/strings.en
rename to streampipes-extensions/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.jts.processor.latlngtojtspoint/strings.en