You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2019/01/27 17:42:12 UTC
[karaf-decanter] branch master updated: [KARAF-6009] Add filtering
support in appenders
This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git
The following commit(s) were added to refs/heads/master by this push:
new cff0aa5 [KARAF-6009] Add filtering support in appenders
new 2682f95 Merge pull request #69 from jbonofre/KARAF-6009
cff0aa5 is described below
commit cff0aa57e6854893126c11db63534eafcf940984
Author: Jean-Baptiste Onofré <jb...@apache.org>
AuthorDate: Sat Jan 26 15:51:17 2019 +0100
[KARAF-6009] Add filtering support in appenders
---
appender/camel/pom.xml | 32 ++++
.../decanter/appender/camel/CamelAppender.java | 31 ++--
.../decanter/appender/camel/CamelAppenderTest.java | 121 +++++++++++++++
appender/cassandra/pom.xml | 26 +++-
.../appender/cassandra/CassandraAppender.java | 53 ++++---
.../appender/cassandra/CassandraAppenderTest.java | 64 +++++++-
appender/elasticsearch-jest/pom.xml | 17 ++-
.../elasticsearch/jest/ElasticsearchAppender.java | 46 ++++--
.../jest/TestElasticsearchAppender.java | 55 ++++++-
appender/elasticsearch-native-1.x/pom.xml | 23 +++
.../elasticsearch/ElasticsearchAppender.java | 38 +++--
.../elasticsearch/TestElasticsearchAppender.java | 127 ++++++++++------
appender/elasticsearch-native-2.x/pom.xml | 23 +++
.../elasticsearch/ElasticsearchAppender.java | 38 +++--
.../elasticsearch/TestElasticsearchAppender.java | 77 ++++++----
appender/elasticsearch-rest/pom.xml | 11 ++
.../elasticsearch/rest/ElasticsearchAppender.java | 47 ++++--
.../rest/TestElasticsearchAppender.java | 26 +++-
appender/file/pom.xml | 23 +++
.../karaf/decanter/appender/file/FileAppender.java | 33 +++--
.../decanter/appender/file/TestFileAppender.java | 59 +++++++-
appender/jdbc/pom.xml | 23 +++
.../karaf/decanter/appender/jdbc/JdbcAppender.java | 53 ++++---
.../decanter/appender/jdbc/TestJdbcAppender.java | 61 +++++++-
appender/jms/pom.xml | 7 +-
.../karaf/decanter/appender/jms/JmsAppender.java | 81 +++++-----
.../decanter/appender/jms/JmsAppenderTest.java | 51 ++++++-
appender/kafka/pom.xml | 57 +++++++
.../decanter/appender/kafka/ConfigMapper.java | 2 +-
.../decanter/appender/kafka/KafkaAppender.java | 38 +++--
.../appender/kafka/EmbeddedKafkaBroker.java | 100 +++++++++++++
.../decanter/appender/kafka/EmbeddedZooKeeper.java | 76 ++++++++++
.../decanter/appender/kafka/KafkaAppenderTest.java | 73 +++++++++
.../karaf/decanter/appender/kafka/PortFinder.java | 164 +++++++++++++++++++++
appender/log/pom.xml | 23 +++
.../karaf/decanter/appender/log/LogAppender.java | 28 +++-
appender/mongodb/pom.xml | 8 +
.../decanter/appender/mongodb/MongoDbAppender.java | 31 ++--
appender/mqtt/pom.xml | 23 +++
.../karaf/decanter/appender/mqtt/MqttAppender.java | 58 +++++---
.../decanter/appender/mqtt/TestMqttAppender.java | 31 +++-
appender/orientdb/pom.xml | 23 +++
.../appender/orientdb/OrientDBAppender.java | 27 +++-
appender/pom.xml | 1 +
appender/redis/pom.xml | 10 ++
.../decanter/appender/redis/RedisAppender.java | 61 ++++----
appender/rest/pom.xml | 23 +++
.../karaf/decanter/appender/rest/RestAppender.java | 40 +++--
appender/socket/pom.xml | 23 +++
.../decanter/appender/socket/SocketAppender.java | 58 ++++----
appender/timescaledb/pom.xml | 23 +++
.../appender/timescaledb/TimescaleDbAppender.java | 42 ++++--
appender/{ => utils}/pom.xml | 43 ++----
.../karaf/decanter/appender/utils/EventFilter.java | 62 ++++++++
.../decanter/appender/utils/EventFilterTest.java | 75 ++++++++++
appender/websocket-servlet/pom.xml | 56 ++++---
.../websocket/DecanterWebSocketAppender.java | 34 +++--
pom.xml | 7 +-
58 files changed, 2083 insertions(+), 483 deletions(-)
diff --git a/appender/camel/pom.xml b/appender/camel/pom.xml
index 483e9eb..991ccf4 100644
--- a/appender/camel/pom.xml
+++ b/appender/camel/pom.xml
@@ -39,11 +39,43 @@
<artifactId>camel-core</artifactId>
<version>${camel.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
+
+ <!-- test -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
<plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>
+ *
+ </Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.appender.camel,
+ org.apache.karaf.decanter.appender.utils
+ </Private-Package>
+ <_dsannotations>*</_dsannotations>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
diff --git a/appender/camel/src/main/java/org/apache/karaf/decanter/appender/camel/CamelAppender.java b/appender/camel/src/main/java/org/apache/karaf/decanter/appender/camel/CamelAppender.java
index 7e670b8..ffc0e3f 100644
--- a/appender/camel/src/main/java/org/apache/karaf/decanter/appender/camel/CamelAppender.java
+++ b/appender/camel/src/main/java/org/apache/karaf/decanter/appender/camel/CamelAppender.java
@@ -19,6 +19,7 @@ package org.apache.karaf.decanter.appender.camel;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
@@ -40,8 +41,10 @@ import java.util.HashMap;
)
public class CamelAppender implements EventHandler {
+ public static final String DESTINATION_URI_KEY = "destination.uri";
+
private CamelContext camelContext;
- private String destinationUri;
+ private Dictionary<String, Object> config;
private final static Logger LOGGER = LoggerFactory.getLogger(CamelAppender.class);
@@ -52,24 +55,26 @@ public class CamelAppender implements EventHandler {
}
public void open(Dictionary<String, Object> config) throws ConfigurationException {
- LOGGER.debug("Creating CamelContext, and use the {} URI", destinationUri);
- this.camelContext = new DefaultCamelContext();
- this.destinationUri = (String) config.get("destination.uri");
- if (this.destinationUri == null) {
- throw new ConfigurationException("destination.uri", "destination.uri is not defined");
+ this.config = config;
+ if (config.get(DESTINATION_URI_KEY) == null) {
+ throw new ConfigurationException(DESTINATION_URI_KEY, DESTINATION_URI_KEY + " is not defined");
}
+ LOGGER.debug("Creating CamelContext, and use the {} URI", config.get(DESTINATION_URI_KEY));
+ this.camelContext = new DefaultCamelContext();
}
@Override
public void handleEvent(Event event) {
- HashMap<String, Object> data = new HashMap<>();
- for (String name : event.getPropertyNames()) {
- data.put(name, event.getProperty(name));
+ if (EventFilter.match(event, config)) {
+ HashMap<String, Object> data = new HashMap<>();
+ for (String name : event.getPropertyNames()) {
+ data.put(name, event.getProperty(name));
+ }
+ LOGGER.debug("Creating producer template");
+ ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
+ LOGGER.debug("Sending event data on {}", config.get(DESTINATION_URI_KEY));
+ producerTemplate.sendBody((String) config.get(DESTINATION_URI_KEY), data);
}
- LOGGER.debug("Creating producer template");
- ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
- LOGGER.debug("Sending event data on {}", destinationUri);
- producerTemplate.sendBody(destinationUri, data);
}
@Deactivate
diff --git a/appender/camel/src/test/java/org/apache/karaf/decanter/appender/camel/CamelAppenderTest.java b/appender/camel/src/test/java/org/apache/karaf/decanter/appender/camel/CamelAppenderTest.java
new file mode 100644
index 0000000..4fca767
--- /dev/null
+++ b/appender/camel/src/test/java/org/apache/karaf/decanter/appender/camel/CamelAppenderTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.camel;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+public class CamelAppenderTest {
+
+ private static final String TOPIC = "decanter/collect/jmx";
+ private static final long TIMESTAMP = 1454428780634L;
+
+ private DefaultCamelContext camelContext;
+
+ @Before
+ public void setup() throws Exception {
+ camelContext = new DefaultCamelContext();
+ camelContext.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct-vm:decanter")
+ .id("decanter-test")
+ .log("Received ${body}")
+ .to("mock:assert");
+ }
+ });
+ camelContext.start();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ camelContext.stop();
+ }
+
+ @Test
+ public void test() throws Exception {
+ CamelAppender appender = new CamelAppender();
+ Hashtable<String, Object> config = new Hashtable<>();
+ config.put(CamelAppender.DESTINATION_URI_KEY, "direct-vm:decanter");
+ appender.open(config);
+
+ Map<String, Object> data = new HashMap<>();
+ data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+ data.put("testKey", "testValue");
+ Event event = new Event(TOPIC, data);
+
+ appender.handleEvent(event);
+
+ Map<String, Object> expected = new HashMap<>();
+ expected.put("event.topics", "decanter/collect/jmx");
+ expected.putAll(data);
+ MockEndpoint mock = (MockEndpoint) camelContext.getEndpoint("mock:assert");
+ mock.expectedMessageCount(1);
+ mock.message(0).body().isEqualTo(expected);
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ public void testWithFilter() throws Exception {
+ CamelAppender appender = new CamelAppender();
+ Hashtable<String, Object> config = new Hashtable<>();
+ config.put(CamelAppender.DESTINATION_URI_KEY, "direct-vm:decanter");
+ config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+ config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+ appender.open(config);
+
+ Map<String, Object> data = new HashMap<>();
+ data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+ data.put("this is refused property name", "testValue");
+ data.put("key", "value");
+ Event event = new Event(TOPIC, data);
+ appender.handleEvent(event);
+
+ data = new HashMap<>();
+ data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+ data.put("property", "this is a refused value");
+ data.put("key", "value");
+ event = new Event(TOPIC, data);
+ appender.handleEvent(event);
+
+ data = new HashMap<>();
+ data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+ data.put("accepted", "value");
+ event = new Event(TOPIC, data);
+ appender.handleEvent(event);
+
+ Map<String, Object> expected = new HashMap<>();
+ expected.put("event.topics", "decanter/collect/jmx");
+ expected.putAll(data);
+ MockEndpoint mock = (MockEndpoint) camelContext.getEndpoint("mock:assert");
+ mock.expectedMessageCount(1);
+ mock.message(0).body().isEqualTo(expected);
+ mock.assertIsSatisfied();
+ }
+
+}
diff --git a/appender/cassandra/pom.xml b/appender/cassandra/pom.xml
index 9fe131d..da62f8e 100644
--- a/appender/cassandra/pom.xml
+++ b/appender/cassandra/pom.xml
@@ -32,11 +32,14 @@
<name>Apache Karaf :: Decanter :: Appender :: Cassandra</name>
<dependencies>
-
<dependency>
<groupId>org.apache.karaf.decanter</groupId>
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@@ -136,6 +139,27 @@
<build>
<plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>
+ *
+ </Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.appender.cassandra,
+ org.apache.karaf.decanter.appender.utils
+ </Private-Package>
+ <_dsannotations>*</_dsannotations>
+ </instructions>
+ </configuration>
+ </plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
diff --git a/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java b/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java
index 7c5d159..e6ad035 100644
--- a/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java
+++ b/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java
@@ -20,6 +20,7 @@ import java.util.Dictionary;
import java.util.List;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -45,10 +46,19 @@ import com.datastax.driver.core.exceptions.InvalidQueryException;
)
public class CassandraAppender implements EventHandler {
+ public static String KEYSPACE_PROPERTY = "keyspace.name";
+ public static String TABLE_PROPERTY = "table.name";
+ public static String CASSANDRA_HOST_PROPERTY = "cassandra.host";
+ public static String CASSANDRA_PORT_PROPERTY = "cassandra.port";
+
+ public static String KEYSPACE_DEFAULT = "decanter";
+ public static String TABLE_DEFAULT = "decanter";
+ public static String CASSANDRA_HOST_DEFAULT = "localhost";
+ public static String CASSANDRA_PORT_DEFAULT = "9042";
+
private final static Logger LOGGER = LoggerFactory.getLogger(CassandraAppender.class);
- private String keyspace;
- private String tableName;
+ private Dictionary<String, Object> config;
@Reference
public Marshaller marshaller;
@@ -70,10 +80,9 @@ public class CassandraAppender implements EventHandler {
}
void activate(Dictionary<String, Object> config) {
- this.keyspace = getValue(config, "keyspace.name", "decanter");
- this.tableName = getValue(config, "table.name", "decanter");
- String host = getValue(config, "cassandra.host", "localhost");
- Integer port = Integer.parseInt(getValue(config, "cassandra.port", "9042"));
+ this.config = config;
+ String host = getValue(config, CASSANDRA_HOST_PROPERTY, CASSANDRA_HOST_DEFAULT);
+ Integer port = Integer.parseInt(getValue(config, CASSANDRA_PORT_PROPERTY, CASSANDRA_PORT_DEFAULT));
Builder clusterBuilder = Cluster.builder().addContactPoint(host);
if (port != null) {
clusterBuilder.withPort(port);
@@ -93,21 +102,25 @@ public class CassandraAppender implements EventHandler {
@Override
public void handleEvent(Event event) {
- LOGGER.trace("Looking for the Cassandra datasource");
- try (Session session = cluster.connect()){
- useKeyspace(session, keyspace);
- createTable(session, keyspace, tableName);
-
- Long timestamp = (Long) event.getProperty("timestamp");
- if (timestamp == null) {
- timestamp = System.currentTimeMillis();
+ if (EventFilter.match(event, config)) {
+ LOGGER.trace("Looking for the Cassandra datasource");
+ try (Session session = cluster.connect()) {
+ String keyspace = getValue(config, KEYSPACE_PROPERTY, KEYSPACE_DEFAULT);
+ String tableName = getValue(config, TABLE_PROPERTY, TABLE_DEFAULT);
+ useKeyspace(session, keyspace);
+ createTable(session, keyspace, tableName);
+
+ Long timestamp = (Long) event.getProperty("timestamp");
+ if (timestamp == null) {
+ timestamp = System.currentTimeMillis();
+ }
+ String jsonSt = marshaller.marshal(event);
+ session.execute(String.format(insertTemplate, tableName), timestamp, jsonSt);
+
+ LOGGER.trace("Data inserted into {} table", tableName);
+ } catch (Exception e) {
+ LOGGER.error("Can't store in the database", e);
}
- String jsonSt = marshaller.marshal(event);
- session.execute(String.format(insertTemplate, tableName), timestamp, jsonSt);
-
- LOGGER.trace("Data inserted into {} table", tableName);
- } catch (Exception e) {
- LOGGER.error("Can't store in the database", e);
}
}
diff --git a/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java b/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java
index d4af92f..9f4bc37 100644
--- a/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java
+++ b/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java
@@ -30,8 +30,10 @@ import java.util.Map;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.osgi.service.event.Event;
@@ -84,14 +86,14 @@ public class CassandraAppenderTest {
}
@Test
- public void testHandleEvent() throws Exception {
+ public void test() throws Exception {
Marshaller marshaller = new JsonMarshaller();
CassandraAppender appender = new CassandraAppender();
Dictionary<String, Object> config = new Hashtable<String, Object>();
- config.put("cassandra.host", CASSANDRA_HOST);
- config.put("cassandra.port", CASSANDRA_PORT);
- config.put("keyspace.name", KEYSPACE);
- config.put("table.name", TABLE_NAME);
+ config.put(CassandraAppender.CASSANDRA_PORT_PROPERTY, CASSANDRA_HOST);
+ config.put(CassandraAppender.CASSANDRA_PORT_PROPERTY, CASSANDRA_PORT);
+ config.put(CassandraAppender.KEYSPACE_PROPERTY, KEYSPACE);
+ config.put(CassandraAppender.TABLE_PROPERTY, TABLE_NAME);
appender.marshaller = marshaller;
appender.activate(config);
@@ -101,10 +103,11 @@ public class CassandraAppenderTest {
appender.handleEvent(event);
- Session session = getSesion();
+ Session session = getSession();
ResultSet execute = session.execute("SELECT * FROM "+ KEYSPACE+"."+TABLE_NAME+";");
List<Row> all = execute.all();
+ Assert.assertEquals(1, all.size());
assertThat(all, not(nullValue()));
assertThat(all.get(0).getTimestamp("timeStamp").getTime(), is(TIMESTAMP));
@@ -112,7 +115,54 @@ public class CassandraAppenderTest {
session.close();
}
- private Session getSesion() {
+ @Test
+ public void testWithFilter() throws Exception {
+ Marshaller marshaller = new JsonMarshaller();
+ CassandraAppender appender = new CassandraAppender();
+ Dictionary<String, Object> config = new Hashtable<>();
+ config.put(CassandraAppender.CASSANDRA_PORT_PROPERTY, CASSANDRA_HOST);
+ config.put(CassandraAppender.CASSANDRA_PORT_PROPERTY, CASSANDRA_PORT);
+ config.put(CassandraAppender.KEYSPACE_PROPERTY, KEYSPACE);
+ config.put(CassandraAppender.TABLE_PROPERTY, TABLE_NAME);
+ config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+ config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+ appender.marshaller = marshaller;
+ appender.activate(config);
+
+ Map<String, Object> data = new HashMap<>();
+ data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+ data.put("this is a refused property", "value");
+ Event event = new Event(TOPIC, data);
+
+ appender.handleEvent(event);
+
+ data = new HashMap<>();
+ data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+ data.put("property", "this is a refused value");
+ event = new Event(TOPIC, data);
+
+ appender.handleEvent(event);
+
+ data = new HashMap<>();
+ data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+ data.put("accepted", "accepted");
+ event = new Event(TOPIC, data);
+
+ appender.handleEvent(event);
+
+ Session session = getSession();
+
+ ResultSet execute = session.execute("SELECT * FROM "+ KEYSPACE+"."+TABLE_NAME+";");
+ List<Row> all = execute.all();
+ Assert.assertEquals(1, all.size());
+ assertThat(all, not(nullValue()));
+
+ assertThat(all.get(0).getTimestamp("timeStamp").getTime(), is(TIMESTAMP));
+
+ session.close();
+ }
+
+ private Session getSession() {
Builder clusterBuilder = Cluster.builder().addContactPoint(CASSANDRA_HOST);
clusterBuilder.withPort(Integer.valueOf(CASSANDRA_PORT));
diff --git a/appender/elasticsearch-jest/pom.xml b/appender/elasticsearch-jest/pom.xml
index 5f05dde..af6f48f 100644
--- a/appender/elasticsearch-jest/pom.xml
+++ b/appender/elasticsearch-jest/pom.xml
@@ -39,6 +39,10 @@
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
+ <dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
<version>2.0.2</version>
@@ -68,16 +72,27 @@
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
<configuration>
+ <obrRepository>NONE</obrRepository>
<instructions>
- <Import-Package>!javax.servlet*, org.apache.log;resolution:=optional,*</Import-Package>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>
+ !javax.servlet*,
+ org.apache.log;resolution:=optional,
+ *
+ </Import-Package>
<Private-Package>
org.apache.karaf.decanter.appender.elasticsearch.jest,
+ org.apache.karaf.decanter.appender.utils,
org.apache.http*,
org.apache.commons*,
io.searchbox*,
com.google*
</Private-Package>
+ <_dsannotations>*</_dsannotations>
</instructions>
</configuration>
</plugin>
diff --git a/appender/elasticsearch-jest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/jest/ElasticsearchAppender.java b/appender/elasticsearch-jest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/jest/ElasticsearchAppender.java
index 9b8c5f2..cd0266e 100644
--- a/appender/elasticsearch-jest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/jest/ElasticsearchAppender.java
+++ b/appender/elasticsearch-jest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/jest/ElasticsearchAppender.java
@@ -41,6 +41,7 @@ import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -70,10 +71,23 @@ public class ElasticsearchAppender implements EventHandler {
@Reference
public Marshaller marshaller;
+ public static String ADDRESS_PROPERTY = "address";
+ public static String USERNAME_PROPERTY = "username";
+ public static String PASSWORD_PROPERTY = "password";
+ public static String INDEX_PREFIX_PROPERTY = "index.prefix";
+ public static String INDEX_TYPE_PROPERTY = "index.type";
+ public static String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
+
+ public static String ADDRESS_DEFAULT = "http://localhost:9200";
+ public static String USERNAME_DEFAULT = null;
+ public static String PASSWORD_DEFAULT = null;
+ public static String INDEX_PREFIX_DEFAULT = "karaf";
+ public static String INDEX_TYPE_DEFAULT = "decanter";
+ public static String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
+
+ private Dictionary<String, Object> config;
+
private JestClient client;
- private String indexPrefix;
- private boolean indexTimestamped;
- private String indexType;
private final SimpleDateFormat tsFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss,SSS'Z'");
private final SimpleDateFormat indexDateFormat = new SimpleDateFormat("yyyy.MM.dd");
@@ -87,10 +101,11 @@ public class ElasticsearchAppender implements EventHandler {
}
public void open(Dictionary<String, Object> config) {
- String addressesString = getValue(config, "address", "http://localhost:9200");
+ this.config = config;
+ String addressesString = getValue(config, ADDRESS_PROPERTY, ADDRESS_DEFAULT);
Set<String> addresses = new HashSet<String>(Arrays.asList(addressesString.split(";")));
- String username = getValue(config, "username", null);
- String password = getValue(config, "password", null);
+ String username = getValue(config, USERNAME_PROPERTY, USERNAME_DEFAULT);
+ String password = getValue(config, PASSWORD_PROPERTY, PASSWORD_DEFAULT);
Builder builder = new HttpClientConfig.Builder(addresses).readTimeout(10000)
.multiThreaded(true);
@@ -129,10 +144,6 @@ public class ElasticsearchAppender implements EventHandler {
TimeZone tz = TimeZone.getTimeZone( "UTC" );
tsFormat.setTimeZone(tz);
indexDateFormat.setTimeZone(tz);
-
- indexPrefix = getValue(config, "index.prefix", "karaf");
- indexTimestamped = Boolean.parseBoolean(getValue(config, "index.event.timestamped", "true"));
- indexType = getValue(config, "index.type", "decanter");
}
private String getValue(Dictionary<String, Object> config, String key, String defaultValue) {
@@ -147,18 +158,20 @@ public class ElasticsearchAppender implements EventHandler {
@Override
public void handleEvent(Event event) {
- try {
- send(event);
- } catch (Exception e) {
- LOGGER.warn("Can't append into Elasticsearch", e);
+ if (EventFilter.match(event, config)) {
+ try {
+ send(event);
+ } catch (Exception e) {
+ LOGGER.warn("Can't append into Elasticsearch", e);
+ }
}
}
private void send(Event event) throws Exception {
- String indexName = getIndexName(indexPrefix, getDate(event));
+ String indexName = getIndexName(getValue(config, INDEX_PREFIX_PROPERTY, INDEX_PREFIX_DEFAULT), getDate(event));
String jsonSt = marshaller.marshal(event);
- JestResult result = client.execute(new Index.Builder(jsonSt).index(indexName).type(indexType).build());
+ JestResult result = client.execute(new Index.Builder(jsonSt).index(indexName).type(getValue(config, INDEX_TYPE_PROPERTY, INDEX_TYPE_DEFAULT)).build());
if (!result.isSucceeded()) {
throw new IllegalStateException(result.getErrorMessage());
@@ -172,6 +185,7 @@ public class ElasticsearchAppender implements EventHandler {
}
private String getIndexName(String prefix, Date date) {
+ boolean indexTimestamped = Boolean.parseBoolean(getValue(config, INDEX_EVENT_TIMESTAMPED_PROPERTY, INDEX_EVENT_TIMESTAMPED_DEFAULT));
if (indexTimestamped) {
return prefix + "-" + indexDateFormat.format(date);
} else {
diff --git a/appender/elasticsearch-jest/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/jest/TestElasticsearchAppender.java b/appender/elasticsearch-jest/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/jest/TestElasticsearchAppender.java
index 41b7a22..4636c93 100644
--- a/appender/elasticsearch-jest/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/jest/TestElasticsearchAppender.java
+++ b/appender/elasticsearch-jest/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/jest/TestElasticsearchAppender.java
@@ -18,11 +18,14 @@ package org.apache.karaf.decanter.appender.elasticsearch.jest;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.osgi.service.event.Event;
@@ -30,6 +33,7 @@ import static org.elasticsearch.node.NodeBuilder.*;
import java.util.Dictionary;
import java.util.Hashtable;
+import java.util.Map;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
@@ -41,9 +45,10 @@ public class TestElasticsearchAppender {
private static final int PORT = 9301;
private static final int HTTP_PORT = 9201;
- @Test
- public void testAppender() throws Exception {
+ private Node node;
+ @Before
+ public void setup() throws Exception {
Settings settings = settingsBuilder()
.put("cluster.name", CLUSTER_NAME)
.put("http.enabled", "true")
@@ -57,14 +62,22 @@ public class TestElasticsearchAppender {
.put("path.plugins", "target/plugins")
.build();
- Node node = nodeBuilder().settings(settings).node();
+ node = nodeBuilder().settings(settings).node();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ node.close();
+ }
+ @Test
+ public void test() throws Exception {
Marshaller marshaller = new JsonMarshaller();
ElasticsearchAppender appender = new ElasticsearchAppender();
appender.marshaller = marshaller;
Dictionary<String, Object> config = new Hashtable<>();
- config.put("address", "http://" + HOST + ":" + HTTP_PORT);
- appender.open(config );
+ config.put(ElasticsearchAppender.ADDRESS_PROPERTY, "http://" + HOST + ":" + HTTP_PORT);
+ appender.open(config);
appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
@@ -76,7 +89,37 @@ public class TestElasticsearchAppender {
}
Assert.assertEquals(3L, node.client().count(Requests.countRequest()).actionGet().getCount());
- node.close();
+ }
+
+ @Test
+ public void testWithFilter() throws Exception {
+ Marshaller marshaller = new JsonMarshaller();
+ ElasticsearchAppender appender = new ElasticsearchAppender();
+ appender.marshaller = marshaller;
+ Dictionary<String, Object> config = new Hashtable<>();
+ config.put(ElasticsearchAppender.ADDRESS_PROPERTY, "http://" + HOST + ":" + HTTP_PORT);
+ config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+ config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+ appender.open(config);
+
+ Map<String, String> data = MapBuilder.<String, String>newMapBuilder().put("refused_property", "test").map();
+ Event event = new Event("testTopic", data);
+ appender.handleEvent(event);
+
+ data = MapBuilder.<String, String>newMapBuilder().put("property", "refused_value").map();
+ event = new Event("testTopic", data);
+ appender.handleEvent(event);
+
+ data = MapBuilder.<String, String>newMapBuilder().put("foo", "bar").map();
+ event = new Event("testTopic", data);
+ appender.handleEvent(event);
+
+ int maxTryCount = 10;
+ for(int i=0; node.client().count(Requests.countRequest()).actionGet().getCount() == 0 && i< maxTryCount; i++) {
+ Thread.sleep(500);
+ }
+
+ Assert.assertEquals(1L, node.client().count(Requests.countRequest()).actionGet().getCount());
}
}
diff --git a/appender/elasticsearch-native-1.x/pom.xml b/appender/elasticsearch-native-1.x/pom.xml
index d6d6208..1177756 100644
--- a/appender/elasticsearch-native-1.x/pom.xml
+++ b/appender/elasticsearch-native-1.x/pom.xml
@@ -44,6 +44,10 @@
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
+ <dependency>
<groupId>javax.json</groupId>
<artifactId>javax.json-api</artifactId>
</dependency>
@@ -63,6 +67,25 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>*</Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.appender.elasticsearch,
+ org.apache.karaf.decanter.appender.utils
+ </Private-Package>
+ <_dsannotations>*</_dsannotations>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
diff --git a/appender/elasticsearch-native-1.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java b/appender/elasticsearch-native-1.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
index d47e34c..87fc459 100644
--- a/appender/elasticsearch-native-1.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
+++ b/appender/elasticsearch-native-1.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
@@ -24,6 +24,7 @@ import java.util.Dictionary;
import java.util.TimeZone;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.TransportClient;
@@ -52,6 +53,18 @@ import org.slf4j.LoggerFactory;
)
public class ElasticsearchAppender implements EventHandler {
+ public static String HOST_PROPERTY = "host";
+ public static String PORT_PROPERTY = "port";
+ public static String CLUSTER_NAME_PROPERTY = "clusterName";
+ public static String INDEX_PREFIX_PROPERTY = "index.prefix";
+ public static String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
+
+ public static String HOST_DEFAULT = "localhost";
+ public static String PORT_DEFAULT = "9300";
+ public static String CLUSTER_NAME_DEFAULT = "elasticsearch";
+ public static String INDEX_PREFIX_DEFAULT = "karaf";
+ public static String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
+
final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchAppender.class);
@Reference
@@ -66,8 +79,7 @@ public class ElasticsearchAppender implements EventHandler {
private WorkFinishedListener listener;
- private String indexPrefix;
- private boolean indexTimestamped;
+ private Dictionary<String, Object> config;
@SuppressWarnings("unchecked")
@Activate
@@ -76,12 +88,11 @@ public class ElasticsearchAppender implements EventHandler {
}
public void open(Dictionary<String, Object> config) {
+ this.config = config;
try {
- String host = getValue(config, "host", "localhost");
- int port = Integer.parseInt(getValue(config, "port", "9300"));
- String cluster = getValue(config, "clusterName", "elasticsearch");
- indexPrefix = getValue(config, "index.prefix", "karaf");
- indexTimestamped = Boolean.parseBoolean(getValue(config, "index.event.timestamped", "true"));
+ String host = getValue(config, HOST_PROPERTY, HOST_DEFAULT);
+ int port = Integer.parseInt(getValue(config, PORT_PROPERTY, PORT_DEFAULT));
+ String cluster = getValue(config, CLUSTER_NAME_PROPERTY, CLUSTER_NAME_DEFAULT);
TimeZone tz = TimeZone.getTimeZone( "UTC" );
tsFormat.setTimeZone(tz);
indexDateFormat.setTimeZone(tz);
@@ -130,15 +141,17 @@ public class ElasticsearchAppender implements EventHandler {
@Override
public void handleEvent(Event event) {
- try {
- send(event);
- } catch (Exception e) {
- LOGGER.warn("Can't append into Elasticsearch", e);
+ if (EventFilter.match(event, config)) {
+ try {
+ send(event);
+ } catch (Exception e) {
+ LOGGER.warn("Can't append into Elasticsearch", e);
+ }
}
}
private void send(Event event) {
- String indexName = getIndexName(indexPrefix, getDate(event));
+ String indexName = getIndexName(getValue(config, INDEX_PREFIX_PROPERTY, INDEX_PREFIX_DEFAULT), getDate(event));
String jsonSt = marshaller.marshal(event);
LOGGER.debug("Sending event to elastic search with content: {}", jsonSt);
bulkProcessor.add(new IndexRequest(indexName, getType(event)).source(jsonSt));
@@ -156,6 +169,7 @@ public class ElasticsearchAppender implements EventHandler {
}
private String getIndexName(String prefix, Date date) {
+ boolean indexTimestamped = Boolean.parseBoolean(getValue(config, INDEX_EVENT_TIMESTAMPED_PROPERTY, INDEX_EVENT_TIMESTAMPED_DEFAULT));
if (indexTimestamped) {
return prefix + "-" + indexDateFormat.format(date);
} else {
diff --git a/appender/elasticsearch-native-1.x/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java b/appender/elasticsearch-native-1.x/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java
index f966e9d..255d2e5 100644
--- a/appender/elasticsearch-native-1.x/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java
+++ b/appender/elasticsearch-native-1.x/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java
@@ -18,11 +18,14 @@ package org.apache.karaf.decanter.appender.elasticsearch;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.osgi.service.event.Event;
@@ -36,53 +39,91 @@ import org.apache.karaf.decanter.api.marshaller.Marshaller;
import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
public class TestElasticsearchAppender {
+
private static final String HOST = "127.0.0.1";
private static final String CLUSTER_NAME = "elasticsearch-test";
private static final int PORT = 9300;
private static final int MAX_TRIES = 10;
- @Test
- public void testAppender() throws Exception {
-
- Settings settings = settingsBuilder()
- .put("cluster.name", CLUSTER_NAME)
- .put("http.enabled", "true")
- .put("node.data", true)
- .put("path.data", "target/data")
- .put("network.host", HOST)
- .put("port", PORT)
- .put("index.store.type", "memory")
- .put("index.store.fs.memory.enabled", "true")
- .put("path.plugins", "target/plugins")
- .build();
-
- Node node = nodeBuilder().settings(settings).node();
- Marshaller marshaller = new JsonMarshaller();
- ElasticsearchAppender appender = new ElasticsearchAppender();
- appender.marshaller = marshaller;
- Dictionary<String, Object> config = new Hashtable<>();
- config.put("clusterName", CLUSTER_NAME);
- config.put("port", "" + PORT);
- appender.open(config);
- appender.handleEvent(new Event("testTopic", dummyMap()));
- appender.handleEvent(new Event("testTopic", dummyMap()));
- appender.handleEvent(new Event("testTopic", dummyMap()));
- appender.close();
-
- long currentCount = 0;
- int c = 0;
- while (c < MAX_TRIES && currentCount != 3) {
- currentCount = node.client().count(Requests.countRequest()).actionGet().getCount();
- Thread.sleep(500);
- c++;
- }
-
- Assert.assertEquals(3L, currentCount);
- node.close();
- }
-
- private Map<String, String> dummyMap() {
- return MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map();
- }
+ private Node node;
+
+ @Before
+ public void setup() throws Exception {
+ Settings settings = settingsBuilder()
+ .put("cluster.name", CLUSTER_NAME)
+ .put("http.enabled", "true")
+ .put("node.data", true)
+ .put("path.data", "target/data")
+ .put("network.host", HOST)
+ .put("port", PORT)
+ .put("index.store.type", "memory")
+ .put("index.store.fs.memory.enabled", "true")
+ .put("path.plugins", "target/plugins")
+ .build();
+
+ node = nodeBuilder().settings(settings).node();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ node.close();
+ }
+
+ @Test
+ public void test() throws Exception {
+ Marshaller marshaller = new JsonMarshaller();
+ ElasticsearchAppender appender = new ElasticsearchAppender();
+ appender.marshaller = marshaller;
+ Dictionary<String, Object> config = new Hashtable<>();
+ config.put(ElasticsearchAppender.CLUSTER_NAME_PROPERTY, CLUSTER_NAME);
+ config.put(ElasticsearchAppender.PORT_PROPERTY, "" + PORT);
+ appender.open(config);
+ appender.handleEvent(new Event("testTopic", dummyMap()));
+ appender.handleEvent(new Event("testTopic", dummyMap()));
+ appender.handleEvent(new Event("testTopic", dummyMap()));
+ appender.close();
+
+ long currentCount = 0;
+ int c = 0;
+ while (c < MAX_TRIES && currentCount != 3) {
+ currentCount = node.client().count(Requests.countRequest()).actionGet().getCount();
+ Thread.sleep(500);
+ c++;
+ }
+
+ Assert.assertEquals(3L, currentCount);
+ }
+
+ @Test
+ public void testWithFilter() throws Exception {
+ Marshaller marshaller = new JsonMarshaller();
+ ElasticsearchAppender appender = new ElasticsearchAppender();
+ appender.marshaller = marshaller;
+ Dictionary<String, Object> config = new Hashtable<>();
+ config.put(ElasticsearchAppender.CLUSTER_NAME_PROPERTY, CLUSTER_NAME);
+ config.put(ElasticsearchAppender.PORT_PROPERTY, "" + PORT);
+ config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+ config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+ appender.open(config);
+
+ Map<String, String> data = MapBuilder.<String, String>newMapBuilder().put("refused_property", "test").map();
+ Event event = new Event("testTopic", data);
+ appender.handleEvent(event);
+
+ data = MapBuilder.<String, String>newMapBuilder().put("property", "refused_value").map();
+ event = new Event("testTopic", data);
+ appender.handleEvent(event);
+
+ int maxTryCount = 10;
+ for(int i=0; node.client().count(Requests.countRequest()).actionGet().getCount() == 0 && i< maxTryCount; i++) {
+ Thread.sleep(500);
+ }
+
+ Assert.assertEquals(0L, node.client().count(Requests.countRequest()).actionGet().getCount());
+ }
+
+ private Map<String, String> dummyMap() {
+ return MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map();
+ }
}
diff --git a/appender/elasticsearch-native-2.x/pom.xml b/appender/elasticsearch-native-2.x/pom.xml
index a08a018..ee0d9c2 100644
--- a/appender/elasticsearch-native-2.x/pom.xml
+++ b/appender/elasticsearch-native-2.x/pom.xml
@@ -44,6 +44,10 @@
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
+ <dependency>
<groupId>javax.json</groupId>
<artifactId>javax.json-api</artifactId>
</dependency>
@@ -63,6 +67,25 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>*</Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.appender.elasticsearch,
+ org.apache.karaf.decanter.appender.utils
+ </Private-Package>
+ <_dsannotations>*</_dsannotations>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
diff --git a/appender/elasticsearch-native-2.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java b/appender/elasticsearch-native-2.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
index ff38d9e..9149a92 100644
--- a/appender/elasticsearch-native-2.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
+++ b/appender/elasticsearch-native-2.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
@@ -23,6 +23,7 @@ import java.util.Dictionary;
import java.util.TimeZone;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
@@ -51,6 +52,18 @@ import org.slf4j.LoggerFactory;
)
public class ElasticsearchAppender implements EventHandler {
+ public static String HOST_PROPERTY = "host";
+ public static String PORT_PROPERTY = "port";
+ public static String CLUSTER_NAME_PROPERTY = "clusterName";
+ public static String INDEX_PREFIX_PROPERTY = "index.prefix";
+ public static String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
+
+ public static String HOST_DEFAULT = "localhost";
+ public static String PORT_DEFAULT = "9300";
+ public static String CLUSTER_NAME_DEFAULT = "elasticsearch";
+ public static String INDEX_PREFIX_DEFAULT = "karaf";
+ public static String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
+
final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchAppender.class);
@Reference
@@ -64,8 +77,7 @@ public class ElasticsearchAppender implements EventHandler {
Client client;
private WorkFinishedListener listener;
- private String indexPrefix;
- private boolean indexTimestamped;
+ private Dictionary<String, Object> config;
@SuppressWarnings("unchecked")
@Activate
@@ -74,12 +86,11 @@ public class ElasticsearchAppender implements EventHandler {
}
public void open(Dictionary<String, Object> config) {
+ this.config = config;
try {
- String host = getValue(config, "host", "localhost");
- int port = Integer.parseInt(getValue(config, "port", "9300"));
- String cluster = getValue(config, "clusterName", "elasticsearch");
- indexPrefix = getValue(config, "index.prefix", "karaf");
- indexTimestamped = Boolean.parseBoolean(getValue(config, "index.event.timestamped", "true"));
+ String host = getValue(config, HOST_PROPERTY, HOST_DEFAULT);
+ int port = Integer.parseInt(getValue(config, PORT_PROPERTY, PORT_DEFAULT));
+ String cluster = getValue(config, CLUSTER_NAME_PROPERTY, CLUSTER_NAME_DEFAULT);
TimeZone tz = TimeZone.getTimeZone( "UTC" );
tsFormat.setTimeZone(tz);
indexDateFormat.setTimeZone(tz);
@@ -123,15 +134,17 @@ public class ElasticsearchAppender implements EventHandler {
@Override
public void handleEvent(Event event) {
- try {
- send(event);
- } catch (Exception e) {
- LOGGER.warn("Can't append into Elasticsearch", e);
+ if (EventFilter.match(event, config)) {
+ try {
+ send(event);
+ } catch (Exception e) {
+ LOGGER.warn("Can't append into Elasticsearch", e);
+ }
}
}
private void send(Event event) {
- String indexName = getIndexName(indexPrefix, getDate(event));
+ String indexName = getIndexName(getValue(config, INDEX_PREFIX_PROPERTY, INDEX_PREFIX_DEFAULT), getDate(event));
String jsonSt = marshaller.marshal(event);
LOGGER.debug("Sending event to elastic search with content: {}", jsonSt);
bulkProcessor.add(new IndexRequest(indexName, getType(event)).source(jsonSt));
@@ -149,6 +162,7 @@ public class ElasticsearchAppender implements EventHandler {
}
private String getIndexName(String prefix, Date date) {
+ boolean indexTimestamped = Boolean.parseBoolean(getValue(config, INDEX_EVENT_TIMESTAMPED_PROPERTY, INDEX_EVENT_TIMESTAMPED_DEFAULT));
if (indexTimestamped) {
return prefix + "-" + indexDateFormat.format(date);
} else {
diff --git a/appender/elasticsearch-native-2.x/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java b/appender/elasticsearch-native-2.x/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java
index 9d8851e..b01d760 100644
--- a/appender/elasticsearch-native-2.x/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java
+++ b/appender/elasticsearch-native-2.x/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java
@@ -24,49 +24,62 @@ import java.util.Hashtable;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Requests;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.osgi.service.event.Event;
public class TestElasticsearchAppender {
+
private static final String CLUSTER_NAME = "elasticsearch-test";
private static final int PORT = 9300;
+ private static final int MAX_TRIES = 10;
+
+ private Node node;
+
+ @Before
+ public void setup() throws Exception {
+ Settings settings = Settings.settingsBuilder()
+ .put("cluster.name", "elasticsearch")
+ .put("http.enabled", "true")
+ .put("node.data", true)
+ .put("path.home", "target")
+ .put("path.data", "target/data")
+ .put("network.host", "127.0.0.1")
+ .put("index.store.type", "memory")
+ .put("index.store.fs.memory.enabled", "true")
+ .put("path.plugins", "target/plugins")
+ .build();
+
+ node = nodeBuilder().settings(settings).node();
+ }
- @Test
- public void testAppender() throws Exception {
-
- Settings settings = Settings.settingsBuilder()
- .put("cluster.name", "elasticsearch")
- .put("http.enabled", "true")
- .put("node.data", true)
- .put("path.home", "target")
- .put("path.data", "target/data")
- .put("network.host", "127.0.0.1")
- .put("index.store.type", "memory")
- .put("index.store.fs.memory.enabled", "true")
- .put("path.plugins", "target/plugins")
- .build();
-
- Node node = nodeBuilder().settings(settings).node();
-
- Marshaller marshaller = new JsonMarshaller();
- ElasticsearchAppender appender = new ElasticsearchAppender();
- appender.marshaller = marshaller;
- Dictionary<String, Object> config = new Hashtable<>();
- config.put("clusterName", CLUSTER_NAME);
- config.put("port", "" + PORT);
- appender.open(config);
- appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
- appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
- appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
- appender.close();
+ @After
+ public void teardown() throws Exception {
+ node.close();
+ }
- SearchResponse response = node.client().prepareSearch().execute().actionGet();
- System.out.println(response.toString());
+ @Test
+ public void test() throws Exception {
+ Marshaller marshaller = new JsonMarshaller();
+ ElasticsearchAppender appender = new ElasticsearchAppender();
+ appender.marshaller = marshaller;
+ Dictionary<String, Object> config = new Hashtable<>();
+ config.put(ElasticsearchAppender.CLUSTER_NAME_PROPERTY, CLUSTER_NAME);
+ config.put(ElasticsearchAppender.PORT_PROPERTY, "" + PORT);
+ appender.open(config);
+ appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
+ appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
+ appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
+ appender.close();
- node.close();
- }
+ SearchResponse response = node.client().prepareSearch().execute().actionGet();
+ System.out.println(response.toString());
+ }
}
diff --git a/appender/elasticsearch-rest/pom.xml b/appender/elasticsearch-rest/pom.xml
index 373e7e9..d1aaf93 100644
--- a/appender/elasticsearch-rest/pom.xml
+++ b/appender/elasticsearch-rest/pom.xml
@@ -39,6 +39,10 @@
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch6.version}</version>
@@ -86,18 +90,25 @@
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
<configuration>
+ <obrRepository>NONE</obrRepository>
<instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
<Import-Package>
org.apache.log;resolution:=optional,
*
</Import-Package>
<Private-Package>
org.apache.karaf.decanter.appender.elasticsearch.rest,
+ org.apache.karaf.decanter.appender.utils,
org.apache.http*,
org.apache.commons*,
org.elasticsearch*
</Private-Package>
+ <_dsannotations>*</_dsannotations>
</instructions>
</configuration>
</plugin>
diff --git a/appender/elasticsearch-rest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/rest/ElasticsearchAppender.java b/appender/elasticsearch-rest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/rest/ElasticsearchAppender.java
index 901ed37..1d3c359 100644
--- a/appender/elasticsearch-rest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/rest/ElasticsearchAppender.java
+++ b/appender/elasticsearch-rest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/rest/ElasticsearchAppender.java
@@ -28,6 +28,7 @@ import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
@@ -51,13 +52,26 @@ import java.util.*;
)
public class ElasticsearchAppender implements EventHandler {
+ public static String ADDRESSES_PROPERTY = "addresses";
+ public static String USERNAME_PROPERTY = "username";
+ public static String PASSWORD_PROPERTY = "password";
+ public static String INDEX_PREFIX_PROPERTY = "index.prefix";
+ public static String INDEX_TYPE_PROPERTY = "index.type";
+ public static String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
+
+ public static String ADDRESSES_DEFAULT = "http://localhost:9200";
+ public static String USERNAME_DEFAULT = null;
+ public static String PASSWORD_DEFAULT = null;
+ public static String INDEX_PREFIX_DEFAULT = "karaf";
+ public static String INDEX_TYPE_DEFAULT = "decanter";
+ public static String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
+
@Reference
public Marshaller marshaller;
private RestClient client;
- private String indexPrefix;
- private boolean indexTimestamped;
- private String indexType;
+
+ private Dictionary<String, Object> config;
private final SimpleDateFormat tsFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss,SSS'Z'");
private final SimpleDateFormat indexDateFormat = new SimpleDateFormat("yyyy.MM.dd");
@@ -71,9 +85,11 @@ public class ElasticsearchAppender implements EventHandler {
}
public void open(Dictionary<String, Object> config) {
- String addressesString = getValue(config, "addresses", "http://localhost:9200");
- String username = getValue(config, "username", null);
- String password = getValue(config, "password", null);
+ this.config = config;
+
+ String addressesString = getValue(config, ADDRESSES_PROPERTY, ADDRESSES_DEFAULT);
+ String username = getValue(config, USERNAME_PROPERTY, USERNAME_DEFAULT);
+ String password = getValue(config, PASSWORD_PROPERTY, PASSWORD_DEFAULT);
Set<String> addresses = new HashSet<String>(Arrays.asList(addressesString.split(",")));
@@ -116,10 +132,6 @@ public class ElasticsearchAppender implements EventHandler {
TimeZone tz = TimeZone.getTimeZone( "UTC" );
tsFormat.setTimeZone(tz);
indexDateFormat.setTimeZone(tz);
-
- indexPrefix = getValue(config, "index.prefix", "karaf");
- indexTimestamped = Boolean.parseBoolean(getValue(config, "index.event.timestamped", "true"));
- indexType = getValue(config, "index.type", "decanter");
}
private String getValue(Dictionary<String, Object> config, String key, String defaultValue) {
@@ -138,19 +150,21 @@ public class ElasticsearchAppender implements EventHandler {
@Override
public void handleEvent(Event event) {
- try {
- send(event);
- } catch (Exception e) {
- LOGGER.warn("Can't append into Elasticsearch", e);
+ if (EventFilter.match(event, config)) {
+ try {
+ send(event);
+ } catch (Exception e) {
+ LOGGER.warn("Can't append into Elasticsearch", e);
+ }
}
}
private void send(Event event) throws Exception {
- String indexName = getIndexName(indexPrefix, getDate(event));
+ String indexName = getIndexName(getValue(config, INDEX_PREFIX_PROPERTY, INDEX_PREFIX_DEFAULT), getDate(event));
String jsonSt = marshaller.marshal(event);
// elasticsearch 6.x only allows one type per index mapping, the _type is part of the document
- String endpoint = String.format("/%s/%s", indexName, indexType);
+ String endpoint = String.format("/%s/%s", indexName, getValue(config, INDEX_TYPE_PROPERTY, INDEX_TYPE_DEFAULT));
HttpEntity request = new NStringEntity(jsonSt, ContentType.APPLICATION_JSON);
client.performRequest("POST", endpoint, Collections.singletonMap("refresh", "true"), request);
@@ -163,6 +177,7 @@ public class ElasticsearchAppender implements EventHandler {
}
private String getIndexName(String prefix, Date date) {
+ boolean indexTimestamped = Boolean.parseBoolean(getValue(config, INDEX_EVENT_TIMESTAMPED_PROPERTY, INDEX_EVENT_TIMESTAMPED_DEFAULT));
if (indexTimestamped) {
return prefix + "-" + indexDateFormat.format(date);
} else {
diff --git a/appender/elasticsearch-rest/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/rest/TestElasticsearchAppender.java b/appender/elasticsearch-rest/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/rest/TestElasticsearchAppender.java
index 09abc7e..2199771 100644
--- a/appender/elasticsearch-rest/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/rest/TestElasticsearchAppender.java
+++ b/appender/elasticsearch-rest/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/rest/TestElasticsearchAppender.java
@@ -22,6 +22,7 @@ import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.protocol.HTTP;
import org.apache.http.util.EntityUtils;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
@@ -43,7 +44,9 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.transport.Netty4Plugin;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.osgi.service.event.Event;
@@ -56,9 +59,10 @@ public class TestElasticsearchAppender {
private static final String HOST = "127.0.0.1";
private static final int HTTP_PORT = 9201;
- @Test
- public void testAppender() throws Exception {
+ private Node node;
+ @Before
+ public void setup() throws Exception {
Settings settings = Settings.builder()
.put("cluster.name", CLUSTER_NAME)
.put("node.name", "test")
@@ -71,19 +75,31 @@ public class TestElasticsearchAppender {
.build();
Collection plugins = Arrays.asList(Netty4Plugin.class);
- Node node = new PluginConfigurableNode(settings, plugins);
+ node = new PluginConfigurableNode(settings, plugins);
node.start();
+ }
+ @After
+ public void teardown() throws Exception {
+ node.close();
+ }
+
+ @Test
+ public void test() throws Exception {
Marshaller marshaller = new JsonMarshaller();
ElasticsearchAppender appender = new ElasticsearchAppender();
appender.marshaller = marshaller;
Dictionary<String, Object> config = new Hashtable<>();
- config.put("addresses", "http://" + HOST + ":" + HTTP_PORT);
- appender.open(config );
+ config.put(ElasticsearchAppender.ADDRESSES_PROPERTY, "http://" + HOST + ":" + HTTP_PORT);
+ config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+ config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+ appender.open(config);
appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
+ appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("refused", "b").put("c", "d").map()));
+ appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "refused").put("c", "d").map()));
appender.close();
HttpHost host = new HttpHost(HOST, HTTP_PORT, "http");
diff --git a/appender/file/pom.xml b/appender/file/pom.xml
index 964d6f1..18899e0 100644
--- a/appender/file/pom.xml
+++ b/appender/file/pom.xml
@@ -38,6 +38,10 @@
<groupId>org.apache.karaf.decanter</groupId>
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.karaf.decanter.marshaller</groupId>
@@ -49,6 +53,25 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>*</Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.appender.file,
+ org.apache.karaf.decanter.appender.utils
+ </Private-Package>
+ <_dsannotations>*</_dsannotations>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
diff --git a/appender/file/src/main/java/org/apache/karaf/decanter/appender/file/FileAppender.java b/appender/file/src/main/java/org/apache/karaf/decanter/appender/file/FileAppender.java
index 713d9fa..4a2778e 100644
--- a/appender/file/src/main/java/org/apache/karaf/decanter/appender/file/FileAppender.java
+++ b/appender/file/src/main/java/org/apache/karaf/decanter/appender/file/FileAppender.java
@@ -17,6 +17,7 @@
package org.apache.karaf.decanter.appender.file;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -38,22 +39,28 @@ import java.util.Dictionary;
)
public class FileAppender implements EventHandler {
+ public static String FILENAME_PROPERTY = "filename";
+ public static String APPEND_PROPERTY = "append";
+
@Reference
public Marshaller marshaller;
private BufferedWriter writer;
- private boolean append;
+ private Dictionary<String, Object> config;
@Activate
public void activate(ComponentContext componentContext) throws Exception {
Dictionary<String, Object> config = componentContext.getProperties();
- String filename = (config.get("filename") != null) ? (String) config.get("filename") : System.getProperty("karaf.data") + File.separator + "decanter";
- append = (config.get("append") != null) ? Boolean.parseBoolean((String) config.get("append")) : true;
- open(filename);
+ open(config);
}
- public void open(String filename) throws Exception {
+ public void open(Dictionary<String, Object> config) throws Exception {
+ this.config = config;
+
+ String filename = (config.get(FILENAME_PROPERTY) != null) ? (String) config.get(FILENAME_PROPERTY) : System.getProperty("karaf.data") + File.separator + "decanter";
+ boolean append = (config.get(APPEND_PROPERTY) != null) ? Boolean.parseBoolean((String) config.get(APPEND_PROPERTY)) : true;
+
File file = new File(filename);
file.getParentFile().mkdirs();
file.createNewFile();
@@ -62,13 +69,15 @@ public class FileAppender implements EventHandler {
@Override
public void handleEvent(Event event) {
- try {
- String marshalled = marshaller.marshal(event);
- writer.write(marshalled);
- writer.newLine();
- writer.flush();
- } catch (Exception e) {
- // nothing to do
+ if (EventFilter.match(event, config)) {
+ try {
+ String marshalled = marshaller.marshal(event);
+ writer.write(marshalled);
+ writer.newLine();
+ writer.flush();
+ } catch (Exception e) {
+ // nothing to do
+ }
}
}
diff --git a/appender/file/src/test/java/org/apache/karaf/decanter/appender/file/TestFileAppender.java b/appender/file/src/test/java/org/apache/karaf/decanter/appender/file/TestFileAppender.java
index 255327b..bd10f43 100644
--- a/appender/file/src/test/java/org/apache/karaf/decanter/appender/file/TestFileAppender.java
+++ b/appender/file/src/test/java/org/apache/karaf/decanter/appender/file/TestFileAppender.java
@@ -16,6 +16,7 @@
*/
package org.apache.karaf.decanter.appender.file;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.apache.karaf.decanter.marshaller.csv.CsvMarshaller;
import org.junit.Assert;
import org.junit.Test;
@@ -24,31 +25,73 @@ import org.osgi.service.event.Event;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
+import java.util.Dictionary;
import java.util.HashMap;
+import java.util.Hashtable;
import java.util.Map;
public class TestFileAppender {
@Test
- public void testAppender() throws Exception {
+ public void test() throws Exception {
FileAppender fileAppender = new FileAppender();
fileAppender.marshaller = new CsvMarshaller();
- fileAppender.open("target/test-classes/decanter");
- Map<String, String> map = new HashMap<>();
- map.put("a", "b");
- map.put("c", "d");
- fileAppender.handleEvent(new Event("testTopic", map));
- fileAppender.handleEvent(new Event("testTopic", map));
- fileAppender.handleEvent(new Event("testTopic", map));
+ Dictionary<String, Object> config = new Hashtable<>();
+ config.put(FileAppender.FILENAME_PROPERTY, "target/test-classes/decanter");
+ fileAppender.open(config);
+
+ Map<String, String> data = new HashMap<>();
+ data.put("a", "b");
+ data.put("c", "d");
+ fileAppender.handleEvent(new Event("testTopic", data));
+ fileAppender.handleEvent(new Event("testTopic", data));
+ fileAppender.handleEvent(new Event("testTopic", data));
fileAppender.deactivate();
File file = new File("target/test-classes/decanter");
+ int lineCount = 0;
try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
String line;
while ((line = reader.readLine()) != null) {
+ lineCount++;
Assert.assertEquals("a=b,c=d,event.topics=testTopic", line);
}
}
+ Assert.assertEquals(3, lineCount);
+ }
+
+ @Test
+ public void testWithFilter() throws Exception {
+ FileAppender fileAppender = new FileAppender();
+ fileAppender.marshaller = new CsvMarshaller();
+ Dictionary<String, Object> config = new Hashtable<>();
+ config.put(FileAppender.FILENAME_PROPERTY, "target/test-classes/filtered");
+ config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+ config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+ fileAppender.open(config);
+
+ Map<String, String> data = new HashMap<>();
+ data.put("refused_property", "test");
+ fileAppender.handleEvent(new Event("testTopic", data));
+
+ data = new HashMap<>();
+ data.put("property", "refused_value");
+ fileAppender.handleEvent(new Event("testTopic", data));
+
+ data = new HashMap<>();
+ data.put("a", "b");
+ fileAppender.handleEvent(new Event("testTopic", data));
+
+ File file = new File("target/test-classes/filtered");
+ int lineCount = 0;
+ try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ lineCount++;
+ Assert.assertEquals("a=b,event.topics=testTopic", line);
+ }
+ }
+ Assert.assertEquals(1, lineCount);
}
}
diff --git a/appender/jdbc/pom.xml b/appender/jdbc/pom.xml
index c31b804..1638706 100644
--- a/appender/jdbc/pom.xml
+++ b/appender/jdbc/pom.xml
@@ -42,6 +42,10 @@
<groupId>org.apache.karaf.decanter</groupId>
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.derby</groupId>
@@ -58,6 +62,25 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>*</Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.appender.jdbc,
+ org.apache.karaf.decanter.appender.utils
+ </Private-Package>
+ <_dsannotations>*</_dsannotations>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
diff --git a/appender/jdbc/src/main/java/org/apache/karaf/decanter/appender/jdbc/JdbcAppender.java b/appender/jdbc/src/main/java/org/apache/karaf/decanter/appender/jdbc/JdbcAppender.java
index 3674f37..63b627f 100644
--- a/appender/jdbc/src/main/java/org/apache/karaf/decanter/appender/jdbc/JdbcAppender.java
+++ b/appender/jdbc/src/main/java/org/apache/karaf/decanter/appender/jdbc/JdbcAppender.java
@@ -25,6 +25,7 @@ import java.util.Dictionary;
import javax.sql.DataSource;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -42,6 +43,12 @@ import org.slf4j.LoggerFactory;
)
public class JdbcAppender implements EventHandler {
+ public static String TABLE_NAME_PROPERTY = "table.name";
+ public static String DIALECT_PROPERTY = "dialect";
+
+ public static String TABLE_NAME_DEFAULT = "decanter";
+ public static String DIALECT_DEFAULT = "generic";
+
@Reference
public Marshaller marshaller;
@@ -60,8 +67,7 @@ public class JdbcAppender implements EventHandler {
private final static String insertQueryTemplate =
"INSERT INTO TABLENAME(timestamp, content) VALUES(?,?)";
- String tableName;
- String dialect;
+ private Dictionary<String, Object> config;
@SuppressWarnings("unchecked")
@Activate
@@ -70,12 +76,11 @@ public class JdbcAppender implements EventHandler {
}
public void open(Dictionary<String, Object> config) {
- this.tableName = getValue(config, "table.name", "decanter");
- this.dialect = getValue(config, "dialect", "generic");
+ this.config = config;
try (Connection connection = dataSource.getConnection()) {
createTable(connection);
} catch (Exception e) {
- LOGGER.debug("Error creating table " + tableName, e);
+ LOGGER.debug("Error creating table " + getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT), e);
}
}
@@ -86,37 +91,39 @@ public class JdbcAppender implements EventHandler {
@Override
public void handleEvent(Event event) {
- try (Connection connection = dataSource.getConnection()) {
- String jsonSt = marshaller.marshal(event);
- String insertQuery = insertQueryTemplate.replaceAll("TABLENAME", tableName);
- Long timestamp = (Long)event.getProperty(EventConstants.TIMESTAMP);
- if (timestamp == null) {
- timestamp = System.currentTimeMillis();
+ if (EventFilter.match(event, config)) {
+ try (Connection connection = dataSource.getConnection()) {
+ String jsonSt = marshaller.marshal(event);
+ String insertQuery = insertQueryTemplate.replaceAll("TABLENAME", getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT));
+ Long timestamp = (Long) event.getProperty(EventConstants.TIMESTAMP);
+ if (timestamp == null) {
+ timestamp = System.currentTimeMillis();
+ }
+ try (PreparedStatement insertStatement = connection.prepareStatement(insertQuery)) {
+ insertStatement.setLong(1, timestamp);
+ insertStatement.setString(2, jsonSt);
+ insertStatement.executeUpdate();
+ LOGGER.trace("Data inserted into {} table", getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT));
+ }
+ } catch (Exception e) {
+ LOGGER.error("Can't store in the database", e);
}
- try (PreparedStatement insertStatement = connection.prepareStatement(insertQuery)) {
- insertStatement.setLong(1, timestamp);
- insertStatement.setString(2, jsonSt);
- insertStatement.executeUpdate();
- LOGGER.trace("Data inserted into {} table", tableName);
- }
- } catch (Exception e) {
- LOGGER.error("Can't store in the database", e);
}
}
private void createTable(Connection connection) {
String createTemplate = null;
- if (dialect.equalsIgnoreCase("mysql")) {
+ if (getValue(config, DIALECT_PROPERTY, DIALECT_DEFAULT).equalsIgnoreCase("mysql")) {
createTemplate = createTableQueryMySQLTemplate;
- } else if (dialect.equalsIgnoreCase("derby")) {
+ } else if (getValue(config, DIALECT_PROPERTY, DIALECT_DEFAULT).equalsIgnoreCase("derby")) {
createTemplate = createTableQueryDerbyTemplate;
} else {
createTemplate = createTableQueryGenericTemplate;
}
- String createTableQuery = createTemplate.replaceAll("TABLENAME", tableName);
+ String createTableQuery = createTemplate.replaceAll("TABLENAME", getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT));
try (Statement createStatement = connection.createStatement()) {
createStatement.executeUpdate(createTableQuery);
- LOGGER.debug("Table {} has been created", tableName);
+ LOGGER.debug("Table {} has been created", getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT));
} catch (SQLException e) {
LOGGER.trace("Can't create table {}", e);
}
diff --git a/appender/jdbc/src/test/java/org/apache/karaf/decanter/appender/jdbc/TestJdbcAppender.java b/appender/jdbc/src/test/java/org/apache/karaf/decanter/appender/jdbc/TestJdbcAppender.java
index 6309959..b61a436 100644
--- a/appender/jdbc/src/test/java/org/apache/karaf/decanter/appender/jdbc/TestJdbcAppender.java
+++ b/appender/jdbc/src/test/java/org/apache/karaf/decanter/appender/jdbc/TestJdbcAppender.java
@@ -32,6 +32,7 @@ import javax.json.JsonReader;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
import org.junit.Assert;
import org.junit.Test;
@@ -39,12 +40,13 @@ import org.osgi.service.event.Event;
import org.osgi.service.event.EventConstants;
public class TestJdbcAppender {
+
private static final String TABLE_NAME = "decanter";
private static final String TOPIC = "decanter/collect/jmx";
private static final long TIMESTAMP = 1454428780634L;
@Test
- public void testHandleEvent() throws SQLException {
+ public void test() throws SQLException {
System.setProperty("derby.stream.error.file", "target/derby.log");
Marshaller marshaller = new JsonMarshaller();
EmbeddedDataSource dataSource = new EmbeddedDataSource();
@@ -60,9 +62,60 @@ public class TestJdbcAppender {
config.put("dialect", "derby");
appender.open(config);
- Map<String, Object> properties = new HashMap<>();
- properties.put(EventConstants.TIMESTAMP, TIMESTAMP);
- Event event = new Event(TOPIC, properties);
+ Map<String, Object> data = new HashMap<>();
+ data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+ Event event = new Event(TOPIC, data);
+ appender.handleEvent(event);
+
+ try (Connection con = dataSource.getConnection(); Statement statement = con.createStatement();) {
+ ResultSet res = statement.executeQuery("select timestamp, content from " + TABLE_NAME);
+ res.next();
+ long dbTimeStamp = res.getLong(1);
+ String json = res.getString(2);
+ JsonReader reader = Json.createReader(new StringReader(json));
+ JsonObject jsonO = reader.readObject();
+ Assert.assertEquals("Timestamp db", TIMESTAMP, dbTimeStamp);
+ Assert.assertEquals("Timestamp string", "2016-02-02T15:59:40,634Z",jsonO.getString("@timestamp"));
+ Assert.assertEquals("timestamp long", TIMESTAMP, jsonO.getJsonNumber(EventConstants.TIMESTAMP).longValue());
+ Assert.assertEquals("Topic", TOPIC, jsonO.getString(EventConstants.EVENT_TOPIC.replace('.','_')));
+ Assert.assertFalse(res.next());
+ }
+ }
+
+ @Test
+ public void testWithFilter() throws SQLException {
+ System.setProperty("derby.stream.error.file", "target/derby.log");
+ Marshaller marshaller = new JsonMarshaller();
+ EmbeddedDataSource dataSource = new EmbeddedDataSource();
+ dataSource.setDatabaseName("target/testFilterDB");
+ dataSource.setCreateDatabase("create");
+
+ deleteTable(dataSource);
+
+ JdbcAppender appender = new JdbcAppender();
+ appender.marshaller = marshaller;
+ appender.dataSource = dataSource;
+ Dictionary<String, Object> config = new Hashtable<>();
+ config.put("dialect", "derby");
+ config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+ config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+ appender.open(config);
+
+ Map<String, Object> data = new HashMap<>();
+ data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+ data.put("this_refused", "data");
+ Event event = new Event(TOPIC, data);
+ appender.handleEvent(event);
+
+ data = new HashMap<>();
+ data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+ data.put("property", "this_refused");
+ event = new Event(TOPIC, data);
+ appender.handleEvent(event);
+
+ data = new HashMap<>();
+ data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+ event = new Event(TOPIC, data);
appender.handleEvent(event);
try (Connection con = dataSource.getConnection(); Statement statement = con.createStatement();) {
diff --git a/appender/jms/pom.xml b/appender/jms/pom.xml
index 86c9b42..416522f 100644
--- a/appender/jms/pom.xml
+++ b/appender/jms/pom.xml
@@ -39,6 +39,10 @@
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>1.1.1</version>
@@ -71,7 +75,8 @@
*
</Import-Package>
<Private-Package>
- org.apache.karaf.decanter.appender.jms
+ org.apache.karaf.decanter.appender.jms,
+ org.apache.karaf.decanter.appender.utils
</Private-Package>
<_dsannotations>*</_dsannotations>
<_dsannotations-options>nocapabilities,norequirements</_dsannotations-options>
diff --git a/appender/jms/src/main/java/org/apache/karaf/decanter/appender/jms/JmsAppender.java b/appender/jms/src/main/java/org/apache/karaf/decanter/appender/jms/JmsAppender.java
index c328b19..ae1cb00 100644
--- a/appender/jms/src/main/java/org/apache/karaf/decanter/appender/jms/JmsAppender.java
+++ b/appender/jms/src/main/java/org/apache/karaf/decanter/appender/jms/JmsAppender.java
@@ -22,6 +22,7 @@ import java.util.Map;
import javax.jms.*;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -39,6 +40,18 @@ import org.slf4j.LoggerFactory;
)
public class JmsAppender implements EventHandler {
+ public static String USERNAME_PROPERTY = "username";
+ public static String PASSWORD_PROPERTY = "password";
+ public static String DESTINATION_NAME_PROPERTY = "destination.name";
+ public static String DESTINATION_TYPE_PROPERTY = "destination.type";
+ public static String MESSAGE_TYPE_PROPERTY = "message.type";
+
+ public static String USERNAME_DEFAULT = null;
+ public static String PASSWORD_DEFAULT = null;
+ public static String DESTINATION_NAME_DEFAULT = "decanter";
+ public static String DESTINATION_TYPE_DEFAULT = "queue";
+ public static String MESSAGE_TYPE_DEFAULT = "text";
+
@Reference
public ConnectionFactory connectionFactory;
@@ -47,11 +60,8 @@ public class JmsAppender implements EventHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(JmsAppender.class);
- private String username;
- private String password;
- private String destinationName;
- private String destinationType;
- private String messageType;
+ private Dictionary<String, Object> config;
+
@SuppressWarnings("unchecked")
@Activate
@@ -60,45 +70,42 @@ public class JmsAppender implements EventHandler {
}
void activate(Dictionary<String, Object> config) {
- username = getProperty(config, "username", null);
- password = getProperty(config, "password", null);
- destinationName = getProperty(config, "destination.name", "decanter");
- destinationType = getProperty(config, "destination.type", "queue");
- messageType = getProperty(config, "message.type", "text");
- LOGGER.info("Decanter JMS Appender started sending to {} {}",destinationType, destinationName);
+ this.config = config;
+ LOGGER.info("Decanter JMS Appender started sending to {} {}", getValue(config, DESTINATION_TYPE_PROPERTY, DESTINATION_TYPE_DEFAULT), getValue(config, DESTINATION_NAME_PROPERTY, DESTINATION_NAME_DEFAULT));
}
- private String getProperty(Dictionary<String, Object> properties, String key, String defaultValue) {
+ private String getValue(Dictionary<String, Object> properties, String key, String defaultValue) {
return (properties.get(key) != null) ? (String) properties.get(key) : defaultValue;
}
@Override
public void handleEvent(Event event) {
- Connection connection = null;
- Session session = null;
- try {
- connection = createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = createDestination(session);
- MessageProducer producer = session.createProducer(destination);
- if (messageType.equalsIgnoreCase("text")) {
- TextMessage message = session.createTextMessage(marshaller.marshal(event));
- producer.send(message);
- } else {
- MapMessage message = session.createMapMessage();
- for (String name : event.getPropertyNames()) {
- Object value = event.getProperty(name);
- setProperty(message, name, value);
+ if (EventFilter.match(event, config)) {
+ Connection connection = null;
+ Session session = null;
+ try {
+ connection = createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = createDestination(session);
+ MessageProducer producer = session.createProducer(destination);
+ if (getValue(config, MESSAGE_TYPE_PROPERTY, MESSAGE_TYPE_DEFAULT).equalsIgnoreCase("text")) {
+ TextMessage message = session.createTextMessage(marshaller.marshal(event));
+ producer.send(message);
+ } else {
+ MapMessage message = session.createMapMessage();
+ for (String name : event.getPropertyNames()) {
+ Object value = event.getProperty(name);
+ setProperty(message, name, value);
+ }
+ producer.send(message);
}
- producer.send(message);
+ producer.close();
+ } catch (Exception e) {
+ LOGGER.warn("Can't send to JMS broker", e);
+ } finally {
+ safeClose(session);
+ safeClose(connection);
}
- producer.close();
- } catch (Exception e) {
- LOGGER.warn("Can't send to JMS broker", e);
- }
- finally {
- safeClose(session);
- safeClose(connection);
}
}
@@ -125,12 +132,16 @@ public class JmsAppender implements EventHandler {
}
private Destination createDestination(Session session) throws JMSException {
+ String destinationType = getValue(config, DESTINATION_TYPE_PROPERTY, DESTINATION_TYPE_DEFAULT);
+ String destinationName = getValue(config, DESTINATION_NAME_PROPERTY, DESTINATION_NAME_DEFAULT);
return (destinationType.equalsIgnoreCase("topic"))
? session.createTopic(destinationName)
: session.createQueue(destinationName);
}
private Connection createConnection() throws JMSException {
+ String username = getValue(config, USERNAME_PROPERTY, USERNAME_DEFAULT);
+ String password = getValue(config, PASSWORD_PROPERTY, PASSWORD_DEFAULT);
return (username != null)
? connectionFactory.createConnection(username, password)
: connectionFactory.createConnection();
diff --git a/appender/jms/src/test/java/org/apache/karaf/decanter/appender/jms/JmsAppenderTest.java b/appender/jms/src/test/java/org/apache/karaf/decanter/appender/jms/JmsAppenderTest.java
index 36ac273..aa2e24f 100644
--- a/appender/jms/src/test/java/org/apache/karaf/decanter/appender/jms/JmsAppenderTest.java
+++ b/appender/jms/src/test/java/org/apache/karaf/decanter/appender/jms/JmsAppenderTest.java
@@ -28,6 +28,7 @@ import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.junit.Assert;
import org.junit.Test;
import org.osgi.service.event.Event;
@@ -35,7 +36,7 @@ import org.osgi.service.event.Event;
public class JmsAppenderTest {
@Test
- public void testHandleEvent() throws JMSException {
+ public void test() throws JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
JmsAppender appender = new JmsAppender();
appender.connectionFactory = cf;
@@ -70,4 +71,52 @@ public class JmsAppenderTest {
Object map = message.getObject("map");
Assert.assertTrue(map instanceof Map);
}
+
+ @Test
+ public void testWithFilter() throws JMSException {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ JmsAppender appender = new JmsAppender();
+ appender.connectionFactory = cf;
+ Dictionary<String, Object> config = new Hashtable<>();
+ config.put("message.type", "map");
+ config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+ config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
+ appender.activate(config);
+
+ Connection con = cf.createConnection();
+ con.start();
+ Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer = sess.createConsumer(sess.createQueue("decanter"));
+
+ Map<String, Object> data = new HashMap<String, Object>();
+ data.put("timestamp", 1l);
+ data.put("string", "test");
+ data.put("boolean", true);
+ data.put("integer", 1);
+ data.put("testnull", null);
+ data.put("map", new HashMap<String, String>());
+ appender.handleEvent(new Event("decanter/collect", data));
+
+ data = new HashMap<>();
+ data.put("refused_property", "value");
+ appender.handleEvent(new Event("decanter/collect", data));
+
+ data = new HashMap<>();
+ data.put("property", "refused_value");
+ appender.handleEvent(new Event("decanter/collect", data));
+
+ MapMessage message = (MapMessage)consumer.receive(1000);
+ consumer.close();
+ sess.close();
+ con.close();
+
+ Assert.assertEquals(1l, message.getObject("timestamp"));
+ Assert.assertEquals("test", message.getObject("string"));
+ Assert.assertEquals(true, message.getObject("boolean"));
+ Assert.assertEquals(1, message.getObject("integer"));
+ Object map = message.getObject("map");
+ Assert.assertTrue(map instanceof Map);
+ }
+
}
diff --git a/appender/kafka/pom.xml b/appender/kafka/pom.xml
index 61e2f02..8eaf002 100644
--- a/appender/kafka/pom.xml
+++ b/appender/kafka/pom.xml
@@ -39,15 +39,72 @@
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
+
+ <!-- test -->
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.10</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>${kafka.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>1.7.25</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
<plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>*</Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.appender.kafka,
+ org.apache.karaf.decanter.appender.utils
+ </Private-Package>
+ <_dsannotations>*</_dsannotations>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
diff --git a/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/ConfigMapper.java b/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/ConfigMapper.java
index f62cac0..5d6e775 100644
--- a/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/ConfigMapper.java
+++ b/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/ConfigMapper.java
@@ -62,7 +62,7 @@ public class ConfigMapper {
}
private void process(String key, String defaultValue) {
- String value = (String)confSource.get(key);
+ String value = (String) confSource.get(key);
String usedValue = (value != null) ? value : defaultValue;
if (usedValue != null) {
config.put(key, usedValue);
diff --git a/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java b/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
index c09dd66..9bb2c48 100644
--- a/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
+++ b/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
@@ -16,6 +16,7 @@
*/
package org.apache.karaf.decanter.appender.kafka;
+import java.util.Dictionary;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
@@ -23,6 +24,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -48,6 +50,7 @@ public class KafkaAppender implements EventHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(KafkaAppender.class);
+ private Dictionary<String, Object> config;
private Properties properties;
private String topic;
private KafkaProducer<String, String> producer;
@@ -55,7 +58,12 @@ public class KafkaAppender implements EventHandler {
@Activate
@SuppressWarnings("unchecked")
public void activate(ComponentContext context) {
- this.properties = ConfigMapper.map(context.getProperties());
+ activate(context.getProperties());
+ }
+
+ public void activate(Dictionary<String, Object> config) {
+ this.config = config;
+ this.properties = ConfigMapper.map(config);
this.topic = properties.getProperty("topic");
properties.remove("topic");
@@ -71,20 +79,22 @@ public class KafkaAppender implements EventHandler {
@Override
public void handleEvent(Event event) {
- try {
- String type = (String)event.getProperty("type");
- String data = marshaller.marshal(event);
- producer.send(new ProducerRecord<>(topic, type, data), new Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e != null) {
- LOGGER.warn("Can't send event to Kafka broker", e);
+ if (EventFilter.match(event, config)) {
+ try {
+ String type = (String) event.getProperty("type");
+ String data = marshaller.marshal(event);
+ producer.send(new ProducerRecord<>(topic, type, data), new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+ if (e != null) {
+ LOGGER.warn("Can't send event to Kafka broker", e);
+ }
}
- }
- }).get();
- producer.flush();
- } catch (Exception e) {
- LOGGER.warn("Error sending event to kafka", e);
+ }).get();
+ producer.flush();
+ } catch (Exception e) {
+ LOGGER.warn("Error sending event to kafka", e);
+ }
}
}
diff --git a/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/EmbeddedKafkaBroker.java b/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/EmbeddedKafkaBroker.java
new file mode 100644
index 0000000..4de295a
--- /dev/null
+++ b/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/EmbeddedKafkaBroker.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.kafka;
+
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZkUtils;
+import org.apache.kafka.common.utils.SystemTime;
+import org.junit.rules.ExternalResource;
+import scala.Option;
+import scala.collection.mutable.Buffer;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class EmbeddedKafkaBroker extends ExternalResource {
+
+ private final Integer brokerId;
+ private final Integer port;
+ private final String zkConnection;
+ private final Properties baseProperties;
+
+ private final String brokerList;
+
+ private KafkaServer kafkaServer;
+ private File logDir;
+ private ZkUtils zkUtils;
+
+ public EmbeddedKafkaBroker(int brokerId, int port, String zkConnection, Properties baseProperties) {
+ this.brokerId = brokerId;
+ this.port = port;
+ this.zkConnection = zkConnection;
+ this.baseProperties = baseProperties;
+ this.brokerList = "localhost:" + this.port;
+ }
+
+ @Override
+ public void before() {
+ logDir = new File("target/test-classes/kafka-log");
+ logDir.mkdirs();
+
+ Properties properties = new Properties();
+ properties.putAll(baseProperties);
+ properties.setProperty("zookeeper.connect", zkConnection);
+ properties.setProperty("broker.id", brokerId.toString());
+ properties.setProperty("host.name", "localhost");
+ properties.setProperty("port", Integer.toString(port));
+ properties.setProperty("log.dir", logDir.getAbsolutePath());
+ properties.setProperty("num.partitions", String.valueOf(1));
+ properties.setProperty("auto.create.topics.enable", String.valueOf(Boolean.TRUE));
+ properties.setProperty("log.flush.interval.messages", String.valueOf(1));
+ properties.setProperty("offsets.topic.replication.factor", String.valueOf(1));
+
+ kafkaServer = startBroker(properties);
+ }
+
+ private KafkaServer startBroker(Properties props) {
+ zkUtils = ZkUtils.apply(
+ zkConnection,
+ 30000,
+ 30000,
+ false);
+ List<KafkaMetricsReporter> kmrList = new ArrayList<>();
+ Buffer<KafkaMetricsReporter> metricsList = scala.collection.JavaConversions.asScalaBuffer(kmrList);
+ KafkaServer server = new KafkaServer(new KafkaConfig(props), new SystemTime(), Option.<String>empty(), metricsList);
+ server.startup();
+ return server;
+ }
+
+ public String getBrokerList() {
+ return brokerList;
+ }
+
+ public Integer getPort() {
+ return port;
+ }
+
+ public void after() {
+ kafkaServer.shutdown();
+ logDir.delete();
+ }
+
+}
diff --git a/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/EmbeddedZooKeeper.java b/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/EmbeddedZooKeeper.java
new file mode 100644
index 0000000..2425bf8
--- /dev/null
+++ b/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/EmbeddedZooKeeper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.kafka;
+
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.junit.rules.ExternalResource;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public class EmbeddedZooKeeper extends ExternalResource {
+
+ private int port = -1;
+ private int tickTime = 500;
+
+ private ServerCnxnFactory cnxnFactory;
+ private File snapshotDir;
+ private File logDir;
+ private ZooKeeperServer zooKeeperServer;
+
+ public EmbeddedZooKeeper(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public void before() throws IOException {
+ snapshotDir = new File("target/test-classes/zk-snapshot");
+ snapshotDir.mkdirs();
+ logDir = new File("target/test-classes/zk-log");
+ logDir.mkdirs();
+
+ try {
+ zooKeeperServer = new ZooKeeperServer(snapshotDir, logDir, tickTime);
+ cnxnFactory = new NIOServerCnxnFactory();
+ cnxnFactory.configure(new InetSocketAddress("localhost", port), 1024);
+ cnxnFactory.startup(zooKeeperServer);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void after() {
+ cnxnFactory.shutdown();
+ zooKeeperServer.shutdown();
+
+ logDir.delete();
+ snapshotDir.delete();
+ }
+
+ public String getConnection() {
+ return "localhost:" + port;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+}
diff --git a/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/KafkaAppenderTest.java b/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/KafkaAppenderTest.java
new file mode 100644
index 0000000..7172502
--- /dev/null
+++ b/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/KafkaAppenderTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.*;
+import org.osgi.service.event.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class KafkaAppenderTest {
+
+ @ClassRule
+ public static EmbeddedZooKeeper zookeeper = new EmbeddedZooKeeper(PortFinder.getNextAvailable(23000));
+
+ @ClassRule
+ public static EmbeddedKafkaBroker kafkaBroker =
+ new EmbeddedKafkaBroker(0,
+ PortFinder.getNextAvailable(24000),
+ zookeeper.getConnection(),
+ new Properties());
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaAppenderTest.class);
+
+ @BeforeClass
+ public static void beforeClass() {
+ LOG.info("Embedded Zookeeper connection: " + zookeeper.getConnection());
+ LOG.info("Embedded Kafka cluster broker list: " + kafkaBroker.getBrokerList());
+ }
+
+ @Test
+ @Ignore
+ public void test() throws Exception {
+ KafkaAppender appender = new KafkaAppender();
+ Dictionary<String, Object> config = new Hashtable<>();
+ config.put("topic", "test");
+ config.put("bootstrap.servers", kafkaBroker.getBrokerList());
+ appender.activate(config);
+
+ Map<String, Object> data = new HashMap<>();
+ data.put("foo", "bar");
+ Event event = new Event("decanter/collect", data);
+ appender.handleEvent(event);
+
+ Properties kafkaConfig = new Properties();
+ kafkaConfig.put("topic", "test");
+ kafkaConfig.put("bootstrap.servers", kafkaBroker.getBrokerList());
+ KafkaConsumer consumer = new KafkaConsumer<String, String>(kafkaConfig);
+ consumer.subscribe(Arrays.asList("test"));
+ ConsumerRecords<String, String> records = consumer.poll(1000);
+ Assert.assertFalse(records.isEmpty());
+ Assert.assertEquals(1, records.count());
+ }
+
+}
diff --git a/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/PortFinder.java b/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/PortFinder.java
new file mode 100644
index 0000000..df83631
--- /dev/null
+++ b/appender/kafka/src/test/java/org/apache/karaf/decanter/appender/kafka/PortFinder.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.ServerSocket;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PortFinder {
+
+ /**
+ * The minimum server currentMinPort number for IPv4.
+ * Set at 1100 to avoid returning privileged currentMinPort numbers.
+ */
+ public static final int MIN_PORT_NUMBER = 1100;
+
+ /**
+ * The maximum server currentMinPort number for IPv4.
+ */
+ public static final int MAX_PORT_NUMBER = 65535;
+
+ private static final Logger LOG = LoggerFactory.getLogger(PortFinder.class);
+
+ /**
+ * We'll hold open the lowest port in this process
+ * so parallel processes won't use the same block
+ * of ports. They'll go up to the next block.
+ */
+ private static final ServerSocket LOCK;
+
+ /**
+ * Incremented to the next lowest available port when getNextAvailable() is called.
+ */
+ private static AtomicInteger currentMinPort = new AtomicInteger(MIN_PORT_NUMBER);
+
+ /**
+ * Creates a new instance.
+ */
+ private PortFinder() {
+ // Do nothing
+ }
+
+ static {
+ int port = MIN_PORT_NUMBER;
+ ServerSocket ss = null;
+
+ while (ss == null) {
+ try {
+ ss = new ServerSocket(port);
+ } catch (Exception e) {
+ ss = null;
+ port += 200;
+ }
+ }
+ LOCK = ss;
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ try {
+ LOCK.close();
+ } catch (Exception ex) {
+ //ignore
+ }
+ }
+ });
+ currentMinPort.set(port + 1);
+ }
+
+ /**
+ * Gets the next available port starting at the lowest number. This is the preferred
+ * method to use. The port return is immediately marked in use and doesn't rely on the caller actually opening
+ * the port.
+ *
+ * @throws IllegalArgumentException is thrown if the port number is out of range
+ * @throws NoSuchElementException if there are no ports available
+ * @return the available port
+ */
+ public static synchronized int getNextAvailable() {
+ int next = getNextAvailable(currentMinPort.get());
+ currentMinPort.set(next + 1);
+ return next;
+ }
+
+ /**
+ * Gets the next available port starting at a given from port.
+ *
+ * @param fromPort the from port to scan for availability
+ * @throws IllegalArgumentException is thrown if the port number is out of range
+ * @throws NoSuchElementException if there are no ports available
+ * @return the available port
+ */
+ public static synchronized int getNextAvailable(int fromPort) {
+ if (fromPort < currentMinPort.get() || fromPort > MAX_PORT_NUMBER) {
+ throw new IllegalArgumentException("From port number not in valid range: " + fromPort);
+ }
+
+ for (int i = fromPort; i <= MAX_PORT_NUMBER; i++) {
+ if (available(i)) {
+ LOG.info("getNextAvailable({}) -> {}", fromPort, i);
+ return i;
+ }
+ }
+
+ throw new NoSuchElementException("Could not find an available port above " + fromPort);
+ }
+
+ /**
+ * Checks to see if a specific port is available.
+ *
+ * @param port the port number to check for availability
+ * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
+ * @throws IllegalArgumentException is thrown if the port number is out of range
+ */
+ public static boolean available(int port) throws IllegalArgumentException {
+ if (port < currentMinPort.get() || port > MAX_PORT_NUMBER) {
+ throw new IllegalArgumentException("Invalid start currentMinPort: " + port);
+ }
+
+ ServerSocket ss = null;
+ DatagramSocket ds = null;
+ try {
+ ss = new ServerSocket(port);
+ ss.setReuseAddress(true);
+ ds = new DatagramSocket(port);
+ ds.setReuseAddress(true);
+ return true;
+ } catch (IOException e) {
+ // Do nothing
+ } finally {
+ if (ds != null) {
+ ds.close();
+ }
+
+ if (ss != null) {
+ try {
+ ss.close();
+ } catch (IOException e) {
+ /* should not be thrown */
+ }
+ }
+ }
+
+ return false;
+ }
+
+}
diff --git a/appender/log/pom.xml b/appender/log/pom.xml
index eb7c90d..0986ddc 100644
--- a/appender/log/pom.xml
+++ b/appender/log/pom.xml
@@ -38,11 +38,34 @@
<groupId>org.apache.karaf.decanter</groupId>
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
<plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>*</Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.appender.log,
+ org.apache.karaf.decanter.appender.utils
+ </Private-Package>
+ <_dsannotations>*</_dsannotations>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
diff --git a/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/LogAppender.java b/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/LogAppender.java
index 75c62c3..1bae119 100644
--- a/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/LogAppender.java
+++ b/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/LogAppender.java
@@ -17,6 +17,9 @@
package org.apache.karaf.decanter.appender.log;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
@@ -26,6 +29,8 @@ import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Dictionary;
+
/**
* Karaf Decanter Log Appender
* Listens on EventAdmin and writes to the slf4j logger.
@@ -43,16 +48,25 @@ public class LogAppender implements EventHandler {
@Reference(cardinality = ReferenceCardinality.OPTIONAL)
public Marshaller marshaller;
+ private Dictionary<String, Object> config;
+
+ @Activate
+ public void activate(ComponentContext componentContext) {
+ this.config = componentContext.getProperties();
+ }
+
@Override
public void handleEvent(Event event) {
- if (marshaller != null) {
- LOGGER.info(marshaller.marshal(event));
- } else {
- StringBuilder builder = new StringBuilder();
- for (String innerKey : event.getPropertyNames()) {
- builder.append(innerKey).append(":").append(toString(event.getProperty(innerKey))).append(" | ");
+ if (EventFilter.match(event, config)) {
+ if (marshaller != null) {
+ LOGGER.info(marshaller.marshal(event));
+ } else {
+ StringBuilder builder = new StringBuilder();
+ for (String innerKey : event.getPropertyNames()) {
+ builder.append(innerKey).append(":").append(toString(event.getProperty(innerKey))).append(" | ");
+ }
+ LOGGER.info(builder.toString());
}
- LOGGER.info(builder.toString());
}
}
diff --git a/appender/mongodb/pom.xml b/appender/mongodb/pom.xml
index 7f5c7b8..0366157 100644
--- a/appender/mongodb/pom.xml
+++ b/appender/mongodb/pom.xml
@@ -39,6 +39,10 @@
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.2.2</version>
@@ -51,7 +55,9 @@
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<configuration>
+ <obrRepository>NONE</obrRepository>
<instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<Import-Package>
!io.netty*,
org.slf4j;resolution:=optional,
@@ -63,10 +69,12 @@
</Import-Package>
<Private-Package>
org.apache.karaf.decanter.appender.mongodb,
+ org.apache.karaf.decanter.appender.utils,
com.mongodb*,
org.bson*,
io.netty*
</Private-Package>
+ <_dsannotations>*</_dsannotations>
</instructions>
</configuration>
</plugin>
diff --git a/appender/mongodb/src/main/java/org/apache/karaf/decanter/appender/mongodb/MongoDbAppender.java b/appender/mongodb/src/main/java/org/apache/karaf/decanter/appender/mongodb/MongoDbAppender.java
index d97edff..16fef55 100644
--- a/appender/mongodb/src/main/java/org/apache/karaf/decanter/appender/mongodb/MongoDbAppender.java
+++ b/appender/mongodb/src/main/java/org/apache/karaf/decanter/appender/mongodb/MongoDbAppender.java
@@ -21,6 +21,7 @@ import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.bson.Document;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.*;
@@ -40,6 +41,14 @@ import java.util.Dictionary;
)
public class MongoDbAppender implements EventHandler {
+ public static String URI_PROPERTY = "uri";
+ public static String DATABASE_PROPERTY = "database";
+ public static String COLLECTION_PROPERTY = "collection";
+
+ public static String URI_DEFAULT = "mongodb://localhost";
+ public static String DATABASE_DEFAULT = "decanter";
+ public static String COLLECTION_DEFAULT = "decanter";
+
@Reference
public Marshaller marshaller;
@@ -49,13 +58,15 @@ public class MongoDbAppender implements EventHandler {
private MongoDatabase mongoDatabase;
private MongoCollection mongoCollection;
+ private Dictionary<String, Object> config;
+
@Activate
public void activate(ComponentContext componentContext) {
- Dictionary<String, Object> config = componentContext.getProperties();
+ config = componentContext.getProperties();
- String uri = getValue(config, "uri", "mongodb://localhost");
- String database = getValue(config, "database", "decanter");
- String collection = getValue(config, "collection", "decanter");
+ String uri = getValue(config, URI_PROPERTY, URI_DEFAULT);
+ String database = getValue(config, DATABASE_PROPERTY, DATABASE_DEFAULT);
+ String collection = getValue(config, COLLECTION_PROPERTY, COLLECTION_DEFAULT);
mongoClient = new MongoClient(new MongoClientURI(uri));
mongoDatabase = mongoClient.getDatabase(database);
@@ -69,11 +80,13 @@ public class MongoDbAppender implements EventHandler {
@Override
public void handleEvent(Event event) {
- try {
- String data = marshaller.marshal(event);
- mongoCollection.insertOne(Document.parse(data));
- } catch (Exception e) {
- LOGGER.warn("Error storing event in MongoDB", e);
+ if (EventFilter.match(event, config)) {
+ try {
+ String data = marshaller.marshal(event);
+ mongoCollection.insertOne(Document.parse(data));
+ } catch (Exception e) {
+ LOGGER.warn("Error storing event in MongoDB", e);
+ }
}
}
diff --git a/appender/mqtt/pom.xml b/appender/mqtt/pom.xml
index 9d435ba..52920eb 100644
--- a/appender/mqtt/pom.xml
+++ b/appender/mqtt/pom.xml
@@ -42,6 +42,10 @@
<groupId>org.apache.karaf.decanter.marshaller</groupId>
<artifactId>org.apache.karaf.decanter.marshaller.json</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
<dependency>
<groupId>javax.json</groupId>
@@ -68,6 +72,25 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>*</Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.appender.mqtt,
+ org.apache.karaf.decanter.appender.utils
+ </Private-Package>
+ <_dsannotations>*</_dsannotations>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
diff --git a/appender/mqtt/src/main/java/org/apache/karaf/decanter/appender/mqtt/MqttAppender.java b/appender/mqtt/src/main/java/org/apache/karaf/decanter/appender/mqtt/MqttAppender.java
index 426c209..024b53b 100644
--- a/appender/mqtt/src/main/java/org/apache/karaf/decanter/appender/mqtt/MqttAppender.java
+++ b/appender/mqtt/src/main/java/org/apache/karaf/decanter/appender/mqtt/MqttAppender.java
@@ -16,16 +16,14 @@
*/
package org.apache.karaf.decanter.appender.mqtt;
-import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Dictionary;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
-import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
@@ -45,47 +43,59 @@ import org.slf4j.LoggerFactory;
)
public class MqttAppender implements EventHandler {
+ public static String SERVER_PROPERTY = "server";
+ public static String CLIENT_ID_PROPERTY = "clientId";
+ public static String TOPIC_PROPERTY = "topic";
+
+ public static String SERVER_DEFAULT = "tcp://localhost:9300";
+ public static String CLIENT_ID_DEFAULT = "decanter";
+ public static String TOPIC_DEFAULT = "decanter";
+
@Reference
public Marshaller marshaller;
private final static Logger LOGGER = LoggerFactory.getLogger(MqttAppender.class);
private MqttClient client;
- private String server;
- private String clientId;
- private String topic;
+
+ private Dictionary<String, Object> config;
@Activate
public void activate(ComponentContext componentContext) throws Exception {
activate(componentContext.getProperties());
}
- public void activate(Dictionary<String, Object> dictionary) throws Exception {
- this.server = getProperty(dictionary, "server", "tcp://localhost:9300");
- this.clientId = getProperty(dictionary, "clientId", "decanter");
- this.topic = getProperty(dictionary, "topic", "decanter");
- client = new MqttClient(server, clientId, new MemoryPersistence());
+ public void activate(Dictionary<String, Object> config) throws Exception {
+ this.config = config;
+ client = new MqttClient(
+ getValue(config, SERVER_PROPERTY, SERVER_DEFAULT),
+ getValue(config, CLIENT_ID_PROPERTY, CLIENT_ID_DEFAULT),
+ new MemoryPersistence());
client.connect();
}
- private String getProperty(Dictionary<String, Object> properties, String key, String defaultValue) {
- return (properties.get(key) != null) ? (String) properties.get(key) : defaultValue;
+ private String getValue(Dictionary<String, Object> config, String key, String defaultValue) {
+ return (config.get(key) != null) ? (String) config.get(key) : defaultValue;
}
@Override
public void handleEvent(Event event) {
- try {
- MqttMessage message = new MqttMessage();
- String jsonSt = marshaller.marshal(event);
- message.setPayload(jsonSt.getBytes(StandardCharsets.UTF_8));
- client.publish(topic, message);
- } catch (Exception e) {
- LOGGER.warn("Error sending to MQTT server " + client.getServerURI(), e);
+ if (EventFilter.match(event, config)) {
try {
- client.disconnect();
- client.connect();
- } catch (MqttException e1) {
- e1.printStackTrace();
+ MqttMessage message = new MqttMessage();
+ String jsonSt = marshaller.marshal(event);
+ message.setPayload(jsonSt.getBytes(StandardCharsets.UTF_8));
+ client.publish(
+ getValue(config, TOPIC_PROPERTY, TOPIC_DEFAULT),
+ message);
+ } catch (Exception e) {
+ LOGGER.warn("Error sending to MQTT server " + client.getServerURI(), e);
+ try {
+ client.disconnect();
+ client.connect();
+ } catch (MqttException e1) {
+ e1.printStackTrace();
+ }
}
}
}
diff --git a/appender/mqtt/src/test/java/org/apache/karaf/decanter/appender/mqtt/TestMqttAppender.java b/appender/mqtt/src/test/java/org/apache/karaf/decanter/appender/mqtt/TestMqttAppender.java
index b9c2660..f74420d 100644
--- a/appender/mqtt/src/test/java/org/apache/karaf/decanter/appender/mqtt/TestMqttAppender.java
+++ b/appender/mqtt/src/test/java/org/apache/karaf/decanter/appender/mqtt/TestMqttAppender.java
@@ -28,6 +28,7 @@ import javax.json.JsonReader;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
@@ -41,12 +42,13 @@ import org.osgi.service.event.Event;
import org.osgi.service.event.EventConstants;
public class TestMqttAppender {
+
private static final String SERVER = "tcp://localhost:11883";
private static final String TOPIC = "decanter";
private static final long TIMESTAMP = 1454428780634L;
@Test
- public void testSend() throws URISyntaxException, Exception {
+ public void test() throws URISyntaxException, Exception {
BrokerService brokerService = new BrokerService();
brokerService.setUseJmx(false);
brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
@@ -61,15 +63,30 @@ public class TestMqttAppender {
MqttAppender appender = new MqttAppender();
appender.marshaller = marshaller;
Dictionary<String, Object> config = new Hashtable<>();
- config.put("server", SERVER);
- config.put("clientId", "decanter");
- config.put("topic", TOPIC);
+ config.put(MqttAppender.SERVER_PROPERTY, SERVER);
+ config.put(MqttAppender.CLIENT_ID_PROPERTY, "decanter");
+ config.put(MqttAppender.TOPIC_PROPERTY, TOPIC);
+ config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
+ config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
appender.activate(config);
- Map<String, Object> properties = new HashMap<>();
- properties.put(EventConstants.TIMESTAMP, TIMESTAMP);
- Event event = new Event(TOPIC, properties);
+ Map<String, Object> data = new HashMap<>();
+ data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+ data.put("property_refused", "data");
+ Event event = new Event(TOPIC, data);
+ appender.handleEvent(event);
+
+ data = new HashMap<>();
+ data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+ data.put("property", "refused_value");
+ event = new Event(TOPIC, data);
appender.handleEvent(event);
+
+ data = new HashMap<>();
+ data.put(EventConstants.TIMESTAMP, TIMESTAMP);
+ event = new Event(TOPIC, data);
+ appender.handleEvent(event);
+
Thread.sleep(100);
Assert.assertEquals(1, received.size());
diff --git a/appender/orientdb/pom.xml b/appender/orientdb/pom.xml
index f06253f..102ddb8 100644
--- a/appender/orientdb/pom.xml
+++ b/appender/orientdb/pom.xml
@@ -39,6 +39,10 @@
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.orientechnologies</groupId>
<artifactId>orientdb-client</artifactId>
<version>${orientdb.version}</version>
@@ -60,6 +64,25 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>*</Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.appender.orientdb,
+ org.apache.karaf.decanter.appender.utils
+ </Private-Package>
+ <_dsannotations>*</_dsannotations>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
diff --git a/appender/orientdb/src/main/java/org/apache/karaf/decanter/appender/orientdb/OrientDBAppender.java b/appender/orientdb/src/main/java/org/apache/karaf/decanter/appender/orientdb/OrientDBAppender.java
index 58b6c19..fd50d0f 100644
--- a/appender/orientdb/src/main/java/org/apache/karaf/decanter/appender/orientdb/OrientDBAppender.java
+++ b/appender/orientdb/src/main/java/org/apache/karaf/decanter/appender/orientdb/OrientDBAppender.java
@@ -20,6 +20,7 @@ package org.apache.karaf.decanter.appender.orientdb;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.record.impl.ODocument;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -38,17 +39,27 @@ import java.util.Dictionary;
)
public class OrientDBAppender implements EventHandler {
+ public static String URL_PROPERTY = "url";
+ public static String USERNAME_PROPERTY = "username";
+ public static String PASSWORD_PROPERTY = "password";
+
+ public static String URL_DEFAULT = "remote:localhost/decanter";
+ public static String USERNAME_DEFAULT = "root";
+ public static String PASSWORD_DEFAULT = "decanter";
+
@Reference
public Marshaller marshaller;
private ODatabaseDocumentTx database;
+ private Dictionary<String, Object> config;
+
@Activate
public void activate(ComponentContext componentContext) {
- Dictionary<String, Object> config = componentContext.getProperties();
- String url = getValue(config, "url", "remote:localhost/decanter");
- String username = getValue(config, "username", "root");
- String password = getValue(config, "password", "decanter");
+ config = componentContext.getProperties();
+ String url = getValue(config, URL_PROPERTY, URL_DEFAULT);
+ String username = getValue(config, USERNAME_PROPERTY, USERNAME_DEFAULT);
+ String password = getValue(config, PASSWORD_PROPERTY, PASSWORD_DEFAULT);
database = new ODatabaseDocumentTx(url).open(username, password);
}
@@ -64,9 +75,11 @@ public class OrientDBAppender implements EventHandler {
@Override
public void handleEvent(Event event) {
- String json = marshaller.marshal(event);
- ODocument document = new ODocument("decanter").fromJSON(json);
- document.save();
+ if (EventFilter.match(event, config)) {
+ String json = marshaller.marshal(event);
+ ODocument document = new ODocument("decanter").fromJSON(json);
+ document.save();
+ }
}
}
diff --git a/appender/pom.xml b/appender/pom.xml
index 3c9605b..d021ddd 100644
--- a/appender/pom.xml
+++ b/appender/pom.xml
@@ -34,6 +34,7 @@
<name>Apache Karaf :: Decanter :: Appender</name>
<modules>
+ <module>utils</module>
<module>camel</module>
<module>cassandra</module>
<module>dropwizard</module>
diff --git a/appender/redis/pom.xml b/appender/redis/pom.xml
index 3f3d5c6..feb982e 100644
--- a/appender/redis/pom.xml
+++ b/appender/redis/pom.xml
@@ -39,6 +39,10 @@
<artifactId>redisson</artifactId>
<version>${redisson.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
</dependencies>
<build>
@@ -46,14 +50,20 @@
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
<configuration>
+ <obrRepository>NONE</obrRepository>
<instructions>
+ <Export-Package>!*</Export-Package>
<Import-Package>
com.esotericsoftware.kryo;resolution:=optional
</Import-Package>
<Private-Package>
org.redisson*,
+ org.apache.karaf.decanter.appender.utils
</Private-Package>
+ <_dsannotations>*</_dsannotations>
</instructions>
</configuration>
</plugin>
diff --git a/appender/redis/src/main/java/org/apache/karaf/decanter/appender/redis/RedisAppender.java b/appender/redis/src/main/java/org/apache/karaf/decanter/appender/redis/RedisAppender.java
index d38ebdb..749a320 100644
--- a/appender/redis/src/main/java/org/apache/karaf/decanter/appender/redis/RedisAppender.java
+++ b/appender/redis/src/main/java/org/apache/karaf/decanter/appender/redis/RedisAppender.java
@@ -16,6 +16,7 @@
*/
package org.apache.karaf.decanter.appender.redis;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -26,8 +27,6 @@ import org.osgi.service.event.EventHandler;
import org.redisson.Config;
import org.redisson.Redisson;
import org.redisson.RedissonClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Dictionary;
import java.util.Map;
@@ -42,38 +41,46 @@ import java.util.Map;
)
public class RedisAppender implements EventHandler {
- private final static Logger LOGGER = LoggerFactory.getLogger(RedisAppender.class);
+ public static String ADDRESS_PROPERTY = "address";
+ public static String MODE_PROPERTY = "mode";
+ public static String MAP_PROPERTY = "map";
+ public static String MASTER_ADDRESS_PROPERTY = "masterAddress";
+ public static String MASTER_NAME_PROPERTY = "masterName";
+ public static String SCAN_INTERVAL_PROPERTY = "scanInterval";
- private String address;
- private String mode;
- private String map;
- private String masterAddress;
- private String masterName;
- private int scanInterval;
+ public static String ADDRESS_DEFAULT = "localhost:6379";
+ public static String MODE_DEFAULT = "Single";
+ public static String MAP_DEFAULT = "Decanter";
+ public static String MASTER_ADDRESS_DEFAULT = null;
+ public static String MASTER_NAME_DEFAULT = null;
+ public static String SCAN_INTERVAL_DEFAULT = "2000";
private RedissonClient redissonClient;
+ private Dictionary<String, Object> config;
+
@Activate
public void activate(ComponentContext componentContext) {
- Dictionary<String, Object> properties = componentContext.getProperties();
- address = getProperty(properties, "address", "localhost:6379");
- mode = getProperty(properties, "mode", "Single");
- map = getProperty(properties, "map", "Decanter");
- masterAddress = getProperty(properties, "masterAddress", null);
- masterName = getProperty(properties, "masterName", null);
- scanInterval = Integer.parseInt(getProperty(properties, "scanInterval", "2000"));
+ config = componentContext.getProperties();
+
+ String address = getValue(config, ADDRESS_PROPERTY, ADDRESS_DEFAULT);
+ String mode = getValue(config, MODE_PROPERTY, MODE_DEFAULT);
+ String map = getValue(config, MAP_PROPERTY, MAP_DEFAULT);
+ String masterAddress = getValue(config, MASTER_ADDRESS_PROPERTY, MASTER_ADDRESS_DEFAULT);
+ String masterName = getValue(config, MASTER_NAME_PROPERTY, MASTER_NAME_DEFAULT);
+ int scanInterval = Integer.parseInt(getValue(config, SCAN_INTERVAL_PROPERTY, SCAN_INTERVAL_DEFAULT));
- Config config = new Config();
+ Config redissonConfig = new Config();
if (mode.equalsIgnoreCase("Single")) {
- config.useSingleServer().setAddress(address);
+ redissonConfig.useSingleServer().setAddress(address);
} else if (mode.equalsIgnoreCase("Master_Slave")) {
- config.useMasterSlaveServers().setMasterAddress(masterAddress).addSlaveAddress(address);
+ redissonConfig.useMasterSlaveServers().setMasterAddress(masterAddress).addSlaveAddress(address);
} else if (mode.equalsIgnoreCase("Sentinel")) {
- config.useSentinelServers().addSentinelAddress(masterName).addSentinelAddress(address);
+ redissonConfig.useSentinelServers().addSentinelAddress(masterName).addSentinelAddress(address);
} else if (mode.equalsIgnoreCase("Cluster")) {
- config.useClusterServers().setScanInterval(scanInterval).addNodeAddress(address);
+ redissonConfig.useClusterServers().setScanInterval(scanInterval).addNodeAddress(address);
}
- redissonClient = Redisson.create(config);
+ redissonClient = Redisson.create(redissonConfig);
}
@Deactivate
@@ -85,13 +92,15 @@ public class RedisAppender implements EventHandler {
@Override
public void handleEvent(Event event) {
- Map<String, Object> redisMap = redissonClient.getMap(this.map);
- for (String name : event.getPropertyNames()) {
- redisMap.put(name, event.getProperty(name));
+ if (EventFilter.match(event, config)) {
+ Map<String, Object> redisMap = redissonClient.getMap(getValue(config, MAP_PROPERTY, MAP_DEFAULT));
+ for (String name : event.getPropertyNames()) {
+ redisMap.put(name, event.getProperty(name));
+ }
}
}
- private String getProperty(Dictionary<String, Object> properties, String key, String defaultValue) {
+ private String getValue(Dictionary<String, Object> properties, String key, String defaultValue) {
return (properties.get(key) != null) ? (String) properties.get(key) : defaultValue;
}
diff --git a/appender/rest/pom.xml b/appender/rest/pom.xml
index 85edd19..851d616 100644
--- a/appender/rest/pom.xml
+++ b/appender/rest/pom.xml
@@ -39,6 +39,10 @@
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.karaf.decanter.marshaller</groupId>
<artifactId>org.apache.karaf.decanter.marshaller.json</artifactId>
<scope>test</scope>
@@ -53,6 +57,25 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>*</Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.appender.rest,
+ org.apache.karaf.decanter.appender.utils
+ </Private-Package>
+ <_dsanntations>*</_dsanntations>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
diff --git a/appender/rest/src/main/java/org/apache/karaf/decanter/appender/rest/RestAppender.java b/appender/rest/src/main/java/org/apache/karaf/decanter/appender/rest/RestAppender.java
index b14425f..5762465 100644
--- a/appender/rest/src/main/java/org/apache/karaf/decanter/appender/rest/RestAppender.java
+++ b/appender/rest/src/main/java/org/apache/karaf/decanter/appender/rest/RestAppender.java
@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
import java.util.Dictionary;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -44,6 +45,8 @@ import org.slf4j.LoggerFactory;
)
public class RestAppender implements EventHandler {
+ public static String URI_PROPERTY = "uri";
+
@Reference
public Marshaller marshaller;
@@ -51,6 +54,8 @@ public class RestAppender implements EventHandler {
private URI uri;
+ private Dictionary<String, Object> config;
+
@Activate
@SuppressWarnings("unchecked")
public void activate(ComponentContext context) throws URISyntaxException {
@@ -59,7 +64,8 @@ public class RestAppender implements EventHandler {
}
void activate(Dictionary<String, Object> config) throws URISyntaxException {
- uri = new URI(getMandatoryValue(config, "uri"));
+ this.config = config;
+ uri = new URI(getMandatoryValue(config, URI_PROPERTY));
}
private String getMandatoryValue(Dictionary<String, Object> config, String key) {
@@ -73,21 +79,23 @@ public class RestAppender implements EventHandler {
@Override
public void handleEvent(Event event) {
- try {
- HttpURLConnection connection = (HttpURLConnection)uri.toURL().openConnection();
- connection.setDoOutput(true);
- connection.setInstanceFollowRedirects(false);
- connection.setRequestMethod("POST");
- connection.setRequestProperty("Content-Type", "application/json");
- connection.setRequestProperty("charset", "utf-8");
- OutputStream out = connection.getOutputStream();
- marshaller.marshal(event, out);
- out.close();
- InputStream is = connection.getInputStream();
- is.read();
- is.close();
- } catch (Exception e) {
- LOGGER.warn("Error sending event to rest service", e);
+ if (EventFilter.match(event, config)) {
+ try {
+ HttpURLConnection connection = (HttpURLConnection) uri.toURL().openConnection();
+ connection.setDoOutput(true);
+ connection.setInstanceFollowRedirects(false);
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "application/json");
+ connection.setRequestProperty("charset", "utf-8");
+ OutputStream out = connection.getOutputStream();
+ marshaller.marshal(event, out);
+ out.close();
+ InputStream is = connection.getInputStream();
+ is.read();
+ is.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error sending event to rest service", e);
+ }
}
}
diff --git a/appender/socket/pom.xml b/appender/socket/pom.xml
index 7f75b2c..41a8565 100644
--- a/appender/socket/pom.xml
+++ b/appender/socket/pom.xml
@@ -38,11 +38,34 @@
<groupId>org.apache.karaf.decanter</groupId>
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
<plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>*</Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.appender.socket,
+ org.apache.karaf.decanter.appender.utils
+ </Private-Package>
+ <_dsannotations>*</_dsannotations>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
diff --git a/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java b/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java
index 1016438..fb5d0f7 100644
--- a/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java
+++ b/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java
@@ -17,6 +17,7 @@
package org.apache.karaf.decanter.appender.socket;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.*;
import org.osgi.service.event.Event;
@@ -37,44 +38,49 @@ import java.util.Dictionary;
)
public class SocketAppender implements EventHandler {
+ public static String HOST_PROPERTY = "host";
+ public static String PORT_PROPERTY = "port";
+
+ public static String HOST_DEFAULT = "localhost";
+ public static String PORT_DEFAULT = "34343";
+
@Reference
public Marshaller marshaller;
private final static Logger LOGGER = LoggerFactory.getLogger(SocketAppender.class);
- private String host;
- private int port;
-
+ private Dictionary<String, Object> config;
@Activate
public void activate(ComponentContext componentContext) throws Exception {
- Dictionary<String, Object> config = componentContext.getProperties();
- host = getValue(config, "host", "localhost");
- String portSt = getValue(config, "port", "34343");
- port = Integer.parseInt(portSt);
+ this.config = componentContext.getProperties();
}
@Override
public void handleEvent(Event event) {
- Socket socket = null;
- PrintWriter writer = null;
- try {
- socket = new Socket(host, port);
- String data = marshaller.marshal(event);
- writer = new PrintWriter(socket.getOutputStream(), true);
- writer.println(data);
- } catch (Exception e) {
- LOGGER.warn("Error sending data on the socket", e);
- } finally {
- if (writer != null) {
- writer.flush();
- writer.close();
- }
- if (socket != null) {
- try {
- socket.close();
- } catch (Exception e) {
- // nothing to do
+ if (EventFilter.match(event, config)) {
+ Socket socket = null;
+ PrintWriter writer = null;
+ try {
+ socket = new Socket(
+ getValue(config, HOST_PROPERTY, HOST_DEFAULT),
+ Integer.parseInt(getValue(config, PORT_PROPERTY, PORT_DEFAULT)));
+ String data = marshaller.marshal(event);
+ writer = new PrintWriter(socket.getOutputStream(), true);
+ writer.println(data);
+ } catch (Exception e) {
+ LOGGER.warn("Error sending data on the socket", e);
+ } finally {
+ if (writer != null) {
+ writer.flush();
+ writer.close();
+ }
+ if (socket != null) {
+ try {
+ socket.close();
+ } catch (Exception e) {
+ // nothing to do
+ }
}
}
}
diff --git a/appender/timescaledb/pom.xml b/appender/timescaledb/pom.xml
index 7738326..1fd90a9 100644
--- a/appender/timescaledb/pom.xml
+++ b/appender/timescaledb/pom.xml
@@ -42,11 +42,34 @@
<groupId>org.apache.karaf.decanter</groupId>
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
<plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>*</Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.appender.timescaledb,
+ org.apache.karaf.decanter.appender.utils
+ </Private-Package>
+ <_dsannotations>*</_dsannotations>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
diff --git a/appender/timescaledb/src/main/java/org/apache/karaf/decanter/appender/timescaledb/TimescaleDbAppender.java b/appender/timescaledb/src/main/java/org/apache/karaf/decanter/appender/timescaledb/TimescaleDbAppender.java
index ef95c25..c9ef9f3 100644
--- a/appender/timescaledb/src/main/java/org/apache/karaf/decanter/appender/timescaledb/TimescaleDbAppender.java
+++ b/appender/timescaledb/src/main/java/org/apache/karaf/decanter/appender/timescaledb/TimescaleDbAppender.java
@@ -23,6 +23,7 @@ import java.sql.Statement;
import java.util.Dictionary;
import javax.sql.DataSource;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -40,6 +41,10 @@ import org.slf4j.LoggerFactory;
)
public class TimescaleDbAppender implements EventHandler {
+ public static String TABLE_NAME_PROPERTY = "table.name";
+
+ public static String TABLE_NAME_DEFAULT = "decanter";
+
@Reference
public Marshaller marshaller;
@@ -58,7 +63,7 @@ public class TimescaleDbAppender implements EventHandler {
private final static String insertQueryTemplate =
"INSERT INTO TABLENAME(timestamp, content) VALUES(?,?)";
- String tableName;
+ private Dictionary<String, Object> config;
@SuppressWarnings("unchecked")
@Activate
@@ -67,7 +72,8 @@ public class TimescaleDbAppender implements EventHandler {
}
public void open(Dictionary<String, Object> config) {
- this.tableName = getValue(config, "table.name", "decanter");
+ this.config = config;
+ String tableName = getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT);
try (Connection connection = dataSource.getConnection()) {
createStructure(connection);
} catch (Exception e) {
@@ -82,25 +88,29 @@ public class TimescaleDbAppender implements EventHandler {
@Override
public void handleEvent(Event event) {
- try (Connection connection = dataSource.getConnection()) {
- String jsonSt = marshaller.marshal(event);
- String insertQuery = insertQueryTemplate.replaceAll("TABLENAME", tableName);
- Long timestamp = (Long)event.getProperty(EventConstants.TIMESTAMP);
- if (timestamp == null) {
- timestamp = System.currentTimeMillis();
+ if (EventFilter.match(event, config)) {
+ try (Connection connection = dataSource.getConnection()) {
+ String tableName = getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT);
+ String jsonSt = marshaller.marshal(event);
+ String insertQuery = insertQueryTemplate.replaceAll("TABLENAME", tableName);
+ Long timestamp = (Long) event.getProperty(EventConstants.TIMESTAMP);
+ if (timestamp == null) {
+ timestamp = System.currentTimeMillis();
+ }
+ try (PreparedStatement insertStatement = connection.prepareStatement(insertQuery)) {
+ insertStatement.setLong(1, timestamp);
+ insertStatement.setString(2, jsonSt);
+ insertStatement.executeUpdate();
+ LOGGER.trace("Data inserted into {} table", tableName);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Can't store in the database", e);
}
- try (PreparedStatement insertStatement = connection.prepareStatement(insertQuery)) {
- insertStatement.setLong(1, timestamp);
- insertStatement.setString(2, jsonSt);
- insertStatement.executeUpdate();
- LOGGER.trace("Data inserted into {} table", tableName);
- }
- } catch (Exception e) {
- LOGGER.error("Can't store in the database", e);
}
}
private void createStructure(Connection connection) {
+ String tableName = getValue(config, TABLE_NAME_PROPERTY, TABLE_NAME_DEFAULT);
String createTemplate = createTableQueryTemplate;
String createTableQuery = createTemplate.replaceAll("TABLENAME", tableName);
diff --git a/appender/pom.xml b/appender/utils/pom.xml
similarity index 55%
copy from appender/pom.xml
copy to appender/utils/pom.xml
index 3c9605b..854fbdd 100644
--- a/appender/pom.xml
+++ b/appender/utils/pom.xml
@@ -22,38 +22,23 @@
<modelVersion>4.0.0</modelVersion>
<parent>
- <groupId>org.apache.karaf</groupId>
- <artifactId>decanter</artifactId>
+ <groupId>org.apache.karaf.decanter</groupId>
+ <artifactId>appender</artifactId>
<version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <groupId>org.apache.karaf.decanter</groupId>
- <artifactId>appender</artifactId>
- <packaging>pom</packaging>
- <name>Apache Karaf :: Decanter :: Appender</name>
-
- <modules>
- <module>camel</module>
- <module>cassandra</module>
- <module>dropwizard</module>
- <module>elasticsearch-native-1.x</module>
- <module>elasticsearch-native-2.x</module>
- <module>elasticsearch-jest</module>
- <module>elasticsearch-rest</module>
- <module>file</module>
- <module>jdbc</module>
- <module>jms</module>
- <module>kafka</module>
- <module>log</module>
- <module>orientdb</module>
- <module>mongodb</module>
- <module>mqtt</module>
- <module>redis</module>
- <module>rest</module>
- <module>socket</module>
- <module>timescaledb</module>
- <module>websocket-servlet</module>
- </modules>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache Karaf :: Decanter :: Appender :: Utils</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>osgi.cmpn</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
diff --git a/appender/utils/src/main/java/org/apache/karaf/decanter/appender/utils/EventFilter.java b/appender/utils/src/main/java/org/apache/karaf/decanter/appender/utils/EventFilter.java
new file mode 100644
index 0000000..8f5efa0
--- /dev/null
+++ b/appender/utils/src/main/java/org/apache/karaf/decanter/appender/utils/EventFilter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.utils;
+
+import org.osgi.service.event.Event;
+
+import java.util.Dictionary;
+
+public class EventFilter {
+
+ public static String PROPERTY_NAME_EXCLUDE_CONFIG = "event.property.name.exclude";
+ public static String PROPERTY_NAME_INCLUDE_CONFIG = "event.property.name.include";
+ public static String PROPERTY_VALUE_EXCLUDE_CONFIG = "event.property.value.exclude";
+ public static String PROPERTY_VALUE_INCLUDE_CONFIG = "event.property.value.include";
+
+ public static boolean match(Event event, Dictionary<String, Object> config) {
+ if (config == null) {
+ return true;
+ }
+
+ String nameExcludeRegex = (config.get(PROPERTY_NAME_EXCLUDE_CONFIG) != null) ? (String) config.get(PROPERTY_NAME_EXCLUDE_CONFIG) : null;
+ String nameIncludeRegex = (config.get(PROPERTY_NAME_INCLUDE_CONFIG) != null) ? (String) config.get(PROPERTY_NAME_INCLUDE_CONFIG) : null;
+ String valueExcludeRegex = (config.get(PROPERTY_VALUE_EXCLUDE_CONFIG) != null) ? (String) config.get(PROPERTY_VALUE_EXCLUDE_CONFIG) : null;
+ String valueIncludeRegex = (config.get(PROPERTY_VALUE_INCLUDE_CONFIG) != null) ? (String) config.get(PROPERTY_VALUE_INCLUDE_CONFIG) : null;
+
+ for (String name : event.getPropertyNames()) {
+ if (nameExcludeRegex != null && name.matches(nameExcludeRegex)) {
+ return false;
+ }
+
+ if (nameIncludeRegex != null && name.matches(nameIncludeRegex)) {
+ return true;
+ }
+
+ if (event.getProperty(name) != null && event.getProperty(name) instanceof String) {
+ if (valueExcludeRegex != null && ((String) event.getProperty(name)).matches(valueExcludeRegex)) {
+ return false;
+ }
+ if (valueIncludeRegex != null && ((String) event.getProperty(name)).matches(valueIncludeRegex)) {
+ return true;
+ }
+ }
+
+ }
+ return true;
+ }
+
+}
diff --git a/appender/utils/src/test/java/org/apache/karaf/decanter/appender/utils/EventFilterTest.java b/appender/utils/src/test/java/org/apache/karaf/decanter/appender/utils/EventFilterTest.java
new file mode 100644
index 0000000..f564cee
--- /dev/null
+++ b/appender/utils/src/test/java/org/apache/karaf/decanter/appender/utils/EventFilterTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+public class EventFilterTest {
+
+ @Test
+ public void noFilter() {
+ // no config
+ Assert.assertTrue(EventFilter.match(prepareTestEvent(), null));
+ // no filter in the config
+ Dictionary<String, Object> config = new Hashtable<>();
+ Assert.assertTrue(EventFilter.match(prepareTestEvent(), config));
+ }
+
+ @Test
+ public void propertyNameFilter() {
+ Dictionary<String, Object> config = new Hashtable<>();
+ config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, "key.*");
+ // exclude
+ Assert.assertFalse(EventFilter.match(prepareTestEvent(), config));
+ // exclude first
+ config.put(EventFilter.PROPERTY_NAME_INCLUDE_CONFIG, "other");
+ Assert.assertFalse(EventFilter.match(prepareTestEvent(), config));
+ // include
+ config.remove(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG);
+ Assert.assertTrue(EventFilter.match(prepareTestEvent(), config));
+ }
+
+ @Test
+ public void propertyValueFilter() {
+ Dictionary<String, Object> config = new Hashtable<>();
+ config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, "value.*");
+ // exclude
+ Assert.assertFalse(EventFilter.match(prepareTestEvent(), config));
+ // exclude first
+ config.put(EventFilter.PROPERTY_VALUE_INCLUDE_CONFIG, "other");
+ Assert.assertFalse(EventFilter.match(prepareTestEvent(), config));
+ // include
+ config.remove(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG);
+ Assert.assertTrue(EventFilter.match(prepareTestEvent(), config));
+ }
+
+ private Event prepareTestEvent() {
+ Map<String, Object> map = new HashMap<>();
+ map.put("key1", "value1");
+ map.put("key2", "value2");
+ map.put("other", "other");
+ return new Event("test", map);
+ }
+
+}
diff --git a/appender/websocket-servlet/pom.xml b/appender/websocket-servlet/pom.xml
index cf05680..10af852 100644
--- a/appender/websocket-servlet/pom.xml
+++ b/appender/websocket-servlet/pom.xml
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
@@ -44,6 +45,10 @@
<groupId>org.apache.karaf.decanter</groupId>
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ </dependency>
</dependencies>
<build>
@@ -51,38 +56,45 @@
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
<configuration>
+ <obrRepository>NONE</obrRepository>
<instructions>
+ <Export-Package>!*</Export-Package>
<Import-Package>
org.osgi.service.component,
org.osgi.service.event,
*
</Import-Package>
<Private-Package>
- org.apache.karaf.decanter.appender.websocket
+ org.apache.karaf.decanter.appender.websocket,
+ org.apache.karaf.decanter.appender.utils
</Private-Package>
+ <_dsannotations>*</_dsannotations>
</instructions>
</configuration>
- </plugin> <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>attach-artifact</goal>
- </goals>
- <configuration>
- <artifacts>
- <artifact>
- <file>src/main/cfg/org.apache.karaf.decanter.appender.websocket.servlet.cfg</file>
- <type>cfg</type>
- </artifact>
- </artifacts>
- </configuration>
- </execution>
- </executions>
- </plugin>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>src/main/cfg/org.apache.karaf.decanter.appender.websocket.servlet.cfg</file>
+ <type>cfg</type>
+ </artifact>
+ </artifacts>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
diff --git a/appender/websocket-servlet/src/main/java/org/apache/karaf/decanter/appender/websocket/DecanterWebSocketAppender.java b/appender/websocket-servlet/src/main/java/org/apache/karaf/decanter/appender/websocket/DecanterWebSocketAppender.java
index 195ed83..79cdd4a 100644
--- a/appender/websocket-servlet/src/main/java/org/apache/karaf/decanter/appender/websocket/DecanterWebSocketAppender.java
+++ b/appender/websocket-servlet/src/main/java/org/apache/karaf/decanter/appender/websocket/DecanterWebSocketAppender.java
@@ -17,6 +17,7 @@
package org.apache.karaf.decanter.appender.websocket;
import org.apache.karaf.decanter.api.marshaller.Marshaller;
+import org.apache.karaf.decanter.appender.utils.EventFilter;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
@@ -34,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
+import java.util.Dictionary;
import java.util.HashSet;
import java.util.Set;
@@ -46,10 +48,13 @@ import java.util.Set;
@WebSocket
public class DecanterWebSocketAppender implements EventHandler {
+ public static String ALIAS_PROPERTY = "servlet.alias";
+
+ public static String ALIAS_DEFAULT = "/decanter-websocket";
+
private static final Logger LOG = LoggerFactory.getLogger(DecanterWebSocketAppender.class);
private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<Session>());
- private String alias;
@Reference
private Marshaller marshaller;
@@ -57,6 +62,8 @@ public class DecanterWebSocketAppender implements EventHandler {
@Reference
private HttpService httpService;
+ private Dictionary<String, Object> config;
+
@OnWebSocketConnect
public void onOpen(Session session) {
session.setIdleTimeout(-1);
@@ -70,27 +77,34 @@ public class DecanterWebSocketAppender implements EventHandler {
@Activate
public void activate(ComponentContext componentContext) throws Exception {
- alias = (String) componentContext.getProperties().get("servlet.alias");
+ this.config = componentContext.getProperties();
+ String alias = (String) config.get(ALIAS_PROPERTY);
if (alias == null) {
- alias = "/decanter-websocket";
+ alias = ALIAS_DEFAULT;
}
httpService.registerServlet(alias, new DecanterWebSocketServlet(), null, null);
}
@Deactivate
public void deactivate() throws Exception {
+ String alias = (String) config.get(ALIAS_PROPERTY);
+ if (alias == null) {
+ alias = ALIAS_DEFAULT;
+ }
httpService.unregister(alias);
}
@Override
public void handleEvent(Event event) {
- String message = marshaller.marshal(event);
- synchronized (sessions) {
- for (Session session : sessions) {
- try {
- session.getRemote().sendString(message);
- } catch (Exception e) {
- LOG.warn("Can't publish to remote websocket endpoint", e);
+ if (EventFilter.match(event, config)) {
+ String message = marshaller.marshal(event);
+ synchronized (sessions) {
+ for (Session session : sessions) {
+ try {
+ session.getRemote().sendString(message);
+ } catch (Exception e) {
+ LOG.warn("Can't publish to remote websocket endpoint", e);
+ }
}
}
}
diff --git a/pom.xml b/pom.xml
index 63e4edb..ac76a82 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,7 +52,7 @@
<elasticsearch6.bundle.version>6.2.4_1</elasticsearch6.bundle.version>
<glassfish-json.version>1.0.4</glassfish-json.version>
<json-api.version>1.0</json-api.version>
- <kafka.version>1.1.1</kafka.version>
+ <kafka.version>2.1.0</kafka.version>
<karaf.version>4.1.4</karaf.version>
<kibana.version>3.1.1</kibana.version>
<kibana4.version>4.1.2</kibana4.version>
@@ -295,6 +295,11 @@
<artifactId>org.apache.karaf.decanter.collector.utils</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.karaf.decanter.appender</groupId>
+ <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- OSGi -->
<dependency>