You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2020/04/20 12:53:56 UTC

[karaf-decanter] branch master updated: [KARAF-6673] Add aggregate processor

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git


The following commit(s) were added to refs/heads/master by this push:
     new 746ff53  [KARAF-6673] Add aggregate processor
     new d6ba6cc  Merge pull request #157 from jbonofre/KARAF-6673
746ff53 is described below

commit 746ff536bfcee6f2c552bb46bb0705e2279ae6e8
Author: jbonofre <jb...@apache.org>
AuthorDate: Mon Apr 20 11:59:32 2020 +0200

    [KARAF-6673] Add aggregate processor
---
 assembly/src/main/feature/feature.xml              |   7 +-
 .../src/main/asciidoc/user-guide/processors.adoc   |  46 +++++++-
 processor/{ => aggregate}/pom.xml                  |  38 +++++--
 .../processor/aggregate/AggregateProcessor.java    | 109 +++++++++++++++++++
 .../aggregate/AggregateProcessorTest.java          | 121 +++++++++++++++++++++
 processor/pom.xml                                  |   1 +
 6 files changed, 310 insertions(+), 12 deletions(-)

diff --git a/assembly/src/main/feature/feature.xml b/assembly/src/main/feature/feature.xml
index c6a4300..48651a1 100644
--- a/assembly/src/main/feature/feature.xml
+++ b/assembly/src/main/feature/feature.xml
@@ -16,7 +16,7 @@
     See the License for the specific language governing permissions and
     limitations under the License.
 -->
-<features name="karaf-decanter-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.0.0 http://karaf.apache.org/xmlns/features/v1.0.0">
+<features name="karaf-decanter-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.4.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.4.0 http://karaf.apache.org/xmlns/features/v1.4.0">
 
     <repository>mvn:org.apache.camel.karaf/apache-camel/${camel.version}/xml/features</repository>
 
@@ -417,6 +417,11 @@ org.apache.felix.eventadmin.IgnoreTimeout=org.apache.karaf.decanter.
         <bundle>mvn:org.apache.karaf.decanter.processor/org.apache.karaf.decanter.processor.passthrough/${project.version}</bundle>
     </feature>
 
+    <feature name="decanter-processor-aggregate" version="${project.version}" description="Karaf Decanter Aggregate Processor">
+        <feature>decanter-common</feature>
+        <bundle>mvn:org.apache.karaf.decanter.processor/org.apache.karaf.decanter.processor.aggregate/${project.version}</bundle>
+    </feature>
+
     <feature name="decanter-alerting-core" version="${project.version}" description="Karaf Decanter Alerting core">
         <feature>decanter-common</feature>
         <bundle>mvn:org.apache.karaf.decanter.alerting/org.apache.karaf.decanter.alerting.service/${project.version}</bundle>
diff --git a/manual/src/main/asciidoc/user-guide/processors.adoc b/manual/src/main/asciidoc/user-guide/processors.adoc
index 2583dd0..42bf87e 100644
--- a/manual/src/main/asciidoc/user-guide/processors.adoc
+++ b/manual/src/main/asciidoc/user-guide/processors.adoc
@@ -32,4 +32,48 @@ data to `decanter/process/second`. Finally, at the end of the chain, you have to
 
 ==== Pass Through
 
-This processor doesn't implement any concrete logic. It's for the example how to implement a processor.
\ No newline at end of file
+This processor doesn't implement any concrete logic. It's for the example how to implement a processor.
+
+You can install this processor using the `decanter-processor-passthrough` feature:
+
+----
+karaf@root()> feature:install decanter-processor-passthrough
+----
+
+==== Aggregate
+
+This processor "merges" several incoming events in a single one that is sent periodically.
+
+You can install this processor using the `decanter-processor-aggregate` feature:
+
+----
+karaf@root()> feature:install decanter-processor-aggregate
+----
+
+By default, the "merged" event is sent every minute. You can change this using the `period` configuration.
+
+You can provisiong `etc/org.apache.karaf.decanter.processor.aggregate.cfg` configuration file with:
+
+----
+period=120 # this is the period in seconds
+target.topics=decanter/process/aggregate # that's the default target topic
+----
+
+You can also decide if a known property is overwritten in the aggregator or appended.
+
+By default, properties are not overwritten, meaning that it's prefixed by the event index in the aggregator:
+
+----
+0.foo=first
+0.other=bar
+1.foo=second
+1.other=bar
+----
+
+In the processor `etc/org.apache.karaf.decanter.processor.aggregate.cfg` configuration file, you can enable `overwrite`:
+
+----
+overwrite=true
+----
+
+Then, if a property already exist in the aggregator, its value will be overwritten by the new event value received in the aggregator.
\ No newline at end of file
diff --git a/processor/pom.xml b/processor/aggregate/pom.xml
similarity index 51%
copy from processor/pom.xml
copy to processor/aggregate/pom.xml
index 7d3b738..6c15f60 100644
--- a/processor/pom.xml
+++ b/processor/aggregate/pom.xml
@@ -22,19 +22,37 @@
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
-        <groupId>org.apache.karaf</groupId>
-        <artifactId>decanter</artifactId>
+        <groupId>org.apache.karaf.decanter</groupId>
+        <artifactId>processor</artifactId>
         <version>2.4.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
 
