You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2022/11/15 23:36:58 UTC
[incubator-streampipes] 10/11: implement basic processor
This is an automated email from the ASF dual-hosted git repository.
micklich pushed a commit to branch STREAMPIPES-584
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit da8789de830632c5a3e5caff3cca9ab113e80a50
Author: micklich <mi...@apache.org>
AuthorDate: Tue Nov 15 23:49:58 2022 +0100
implement basic processor
---
.../streampipes/processors/geo/jvm/GeoJvmInit.java | 4 +-
.../processor/reprojection/ProjTransformation.java | 49 +++++++++++++++-----
.../reprojection/ProjTransformationController.java | 53 +++++++++-------------
.../reprojection/ProjTransformationParameter.java | 25 +++++-----
4 files changed, 75 insertions(+), 56 deletions(-)
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
index 949887050..9ce8d03f4 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.processors.geo.jvm.config.ConfigKeys;
import org.apache.streampipes.processors.geo.jvm.jts.processor.latLngToGeo.LatLngToGeoController;
+import org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection.ProjTransformationController;
import org.apache.streampipes.processors.geo.jvm.jts.processor.setEPSG.SetEpsgController;
import org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory.CreateTrajectoryFromPointsController;
import org.apache.streampipes.processors.geo.jvm.processor.distancecalculator.DistanceCalculatorController;
@@ -56,7 +57,8 @@ public class GeoJvmInit extends StandaloneModelSubmitter {
new LatLngToGeoController(),
new CreateTrajectoryFromPointsController(),
new SpeedCalculatorController(),
- new StaticDistanceCalculatorController())
+ new StaticDistanceCalculatorController(),
+ new ProjTransformationController())
.registerMessagingFormats(
new JsonDataFormatFactory(),
new CborDataFormatFactory(),
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformation.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformation.java
index f7705372d..58a9d5623 100755
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformation.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformation.java
@@ -16,36 +16,44 @@
*/
package org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection;
-import org.apache.streampipes.processors.geo.jvm.jts.exceptions.SpNotSupportedGeometryException;
+import org.apache.streampipes.logging.api.Logger;
+import org.locationtech.jts.geom.Geometry;
+
import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
import org.apache.streampipes.processors.geo.jvm.jts.helper.SpReprojectionBuilder;
+import org.apache.streampipes.processors.geo.jvm.jts.exceptions.SpNotSupportedGeometryException;
import org.apache.streampipes.processors.geo.jvm.jts.processor.latLngToGeo.LatLngToGeoParameter;
-import org.locationtech.jts.geom.Geometry;
-import org.apache.streampipes.logging.api.Logger;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.runtime.EventProcessor;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.model.runtime.Event;
+import org.postgresql.ds.PGSimpleDataSource;
+import org.apache.sis.setup.Configuration;
+
+import javax.sql.DataSource;
public class ProjTransformation implements EventProcessor<ProjTransformationParameter> {
- private static Logger LOG;
+ private static Logger logger;
private ProjTransformationParameter params;
private Integer targetEPSG;
-
@Override
- public void onInvocation(ProjTransformationParameter params, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
- LOG = params.getGraph().getLogger(LatLngToGeoParameter.class);
+ public void onInvocation(ProjTransformationParameter params, SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) {
+ logger = params.getGraph().getLogger(LatLngToGeoParameter.class);
this.params = params;
- targetEPSG = params.getTarget_epsg();
+ targetEPSG = params.getTargetEpsg();
+
+ //TODO: this has to move to a central place in the streampipes backend
+ Configuration.current().setDatabase(ProjTransformation::createDataSource);
}
@Override
public void onEvent(Event in, SpOutputCollector out) {
- String wkt = in.getFieldBySelector(params.getWkt_string()).getAsPrimitive().getAsString();
+ String wkt = in.getFieldBySelector(params.getWktString()).getAsPrimitive().getAsString();
Integer epsgCode = in.getFieldBySelector(params.getEpsgCode()).getAsPrimitive().getAsInt();
Geometry geometry = SpGeometryBuilder.createSPGeom(wkt, epsgCode);
@@ -57,18 +65,35 @@ public class ProjTransformation implements EventProcessor<ProjTransformationPara
}
if (!transformed.isEmpty()) {
- in.updateFieldBySelector("s0::" + ProjTransformationController.EPSG_RUNTIME, params.getTarget_epsg());
+ in.updateFieldBySelector("s0::" + ProjTransformationController.EPSG_RUNTIME, params.getTargetEpsg());
in.updateFieldBySelector("s0::" + ProjTransformationController.WKT_RUNTIME, transformed.toText());
out.collect(in);
} else {
- LOG.warn("An empty point geometry is created in " + ProjTransformationController.EPA_NAME + " " +
- "due invalid input values. Check used epsg Code:" + epsgCode);
+ logger.warn("An empty point geometry is created in " + ProjTransformationController.EPA_NAME + " "
+ + "due invalid input values. Check used epsg Code:" + epsgCode);
}
}
@Override
public void onDetach() {
+ }
+
+ // https://sis.apache.org/apidocs/org/apache/sis/setup/Configuration.html#setDatabase(java.util.function.Supplier)
+ // TODO: Best would be ConfigKeys VARIABLE. ATM hardcoded and adjustments for development required like IP of client
+ // TODO: has to move together with the Configuration.current() method
+ protected static DataSource createDataSource() {
+ PGSimpleDataSource ds = new PGSimpleDataSource();
+ // HAS TO BE ADJUSTED OR INCLUDED IN THE AUTO_DISCOVERY
+ String[] serverAddresses = {"192.168.1.100"};
+ ds.setServerNames(serverAddresses);
+ int[] serverPortNumbers = {54320};
+ ds.setPortNumbers(serverPortNumbers);
+ ds.setDatabaseName("EPSG");
+ ds.setUser("streampipes");
+ ds.setPassword("streampipes");
+ ds.setReadOnly(true);
+ return ds;
}
}
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationController.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationController.java
index 54f5e89e6..b40ba5000 100755
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationController.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationController.java
@@ -23,7 +23,13 @@ import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.*;
+//import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.SupportedFormats;
+import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
import org.apache.streampipes.sdk.utils.Assets;
@@ -42,42 +48,27 @@ public class ProjTransformationController extends StandaloneEventProcessingDecla
@Override
public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder
- .create("org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection")
- .category(DataProcessorType.GEO)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .withLocales(Locales.EN)
- .requiredStream(StreamRequirementsBuilder
- .create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.domainPropertyReq("http://www.opengis.net/ont/geosparql#Geometry"),
- Labels.withId(WKT_KEY), PropertyScope.MEASUREMENT_PROPERTY
- )
- .requiredPropertyWithUnaryMapping(
- EpRequirements.domainPropertyReq("http://data.ign.fr/def/ignf#CartesianCS"),
- Labels.withId(SOURCE_EPSG_KEY), PropertyScope.MEASUREMENT_PROPERTY
- )
- .build()
- )
- .requiredIntegerParameter(
- Labels.withId(TARGET_EPSG_KEY),
- 32632
- )
- .outputStrategy(OutputStrategies.keep())
- .supportedFormats(SupportedFormats.jsonFormat())
- .supportedProtocols(SupportedProtocols.kafka())
- .build();
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection")
+ .category(DataProcessorType.GEO).withAssets(Assets.DOCUMENTATION, Assets.ICON).withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
+ EpRequirements.domainPropertyReq("http://www.opengis.net/ont/geosparql#Geometry"),
+ Labels.withId(WKT_KEY), PropertyScope.MEASUREMENT_PROPERTY).requiredPropertyWithUnaryMapping(
+ EpRequirements.domainPropertyReq("http://data.ign.fr/def/ignf#CartesianCS"),
+ Labels.withId(SOURCE_EPSG_KEY), PropertyScope.MEASUREMENT_PROPERTY).build())
+ .requiredIntegerParameter(Labels.withId(TARGET_EPSG_KEY), 32632).outputStrategy(OutputStrategies.keep())
+ .supportedFormats(SupportedFormats.jsonFormat()).supportedProtocols(SupportedProtocols.kafka()).build();
}
@Override
- public ConfiguredEventProcessor<ProjTransformationParameter> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+ public ConfiguredEventProcessor<ProjTransformationParameter> onInvocation(DataProcessorInvocation graph,
+ ProcessingElementParameterExtractor extractor) {
- String wkt_String = extractor.mappingPropertyValue(WKT_KEY);
- String source_epsg = extractor.mappingPropertyValue(SOURCE_EPSG_KEY);
- Integer target_epsg = extractor.singleValueParameter(TARGET_EPSG_KEY, Integer.class);
+ String wktString = extractor.mappingPropertyValue(WKT_KEY);
+ String sourceEpsg = extractor.mappingPropertyValue(SOURCE_EPSG_KEY);
+ Integer targetEpsg = extractor.singleValueParameter(TARGET_EPSG_KEY, Integer.class);
- ProjTransformationParameter params = new ProjTransformationParameter(graph, wkt_String, source_epsg, target_epsg);
+ ProjTransformationParameter params = new ProjTransformationParameter(graph, wktString, sourceEpsg, targetEpsg);
return new ConfiguredEventProcessor<>(params, ProjTransformation::new);
}
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationParameter.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationParameter.java
index a8f0c3f73..b26f4f40e 100755
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationParameter.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ProjTransformationParameter.java
@@ -21,27 +21,28 @@ import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams
public class ProjTransformationParameter extends EventProcessorBindingParams {
- private final String wkt_string;
- private final String source_epsg;
- private final Integer target_epsg;
+ private final String wktString;
+ private final String sourceEpsg;
+ private final Integer targetEpsg;
- public ProjTransformationParameter(DataProcessorInvocation graph, String wkt_String, String source_epsg, Integer target_epsg) {
+ public ProjTransformationParameter(DataProcessorInvocation graph, String wktString, String sourceEpsg,
+ Integer targetEpsg) {
super(graph);
- this.wkt_string = wkt_String;
- this.source_epsg = source_epsg;
- this.target_epsg = target_epsg;
+ this.wktString = wktString;
+ this.sourceEpsg = sourceEpsg;
+ this.targetEpsg = targetEpsg;
}
- public String getWkt_string() {
- return wkt_string;
+ public String getWktString() {
+ return wktString;
}
public String getEpsgCode() {
- return source_epsg;
+ return sourceEpsg;
}
- public Integer getTarget_epsg() {
- return target_epsg;
+ public Integer getTargetEpsg() {
+ return targetEpsg;
}
}