You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/03/22 20:50:03 UTC
[incubator-streampipes-extensions] branch dev updated: Add
StaticDistanceCalculator processor
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new 0697b24 Add StaticDistanceCalculator processor
0697b24 is described below
commit 0697b247f4dc156504c71ac7289ec9f9e96bbc9a
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sun Mar 22 21:49:44 2020 +0100
Add StaticDistanceCalculator processor
---
.../pe/jvm/AllPipelineElementsInit.java | 4 +-
.../streampipes/processors/geo/jvm/GeoJvmInit.java | 4 +-
.../StaticDistanceCalculator.java | 60 +++++++++++++++
.../StaticDistanceCalculatorController.java | 87 ++++++++++++++++++++++
.../StaticDistanceCalculatorParameters.java | 65 ++++++++++++++++
.../documentation.md | 5 +-
.../documentation.md | 11 +--
.../strings.en | 14 ++++
8 files changed, 238 insertions(+), 12 deletions(-)
diff --git a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
index 6869848..0459d7c 100644
--- a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
+++ b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
@@ -41,6 +41,7 @@ import org.apache.streampipes.processors.geo.jvm.processor.distancecalculator.Di
import org.apache.streampipes.processors.geo.jvm.processor.geocoder.GoogleMapsGeocodingController;
import org.apache.streampipes.processors.geo.jvm.processor.revgeocoder.ReverseGeocodingController;
import org.apache.streampipes.processors.geo.jvm.processor.speed.SpeedCalculatorController;
+import org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator.StaticDistanceCalculatorController;
import org.apache.streampipes.processors.geo.jvm.processor.staticgeocoder.StaticGoogleMapsGeocodingController;
import org.apache.streampipes.processors.imageprocessing.jvm.processor.genericclassification.GenericImageClassificationController;
import org.apache.streampipes.processors.imageprocessing.jvm.processor.imagecropper.ImageCropperController;
@@ -131,7 +132,8 @@ public class AllPipelineElementsInit extends StandaloneModelSubmitter {
.add(new ReverseGeocodingController())
.add(new SetEpsgController())
.add(new LatLngToGeoController())
- .add(new SpeedCalculatorController());
+ .add(new SpeedCalculatorController())
+ .add(new StaticDistanceCalculatorController());
DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
index 34df051..f82df33 100644
--- a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
@@ -33,6 +33,7 @@ import org.apache.streampipes.processors.geo.jvm.processor.distancecalculator.Di
import org.apache.streampipes.processors.geo.jvm.processor.geocoder.GoogleMapsGeocodingController;
import org.apache.streampipes.processors.geo.jvm.processor.revgeocoder.ReverseGeocodingController;
import org.apache.streampipes.processors.geo.jvm.processor.speed.SpeedCalculatorController;
+import org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator.StaticDistanceCalculatorController;
import org.apache.streampipes.processors.geo.jvm.processor.staticgeocoder.StaticGoogleMapsGeocodingController;
public class GeoJvmInit extends StandaloneModelSubmitter {
@@ -46,7 +47,8 @@ public class GeoJvmInit extends StandaloneModelSubmitter {
.add(new ReverseGeocodingController())
.add(new SetEpsgController())
.add(new LatLngToGeoController())
- .add(new SpeedCalculatorController());
+ .add(new SpeedCalculatorController())
+ .add(new StaticDistanceCalculatorController());
DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculator.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculator.java
new file mode 100644
index 0000000..2889388
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.geo.jvm.processor.util.DistanceUtil;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.runtime.EventProcessor;
+
+public class StaticDistanceCalculator implements EventProcessor<StaticDistanceCalculatorParameters> {
+
+ private String latitudeFieldName;
+ private String longitudeFieldName;
+
+ private Float selectedLocationLatitude;
+ private Float selectedLocationLongitude;
+
+ @Override
+ public void onInvocation(StaticDistanceCalculatorParameters parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+ this.latitudeFieldName = parameters.getLatitudeFieldName();
+ this.longitudeFieldName = parameters.getLongitudeFieldName();
+
+ this.selectedLocationLatitude = parameters.getSelectedLatitude();
+ this.selectedLocationLongitude = parameters.getSelectedLongitude();
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+ Float latitude = event.getFieldBySelector(latitudeFieldName).getAsPrimitive().getAsFloat();
+ Float longitude = event.getFieldBySelector(longitudeFieldName).getAsPrimitive().getAsFloat();
+
+ Float distance = DistanceUtil.dist(latitude, longitude, selectedLocationLatitude,
+ selectedLocationLongitude);
+
+ event.addField("distance", distance);
+
+ collector.collect(event);
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
+}
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculatorController.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculatorController.java
new file mode 100644
index 0000000..2192e7d
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculatorController.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator;
+
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.vocabulary.Geo;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+import java.net.URI;
+
+public class StaticDistanceCalculatorController extends StandaloneEventProcessingDeclarer<StaticDistanceCalculatorParameters> {
+
+ private static final String LATITUDE_KEY = "latitude-key";
+ private static final String LONGITUDE_KEY = "longitude-key" ;
+ private static final String SELECTED_LATITUDE_KEY = "selected-latitude-key";
+ private static final String SELECTED_LONGITUDE_KEY = "selected-longitude-key";
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.geo.jvm.processor" +
+ ".staticdistancecalculator")
+ .category(DataProcessorType.FILTER)
+ .withAssets(Assets.DOCUMENTATION)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq(Geo.lat)
+ , Labels.withId(LATITUDE_KEY), PropertyScope.MEASUREMENT_PROPERTY)
+ .requiredPropertyWithUnaryMapping(EpRequirements.domainPropertyReq(Geo.lng)
+ , Labels.withId(LONGITUDE_KEY), PropertyScope.MEASUREMENT_PROPERTY)
+ .build())
+ .requiredFloatParameter(Labels.withId(SELECTED_LATITUDE_KEY))
+ .requiredFloatParameter(Labels.withId(SELECTED_LONGITUDE_KEY))
+ .outputStrategy(
+ OutputStrategies.append(PrimitivePropertyBuilder
+ .create(Datatypes.Float,"distance")
+ .domainProperty(SO.Number)
+ .measurementUnit(URI.create("http://qudt.org/vocab/unit#Kilometer"))
+ .build())
+ )
+ .build();
+
+ }
+
+ @Override
+ public ConfiguredEventProcessor<StaticDistanceCalculatorParameters> onInvocation(DataProcessorInvocation graph,
+ ProcessingElementParameterExtractor extractor) {
+ String latitudeFieldName = extractor.mappingPropertyValue(LATITUDE_KEY);
+ String longitudeFieldName = extractor.mappingPropertyValue(LONGITUDE_KEY);
+ Float selectedLatitude = extractor.singleValueParameter(SELECTED_LATITUDE_KEY, Float.class);
+ Float selectedLongitude = extractor.singleValueParameter(SELECTED_LONGITUDE_KEY, Float.class);
+
+ StaticDistanceCalculatorParameters staticParam = new StaticDistanceCalculatorParameters(graph,
+ latitudeFieldName, longitudeFieldName, selectedLatitude, selectedLongitude);
+
+ return new ConfiguredEventProcessor<>(staticParam, StaticDistanceCalculator::new);
+ }
+}
diff --git a/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculatorParameters.java b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculatorParameters.java
new file mode 100644
index 0000000..0d7e347
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/processor/staticdistancecalculator/StaticDistanceCalculatorParameters.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator;
+
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+
+public class StaticDistanceCalculatorParameters extends EventProcessorBindingParams {
+
+ private String latitudeFieldName;
+ private String longitudeFieldName;
+
+ private Float selectedLatitude;
+ private Float selectedLongitude;
+
+ public StaticDistanceCalculatorParameters(DataProcessorInvocation graph,
+ String latitudeFieldName,
+ String longitudeFieldName,
+ Float selectedLatitude,
+ Float selectedLongitude) {
+ super(graph);
+ this.latitudeFieldName = latitudeFieldName;
+ this.longitudeFieldName = longitudeFieldName;
+ this.selectedLatitude = selectedLatitude;
+ this.selectedLongitude = selectedLongitude;
+ }
+
+ public String getLatitudeFieldName() {
+ return latitudeFieldName;
+ }
+
+ public String getLongitudeFieldName() {
+ return longitudeFieldName;
+ }
+
+ public Float getSelectedLatitude() {
+ return selectedLatitude;
+ }
+
+ public void setSelectedLatitude(Float selectedLatitude) {
+ this.selectedLatitude = selectedLatitude;
+ }
+
+ public Float getSelectedLongitude() {
+ return selectedLongitude;
+ }
+
+ public void setSelectedLongitude(Float selectedLongitude) {
+ this.selectedLongitude = selectedLongitude;
+ }
+}
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.distancecalculator/documentation.md b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.distancecalculator/documentation.md
index d43213b..f221b91 100644
--- a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.distancecalculator/documentation.md
+++ b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.distancecalculator/documentation.md
@@ -16,7 +16,7 @@
~
-->
-## Spatial Grid Enrichment
+## Distance Calculator
<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -26,8 +26,7 @@
## Description
-Groups spatial events into cells of a given size
-Add a detailed description here
+Calculates the distance between two latitude/longitude pairs in a single event.
***
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.distancecalculator/documentation.md b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator/documentation.md
similarity index 80%
copy from streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.distancecalculator/documentation.md
copy to streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator/documentation.md
index d43213b..eb70028 100644
--- a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.distancecalculator/documentation.md
+++ b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator/documentation.md
@@ -16,23 +16,20 @@
~
-->
-## Spatial Grid Enrichment
-
-<p align="center">
- <img src="icon.png" width="150px;" class="pe-image-documentation"/>
-</p>
+## Static Distance Calculator
***
## Description
-Groups spatial events into cells of a given size
-Add a detailed description here
+Calculates the distance (in km) between a fixed location (e.g., a place) and a latitude/longitude pair of an input
+ event.
***
## Required input
+Requires a data stream that provides latitude and longitude values.
***
diff --git a/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator/strings.en b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator/strings.en
new file mode 100644
index 0000000..4238a5b
--- /dev/null
+++ b/streampipes-processors-geo-jvm/src/main/resources/org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator/strings.en
@@ -0,0 +1,14 @@
+org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator.title=Static Distance Calculator
+org.apache.streampipes.processors.geo.jvm.processor.staticdistancecalculator.description=Calculates the distance between a fixed location and a moving location
+
+latitude-key.title=Latitude field
+latitude-key.description=
+
+longitude-key.title=Longitude field
+longitude-key.description=
+
+selected-latitude-key.title=Latitude
+selected-latitude-key.description=The latitude value of the fixed location
+
+selected-longitude-key.title=Longitude
+selected-longitude-key.description=The longitude value of the fixed location
\ No newline at end of file