You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by mi...@apache.org on 2022/11/19 23:11:31 UTC
[streampipes] 01/04: rewrite to 1-class processor model
This is an automated email from the ASF dual-hosted git repository.
micklich pushed a commit to branch STREAMPIPES-642
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 5c973c1ebf3797789f0f7753ed904f67c96f7e3a
Author: micklich <mi...@apache.org>
AuthorDate: Sat Nov 19 23:35:11 2022 +0100
rewrite to 1-class processor model
---
.../streampipes/processors/geo/jvm/GeoJvmInit.java | 4 +-
.../geo/jvm/jts/processor/epsg/EpsgProcessor.java | 80 ++++++++++++++++++++++
2 files changed, 82 insertions(+), 2 deletions(-)
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
index 949887050..52897a751 100644
--- a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/GeoJvmInit.java
@@ -29,8 +29,8 @@ import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.processors.geo.jvm.config.ConfigKeys;
+import org.apache.streampipes.processors.geo.jvm.jts.processor.epsg.EpsgProcessor;
import org.apache.streampipes.processors.geo.jvm.jts.processor.latLngToGeo.LatLngToGeoController;
-import org.apache.streampipes.processors.geo.jvm.jts.processor.setEPSG.SetEpsgController;
import org.apache.streampipes.processors.geo.jvm.jts.processor.trajectory.CreateTrajectoryFromPointsController;
import org.apache.streampipes.processors.geo.jvm.processor.distancecalculator.DistanceCalculatorController;
import org.apache.streampipes.processors.geo.jvm.processor.geocoder.GoogleMapsGeocodingController;
@@ -52,7 +52,7 @@ public class GeoJvmInit extends StandaloneModelSubmitter {
new GoogleMapsGeocodingController(),
new StaticGoogleMapsGeocodingController(),
new ReverseGeocodingController(),
- new SetEpsgController(),
+ new EpsgProcessor(),
new LatLngToGeoController(),
new CreateTrajectoryFromPointsController(),
new SpeedCalculatorController(),
diff --git a/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/epsg/EpsgProcessor.java b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/epsg/EpsgProcessor.java
new file mode 100644
index 000000000..44057d9d2
--- /dev/null
+++ b/streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/epsg/EpsgProcessor.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * *
+ *
+ */
+
+package org.apache.streampipes.processors.geo.jvm.jts.processor.epsg;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+public class EpsgProcessor extends StreamPipesDataProcessor {
+ private static final String EPSG_KEY = "epsg-key";
+ public static final String EPSG_RUNTIME = "epsg";
+
+ private int epsgCode;
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.geo.jvm.jts.processor.epsg")
+ .category(DataProcessorType.GEO)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .build())
+ .outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
+ .create(Datatypes.Integer, EPSG_RUNTIME)
+ .domainProperty("http://data.ign.fr/def/ignf#CartesianCS")
+ .build())
+ )
+ .requiredIntegerParameter(Labels.withId(EPSG_KEY), 4326)
+ .build();
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+ this.epsgCode = parameters.extractor().singleValueParameter(EPSG_KEY, Integer.class);
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
+ event.addField(EpsgProcessor.EPSG_RUNTIME, this.epsgCode);
+ collector.collect(event);
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
+}