You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2022/11/15 23:36:58 UTC

[incubator-streampipes] 10/11: implement basic processor

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch STREAMPIPES-584
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit da8789de830632c5a3e5caff3cca9ab113e80a50
Author: micklich <mi...@apache.org>
AuthorDate: Tue Nov 15 23:49:58 2022 +0100

    implement basic processor
---
 .../streampipes/processors/geo/jvm/GeoJvmInit.java |  4 +-
 .../processor/reprojection/ProjTransformation.java | 49 +++++++++++++++-----
 .../reprojection/ProjTransformationController.java | 53 +++++++++-------------
 .../reprojection/ProjTransformationParameter.java  | 25 +++++-----
 4 files changed, 75 insertions(+), 56 deletions(-)

diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
index 949887050..9ce8d03f4 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.processors.geo.jvm.config.ConfigKeys;
 import org.apache.streampipes.processors.geo.jvm.jts.processor.latLngToGeo.LatLngToGeoController;
+import org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection.ProjTransformationController;
 import org.apache.streampipes.processors.geo.jvm.jts.processor.setEPSG.SetEpsgController;
 import org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory.CreateTrajectoryFromPointsController;
 import org.apache.streampipes.processors.geo.jvm.processor.distancecalculator.DistanceCalculatorController;
@@ -56,7 +57,8 @@ public class GeoJvmInit extends StandaloneModelSubmitter {
                     new LatLngToGeoController(),
                     new CreateTrajectoryFromPointsController(),
                     new SpeedCalculatorController(),
-                    new StaticDistanceCalculatorController())
+                    new StaticDistanceCalculatorController(),
+                    new ProjTransformationController())
             .registerMessagingFormats(
                     new JsonDataFormatFactory(),
                     new CborDataFormatFactory(),
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformation.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformation.java
index f7705372d..58a9d5623 100755
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformation.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformation.java
@@ -16,36 +16,44 @@
  */
 package org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection;
 
-import org.apache.streampipes.processors.geo.jvm.jts.exceptions.SpNotSupportedGeometryException;
+import org.apache.streampipes.logging.api.Logger;
+import org.locationtech.jts.geom.Geometry;
+
 import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
 import org.apache.streampipes.processors.geo.jvm.jts.helper.SpReprojectionBuilder;
+import org.apache.streampipes.processors.geo.jvm.jts.exceptions.SpNotSupportedGeometryException;
 import org.apache.streampipes.processors.geo.jvm.jts.processor.latLngToGeo.LatLngToGeoParameter;
-import org.locationtech.jts.geom.Geometry;
-import org.apache.streampipes.logging.api.Logger;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.model.runtime.Event;
 
+import org.postgresql.ds.PGSimpleDataSource;
+import org.apache.sis.setup.Configuration;
+
+import javax.sql.DataSource;
 
 public class ProjTransformation implements EventProcessor<ProjTransformationParameter> {
 
-    private static Logger LOG;
+    private static Logger logger;
     private ProjTransformationParameter params;
     private Integer targetEPSG;
 
-
     @Override
-    public void onInvocation(ProjTransformationParameter params, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
-        LOG = params.getGraph().getLogger(LatLngToGeoParameter.class);
+    public void onInvocation(ProjTransformationParameter params, SpOutputCollector spOutputCollector,
+                             EventProcessorRuntimeContext runtimeContext) {
+        logger = params.getGraph().getLogger(LatLngToGeoParameter.class);
         this.params = params;
-        targetEPSG = params.getTarget_epsg();
+        targetEPSG = params.getTargetEpsg();
+
+        //TODO: this has to move to a central place in the streampipes backend
+        Configuration.current().setDatabase(ProjTransformation::createDataSource);
     }
 
     @Override
     public void onEvent(Event in, SpOutputCollector out) {
 
-        String wkt = in.getFieldBySelector(params.getWkt_string()).getAsPrimitive().getAsString();
+        String wkt = in.getFieldBySelector(params.getWktString()).getAsPrimitive().getAsString();
         Integer epsgCode = in.getFieldBySelector(params.getEpsgCode()).getAsPrimitive().getAsInt();
         Geometry geometry = SpGeometryBuilder.createSPGeom(wkt, epsgCode);
 
@@ -57,18 +65,35 @@ public class ProjTransformation implements EventProcessor<ProjTransformationPara
         }
 
         if (!transformed.isEmpty()) {
-            in.updateFieldBySelector("s0::" + ProjTransformationController.EPSG_RUNTIME, params.getTarget_epsg());
+            in.updateFieldBySelector("s0::" + ProjTransformationController.EPSG_RUNTIME, params.getTargetEpsg());
             in.updateFieldBySelector("s0::" + ProjTransformationController.WKT_RUNTIME, transformed.toText());
 
             out.collect(in);
         } else {
-            LOG.warn("An empty point geometry is created in " + ProjTransformationController.EPA_NAME + " " +
-                    "due invalid input values. Check used epsg Code:" + epsgCode);
+            logger.warn("An empty point geometry is created in " + ProjTransformationController.EPA_NAME + " "
+                    + "due invalid input values. Check used epsg Code:" + epsgCode);
         }
     }
 
     @Override
     public void onDetach() {
+    }
+
+    // https://sis.apache.org/apidocs/org/apache/sis/setup/Configuration.html#setDatabase(java.util.function.Supplier)
+    // TODO: Best would be ConfigKeys VARIABLE. ATM hardcoded and adjustments for development required like IP of client
+    // TODO: has to move together with the Configuration.current() method
+    protected static DataSource createDataSource() {
+        PGSimpleDataSource ds = new PGSimpleDataSource();
+        // HAS TO BE ADJUSTED OR INCLUDED IN THE AUTO_DISCOVERY
+        String[] serverAddresses = {"192.168.1.100"};
+        ds.setServerNames(serverAddresses);
+        int[] serverPortNumbers = {54320};
+        ds.setPortNumbers(serverPortNumbers);
+        ds.setDatabaseName("EPSG");
+        ds.setUser("streampipes");
+        ds.setPassword("streampipes");
+        ds.setReadOnly(true);
 
+        return ds;
     }
 }
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationController.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationController.java
index 54f5e89e6..b40ba5000 100755
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationController.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationController.java
@@ -23,7 +23,13 @@ import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.*;
+//import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.SupportedFormats;
+import org.apache.streampipes.sdk.helpers.SupportedProtocols;
 import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
 import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
 import org.apache.streampipes.sdk.utils.Assets;
@@ -42,42 +48,27 @@ public class ProjTransformationController extends StandaloneEventProcessingDecla
 
     @Override
     public DataProcessorDescription declareModel() {
-        return ProcessingElementBuilder
-                .create("org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection")
-                .category(DataProcessorType.GEO)
-                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-                .withLocales(Locales.EN)
-                .requiredStream(StreamRequirementsBuilder
-                        .create()
-                        .requiredPropertyWithUnaryMapping(
-                                EpRequirements.domainPropertyReq("http://www.opengis.net/ont/geosparql#Geometry"),
-                                Labels.withId(WKT_KEY), PropertyScope.MEASUREMENT_PROPERTY
-                        )
-                        .requiredPropertyWithUnaryMapping(
-                                EpRequirements.domainPropertyReq("http://data.ign.fr/def/ignf#CartesianCS"),
-                                Labels.withId(SOURCE_EPSG_KEY), PropertyScope.MEASUREMENT_PROPERTY
-                        )
-                        .build()
-                )
-                .requiredIntegerParameter(
-                        Labels.withId(TARGET_EPSG_KEY),
-                        32632
-                )
-                .outputStrategy(OutputStrategies.keep())
-                .supportedFormats(SupportedFormats.jsonFormat())
-                .supportedProtocols(SupportedProtocols.kafka())
-                .build();
+        return ProcessingElementBuilder.create("org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection")
+                .category(DataProcessorType.GEO).withAssets(Assets.DOCUMENTATION, Assets.ICON).withLocales(Locales.EN)
+                .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
+                        EpRequirements.domainPropertyReq("http://www.opengis.net/ont/geosparql#Geometry"),
+                        Labels.withId(WKT_KEY), PropertyScope.MEASUREMENT_PROPERTY).requiredPropertyWithUnaryMapping(
+                        EpRequirements.domainPropertyReq("http://data.ign.fr/def/ignf#CartesianCS"),
+                        Labels.withId(SOURCE_EPSG_KEY), PropertyScope.MEASUREMENT_PROPERTY).build())
+                .requiredIntegerParameter(Labels.withId(TARGET_EPSG_KEY), 32632).outputStrategy(OutputStrategies.keep())
+                .supportedFormats(SupportedFormats.jsonFormat()).supportedProtocols(SupportedProtocols.kafka()).build();
     }
 
 
     @Override
