You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2023/01/05 21:14:24 UTC

[streampipes] 02/07: [STREAMPIPES-584] add processor class and additional work

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch SP-584
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit d08f4b82e421f49c56aaa662d76c104bfa598045
Author: micklich <mi...@apache.org>
AuthorDate: Sun Jan 1 17:23:43 2023 +0100

    [STREAMPIPES-584] add processor class and additional work
---
 .../SpNotSupportedGeometryException.java           |  41 +++
 .../geo/jvm/jts/helper/SpGeometryBuilder.java      |  60 +++-
 .../geo/jvm/jts/helper/SpReprojectionBuilder.java  | 304 +++++++++++++++++++++
 .../reprojection/ReprojectionProcessor.java        | 150 ++++++++++
 4 files changed, 554 insertions(+), 1 deletion(-)

diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/exceptions/SpNotSupportedGeometryException.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/exceptions/SpNotSupportedGeometryException.java
new file mode 100755
index 000000000..238367893
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/exceptions/SpNotSupportedGeometryException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.processors.geo.jvm.jts.exceptions;
+
+public class SpNotSupportedGeometryException extends Exception {
+
+  public SpNotSupportedGeometryException() {
+  }
+
+  public SpNotSupportedGeometryException(String message) {
+    super(message);
+  }
+
+  public SpNotSupportedGeometryException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public SpNotSupportedGeometryException(Throwable cause) {
+    super(cause);
+  }
+
+  public SpNotSupportedGeometryException(String message, Throwable cause, boolean enableSuppression,
+                                         boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+}
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/helper/SpGeometryBuilder.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/helper/SpGeometryBuilder.java
index 0fa5ccc17..3d529de54 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/helper/SpGeometryBuilder.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/helper/SpGeometryBuilder.java
@@ -21,7 +21,12 @@ package org.apache.streampipes.processors.geo.jvm.jts.helper;
 import org.locationtech.jts.geom.Coordinate;
 import org.locationtech.jts.geom.Geometry;
 import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.LineString;
+import org.locationtech.jts.geom.MultiLineString;
+import org.locationtech.jts.geom.MultiPoint;
+import org.locationtech.jts.geom.MultiPolygon;
 import org.locationtech.jts.geom.Point;
+import org.locationtech.jts.geom.Polygon;
 import org.locationtech.jts.geom.PrecisionModel;
 import org.locationtech.jts.io.ParseException;
 import org.locationtech.jts.io.WKTReader;
@@ -97,6 +102,20 @@ public class SpGeometryBuilder {
   }
 
 
+  public static Geometry createSPGeom(Geometry geom, Integer epsgCode) {
+
+    Geometry returnedGeom = null;
+    //gets precision model from getPrecisionModel method
+    PrecisionModel prec = getPrecisionModel(epsgCode);
+    //creates the factory object
+    GeometryFactory geomFactory = new GeometryFactory(prec, epsgCode);
+    // creates the new geom from the input geom.
+    returnedGeom = geomFactory.createGeometry(geom);
+
+    return returnedGeom;
+  }
+
+
   /**
    * Is in wgs coordinate range boolean.
    *
@@ -119,7 +138,7 @@ public class SpGeometryBuilder {
    * @param epsg EPSG Code representing SRID
    * @return {@link org.locationtech.jts.geom.PrecisionModel}
    */
-  private static PrecisionModel getPrecisionModel(Integer epsg) {
+  protected static PrecisionModel getPrecisionModel(Integer epsg) {
     PrecisionModel precisionModel;
     if (epsg == 4326) {
       // use scale precision with 7 decimal positions like default OSM
@@ -130,4 +149,43 @@ public class SpGeometryBuilder {
     }
     return precisionModel;
   }
+  public static Geometry createEmptyGeometry(Geometry geom) {
+    Geometry outputGeom = null;
+
+    if (geom instanceof Point) {
+      outputGeom = geom.getFactory().createPoint();
+    } else if (geom instanceof LineString) {
+      outputGeom = geom.getFactory().createLineString();
+    } else if (geom instanceof Polygon) {
+      outputGeom = geom.getFactory().createPolygon();
+    } else if (geom instanceof MultiPoint) {
+      outputGeom = geom.getFactory().createMultiPoint();
+    } else if (geom instanceof MultiLineString) {
+      outputGeom = geom.getFactory().createMultiLineString();
+    } else if (geom instanceof MultiPolygon) {
+      outputGeom = geom.getFactory().createMultiPolygon();
+    } else {
+      outputGeom = geom.getFactory().createGeometryCollection();
+    }
+    return outputGeom;
+  }
+
+  public static Point extractPoint(Geometry geom) {
+    Point returnedPoint = null;
+
+    Integer epsgCode = geom.getSRID();
+
+    if (geom instanceof Point) {
+      // get y lng and x lat coords from point. has to be casted because input is basic geometry
+      returnedPoint =  (Point) geom;
+
+    } else if (geom instanceof LineString) {
+      // cast geometry to line and calculates the centroid to get point
+      returnedPoint = (Point) createSPGeom((geom).getInteriorPoint(), epsgCode);
+    } else {
+      returnedPoint = (Point) createSPGeom(geom.getCentroid(), epsgCode);
+    }
+    return returnedPoint;
+  }
+
 }
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/helper/SpReprojectionBuilder.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/helper/SpReprojectionBuilder.java
new file mode 100755
index 000000000..9781d65eb
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/helper/SpReprojectionBuilder.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.geo.jvm.jts.helper;
+
+import org.apache.streampipes.processors.geo.jvm.jts.exceptions.SpNotSupportedGeometryException;
+
+import org.apache.sis.geometry.DirectPosition2D;
+import org.apache.sis.referencing.CRS;
+import org.apache.sis.referencing.crs.AbstractCRS;
+import org.apache.sis.referencing.cs.AxesConvention;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.CoordinateList;
+import org.locationtech.jts.geom.CoordinateSequence;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.Point;
+import org.locationtech.jts.geom.PrecisionModel;
+import org.locationtech.jts.geom.impl.CoordinateArraySequence;
+import org.opengis.metadata.citation.Citation;
+import org.opengis.referencing.crs.CRSAuthorityFactory;
+import org.opengis.referencing.crs.CoordinateReferenceSystem;
+import org.opengis.referencing.cs.CoordinateSystemAxis;
+import org.opengis.referencing.operation.CoordinateOperation;
+import org.opengis.referencing.operation.TransformException;
+import org.opengis.util.FactoryException;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+
+public class SpReprojectionBuilder {
+
+  public static Geometry reprojectSpGeometry(Geometry geom, Integer targetEPSG)
+      throws SpNotSupportedGeometryException {
+
+    Geometry output = null;
+
+    CoordinateReferenceSystem sourcerCRS = getCRS(geom.getSRID());
+    CoordinateReferenceSystem targetrCRS = getCRS(targetEPSG);
+    CoordinateOperation operator = getOperator(sourcerCRS, targetrCRS);
+
+    CoordinateList geomCoordList = new CoordinateList(geom.getCoordinates());
+    List<Coordinate> projectedList =
+        geomCoordList.stream().map(coordinate -> transformCoordinate(coordinate, operator))
+            .collect(Collectors.toList());
+
+    CoordinateSequence cs = new CoordinateArraySequence(projectedList.toArray(new Coordinate[]{}));
+
+    output = createSimpleSPGeom(cs, geom.getGeometryType(), targetEPSG);
+
+    return output;
+  }
+
+  public static Geometry createSimpleSPGeom(CoordinateSequence cs, String geometryType, Integer targetEPSG)
+      throws SpNotSupportedGeometryException {
+    Geometry output = null;
+    PrecisionModel prec = SpGeometryBuilder.getPrecisionModel(targetEPSG);
+    GeometryFactory geomFactory = new GeometryFactory(prec, targetEPSG);
+
+    switch (geometryType) {
+      case "Point":
+        output = geomFactory.createPoint(cs);
+        break;
+      case "LineString":
+        output = geomFactory.createLineString(cs);
+        break;
+      case "Polygon":
+        output = geomFactory.createPolygon(cs);
+        break;
+      case "MulitPoint":
+        output = geomFactory.createMultiPoint(cs);
+        break;
+      case "MultiLineString":
+        // output = geomFactory.createMultiLineString();
+        //        break;
+        throw new SpNotSupportedGeometryException();
+      case "MultiPolygon":
+        //        output = geomFactory.createMultiPolygon(cs);
+        //        break;
+        throw new SpNotSupportedGeometryException();
+
+      case "GeometryCpllection":
+        //        output = geomFactory.createGeometryCollection(cs);
+        //        break;
+        throw new SpNotSupportedGeometryException();
+    }
+
+    return output;
+  }
+
+  protected static CoordinateReferenceSystem getCRS(int epsg) {
+    CoordinateReferenceSystem output = null;
+
+    try {
+      output = CRS.forCode(("EPSG:" + epsg));
+      if (epsg == 4326) {
+        output = AbstractCRS.castOrCopy(output).forConvention(AxesConvention.RIGHT_HANDED);
+      }
+    } catch (FactoryException e) {
+      //todo
+      e.printStackTrace();
+    }
+    return output;
+  }
+
+  protected static Coordinate transformCoordinate(Coordinate coord, CoordinateOperation op) {
+
+    DirectPosition2D sisPoint = new DirectPosition2D(coord.getX(), coord.getY());
+    DirectPosition2D projSisPoint = null;
+    Coordinate output;
+
+    try {
+      projSisPoint = (DirectPosition2D) op.getMathTransform().transform(sisPoint, null);
+    } catch (TransformException e) {
+      e.printStackTrace();
+    }
+
+    output = new Coordinate(projSisPoint.getX(), projSisPoint.getY(), coord.getZ());
+
+    return output;
+  }
+
+  protected static CoordinateOperation getOperator(CoordinateReferenceSystem source,
+                                                   CoordinateReferenceSystem target) {
+
+    CoordinateOperation op = null;
+
+    try {
+      op = CRS.findOperation(source, target, null);
+    } catch (FactoryException e) {
+      e.printStackTrace();
+    }
+
+    return op;
+  }
+
+  protected static String getCrsUnit(int epsg) {
+    String unit = null;
+
+    CoordinateReferenceSystem crs = null;
+    try {
+      crs = CRS.forCode(("EPSG:" + epsg));
+    } catch (FactoryException e) {
+      e.printStackTrace();
+    }
+    unit = crs.getCoordinateSystem().getAxis(0).getUnit().getName();
+
+    return unit;
+  }
+
+  public static Geometry unifyEPSG(Geometry geomA, Geometry geomB, boolean useFirstGeomAsBase)
+      throws SpNotSupportedGeometryException {
+
+    Geometry tempGeomA = geomA;
+    Geometry tempGeomB = geomB;
+
+    if (geomA.getSRID() != geomB.getSRID()) {
+      if (useFirstGeomAsBase) {
+        tempGeomB = reprojectSpGeometry(geomB, geomA.getSRID());
+      } else {
+        tempGeomA = reprojectSpGeometry(geomA, geomB.getSRID());
+      }
+    }
+
+    if (!useFirstGeomAsBase) {
+      return tempGeomA;
+    } else {
+      return tempGeomB;
+    }
+  }
+
+  public static boolean isLongitudeFirst(int epsg) throws FactoryException {
+    CoordinateReferenceSystem crs = null;
+    crs = CRS.forCode(("EPSG:" + epsg));
+    CoordinateSystemAxis axis = crs.getCoordinateSystem().getAxis(0);
+    return axis.getDirection().name().equals("NORTH");
+  }
+
+  public static boolean isMeterCRS(int epsg) {
+    return getCrsUnit(epsg).equals(SpCRSUnits.METRE.getSpCRSUnit());
+  }
+
+  public static boolean isWGS84(Geometry geom) {
+    return geom.getSRID() == 4326;
+  }
+
+  public static int findWgsUtm_EPSG(Point point) {
+    double lon = point.getX();
+    double lat = point.getY();
+
+    Integer zone;
+    Integer epsg;
+    Integer hemisphere;
+
+    zone = (int) Math.floor(lon / 6 + 31);
+
+    if ((lat > 55) && (zone == 31) && (lat < 64) && (lon > 2)) {
+      zone = 32;
+    } else if ((lat > 71) && (zone == 32) && (lon < 9)) {
+      zone = 31;
+    } else if ((lat > 71) && (zone == 32) && (lon > 8)) {
+      zone = 33;
+    } else if ((lat > 71) && (zone == 34) && (lon < 21)) {
+      zone = 33;
+    } else if ((lat > 71) && (zone == 34) && (lon > 20)) {
+      zone = 35;
+    } else if ((lat > 71) && (zone == 36) && (lon < 33)) {
+      zone = 35;
+    } else if ((lat > 71) && (zone == 36) && (lon > 32)) {
+      zone = 37;
+    }
+
+    // Set northern or southern hemisphere
+    if (lat < 0) {
+      hemisphere = 7;
+    } else {
+      hemisphere = 6;
+    }
+
+    //concatenate integer values
+    epsg = Integer.valueOf(String.valueOf(32) + hemisphere + zone);
+    return epsg;
+  }
+
+
+  public enum SpCRSUnits {
+    METRE("metre"), DEGREE("degree");
+
+    private final String unit;
+
+    SpCRSUnits(String unit) {
+      this.unit = unit;
+    }
+
+    public String getSpCRSUnit() {
+      return unit;
+    }
+  }
+
+  public static boolean isSisEpsgValid(Integer targetEPSG) {
+    boolean check = true;
+
+    try {
+      CRS.forCode("EPSG::" + targetEPSG);
+    } catch (FactoryException ex) {
+      check = false;
+    }
+    return check;
+  }
+
+
+  private static CRSAuthorityFactory getFactory() throws FactoryException {
+    CRSAuthorityFactory factory;
+    try {
+      factory = CRS.getAuthorityFactory("EPSG");
+    } catch (FactoryException e) {
+      throw new FactoryException(e);
+    }
+    return factory;
+  }
+
+  public static boolean isSisConfigurationValid() throws FactoryException {
+    boolean check = true;
+    CRSAuthorityFactory factory = SpReprojectionBuilder.getFactory();
+    Citation authority = factory.getAuthority();
+    if (authority == null) {
+      check = false;
+    }
+    return check;
+  }
+
+  public static boolean isSisDbCorrectVersion() throws FactoryException {
+    boolean check = true;
+    CRSAuthorityFactory factory = SpReprojectionBuilder.getFactory();
+    Citation authority = factory.getAuthority();
+    if (!authority.getEdition().toString().equals("9.9.1")) {
+      check = false;
+    }
+    return check;
+  }
+
+  public static String getSisDbVersion() throws FactoryException {
+    CRSAuthorityFactory factory = SpReprojectionBuilder.getFactory();
+    Citation authority = factory.getAuthority();
+    String version = authority.getEdition().toString();
+    return version;
+  }
+}
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
new file mode 100644
index 000000000..0ea67deba
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.processors.geo.jvm.jts.exceptions.SpNotSupportedGeometryException;
+import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
+import org.apache.streampipes.processors.geo.jvm.jts.helper.SpReprojectionBuilder;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.SupportedFormats;
+import org.apache.streampipes.sdk.helpers.SupportedProtocols;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import org.locationtech.jts.geom.Geometry;
+import org.opengis.util.FactoryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReprojectionProcessor extends StreamPipesDataProcessor {
+  public static final String GEOM_KEY = "geom-key";
+  public static final String SOURCE_EPSG_KEY = "source-epsg-key";
+  public static final String TARGET_EPSG_KEY = "target-epsg-key";
+  public static final String GEOM_RUNTIME = "geomWKT";
+  public static final String EPSG_RUNTIME = "epsg";
+  private String geometryMapper;
+  private String sourceEpsgMapper;
+  private Integer targetEpsg;
+  private static final Logger LOG = LoggerFactory.getLogger(ReprojectionProcessor.class);
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection")
+        .category(DataProcessorType.GEO)
+        .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+        .withLocales(Locales.EN)
+        .requiredStream(StreamRequirementsBuilder
+            .create()
+            .requiredPropertyWithUnaryMapping(
+                EpRequirements.domainPropertyReq("http://www.opengis.net/ont/geosparql#Geometry"),
+                Labels.withId(GEOM_KEY),
+                PropertyScope.MEASUREMENT_PROPERTY)
+            .requiredPropertyWithUnaryMapping(
+                EpRequirements.domainPropertyReq("http://data.ign.fr/def/ignf#CartesianCS"),
+                Labels.withId(SOURCE_EPSG_KEY),
+                PropertyScope.MEASUREMENT_PROPERTY)
+            .build())
+        .outputStrategy(OutputStrategies.keep())
+        .requiredIntegerParameter(Labels.withId(TARGET_EPSG_KEY), 32632)
+        .supportedFormats(SupportedFormats.jsonFormat())
+        .supportedProtocols(SupportedProtocols.kafka())
+        .build();
+  }
+
+  @Override
+  public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+
+    this.geometryMapper = parameters.extractor().mappingPropertyValue(GEOM_KEY);
+    this.sourceEpsgMapper = parameters.extractor().mappingPropertyValue(SOURCE_EPSG_KEY);
+    this.targetEpsg = parameters.extractor().singleValueParameter(TARGET_EPSG_KEY, Integer.class);
+
+    // check if SIS DB is set up with imported data or is null
+    try {
+      if (SpReprojectionBuilder.isSisConfigurationValid()){
+        LOG.info("SIS DB Settings successful checked ");
+      } else {
+        LOG.warn("The required EPSG database is not imported");
+        throw new SpRuntimeException("The required EPSG database is not imported");
+      }
+    } catch (FactoryException e) {
+      throw new SpRuntimeException("Something unexpected happened " + e);
+    }
+
+    // check if SIS DB has the supported 9.9.1 Version.
+    try {
+      if (!SpReprojectionBuilder.isSisDbCorrectVersion()) {
+        LOG.warn("Not supported EPSG DB is used.");
+        throw new SpRuntimeException("Your current EPSG DB version " + SpReprojectionBuilder.getSisDbVersion()
+            + " is not the supported 9.9.1 version. ");
+      }
+    } catch (FactoryException e) {
+      throw new SpRuntimeException("Something unexpected happened " + e);
+    }
+
+    // checks if Input EPSG in valid and exists in EPSG DB
+    if (!SpReprojectionBuilder.isSisEpsgValid(this.targetEpsg)) {
+      throw new SpRuntimeException("Your chosen EPSG Code " + this.targetEpsg + " is not valid. "
+          + "Check EPSG on https://spatialreference.org");
+    }
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+    String geom = event.getFieldBySelector(geometryMapper).getAsPrimitive().getAsString();
+    Integer sourceEpsg = event.getFieldBySelector(sourceEpsgMapper).getAsPrimitive().getAsInt();
+
+    Geometry geometry = SpGeometryBuilder.createSPGeom(geom, sourceEpsg);
+
+    Geometry reprojected;
+    try {
+      reprojected = SpReprojectionBuilder.reprojectSpGeometry(geometry, targetEpsg);
+    } catch (SpNotSupportedGeometryException e) {
+      reprojected = SpGeometryBuilder.createEmptyGeometry(geometry);
+    }
+
+    if (!reprojected.isEmpty()) {
+      event.updateFieldBySelector("s0::" + EPSG_RUNTIME, targetEpsg);
+      event.updateFieldBySelector("s0::" + GEOM_RUNTIME, reprojected.toText());
+
+      collector.collect(event);
+    } else {
+      LOG.warn("An empty point geometry is created"
+          + " due invalid input values. Check used epsg Code:" + targetEpsg);
+    }
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+
+  }
+}