You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2020/05/12 18:37:58 UTC

[incubator-streampipes-extensions] 01/17: added PE

This is an automated email from the ASF dual-hosted git repository.

micklich pushed a commit to branch feature/trajectory
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git

commit 285c684bf33c85de8ec8011896fba95d46718230
Author: micklich <fl...@disy.net>
AuthorDate: Tue Apr 28 17:22:51 2020 +0200

    added PE
---
 .../streampipes/processors/geo/jvm/GeoJvmInit.java |  20 ++--
 .../trajectory/CreateTrajectoryFromPoints.java     |  73 ++++++++++++
 .../CreateTrajectoryFromPointsController.java      | 129 +++++++++++++++++++++
 .../CreateTrajectoryFromPointsParameter.java       |  61 ++++++++++
 4 files changed, 274 insertions(+), 9 deletions(-)

diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
index f82df33..41b30ce 100644
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
@@ -29,6 +29,7 @@ import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.processors.geo.jvm.config.GeoJvmConfig;
 import org.apache.streampipes.processors.geo.jvm.jts.processor.latLngToGeo.LatLngToGeoController;
 import org.apache.streampipes.processors.geo.jvm.jts.processor.setEPSG.SetEpsgController;
+import org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory.CreateTrajectoryFromPointsController;
 import org.apache.streampipes.processors.geo.jvm.processor.distancecalculator.DistanceCalculatorController;
 import org.apache.streampipes.processors.geo.jvm.processor.geocoder.GoogleMapsGeocodingController;
 import org.apache.streampipes.processors.geo.jvm.processor.revgeocoder.ReverseGeocodingController;