-    public ConfiguredEventProcessor<ProjTransformationParameter> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+    public ConfiguredEventProcessor<ProjTransformationParameter> onInvocation(DataProcessorInvocation graph,
+                                                                              ProcessingElementParameterExtractor extractor) {
 
-        String wkt_String = extractor.mappingPropertyValue(WKT_KEY);
-        String source_epsg = extractor.mappingPropertyValue(SOURCE_EPSG_KEY);
-        Integer target_epsg = extractor.singleValueParameter(TARGET_EPSG_KEY, Integer.class);
+        String wktString = extractor.mappingPropertyValue(WKT_KEY);
+        String sourceEpsg = extractor.mappingPropertyValue(SOURCE_EPSG_KEY);
+        Integer targetEpsg = extractor.singleValueParameter(TARGET_EPSG_KEY, Integer.class);
 
-        ProjTransformationParameter params = new ProjTransformationParameter(graph, wkt_String, source_epsg, target_epsg);
+        ProjTransformationParameter params = new ProjTransformationParameter(graph, wktString, sourceEpsg, targetEpsg);
 
         return new ConfiguredEventProcessor<>(params, ProjTransformation::new);
     }
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationParameter.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationParameter.java
index a8f0c3f73..b26f4f40e 100755
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationParameter.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationParameter.java
@@ -21,27 +21,28 @@ import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams
 
 public class ProjTransformationParameter extends EventProcessorBindingParams {
 
-    private final String wkt_string;
-    private final String source_epsg;
-    private final Integer target_epsg;
+    private final String wktString;
+    private final String sourceEpsg;
+    private final Integer targetEpsg;
 
 
-    public ProjTransformationParameter(DataProcessorInvocation graph, String wkt_String, String source_epsg, Integer target_epsg) {
+    public ProjTransformationParameter(DataProcessorInvocation graph, String wktString, String sourceEpsg,
+                                       Integer targetEpsg) {
         super(graph);
-        this.wkt_string = wkt_String;
-        this.source_epsg = source_epsg;
-        this.target_epsg = target_epsg;
+        this.wktString = wktString;
+        this.sourceEpsg = sourceEpsg;
+        this.targetEpsg = targetEpsg;
     }
 
-    public String getWkt_string() {
-        return wkt_string;
+    public String getWktString() {
+        return wktString;
     }
 
     public String getEpsgCode() {
-        return source_epsg;
+        return sourceEpsg;
     }
 
-    public Integer getTarget_epsg() {
-        return target_epsg;
+    public Integer getTargetEpsg() {
+        return targetEpsg;
     }
 }