You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2022/11/23 22:53:19 UTC
[streampipes] 11/16: rewrite trajectory to 1 class
This is an automated email from the ASF dual-hosted git repository.
micklich pushed a commit to branch STREAMPIPES-642
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit d4f785d1d409d877c5e77f3ef6dd46d597a766f1
Author: micklich <mi...@apache.org>
AuthorDate: Wed Nov 23 23:38:00 2022 +0100
rewrite trajectory to 1 class
dd
---
.../trajectory/TrajectoryFromPointsProcessor.java | 140 +++++++++++++++++++++
1 file changed, 140 insertions(+)
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
new file mode 100644
index 000000000..4c9eee5f1
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
+import org.apache.streampipes.processors.geo.jvm.jts.helper.SpTrajectoryBuilder;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpProperties;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import org.locationtech.jts.geom.LineString;
+import org.locationtech.jts.geom.Point;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TrajectoryFromPointsProcessor extends StreamPipesDataProcessor {
+ public static final String POINT_KEY = "point-key";
+ public static final String EPSG_KEY = "epsg-key";
+ public static final String M_KEY = "m-key";
+ public static final String SUBPOINTS_KEY = "subpoints-key";
+ public static final String DESCRIPTION_KEY = "description-key";
+ public static final String TRAJECTORY_KEY = "trajectory-key";
+ public static final String TRAJECTORY_RUNTIME = "trajectoryWKT";
+ public static final String DESCRIPTION_RUNTIME = "trajectoryDescription";
+ private String pointMapper;
+ private String epsgMapper;
+ private String mValueMapper;
+ private String description;
+ private Integer subpoints;
+ private SpTrajectoryBuilder trajectory;
+ private static final Logger LOG = LoggerFactory.getLogger(TrajectoryFromPointsProcessor.class);
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder
+ .create("org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory")
+ .category(DataProcessorType.GEO)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(
+ StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq
+ ("http://www.opengis.net/ont/geosparql#Geometry"),
+ Labels.withId(POINT_KEY), PropertyScope.MEASUREMENT_PROPERTY)
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.domainPropertyReq
+ ("http://data.ign.fr/def/ignf#CartesianCS"),
+ Labels.withId(EPSG_KEY), PropertyScope.MEASUREMENT_PROPERTY)
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.numberReq(),
+ Labels.withId(M_KEY), PropertyScope.MEASUREMENT_PROPERTY)
+ .build()
+ )
+ .requiredTextParameter(
+ Labels.withId(DESCRIPTION_KEY))
+ .requiredIntegerParameter(
+ Labels.withId(SUBPOINTS_KEY),
+ 2, 30, 1)
+
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.stringEp(
+ Labels.withId(DESCRIPTION_KEY),
+ DESCRIPTION_RUNTIME,
+ SO.Text),
+ EpProperties.stringEp(
+ Labels.withId(TRAJECTORY_KEY),
+ TRAJECTORY_RUNTIME,
+ "http://www.opengis.net/ont/geosparql#Geometry")
+ )
+ )
+ .build();
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+ this.pointMapper = parameters.extractor().mappingPropertyValue(POINT_KEY);
+ this.mValueMapper = parameters.extractor().mappingPropertyValue(M_KEY);
+ this.epsgMapper = parameters.extractor().mappingPropertyValue(EPSG_KEY);
+ this.description = parameters.extractor().singleValueParameter(DESCRIPTION_KEY, String.class);
+ this.subpoints = parameters.extractor().singleValueParameter(SUBPOINTS_KEY, Integer.class);
+
+ trajectory = new SpTrajectoryBuilder(subpoints, description);
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+ // extract values
+ String point = event.getFieldBySelector(pointMapper).getAsPrimitive().getAsString();
+ Integer epsg = event.getFieldBySelector(epsgMapper).getAsPrimitive().getAsInt();
+ Double m = event.getFieldBySelector(mValueMapper).getAsPrimitive().getAsDouble();
+ //create JTS geometry
+ Point eventGeom = (Point) SpGeometryBuilder.createSPGeom(point, epsg);
+ LOG.debug("Geometry Point created");
+ //adds point and m value to trajectory object
+ trajectory.addPointToTrajectory(eventGeom, m);
+ LOG.debug("Point added to trajectory");
+ // returns JTS LineString
+ LineString geom = trajectory.returnAsLineString(eventGeom.getFactory());
+ // adds to stream
+ event.addField(DESCRIPTION_RUNTIME, trajectory.getDescription());
+ event.addField(TRAJECTORY_RUNTIME, geom.toString());
+ collector.collect(event);
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
+}