@@ -40,15 +41,16 @@ public class GeoJvmInit extends StandaloneModelSubmitter {
 
   public static void main(String[] args) {
     DeclarersSingleton
-        .getInstance()
-        .add(new DistanceCalculatorController())
-        .add(new GoogleMapsGeocodingController())
-        .add(new StaticGoogleMapsGeocodingController())
-        .add(new ReverseGeocodingController())
-        .add(new SetEpsgController())
-        .add(new LatLngToGeoController())
-        .add(new SpeedCalculatorController())
-        .add(new StaticDistanceCalculatorController());
+            .getInstance()
+            .add(new DistanceCalculatorController())
+            .add(new GoogleMapsGeocodingController())
+            .add(new StaticGoogleMapsGeocodingController())
+            .add(new ReverseGeocodingController())
+            .add(new SetEpsgController())
+            .add(new LatLngToGeoController())
+            .add(new CreateTrajectoryFromPointsController())
+            .add(new SpeedCalculatorController())
+            .add(new StaticDistanceCalculatorController());
 
 
     DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPoints.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPoints.java
new file mode 100755
index 0000000..3ad78d9
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPoints.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory;
+
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.geo.jvm.jts.helper.SpGeometryBuilder;
+import org.apache.streampipes.processors.geo.jvm.jts.helper.SpTrajectoryBuilder;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.runtime.EventProcessor;
+import org.locationtech.jts.geom.LineString;
+import org.locationtech.jts.geom.Point;
+
+
+public class CreateTrajectoryFromPoints implements EventProcessor<CreateTrajectoryFromPointsParameter> {
+
+    private static Logger LOG;
+    private CreateTrajectoryFromPointsParameter params;
+    SpTrajectoryBuilder trajectory;
+
+
+
+    @Override
+    public void onInvocation(CreateTrajectoryFromPointsParameter params, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
+
+        LOG = params.getGraph().getLogger(CreateTrajectoryFromPointsParameter.class);
+        this.params = params;
+
+        trajectory = new SpTrajectoryBuilder(params.getSubpoints(), params.getDescription());
+    }
+
+    @Override
+    public void onEvent(Event in, SpOutputCollector out) {
+
+        String wkt = in.getFieldBySelector(params.getWkt()).getAsPrimitive().getAsString();
+        Integer epsg = in.getFieldBySelector(params.getEpsg()).getAsPrimitive().getAsInt();
+        Integer m = in.getFieldBySelector(params.getM()).getAsPrimitive().getAsInt();
+
+        Point eventGeom =  (Point) SpGeometryBuilder.createSPGeom(wkt,epsg);
+
+        //creates single trajectory
+        trajectory.addPointToTrajectory(eventGeom, m);
+        LineString geom = trajectory.returnAsLineString(eventGeom.getFactory());
+
+        in.addField(CreateTrajectoryFromPointsController.WKT, geom.toString());
+        in.addField(CreateTrajectoryFromPointsController.DESCRIPTION, trajectory.getDescription());
+        out.collect(in);
+
+    }
+
+    @Override
+    public void onDetach() {
+
+    }
+}
+
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPointsController.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPointsController.java
new file mode 100755
index 0000000..490007f
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPointsController.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory;
+
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+public class CreateTrajectoryFromPointsController extends  StandaloneEventProcessingDeclarer<CreateTrajectoryFromPointsParameter> {
+
+
+
+    public final static String WKT = "trajectory_wkt";
+    public final static String EPSG = "EPSG";
+    public final static String M = "M-Value";
+    public final static String DESCRIPTION = "description";
+    public final static String SUBPOINTS = "subpoints";
+
+    public final static String EPA_NAME = "Create Single Trajectory";
+
+
+    @Override
+    public DataProcessorDescription declareModel() {
+        return ProcessingElementBuilder
+                .create("org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory",
+                        EPA_NAME,
+                        "Creates a trajectory from Points")
+                .category(DataProcessorType.GEO)
+                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+                .requiredStream(
+                        StreamRequirementsBuilder
+                                .create()
+                                .requiredPropertyWithUnaryMapping(
+                                        EpRequirements.stringReq(),
+                                        Labels.from(WKT,
+                                                "Geometry WKT",
+                                                "WKT of the requested Geometry"),
+                                        PropertyScope.NONE
+                                )
+                                .requiredPropertyWithUnaryMapping(
+                                        EpRequirements.numberReq(),
+                                        Labels.from(EPSG, "EPSG Field", "EPSG Code for SRID"),
+                                        PropertyScope.NONE
+                                )
+                                .requiredPropertyWithUnaryMapping(
+                                        EpRequirements.numberReq(),
+                                        Labels.from(M, "M Value", "Choose a value add to trajectory"),
+                                        PropertyScope.NONE
+                                )
+                                .build()
+                )
+                .requiredTextParameter(
+                        Labels.from(
+                                DESCRIPTION,
+                                "description of trajectory",
+                                "Add a description for the trajectory")
+                )
+                .requiredIntegerParameter(
+                        Labels.from(
+                                SUBPOINTS,
+                                "number of allowed subpoints",
+                                "Number og allowed subpoints of the trajector"),
+                        2, 10, 1
+                )
+                .outputStrategy(OutputStrategies.append(
+                        EpProperties.stringEp(
+                                Labels.from(
+                                        "trajectory_wkt",
+                                        "trajectory_wkt",
+                                        "trajectory wkt (lineString) of a point stream"),
+                                WKT,
+                                SO.Text),
+                        EpProperties.stringEp(
+                                Labels.from(
+                                        "trajectory_description",
+                                        "trajectory_description",
+                                        "description of trajectory"),
+                                DESCRIPTION,
+                                SO.Text))
+                )
+
+                .supportedFormats(SupportedFormats.jsonFormat())
+                .supportedProtocols(SupportedProtocols.kafka())
+                .build();
+    }
+
+
+    @Override
+    public ConfiguredEventProcessor<CreateTrajectoryFromPointsParameter> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+
+
+        String wkt = extractor.mappingPropertyValue(WKT);
+        String epsg_value = extractor.mappingPropertyValue(EPSG);
+        String m = extractor.mappingPropertyValue(M);
+
+        String description = extractor.singleValueParameter(DESCRIPTION, String.class);
+        Integer subpoints = extractor.singleValueParameter(SUBPOINTS, Integer.class);
+
+
+        CreateTrajectoryFromPointsParameter params = new CreateTrajectoryFromPointsParameter(graph, wkt, epsg_value, description, subpoints, m);
+
+        return new ConfiguredEventProcessor<>(params, CreateTrajectoryFromPoints::new);
+    }
+}
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPointsParameter.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPointsParameter.java
new file mode 100755
index 0000000..0397ac5
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/CreateTrajectoryFromPointsParameter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory;
+
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+
+public class CreateTrajectoryFromPointsParameter extends EventProcessorBindingParams {
+
+    private String epsg;
+    private String wkt;
+    private String description;
+    private Integer subpoints;
+    private String m;
+
+
+    public CreateTrajectoryFromPointsParameter(DataProcessorInvocation graph, String wkt, String epsg, String description, Integer subpoints, String m) {
+        super(graph);
+        this.wkt = wkt;
+        this.epsg = epsg;
+        this.description = description;
+        this.subpoints = subpoints;
+        this.m = m;
+    }
+
+    public String getEpsg() {
+        return epsg;
+    }
+
+    public String getWkt() {
+        return wkt;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public Integer getSubpoints() {
+        return subpoints;
+    }
+
+    public String getM() {
+        return m;
+    }
+}