You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2020/04/20 12:53:56 UTC
[karaf-decanter] branch master updated: [KARAF-6673] Add aggregate
processor
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git
The following commit(s) were added to refs/heads/master by this push:
new 746ff53 [KARAF-6673] Add aggregate processor
new d6ba6cc Merge pull request #157 from jbonofre/KARAF-6673
746ff53 is described below
commit 746ff536bfcee6f2c552bb46bb0705e2279ae6e8
Author: jbonofre <jb...@apache.org>
AuthorDate: Mon Apr 20 11:59:32 2020 +0200
[KARAF-6673] Add aggregate processor
---
assembly/src/main/feature/feature.xml | 7 +-
.../src/main/asciidoc/user-guide/processors.adoc | 46 +++++++-
processor/{ => aggregate}/pom.xml | 38 +++++--
.../processor/aggregate/AggregateProcessor.java | 109 +++++++++++++++++++
.../aggregate/AggregateProcessorTest.java | 121 +++++++++++++++++++++
processor/pom.xml | 1 +
6 files changed, 310 insertions(+), 12 deletions(-)
diff --git a/assembly/src/main/feature/feature.xml b/assembly/src/main/feature/feature.xml
index c6a4300..48651a1 100644
--- a/assembly/src/main/feature/feature.xml
+++ b/assembly/src/main/feature/feature.xml
@@ -16,7 +16,7 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<features name="karaf-decanter-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.0.0 http://karaf.apache.org/xmlns/features/v1.0.0">
+<features name="karaf-decanter-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.4.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.4.0 http://karaf.apache.org/xmlns/features/v1.4.0">
<repository>mvn:org.apache.camel.karaf/apache-camel/${camel.version}/xml/features</repository>
@@ -417,6 +417,11 @@ org.apache.felix.eventadmin.IgnoreTimeout=org.apache.karaf.decanter.
<bundle>mvn:org.apache.karaf.decanter.processor/org.apache.karaf.decanter.processor.passthrough/${project.version}</bundle>
</feature>
+ <feature name="decanter-processor-aggregate" version="${project.version}" description="Karaf Decanter Aggregate Processor">
+ <feature>decanter-common</feature>
+ <bundle>mvn:org.apache.karaf.decanter.processor/org.apache.karaf.decanter.processor.aggregate/${project.version}</bundle>
+ </feature>
+
<feature name="decanter-alerting-core" version="${project.version}" description="Karaf Decanter Alerting core">
<feature>decanter-common</feature>
<bundle>mvn:org.apache.karaf.decanter.alerting/org.apache.karaf.decanter.alerting.service/${project.version}</bundle>
diff --git a/manual/src/main/asciidoc/user-guide/processors.adoc b/manual/src/main/asciidoc/user-guide/processors.adoc
index 2583dd0..42bf87e 100644
--- a/manual/src/main/asciidoc/user-guide/processors.adoc
+++ b/manual/src/main/asciidoc/user-guide/processors.adoc
@@ -32,4 +32,48 @@ data to `decanter/process/second`. Finally, at the end of the chain, you have to
==== Pass Through
-This processor doesn't implement any concrete logic. It's for the example how to implement a processor.
\ No newline at end of file
+This processor doesn't implement any concrete logic. It's for the example how to implement a processor.
+
+You can install this processor using the `decanter-processor-passthrough` feature:
+
+----
+karaf@root()> feature:install decanter-processor-passthrough
+----
+
+==== Aggregate
+
+This processor "merges" several incoming events in a single one that is sent periodically.
+
+You can install this processor using the `decanter-processor-aggregate` feature:
+
+----
+karaf@root()> feature:install decanter-processor-aggregate
+----
+
+By default, the "merged" event is sent every minute. You can change this using the `period` configuration.
+
+You can provisiong `etc/org.apache.karaf.decanter.processor.aggregate.cfg` configuration file with:
+
+----
+period=120 # this is the period in seconds
+target.topics=decanter/process/aggregate # that's the default target topic
+----
+
+You can also decide if a known property is overwritten in the aggregator or appended.
+
+By default, properties are not overwritten, meaning that it's prefixed by the event index in the aggregator:
+
+----
+0.foo=first
+0.other=bar
+1.foo=second
+1.other=bar
+----
+
+In the processor `etc/org.apache.karaf.decanter.processor.aggregate.cfg` configuration file, you can enable `overwrite`:
+
+----
+overwrite=true
+----
+
+Then, if a property already exist in the aggregator, its value will be overwritten by the new event value received in the aggregator.
\ No newline at end of file
diff --git a/processor/pom.xml b/processor/aggregate/pom.xml
similarity index 51%
copy from processor/pom.xml
copy to processor/aggregate/pom.xml
index 7d3b738..6c15f60 100644
--- a/processor/pom.xml
+++ b/processor/aggregate/pom.xml
@@ -22,19 +22,37 @@
<modelVersion>4.0.0</modelVersion>
<parent>
- <groupId>org.apache.karaf</groupId>
- <artifactId>decanter</artifactId>
+ <groupId>org.apache.karaf.decanter</groupId>
+ <artifactId>processor</artifactId>
<version>2.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <groupId>org.apache.karaf.decanter</groupId>
- <artifactId>processor</artifactId>
- <packaging>pom</packaging>
- <name>Apache Karaf :: Decanter :: Processor</name>
-
- <modules>
- <module>passthrough</module>
- </modules>
+ <groupId>org.apache.karaf.decanter.processor</groupId>
+ <artifactId>org.apache.karaf.decanter.processor.aggregate</artifactId>
+ <packaging>bundle</packaging>
+ <name>Apache Karaf :: Decanter :: Processor :: Aggregate</name>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>*</Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.processor.aggregate
+ </Private-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
diff --git a/processor/aggregate/src/main/java/org/apache/karaf/decanter/processor/aggregate/AggregateProcessor.java b/processor/aggregate/src/main/java/org/apache/karaf/decanter/processor/aggregate/AggregateProcessor.java
new file mode 100644
index 0000000..3168a3f
--- /dev/null
+++ b/processor/aggregate/src/main/java/org/apache/karaf/decanter/processor/aggregate/AggregateProcessor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.processor.aggregate;
+
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+
+import java.util.Dictionary;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Component(
+ name = "org.apache.karaf.decanter.processor.aggregate",
+ immediate = true,
+ property = EventConstants.EVENT_TOPIC + "=decanter/collect/*"
+)
+public class AggregateProcessor implements EventHandler {
+
+ @Reference
+ private EventAdmin dispatcher;
+
+ private String targetTopics;
+ private boolean overwrite = false;
+
+ private int index = 0;
+ private ConcurrentHashMap<String, Object> mergedData = new ConcurrentHashMap<>();
+ private ScheduledExecutorService scheduledExecutorService;
+
+ @Activate
+ public void activate(ComponentContext componentContext) {
+ activate(componentContext.getProperties());
+ }
+
+ public void activate(Dictionary<String, Object> configuration) {
+ targetTopics = (configuration.get("target.topics") != null) ? configuration.get("target.topics").toString() : "decanter/process/aggregate";
+ long period = (configuration.get("period") != null) ? Long.parseLong(configuration.get("period").toString()) : 60L;
+ overwrite = (configuration.get("overwrite") != null) ? Boolean.parseBoolean(configuration.get("overwrite").toString()) : false;
+ scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ scheduledExecutorService.scheduleAtFixedRate(new AggregateTask(), 0, period, TimeUnit.SECONDS);
+ }
+
+ @Deactivate
+ public void deactivate() {
+ scheduledExecutorService.shutdownNow();
+ }
+
+ @Override
+ public void handleEvent(Event event) {
+ // merge data
+ for (String propertyName : event.getPropertyNames()) {
+ if (overwrite) {
+ mergedData.put(propertyName, event.getProperty(propertyName));
+ } else {
+ mergedData.put(index + "." + propertyName, event.getProperty(propertyName));
+ }
+ }
+ index++;
+ }
+
+ class AggregateTask implements Runnable {
+
+ @Override
+ public void run() {
+ // create event and send
+ if (mergedData.size() > 0) {
+ mergedData.put("processor", "aggregate");
+ String[] topics = targetTopics.split(",");
+ for (String topic : topics) {
+ dispatcher.postEvent(new Event(topic, mergedData));
+ }
+ // reset the merged data
+ mergedData.clear();
+ index = 0;
+ }
+ }
+
+ }
+
+ /**
+ * Visible for testing
+ */
+ public void setDispatcher(EventAdmin dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+}
diff --git a/processor/aggregate/src/test/java/org/apache/karaf/decanter/processor/aggregate/AggregateProcessorTest.java b/processor/aggregate/src/test/java/org/apache/karaf/decanter/processor/aggregate/AggregateProcessorTest.java
new file mode 100644
index 0000000..3164b92
--- /dev/null
+++ b/processor/aggregate/src/test/java/org/apache/karaf/decanter/processor/aggregate/AggregateProcessorTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.processor.aggregate;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+
+public class AggregateProcessorTest {
+
+ @Test
+ public void testWithOverwrite() throws Exception {
+ MockDispatcher dispatcher = new MockDispatcher();
+ AggregateProcessor aggregateProcessor = new AggregateProcessor();
+ aggregateProcessor.setDispatcher(dispatcher);
+ Hashtable<String, Object> configuration = new Hashtable<>();
+ configuration.put("period", "2");
+ configuration.put("overwrite", "true");
+ aggregateProcessor.activate(configuration);
+
+ HashMap<String, Object> data1 = new HashMap<>();
+ data1.put("first", "first");
+ Event event1 = new Event("decanter/collect/first", data1);
+ aggregateProcessor.handleEvent(event1);
+
+ HashMap<String, Object> data2 = new HashMap<>();
+ data2.put("second", "second");
+ Event event2 = new Event("decanter/collect/second", data2);
+ aggregateProcessor.handleEvent(event2);
+
+ HashMap<String, Object> data3 = new HashMap<>();
+ data3.put("second", "overwrite");
+ Event event3 = new Event("decanter/collect/third", data3);
+ aggregateProcessor.handleEvent(event3);
+
+ Thread.sleep(3000);
+
+ Assert.assertEquals(1, dispatcher.postedEvents.size());
+ Assert.assertEquals("first", dispatcher.postedEvents.get(0).getProperty("first"));
+ Assert.assertEquals("overwrite", dispatcher.postedEvents.get(0).getProperty("second"));
+ Assert.assertEquals("aggregate", dispatcher.postedEvents.get(0).getProperty("processor"));
+ Assert.assertEquals("decanter/process/aggregate", dispatcher.postedEvents.get(0).getTopic());
+
+ HashMap<String, Object> data4 = new HashMap<>();
+ data4.put("foo", "bar");
+ Event event4 = new Event("decanter/collect/foo", data4);
+ aggregateProcessor.handleEvent(event4);
+
+ Thread.sleep(3000);
+
+ Assert.assertEquals(2, dispatcher.postedEvents.size());
+ Assert.assertEquals("bar", dispatcher.postedEvents.get(1).getProperty("foo"));
+
+ aggregateProcessor.deactivate();
+ }
+
+ @Test
+ public void testWithoutOverwrite() throws Exception {
+ MockDispatcher dispatcher = new MockDispatcher();
+ AggregateProcessor aggregateProcessor = new AggregateProcessor();
+ aggregateProcessor.setDispatcher(dispatcher);
+ Hashtable<String, Object> configuration = new Hashtable<>();
+ configuration.put("period", "2");
+ aggregateProcessor.activate(configuration);
+
+ HashMap<String, Object> data1 = new HashMap<>();
+ data1.put("foo", "first");
+ Event event1 = new Event("decanter/collect/foo", data1);
+ aggregateProcessor.handleEvent(event1);
+
+ HashMap<String, Object> data2 = new HashMap<>();
+ data2.put("foo", "second");
+ Event event2 = new Event("decanter/collect/foo", data2);
+ aggregateProcessor.handleEvent(event2);
+
+ Thread.sleep(4000);
+
+ Assert.assertEquals(1, dispatcher.postedEvents.size());
+ Assert.assertEquals("first", dispatcher.postedEvents.get(0).getProperty("0.foo"));
+ Assert.assertEquals("second", dispatcher.postedEvents.get(0).getProperty("1.foo"));
+
+ aggregateProcessor.deactivate();
+ }
+
+ class MockDispatcher implements EventAdmin {
+
+ public List<Event> postedEvents = new ArrayList<>();
+ public List<Event> sentEvents = new ArrayList<>();
+
+ @Override
+ public void postEvent(Event event) {
+ postedEvents.add(event);
+ }
+
+ @Override
+ public void sendEvent(Event event) {
+ sentEvents.add(event);
+ }
+ }
+
+}
diff --git a/processor/pom.xml b/processor/pom.xml
index 7d3b738..059d77b 100644
--- a/processor/pom.xml
+++ b/processor/pom.xml
@@ -35,6 +35,7 @@
<modules>
<module>passthrough</module>
+ <module>aggregate</module>
</modules>
</project>
\ No newline at end of file