You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2023/01/05 21:14:24 UTC
[streampipes] 02/07: [STREAMPIPES-584] add processor class and additional work
This is an automated email from the ASF dual-hosted git repository.
micklich pushed a commit to branch SP-584
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit d08f4b82e421f49c56aaa662d76c104bfa598045
Author: micklich <mi...@apache.org>
AuthorDate: Sun Jan 1 17:23:43 2023 +0100
[STREAMPIPES-584] add processor class and additional work
---
.../SpNotSupportedGeometryException.java | 41 +++
.../geo/jvm/jts/helper/SpGeometryBuilder.java | 60 +++-
.../geo/jvm/jts/helper/SpReprojectionBuilder.java | 304 +++++++++++++++++++++
.../reprojection/ReprojectionProcessor.java | 150 ++++++++++
4 files changed, 554 insertions(+), 1 deletion(-)
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/exceptions/SpNotSupportedGeometryException.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/exceptions/SpNotSupportedGeometryException.java
new file mode 100755
index 000000000..238367893
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/exceptions/SpNotSupportedGeometryException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.processors.geo.jvm.jts.exceptions;
+
+public class SpNotSupportedGeometryException extends Exception {
+
+ public SpNotSupportedGeometryException() {
+ }
+
+ public SpNotSupportedGeometryException(String message) {
+ super(message);
+ }
+
+ public SpNotSupportedGeometryException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SpNotSupportedGeometryException(Throwable cause) {
+ super(cause);
+ }
+
+ public SpNotSupportedGeometryException(String message, Throwable cause, boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/helper/SpGeometryBuilder.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/helper/SpGeometryBuilder.java
index 0fa5ccc17..3d529de54 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/helper/SpGeometryBuilder.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/helper/SpGeometryBuilder.java
@@ -21,7 +21,12 @@ package org.apache.streampipes.processors.geo.jvm.jts.helper;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.LineString;
+import org.locationtech.jts.geom.MultiLineString;
+import org.locationtech.jts.geom.MultiPoint;
+import org.locationtech.jts.geom.MultiPolygon;
import org.locationtech.jts.geom.Point;
+import org.locationtech.jts.geom.Polygon;
import org.locationtech.jts.geom.PrecisionModel;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKTReader;
@@ -97,6 +102,20 @@ public class SpGeometryBuilder {
}
+ public static Geometry createSPGeom(Geometry geom, Integer epsgCode) {
+
+ Geometry returnedGeom = null;
+ //gets precision model from getPrecisionModel method
+ PrecisionModel prec = getPrecisionModel(epsgCode);
+ //creates the factory object
+ GeometryFactory geomFactory = new GeometryFactory(prec, epsgCode);
+ // creates the new geom from the input geom.
+ returnedGeom = geomFactory.createGeometry(geom);
+
+ return returnedGeom;
+ }
+
+
/**
* Is in wgs coordinate range boolean.
*
@@ -119,7 +138,7 @@ public class SpGeometryBuilder {
* @param epsg EPSG Code representing SRID
* @return {@link org.locationtech.jts.geom.PrecisionModel}
*/
- private static PrecisionModel getPrecisionModel(Integer epsg) {
+ protected static PrecisionModel getPrecisionModel(Integer epsg) {
PrecisionModel precisionModel;
if (epsg == 4326) {
// use scale precision with 7 decimal positions like default OSM
@@ -130,4 +149,43 @@ public class SpGeometryBuilder {
}
return precisionModel;
}
+ public static Geometry createEmptyGeometry(Geometry geom) {
+ Geometry outputGeom = null;
+
+ if (geom instanceof Point) {
+ outputGeom = geom.getFactory().createPoint();
+ } else if (geom instanceof LineString) {
+ outputGeom = geom.getFactory().createLineString();
+ } else if (geom instanceof Polygon) {
+ outputGeom = geom.getFactory().createPolygon();
+ } else if (geom instanceof MultiPoint) {
+ outputGeom = geom.getFactory().createMultiPoint();
+ } else if (geom instanceof MultiLineString) {
+ outputGeom = geom.getFactory().createMultiLineString();
+ } else if (geom instanceof MultiPolygon) {
+ outputGeom = geom.getFactory().createMultiPolygon();
+ } else {
+ outputGeom = geom.getFactory().createGeometryCollection();
+ }
+ return outputGeom;
+ }
+
+ public static Point extractPoint(Geometry geom) {
+ Point returnedPoint = null;
+
+ Integer epsgCode = geom.getSRID();
+
+ if (geom instanceof Point) {
+ // get y lng and x lat coords from point. has to be casted because input is basic geometry
+ returnedPoint = (Point) geom;
+
+ } else if (geom instanceof LineString) {
+ // cast geometry to line and calculates the centroid to get point
+ returnedPoint = (Point) createSPGeom((geom).getInteriorPoint(), epsgCode);
+ } else {
+ returnedPoint = (Point) createSPGeom(geom.getCentroid(), epsgCode);
+ }
+ return returnedPoint;
+ }
+
}
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/helper/SpReprojectionBuilder.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/helper/SpReprojectionBuilder.java
new file mode 100755
index 000000000..9781d65eb
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/helper/SpReprojectionBuilder.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.geo.jvm.jts.helper;
+
+import org.apache.streampipes.processors.geo.jvm.jts.exceptions.SpNotSupportedGeometryException;
+
+import org.apache.sis.geometry.DirectPosition2D;
+import org.apache.sis.referencing.CRS;
+import org.apache.sis.referencing.crs.AbstractCRS;
+import org.apache.sis.referencing.cs.AxesConvention;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.CoordinateList;
+import org.locationtech.jts.geom.CoordinateSequence;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.Point;
+import org.locationtech.jts.geom.PrecisionModel;
+import org.locationtech.jts.geom.impl.CoordinateArraySequence;
+import org.opengis.metadata.citation.Citation;
+import org.opengis.referencing.crs.CRSAuthorityFactory;
+import org.opengis.referencing.crs.CoordinateReferenceSystem;
+import org.opengis.referencing.cs.CoordinateSystemAxis;
+import org.opengis.referencing.operation.CoordinateOperation;
+import org.opengis.referencing.operation.TransformException;
+import org.opengis.util.FactoryException;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+
+public class SpReprojectionBuilder {
+
+ public static Geometry reprojectSpGeometry(Geometry geom, Integer targetEPSG)
+ throws SpNotSupportedGeometryException {
+
+ Geometry output = null;
+
+ CoordinateReferenceSystem sourcerCRS = getCRS(geom.getSRID());
+ CoordinateReferenceSystem targetrCRS = getCRS(targetEPSG);
+ CoordinateOperation operator = getOperator(sourcerCRS, targetrCRS);
+
+ CoordinateList geomCoordList = new CoordinateList(geom.getCoordinates());
+ List<Coordinate> projectedList =
+ geomCoordList.stream().map(coordinate -> transformCoordinate(coordinate, operator))
+ .collect(Collectors.toList());
+
+ CoordinateSequence cs = new CoordinateArraySequence(projectedList.toArray(new Coordinate[]{}));
+
+ output = createSimpleSPGeom(cs, geom.getGeometryType(), targetEPSG);
+
+ return output;
+ }
+
+ public static Geometry createSimpleSPGeom(CoordinateSequence cs, String geometryType, Integer targetEPSG)
+ throws SpNotSupportedGeometryException {
+ Geometry output = null;
+ PrecisionModel prec = SpGeometryBuilder.getPrecisionModel(targetEPSG);
+ GeometryFactory geomFactory = new GeometryFactory(prec, targetEPSG);
+
+ switch (geometryType) {
+ case "Point":
+ output = geomFactory.createPoint(cs);
+ break;
+ case "LineString":
+ output = geomFactory.createLineString(cs);
+ break;
+ case "Polygon":
+ output = geomFactory.createPolygon(cs);
+ break;
+ case "MulitPoint":
+ output = geomFactory.createMultiPoint(cs);
+ break;
+ case "MultiLineString":
+ // output = geomFactory.createMultiLineString();
+ // break;
+ throw new SpNotSupportedGeometryException();
+ case "MultiPolygon":
+ // output = geomFactory.createMultiPolygon(cs);
+ // break;
+ throw new SpNotSupportedGeometryException();
+
+ case "GeometryCpllection":
+ // output = geomFactory.createGeometryCollection(cs);
+ // break;
+ throw new SpNotSupportedGeometryException();
+ }
+
+ return output;
+ }
+
+ protected static CoordinateReferenceSystem getCRS(int epsg) {
+ CoordinateReferenceSystem output = null;
+
+ try {
+ output = CRS.forCode(("EPSG:" + epsg));
+ if (epsg == 4326) {
+ output = AbstractCRS.castOrCopy(output).forConvention(AxesConvention.RIGHT_HANDED);
+ }
+ } catch (FactoryException e) {
+ //todo
+ e.printStackTrace();
+ }
+ return output;
+ }
+
+ protected static Coordinate transformCoordinate(Coordinate coord, CoordinateOperation op) {
+
+ DirectPosition2D sisPoint = new DirectPosition2D(coord.getX(), coord.getY());
+ DirectPosition2D projSisPoint = null;
+ Coordinate output;
+
+ try {
+ projSisPoint = (DirectPosition2D) op.getMathTransform().transform(sisPoint, null);
+ } catch (TransformException e) {
+ e.printStackTrace();
+ }
+
+ output = new Coordinate(projSisPoint.getX(), projSisPoint.getY(), coord.getZ());
+
+ return output;
+ }
+
+ protected static CoordinateOperation getOperator(CoordinateReferenceSystem source,
+ CoordinateReferenceSystem target) {
+
+ CoordinateOperation op = null;
+
+ try {
+ op = CRS.findOperation(source, target, null);
+ } catch (FactoryException e) {
+ e.printStackTrace();
+ }
+
+ return op;
+ }
+
+ protected static String getCrsUnit(int epsg) {
+ String unit = null;
+
+ CoordinateReferenceSystem crs = null;
+ try {
+ crs = CRS.forCode(("EPSG:" + epsg));
+ } catch (FactoryException e) {
+ e.printStackTrace();
+ }
+ unit = crs.getCoordinateSystem().getAxis(0).getUnit().getName();
+
+ return unit;
+ }
+
+ public static Geometry unifyEPSG(Geometry geomA, Geometry geomB, boolean useFirstGeomAsBase)
+ throws SpNotSupportedGeometryException {
+
+ Geometry tempGeomA = geomA;
+ Geometry tempGeomB = geomB;
+
+ if (geomA.getSRID() != geomB.getSRID()) {
+ if (useFirstGeomAsBase) {
+ tempGeomB = reprojectSpGeometry(geomB, geomA.getSRID());
+ } else {
+ tempGeomA = reprojectSpGeometry(geomA, geomB.getSRID());
+ }
+ }
+
+ if (!useFirstGeomAsBase) {
+ return tempGeomA;
+ } else {
+ return tempGeomB;
+ }
+ }
+
+ public static boolean isLongitudeFirst(int epsg) throws FactoryException {
+ CoordinateReferenceSystem crs = null;
+ crs = CRS.forCode(("EPSG:" + epsg));
+ CoordinateSystemAxis axis = crs.getCoordinateSystem().getAxis(0);
+ return axis.getDirection().name().equals("NORTH");
+ }
+
+ public static boolean isMeterCRS(int epsg) {
+ return getCrsUnit(epsg).equals(SpCRSUnits.METRE.getSpCRSUnit());
+ }
+
+ public static boolean isWGS84(Geometry geom) {
+ return geom.getSRID() == 4326;
+ }
+
+ public static int findWgsUtm_EPSG(Point point) {
+ double lon = point.getX();
+ double lat = point.getY();
+
+ Integer zone;
+ Integer epsg;
+ Integer hemisphere;
+
+ zone = (int) Math.floor(lon / 6 + 31);
+
+ if ((lat > 55) && (zone == 31) && (lat < 64) && (lon > 2)) {
+ zone = 32;
+ } else if ((lat > 71) && (zone == 32) && (lon < 9)) {
+ zone = 31;
+ } else if ((lat > 71) && (zone == 32) && (lon > 8)) {
+ zone = 33;
+ } else if ((lat > 71) && (zone == 34) && (lon < 21)) {
+ zone = 33;
+ } else if ((lat > 71) && (zone == 34) && (lon > 20)) {
+ zone = 35;
+ } else if ((lat > 71) && (zone == 36) && (lon < 33)) {
+ zone = 35;
+ } else if ((lat > 71) && (zone == 36) && (lon > 32)) {
+ zone = 37;
+ }
+
+ // Set northern or southern hemisphere
+ if (lat < 0) {
+ hemisphere = 7;
+ } else {
+ hemisphere = 6;
+ }
+
+ //concatenate integer values
+ epsg = Integer.valueOf(String.valueOf(32) + hemisphere + zone);
+ return epsg;
+ }
+
+
+ public enum SpCRSUnits {
+ METRE("metre"), DEGREE("degree");
+
+ private final String unit;
+
+ SpCRSUnits(String unit) {
+ this.unit = unit;
+ }
+
+ public String getSpCRSUnit() {
+ return unit;
+ }
+ }
+
+ public static boolean isSisEpsgValid(Integer targetEPSG) {
+ boolean check = true;
+
+ try {
+ CRS.forCode("EPSG::" + targetEPSG);
+ } catch (FactoryException ex) {
+ check = false;
+ }
+ return check;
+ }
+
+
+ private static CRSAuthorityFactory getFactory() throws FactoryException {
+ CRSAuthorityFactory factory;
+ try {
+ factory = CRS.getAuthorityFactory("EPSG");
+ } catch (FactoryException e) {
+ throw new FactoryException(e);
+ }
+ return factory;
+ }
+
+ public static boolean isSisConfigurationValid() throws FactoryException {
+ boolean check = true;
+ CRSAuthorityFactory factory = SpReprojectionBuilder.getFactory();
+ Citation authority = factory.getAuthority();
+ if (authority == null) {
+ check = false;
+ }
+ return check;
+ }
+
+ public static boolean isSisDbCorrectVersion() throws FactoryException {
+ boolean check = true;
+ CRSAuthorityFactory factory = SpReprojectionBuilder.getFactory();
+ Citation authority = factory.getAuthority();
+ if (!authority.getEdition().toString().equals("9.9.1")) {
+ check = false;
+ }
+ return check;
+ }
+
+ public static String getSisDbVersion() throws FactoryException {
+ CRSAuthorityFactory factory = SpReprojectionBuilder.getFactory();
+ Citation authority = factory.getAuthority();
+ String version = authority.getEdition().toString();
+ return version;
+ }
+}
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
new file mode 100644
index 000000000..0ea67deba
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.processors.geo.jvm.jts.exceptions.SpNotSupportedGeometryException;
+import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
+import org.apache.streampipes.processors.geo.jvm.jts.helper.SpReprojectionBuilder;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.SupportedFormats;
+import org.apache.streampipes.sdk.helpers.SupportedProtocols;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import org.locationtech.jts.geom.Geometry;
+import org.opengis.util.FactoryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReprojectionProcessor extends StreamPipesDataProcessor {
+ public static final String GEOM_KEY = "geom-key";
+ public static final String SOURCE_EPSG_KEY = "source-epsg-key";
+ public static final String TARGET_EPSG_KEY = "target-epsg-key";
+ public static final String GEOM_RUNTIME = "geomWKT";
+ public static final String EPSG_RUNTIME = "epsg";
+ private String geometryMapper;
+ private String sourceEpsgMapper;
+ private Integer targetEpsg;
+ private static final Logger LOG = LoggerFactory.getLogger(ReprojectionProcessor.class);
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection")
+ .category(DataProcessorType.GEO)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.domainPropertyReq("http://www.opengis.net/ont/geosparql#Geometry"),
+ Labels.withId(GEOM_KEY),
+ PropertyScope.MEASUREMENT_PROPERTY)
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.domainPropertyReq("http://data.ign.fr/def/ignf#CartesianCS"),
+ Labels.withId(SOURCE_EPSG_KEY),
+ PropertyScope.MEASUREMENT_PROPERTY)
+ .build())
+ .outputStrategy(OutputStrategies.keep())
+ .requiredIntegerParameter(Labels.withId(TARGET_EPSG_KEY), 32632)
+ .supportedFormats(SupportedFormats.jsonFormat())
+ .supportedProtocols(SupportedProtocols.kafka())
+ .build();
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+
+ this.geometryMapper = parameters.extractor().mappingPropertyValue(GEOM_KEY);
+ this.sourceEpsgMapper = parameters.extractor().mappingPropertyValue(SOURCE_EPSG_KEY);
+ this.targetEpsg = parameters.extractor().singleValueParameter(TARGET_EPSG_KEY, Integer.class);
+
+ // check if SIS DB is set up with imported data or is null
+ try {
+ if (SpReprojectionBuilder.isSisConfigurationValid()){
+ LOG.info("SIS DB Settings successful checked ");
+ } else {
+ LOG.warn("The required EPSG database is not imported");
+ throw new SpRuntimeException("The required EPSG database is not imported");
+ }
+ } catch (FactoryException e) {
+ throw new SpRuntimeException("Something unexpected happened " + e);
+ }
+
+ // check if SIS DB has the supported 9.9.1 Version.
+ try {
+ if (!SpReprojectionBuilder.isSisDbCorrectVersion()) {
+ LOG.warn("Not supported EPSG DB is used.");
+ throw new SpRuntimeException("Your current EPSG DB version " + SpReprojectionBuilder.getSisDbVersion()
+ + " is not the supported 9.9.1 version. ");
+ }
+ } catch (FactoryException e) {
+ throw new SpRuntimeException("Something unexpected happened " + e);
+ }
+
+ // checks if Input EPSG in valid and exists in EPSG DB
+ if (!SpReprojectionBuilder.isSisEpsgValid(this.targetEpsg)) {
+ throw new SpRuntimeException("Your chosen EPSG Code " + this.targetEpsg + " is not valid. "
+ + "Check EPSG on https://spatialreference.org");
+ }
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+ String geom = event.getFieldBySelector(geometryMapper).getAsPrimitive().getAsString();
+ Integer sourceEpsg = event.getFieldBySelector(sourceEpsgMapper).getAsPrimitive().getAsInt();
+
+ Geometry geometry = SpGeometryBuilder.createSPGeom(geom, sourceEpsg);
+
+ Geometry reprojected;
+ try {
+ reprojected = SpReprojectionBuilder.reprojectSpGeometry(geometry, targetEpsg);
+ } catch (SpNotSupportedGeometryException e) {
+ reprojected = SpGeometryBuilder.createEmptyGeometry(geometry);
+ }
+
+ if (!reprojected.isEmpty()) {
+ event.updateFieldBySelector("s0::" + EPSG_RUNTIME, targetEpsg);
+ event.updateFieldBySelector("s0::" + GEOM_RUNTIME, reprojected.toText());
+
+ collector.collect(event);
+ } else {
+ LOG.warn("An empty point geometry is created"
+ + " due invalid input values. Check used epsg Code:" + targetEpsg);
+ }
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
+}