-    <groupId>org.apache.karaf.decanter</groupId>
-    <artifactId>processor</artifactId>
-    <packaging>pom</packaging>
-    <name>Apache Karaf :: Decanter :: Processor</name>
-
-    <modules>
-        <module>passthrough</module>
-    </modules>
+    <groupId>org.apache.karaf.decanter.processor</groupId>
+    <artifactId>org.apache.karaf.decanter.processor.aggregate</artifactId>
+    <packaging>bundle</packaging>
+    <name>Apache Karaf :: Decanter :: Processor :: Aggregate</name>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>*</Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.processor.aggregate
+                        </Private-Package>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 
 </project>
\ No newline at end of file
diff --git a/processor/aggregate/src/main/java/org/apache/karaf/decanter/processor/aggregate/AggregateProcessor.java b/processor/aggregate/src/main/java/org/apache/karaf/decanter/processor/aggregate/AggregateProcessor.java
new file mode 100644
index 0000000..3168a3f
--- /dev/null
+++ b/processor/aggregate/src/main/java/org/apache/karaf/decanter/processor/aggregate/AggregateProcessor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.processor.aggregate;
+
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+
+import java.util.Dictionary;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Component(
+        name = "org.apache.karaf.decanter.processor.aggregate",
+        immediate = true,
+        property = EventConstants.EVENT_TOPIC + "=decanter/collect/*"
+)
+public class AggregateProcessor implements EventHandler {
+
+    @Reference
+    private EventAdmin dispatcher;
+
+    private String targetTopics;
+    private boolean overwrite = false;
+
+    private int index = 0;
+    private ConcurrentHashMap<String, Object> mergedData = new ConcurrentHashMap<>();
+    private ScheduledExecutorService scheduledExecutorService;
+
+    @Activate
+    public void activate(ComponentContext componentContext) {
+        activate(componentContext.getProperties());
+    }
+
+    public void activate(Dictionary<String, Object> configuration) {
+        targetTopics = (configuration.get("target.topics") != null) ? configuration.get("target.topics").toString() : "decanter/process/aggregate";
+        long period = (configuration.get("period") != null) ? Long.parseLong(configuration.get("period").toString()) : 60L;
+        overwrite = (configuration.get("overwrite") != null) ? Boolean.parseBoolean(configuration.get("overwrite").toString()) : false;
+        scheduledExecutorService = Executors.newScheduledThreadPool(1);
+        scheduledExecutorService.scheduleAtFixedRate(new AggregateTask(), 0, period, TimeUnit.SECONDS);
+    }
+
+    @Deactivate
+    public void deactivate() {
+        scheduledExecutorService.shutdownNow();
+    }
+
+    @Override
+    public void handleEvent(Event event) {
+        // merge data
+        for (String propertyName : event.getPropertyNames()) {
+            if (overwrite) {
+                mergedData.put(propertyName, event.getProperty(propertyName));
+            } else {
+                mergedData.put(index + "." + propertyName, event.getProperty(propertyName));
+            }
+        }
+        index++;
+    }
+
+    class AggregateTask implements Runnable {
+
+        @Override
+        public void run() {
+            // create event and send
+            if (mergedData.size() > 0) {
+                mergedData.put("processor", "aggregate");
+                String[] topics = targetTopics.split(",");
+                for (String topic : topics) {
+                    dispatcher.postEvent(new Event(topic, mergedData));
+                }
+                // reset the merged data
+                mergedData.clear();
+                index = 0;
+            }
+        }
+
+    }
+
+    /**
+     * Visible for testing
+     */
+    public void setDispatcher(EventAdmin dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+}
diff --git a/processor/aggregate/src/test/java/org/apache/karaf/decanter/processor/aggregate/AggregateProcessorTest.java b/processor/aggregate/src/test/java/org/apache/karaf/decanter/processor/aggregate/AggregateProcessorTest.java
new file mode 100644
index 0000000..3164b92
--- /dev/null
+++ b/processor/aggregate/src/test/java/org/apache/karaf/decanter/processor/aggregate/AggregateProcessorTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.processor.aggregate;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+
+public class AggregateProcessorTest {
+
+    @Test
+    public void testWithOverwrite() throws Exception {
+        MockDispatcher dispatcher = new MockDispatcher();
+        AggregateProcessor aggregateProcessor = new AggregateProcessor();
+        aggregateProcessor.setDispatcher(dispatcher);
+        Hashtable<String, Object> configuration = new Hashtable<>();
+        configuration.put("period", "2");
+        configuration.put("overwrite", "true");
+        aggregateProcessor.activate(configuration);
+
+        HashMap<String, Object> data1 = new HashMap<>();
+        data1.put("first", "first");
+        Event event1 = new Event("decanter/collect/first", data1);
+        aggregateProcessor.handleEvent(event1);
+
+        HashMap<String, Object> data2 = new HashMap<>();
+        data2.put("second", "second");
+        Event event2 = new Event("decanter/collect/second", data2);
+        aggregateProcessor.handleEvent(event2);
+
+        HashMap<String, Object> data3 = new HashMap<>();
+        data3.put("second", "overwrite");
+        Event event3 = new Event("decanter/collect/third", data3);
+        aggregateProcessor.handleEvent(event3);
+
+        Thread.sleep(3000);
+
+        Assert.assertEquals(1, dispatcher.postedEvents.size());
+        Assert.assertEquals("first", dispatcher.postedEvents.get(0).getProperty("first"));
+        Assert.assertEquals("overwrite", dispatcher.postedEvents.get(0).getProperty("second"));
+        Assert.assertEquals("aggregate", dispatcher.postedEvents.get(0).getProperty("processor"));
+        Assert.assertEquals("decanter/process/aggregate", dispatcher.postedEvents.get(0).getTopic());
+
+        HashMap<String, Object> data4 = new HashMap<>();
+        data4.put("foo", "bar");
+        Event event4 = new Event("decanter/collect/foo", data4);
+        aggregateProcessor.handleEvent(event4);
+
+        Thread.sleep(3000);
+
+        Assert.assertEquals(2, dispatcher.postedEvents.size());
+        Assert.assertEquals("bar", dispatcher.postedEvents.get(1).getProperty("foo"));
+
+        aggregateProcessor.deactivate();
+    }
+
+    @Test
+    public void testWithoutOverwrite() throws Exception {
+        MockDispatcher dispatcher = new MockDispatcher();
+        AggregateProcessor aggregateProcessor = new AggregateProcessor();
+        aggregateProcessor.setDispatcher(dispatcher);
+        Hashtable<String, Object> configuration = new Hashtable<>();
+        configuration.put("period", "2");
+        aggregateProcessor.activate(configuration);
+
+        HashMap<String, Object> data1 = new HashMap<>();
+        data1.put("foo", "first");
+        Event event1 = new Event("decanter/collect/foo", data1);
+        aggregateProcessor.handleEvent(event1);
+
+        HashMap<String, Object> data2 = new HashMap<>();
+        data2.put("foo", "second");
+        Event event2 = new Event("decanter/collect/foo", data2);
+        aggregateProcessor.handleEvent(event2);
+
+        Thread.sleep(4000);
+
+        Assert.assertEquals(1, dispatcher.postedEvents.size());
+        Assert.assertEquals("first", dispatcher.postedEvents.get(0).getProperty("0.foo"));
+        Assert.assertEquals("second", dispatcher.postedEvents.get(0).getProperty("1.foo"));
+
+        aggregateProcessor.deactivate();
+    }
+
+    class MockDispatcher implements EventAdmin {
+
+        public List<Event> postedEvents = new ArrayList<>();
+        public List<Event> sentEvents = new ArrayList<>();
+
+        @Override
+        public void postEvent(Event event) {
+            postedEvents.add(event);
+        }
+
+        @Override
+        public void sendEvent(Event event) {
+            sentEvents.add(event);
+        }
+    }
+
+}
diff --git a/processor/pom.xml b/processor/pom.xml
index 7d3b738..059d77b 100644
--- a/processor/pom.xml
+++ b/processor/pom.xml
@@ -35,6 +35,7 @@
 
     <modules>
         <module>passthrough</module>
+        <module>aggregate</module>
     </modules>
 
 </project>
\ No newline at end of file