You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/12/11 07:41:01 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-449] Update API of rate limit processor
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new f5ff956 [STREAMPIPES-449] Update API of rate limit processor
f5ff956 is described below
commit f5ff9561743ad798fae0e6eb3176a60bddd8ee0c
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Sat Dec 11 08:40:46 2021 +0100
[STREAMPIPES-449] Update API of rate limit processor
---
.../processors/filters/jvm/FiltersJvmInit.java | 4 +-
.../filters/jvm/processor/limit/RateLimit.java | 75 ---------------
.../jvm/processor/limit/RateLimitParameters.java | 66 -------------
...imitController.java => RateLimitProcessor.java} | 103 +++++++++++++++------
4 files changed, 76 insertions(+), 172 deletions(-)
diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
index df52d22..001446b 100644
--- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
+++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/FiltersJvmInit.java
@@ -31,7 +31,7 @@ import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.processors.filters.jvm.processor.booleanfilter.BooleanFilterProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.compose.ComposeProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.enrich.MergeByEnrichProcessor;
-import org.apache.streampipes.processors.filters.jvm.processor.limit.RateLimitController;
+import org.apache.streampipes.processors.filters.jvm.processor.limit.RateLimitProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.merge.MergeByTimeProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.NumericalFilterProcessor;
import org.apache.streampipes.processors.filters.jvm.processor.numericaltextfilter.NumericalTextFilterProcessor;
@@ -63,7 +63,7 @@ public class FiltersJvmInit extends StandaloneModelSubmitter {
new MergeBySchemaProcessor(),
new ComposeProcessor(),
new NumericalTextFilterProcessor(),
- new RateLimitController())
+ new RateLimitProcessor())
.registerMessagingFormats(
new JsonDataFormatFactory(),
new CborDataFormatFactory(),
diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimit.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimit.java
deleted file mode 100644
index 030c2f7..0000000
--- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimit.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.limit;
-
-import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.processors.filters.jvm.processor.limit.util.WindowFactory;
-import org.apache.streampipes.processors.filters.jvm.processor.limit.window.Window;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class RateLimit implements EventProcessor<RateLimitParameters> {
- private final static String DEFAULT_GROUP = "default";
- private Boolean groupingEnabled;
- private String groupingField;
- private ConcurrentMap<Object, Window> windows;
- private WindowFactory factory;
-
- @Override
- public void onInvocation(RateLimitParameters parameters,
- SpOutputCollector outputCollector,
- EventProcessorRuntimeContext runtimeContext) {
- this.groupingEnabled = parameters.getGroupingEnabled();
- this.groupingField = parameters.getGroupingField();
- this.windows = new ConcurrentHashMap<>();
- this.factory = new WindowFactory(
- parameters.getWindowType(),
- parameters.getWindowExpression(),
- parameters.getEventSelection(),
- outputCollector);
- }
-
- @Override
- public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
- Object group = groupingEnabled ? getGroupKey(event) : DEFAULT_GROUP;
- Window window = windows.get(group);
- if (window == null) {
- window = factory.create();
- window.init();
- windows.put(group, window);
- }
- window.onEvent(event);
- }
-
- @Override
- public void onDetach() throws SpRuntimeException {
- for (Window window : this.windows.values()) {
- window.destroy();
- }
- }
-
- private Object getGroupKey(Event event) {
- return event.getFieldBySelector(groupingField).getAsPrimitive().getAsString();
- }
-
-}
diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitParameters.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitParameters.java
deleted file mode 100644
index d7757e4..0000000
--- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitParameters.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.filters.jvm.processor.limit;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.processors.filters.jvm.processor.limit.util.EventSelection;
-import org.apache.streampipes.processors.filters.jvm.processor.limit.util.WindowType;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class RateLimitParameters extends EventProcessorBindingParams {
- private EventSelection eventSelection;
- private WindowType windowType;
- private Boolean groupingEnabled;
- private String groupingField;
- private Object windowExpression;
-
- public RateLimitParameters(DataProcessorInvocation graph,
- WindowType windowType,
- Object windowExpression,
- Boolean groupingEnabled,
- String groupingField,
- EventSelection eventSelection) {
- super(graph);
- this.eventSelection = eventSelection;
- this.windowType = windowType;
- this.groupingEnabled = groupingEnabled;
- this.groupingField = groupingField;
- this.windowExpression = windowExpression;
- }
-
- public WindowType getWindowType() {
- return windowType;
- }
-
- public Object getWindowExpression() {
- return windowExpression;
- }
-
- public Boolean getGroupingEnabled() {
- return groupingEnabled;
- }
-
- public String getGroupingField() {
- return groupingField;
- }
-
- public EventSelection getEventSelection() {
- return eventSelection;
- }
-
-}
diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitController.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitProcessor.java
similarity index 56%
rename from streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitController.java
rename to streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitProcessor.java
index 68242de..c3025bf 100644
--- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitController.java
+++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/limit/RateLimitProcessor.java
@@ -15,29 +15,33 @@
* limitations under the License.
*
*/
+
package org.apache.streampipes.processors.filters.jvm.processor.limit;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.processors.filters.jvm.processor.limit.util.EventSelection;
+import org.apache.streampipes.processors.filters.jvm.processor.limit.util.WindowFactory;
import org.apache.streampipes.processors.filters.jvm.processor.limit.util.WindowType;
+import org.apache.streampipes.processors.filters.jvm.processor.limit.window.Window;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.Alternatives;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.helpers.*;
import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class RateLimitProcessor extends StreamPipesDataProcessor {
-public class RateLimitController extends StandaloneEventProcessingDeclarer<RateLimitParameters> {
private static final String EVENT_SELECTION = "event-selection";
private static final String WINDOW_TYPE = "window-type";
private static final String LENGTH_WINDOW = "length-window";
@@ -54,6 +58,13 @@ public class RateLimitController extends StandaloneEventProcessingDeclarer<RateL
private static final String OPTION_LAST = "Last";
private static final String OPTION_ALL = "All";
+ private final static String DEFAULT_GROUP = "default";
+ private Boolean groupingEnabled;
+ private String groupingField;
+ private ConcurrentMap<Object, Window> windows;
+ private WindowFactory factory;
+
+
@Override
public DataProcessorDescription declareModel() {
return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.limit")
@@ -80,29 +91,63 @@ public class RateLimitController extends StandaloneEventProcessingDeclarer<RateL
}
@Override
- public ConfiguredEventProcessor<RateLimitParameters> onInvocation(DataProcessorInvocation graph,
- ProcessingElementParameterExtractor extractor) {
- Boolean groupingEnabled = Boolean.valueOf(extractor.selectedSingleValue(GROUPING_ENABLED, String.class));
- String groupingField = extractor.mappingPropertyValue(GROUPING_FIELD);
- EventSelection eventSelection = EventSelection.valueOf(extractor
+ public void onInvocation(ProcessorParams
+ processorParams, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext eventProcessorRuntimeContext) throws SpRuntimeException {
+
+ this.groupingEnabled = Boolean.valueOf(processorParams.extractor().selectedSingleValue(GROUPING_ENABLED, String.class));
+ this.groupingField = processorParams.extractor().mappingPropertyValue(GROUPING_FIELD);
+ this.windows = new ConcurrentHashMap<>();
+
+ EventSelection eventSelection = EventSelection.valueOf(processorParams.extractor()
.selectedSingleValue(EVENT_SELECTION, String.class).toUpperCase());
- String windowType = extractor.selectedAlternativeInternalId(WINDOW_TYPE);
+ String windowType = processorParams.extractor().selectedAlternativeInternalId(WINDOW_TYPE);
+
if (TIME_WINDOW.equals(windowType)) {
- Integer windowSize = extractor.singleValueParameter(TIME_WINDOW_SIZE, Integer.class);
- RateLimitParameters params = new RateLimitParameters(graph, WindowType.TIME,
- windowSize, groupingEnabled, groupingField, eventSelection);
- return new ConfiguredEventProcessor<>(params, RateLimit::new);
+ Integer windowSize = processorParams.extractor().singleValueParameter(TIME_WINDOW_SIZE, Integer.class);
+ this.factory = new WindowFactory(
+ WindowType.TIME,
+ windowSize,
+ eventSelection,
+ spOutputCollector);
+
} else if (CRON_WINDOW.equals(windowType)) {
- String cronExpression = extractor.singleValueParameter(CRON_WINDOW_EXPR, String.class);
- RateLimitParameters params = new RateLimitParameters(graph, WindowType.CRON,
- cronExpression, groupingEnabled, groupingField, eventSelection);
- return new ConfiguredEventProcessor<>(params, RateLimit::new);
+ String cronExpression = processorParams.extractor().singleValueParameter(CRON_WINDOW_EXPR, String.class);
+ this.factory = new WindowFactory(
+ WindowType.CRON,
+ cronExpression,
+ eventSelection,
+ spOutputCollector);
+
} else {
- Integer windowSize = extractor.singleValueParameter(LENGTH_WINDOW_SIZE, Integer.class);
- RateLimitParameters params = new RateLimitParameters(graph, WindowType.LENGTH,
- windowSize, groupingEnabled, groupingField, eventSelection);
- return new ConfiguredEventProcessor<>(params, RateLimit::new);
+ Integer windowSize = processorParams.extractor().singleValueParameter(LENGTH_WINDOW_SIZE, Integer.class);
+ this.factory = new WindowFactory(
+ WindowType.LENGTH,
+ windowSize,
+ eventSelection,
+ spOutputCollector);
}
}
+ @Override
+ public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
+ Object group = groupingEnabled ? getGroupKey(event) : DEFAULT_GROUP;
+ Window window = windows.get(group);
+ if (window == null) {
+ window = factory.create();
+ window.init();
+ windows.put(group, window);
+ }
+ window.onEvent(event);
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+ for (Window window : this.windows.values()) {
+ window.destroy();
+ }
+ }
+
+ private Object getGroupKey(Event event) {
+ return event.getFieldBySelector(groupingField).getAsPrimitive().getAsString();
+ }
}