You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/05/12 18:38:05 UTC
[incubator-streampipes-extensions] 08/17: added PE
This is an automated email from the ASF dual-hosted git repository.
micklich pushed a commit to branch feature/trajectory
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit e1fe0292c449281cedfb207154f393a4dbe0a32c
Author: micklich <fl...@disy.net>
AuthorDate: Tue Apr 28 17:22:51 2020 +0200
added PE
---
.../streampipes/processors/geo/jvm/GeoJvmInit.java | 20 ++--
.../trajectory/CreateTrajectoryFromPoints.java | 73 ++++++++++++
.../CreateTrajectoryFromPointsController.java | 129 +++++++++++++++++++++
.../CreateTrajectoryFromPointsParameter.java | 61 ++++++++++
4 files changed, 274 insertions(+), 9 deletions(-)
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
index f82df33..41b30ce 100644
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
@@ -29,6 +29,7 @@ import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.processors.geo.jvm.config.GeoJvmConfig;
import org.apache.streampipes.processors.geo.jvm.jts.processor.latLngToGeo.LatLngToGeoController;
import org.apache.streampipes.processors.geo.jvm.jts.processor.setEPSG.SetEpsgController;
+import org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory.CreateTrajectoryFromPointsController;
import org.apache.streampipes.processors.geo.jvm.processor.distancecalculator.DistanceCalculatorController;
import org.apache.streampipes.processors.geo.jvm.processor.geocoder.GoogleMapsGeocodingController;
import org.apache.streampipes.processors.geo.jvm.processor.revgeocoder.ReverseGeocodingController;
@@ -40,15 +41,16 @@ public class GeoJvmInit extends StandaloneModelSubmitter {
public static void main(String[] args) {
DeclarersSingleton
- .getInstance()
- .add(new DistanceCalculatorController())
- .add(new GoogleMapsGeocodingController())
- .add(new StaticGoogleMapsGeocodingController())
- .add(new ReverseGeocodingController())
- .add(new SetEpsgController())
- .add(new LatLngToGeoController())
- .add(new SpeedCalculatorController())
- .add(new StaticDistanceCalculatorController());
+ .getInstance()
+ .add(new DistanceCalculatorController())
+ .add(new GoogleMapsGeocodingController())
+ .add(new StaticGoogleMapsGeocodingController())
+ .add(new ReverseGeocodingController())
+ .add(new SetEpsgController())
+ .add(new LatLngToGeoController())
+ .add(new CreateTrajectoryFromPointsController())
+ .add(new SpeedCalculatorController())
+ .add(new StaticDistanceCalculatorController());
DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPoints.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPoints.java
new file mode 100755
index 0000000..3ad78d9
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPoints.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory;
+
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
+import org.apache.streampipes.processors.geo.jvm.jts.helper.SpTrajectoryBuilder;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.runtime.EventProcessor;
+import org.locationtech.jts.geom.LineString;
+import org.locationtech.jts.geom.Point;
+
+
+public class CreateTrajectoryFromPoints implements EventProcessor<CreateTrajectoryFromPointsParameter> {
+
+ private static Logger LOG;
+ private CreateTrajectoryFromPointsParameter params;
+ SpTrajectoryBuilder trajectory;
+
+
+
+ @Override
+ public void onInvocation(CreateTrajectoryFromPointsParameter params, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
+
+ LOG = params.getGraph().getLogger(CreateTrajectoryFromPointsParameter.class);
+ this.params = params;
+
+ trajectory = new SpTrajectoryBuilder(params.getSubpoints(), params.getDescription());
+ }
+
+ @Override
+ public void onEvent(Event in, SpOutputCollector out) {
+
+ String wkt = in.getFieldBySelector(params.getWkt()).getAsPrimitive().getAsString();
+ Integer epsg = in.getFieldBySelector(params.getEpsg()).getAsPrimitive().getAsInt();
+ Integer m = in.getFieldBySelector(params.getM()).getAsPrimitive().getAsInt();
+
+ Point eventGeom = (Point) SpGeometryBuilder.createSPGeom(wkt,epsg);
+
+ //creates single trajectory
+ trajectory.addPointToTrajectory(eventGeom, m);
+ LineString geom = trajectory.returnAsLineString(eventGeom.getFactory());
+
+ in.addField(CreateTrajectoryFromPointsController.WKT, geom.toString());
+ in.addField(CreateTrajectoryFromPointsController.DESCRIPTION, trajectory.getDescription());
+ out.collect(in);
+
+ }
+
+ @Override
+ public void onDetach() {
+
+ }
+}
+
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPointsController.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPointsController.java
new file mode 100755
index 0000000..490007f
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPointsController.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory;
+
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+public class CreateTrajectoryFromPointsController extends StandaloneEventProcessingDeclarer<CreateTrajectoryFromPointsParameter> {
+
+
+
+ public final static String WKT = "trajectory_wkt";
+ public final static String EPSG = "EPSG";
+ public final static String M = "M-Value";
+ public final static String DESCRIPTION = "description";
+ public final static String SUBPOINTS = "subpoints";
+
+ public final static String EPA_NAME = "Create Single Trajectory";
+
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder
+ .create("org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory",
+ EPA_NAME,
+ "Creates a trajectory from Points")
+ .category(DataProcessorType.GEO)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .requiredStream(
+ StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.stringReq(),
+ Labels.from(WKT,
+ "Geometry WKT",
+ "WKT of the requested Geometry"),
+ PropertyScope.NONE
+ )
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.numberReq(),
+ Labels.from(EPSG, "EPSG Field", "EPSG Code for SRID"),
+ PropertyScope.NONE
+ )
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.numberReq(),
+ Labels.from(M, "M Value", "Choose a value add to trajectory"),
+ PropertyScope.NONE
+ )
+ .build()
+ )
+ .requiredTextParameter(
+ Labels.from(
+ DESCRIPTION,
+ "description of trajectory",
+ "Add a description for the trajectory")
+ )
+ .requiredIntegerParameter(
+ Labels.from(
+ SUBPOINTS,
+ "number of allowed subpoints",
+ "Number og allowed subpoints of the trajector"),
+ 2, 10, 1
+ )
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.stringEp(
+ Labels.from(
+ "trajectory_wkt",
+ "trajectory_wkt",
+ "trajectory wkt (lineString) of a point stream"),
+ WKT,
+ SO.Text),
+ EpProperties.stringEp(
+ Labels.from(
+ "trajectory_description",
+ "trajectory_description",
+ "description of trajectory"),
+ DESCRIPTION,
+ SO.Text))
+ )
+
+ .supportedFormats(SupportedFormats.jsonFormat())
+ .supportedProtocols(SupportedProtocols.kafka())
+ .build();
+ }
+
+
+ @Override
+ public ConfiguredEventProcessor<CreateTrajectoryFromPointsParameter> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+
+
+ String wkt = extractor.mappingPropertyValue(WKT);
+ String epsg_value = extractor.mappingPropertyValue(EPSG);
+ String m = extractor.mappingPropertyValue(M);
+
+ String description = extractor.singleValueParameter(DESCRIPTION, String.class);
+ Integer subpoints = extractor.singleValueParameter(SUBPOINTS, Integer.class);
+
+
+ CreateTrajectoryFromPointsParameter params = new CreateTrajectoryFromPointsParameter(graph, wkt, epsg_value, description, subpoints, m);
+
+ return new ConfiguredEventProcessor<>(params, CreateTrajectoryFromPoints::new);
+ }
+}
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPointsParameter.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPointsParameter.java
new file mode 100755
index 0000000..0397ac5
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPointsParameter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory;
+
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+
+public class CreateTrajectoryFromPointsParameter extends EventProcessorBindingParams {
+
+ private String epsg;
+ private String wkt;
+ private String description;
+ private Integer subpoints;
+ private String m;
+
+
+ public CreateTrajectoryFromPointsParameter(DataProcessorInvocation graph, String wkt, String epsg, String description, Integer subpoints, String m) {
+ super(graph);
+ this.wkt = wkt;
+ this.epsg = epsg;
+ this.description = description;
+ this.subpoints = subpoints;
+ this.m = m;
+ }
+
+ public String getEpsg() {
+ return epsg;
+ }
+
+ public String getWkt() {
+ return wkt;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public Integer getSubpoints() {
+ return subpoints;
+ }
+
+ public String getM() {
+ return m;
+ }
+}