You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2020/04/23 12:47:23 UTC

[karaf-decanter] branch master updated: [KARAF-6646] Add elasticsearch collector

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git


The following commit(s) were added to refs/heads/master by this push:
     new 15a57a4  [KARAF-6646] Add elasticsearch collector
     new 6a38d35  Merge pull request #160 from jbonofre/KARAF-6646
15a57a4 is described below

commit 15a57a4f30fb9cebb2d0655f2b1e8892616cd82e
Author: jbonofre <jb...@apache.org>
AuthorDate: Thu Apr 23 06:25:36 2020 +0200

    [KARAF-6646] Add elasticsearch collector
---
 assembly/src/main/feature/feature.xml              |  11 ++
 collector/elasticsearch/pom.xml                    | 154 ++++++++++++++++
 ...ache.karaf.decanter.collector.elasticsearch.cfg |  44 +++++
 .../elasticsearch/ElasticsearchCollector.java      | 194 +++++++++++++++++++++
 .../elasticsearch/ElasticsearchCollectorTest.java  | 148 ++++++++++++++++
 collector/pom.xml                                  |   1 +
 .../src/main/asciidoc/user-guide/collectors.adoc   |  44 +++++
 7 files changed, 596 insertions(+)

diff --git a/assembly/src/main/feature/feature.xml b/assembly/src/main/feature/feature.xml
index 549a7d3..5f0a206 100644
--- a/assembly/src/main/feature/feature.xml
+++ b/assembly/src/main/feature/feature.xml
@@ -234,6 +234,17 @@ org.apache.felix.eventadmin.IgnoreTimeout=org.apache.karaf.decanter.
         <feature>decanter-collector-redis-core</feature>
     </feature>
 
+    <feature name="decanter-collector-elasticsearch-core" version="${project.version}" description="Karaf Decanter Elasticsearch Collector core">
+        <feature>decanter-common</feature>
+        <feature>scheduler</feature>
+        <bundle>mvn:org.apache.karaf.decanter.collector/org.apache.karaf.decanter.collector.elasticsearch/${project.version}</bundle>
+    </feature>
+
+    <feature name="decanter-collector-elasticsearch" version="${project.version}" description="Karaf Decanter Elasticsearch Collector">
+        <configfile finalname="/etc/org.apache.karaf.decanter.collector.elasticsearch.cfg">mvn:org.apache.karaf.decanter.collector/org.apache.karaf.decanter.collector.elasticsearch/${project.version}/cfg</configfile>
+        <feature>decanter-collector-elasticsearch-core</feature>
+    </feature>
+
     <feature name="decanter-appender-log" version="${project.version}" description="Karaf Decanter Log Appender">
         <feature>decanter-common</feature>
         <configfile finalname="/etc/org.apache.karaf.decanter.appender.log.cfg">mvn:org.apache.karaf.decanter.appender/org.apache.karaf.decanter.appender.log/${project.version}/cfg</configfile>
diff --git a/collector/elasticsearch/pom.xml b/collector/elasticsearch/pom.xml
new file mode 100644
index 0000000..d519397
--- /dev/null
+++ b/collector/elasticsearch/pom.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <!--
+
+        Licensed to the Apache Software Foundation (ASF) under one or more
+        contributor license agreements.  See the NOTICE file distributed with
+        this work for additional information regarding copyright ownership.
+        The ASF licenses this file to You under the Apache License, Version 2.0
+        (the "License"); you may not use this file except in compliance with
+        the License.  You may obtain a copy of the License at
+
+           http://www.apache.org/licenses/LICENSE-2.0
+
+        Unless required by applicable law or agreed to in writing, software
+        distributed under the License is distributed on an "AS IS" BASIS,
+        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+        See the License for the specific language governing permissions and
+        limitations under the License.
+    -->
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.karaf.decanter</groupId>
+        <artifactId>collector</artifactId>
+        <version>2.4.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <groupId>org.apache.karaf.decanter.collector</groupId>
+    <artifactId>org.apache.karaf.decanter.collector.elasticsearch</artifactId>
+    <packaging>bundle</packaging>
+    <name>Apache Karaf :: Decanter :: Collector :: Elasticsearch</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-client</artifactId>
+            <version>7.6.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-high-level-client</artifactId>
+            <version>7.6.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.karaf.decanter.collector</groupId>
+            <artifactId>org.apache.karaf.decanter.collector.utils</artifactId>
+        </dependency>
+
+        <!-- test -->
+        <dependency>
+            <groupId>org.elasticsearch.test</groupId>
+            <artifactId>framework</artifactId>
+            <version>7.6.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch.plugin</groupId>
+            <artifactId>transport-netty4-client</artifactId>
+            <version>7.6.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>2.12.1</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    <file>src/main/cfg/org.apache.karaf.decanter.collector.elasticsearch.cfg</file>
+                                    <type>cfg</type>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>
+                            !com.carrotsearch*,
+                            !com.fasterxml.jackson*,
+                            !com.github.mustachejava*,
+                            !com.sun.jna*,
+                            !com.sun.management*,
+                            !com.tdunning.math.stats,
+                            !joptsimple*,
+                            !org.apache.lucene*,
+                            !com.google.common*,
+                            !org.joda.time*,
+                            !org.joda.convert,
+                            !org.locationtech*,
+                            !org.tartarus.snowball*,
+                            !org.yaml.snakeyaml*,
+                            org.apache.log;resolution:=optional,
+                            javax.servlet;resolution:=optional,
+                            jdk.net;resolution:=optional,
+                            org.apache.logging*;resolution:=optional,
+                            *
+                        </Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.collector.elasticsearch,
+                            org.apache.karaf.decanter.collector.utils,
+                            com.fasterxml.jackson*;-split-package:=merge-first,
+                            com.carrotsearch*;-split-package:=merge-first,
+                            org.apache.http*;-split-package:=merge-first,
+                            org.apache.commons*;-split-package:=merge-first,
+                            com.github.mustachejava*;-split-package:=merge-first,
+                            com.sun.jna*;-split-package:=merge-first,
+                            com.sun.management*;-split-package:=merge-first,
+                            com.tdunning.math.stats;-split-package:=merge-first,
+                            joptsimple*;-split-package:=merge-first,
+                            org.HdrHistogram;-split-package:=merge-first,
+                            org.apache.lucene*;-split-package:=merge-first,
+                            com.google.common*;-split-package:=merge-first,
+                            org.joda.time*;-split-package:=merge-first,
+                            org.joda.convert;-split-package:=merge-first,
+                            org.locationtech*;-split-package:=merge-first,
+                            org.tartarus.snowball*;-split-package:=merge-first,
+                            org.yaml.snakeyaml*;-split-package:=merge-first,
+                            org.elasticsearch*;-split-package:=merge-first
+                        </Private-Package>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/collector/elasticsearch/src/main/cfg/org.apache.karaf.decanter.collector.elasticsearch.cfg b/collector/elasticsearch/src/main/cfg/org.apache.karaf.decanter.collector.elasticsearch.cfg
new file mode 100644
index 0000000..82e8e29
--- /dev/null
+++ b/collector/elasticsearch/src/main/cfg/org.apache.karaf.decanter.collector.elasticsearch.cfg
@@ -0,0 +1,44 @@
+################################################################################
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+################################################################################
+
+################################################
+# Decanter Elasticsearch Appender Configuration
+################################################
+
+# HTTP address of the elasticsearch nodes (separated with comma)
+addresses=http://localhost:9200
+
+# Basic username and password authentication (no authentication by default)
+#username=user
+#password=password
+
+# Name of the index to request (decanter by default)
+#index=decanter
+
+# Query to request document (match all by default)
+#query=
+
+# Starting point for the document query (no from by default)
+#from=
+
+# Max number of documents retrieved (no max by default)
+#max=
+
+# Search timeout, in seconds (no timeout by default)
+#timeout=
\ No newline at end of file
diff --git a/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java b/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java
new file mode 100644
index 0000000..03cad36
--- /dev/null
+++ b/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.collector.elasticsearch;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.karaf.decanter.collector.utils.PropertiesPreparator;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@Component(
+        name = "org.apache.karaf.decanter.collector.elasticsearch",
+        immediate = true,
+        property = { "decanter.collector.name=elasticsearch",
+                "scheduler.period:Long=60",
+                "scheduler.concurrent:Boolean=false",
+                "scheduler.name=decanter-collector-elasticsearch"}
+)
+public class ElasticsearchCollector implements Runnable {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchCollector.class);
+
+    @Reference
+    private EventAdmin dispatcher;
+
+    private Dictionary<String, Object> configuration;
+    private RestHighLevelClient restClient;
+
+    @Activate
+    public void activate(ComponentContext componentContext) {
+        activate(componentContext.getProperties());
+    }
+
+    public void activate(Dictionary<String, Object> configuration) {
+        this.configuration = configuration;
+        String addressesString = (configuration.get("addresses") != null) ? configuration.get("addresses").toString() : "http://localhost:9200";
+        String username = (configuration.get("username") != null) ? configuration.get("username").toString() : null;
+        String password = (configuration.get("password") != null) ? configuration.get("password").toString() : null;
+
+        Set<String> addresses = new HashSet<>(Arrays.asList(addressesString.split(",")));
+
+        HttpHost[] hosts = new HttpHost[addresses.size()];
+        int i = 0;
+        for (String address : addresses) {
+            try {
+                URL url = new URL(address);
+                hosts[i] = new HttpHost(url.getHost(), url.getPort(), url.getProtocol());
+                i++;
+            } catch (Exception e) {
+                LOGGER.warn("Bad elasticsearch address {}", address, e);
+            }
+        }
+        RestClientBuilder restClientBuilder = RestClient.builder(hosts);
+
+        restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(1000)
+                .setSocketTimeout(10000));
+
+        if (username != null) {
+            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+            restClientBuilder.setHttpClientConfigCallback(
+                    new RestClientBuilder.HttpClientConfigCallback() {
+                        @Override
+                        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
+                            return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+                        }
+                    }
+            );
+        }
+
+        restClient = new RestHighLevelClient(restClientBuilder);
+    }
+
+    @Deactivate
+    public void deactivate() {
+        try {
+            restClient.close();
+        } catch (Exception e) {
+            LOGGER.warn("Warning when closing elasticsearch client", e);
+        }
+    }
+
+    @Override
+    public void run() {
+        SearchRequest searchRequest = new SearchRequest();
+
+        String index = (configuration.get("index") != null) ? configuration.get("index").toString() : "decanter";
+        searchRequest.indices(index);
+
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+
+        String query = (configuration.get("query") != null) ? configuration.get("query").toString() : null;
+        QueryBuilder queryBuilder;
+        if (query == null) {
+            queryBuilder = QueryBuilders.matchAllQuery();
+        } else {
+            queryBuilder = QueryBuilders.queryStringQuery(query);
+        }
+        searchSourceBuilder.query(queryBuilder);
+        String fromString = (configuration.get("from") != null) ? configuration.get("from").toString() : null;
+        if (fromString != null) {
+            int from = Integer.parseInt(fromString);
+            searchSourceBuilder.from(from);
+        }
+        String sizeString = (configuration.get("size") != null) ? configuration.get("size").toString() : null;
+        if (sizeString != null) {
+            int size = Integer.parseInt(sizeString);
+            searchSourceBuilder.size(size);
+        }
+        String timeoutString = (configuration.get("timeout") != null) ? configuration.get("timeout").toString() : null;
+        if (timeoutString != null) {
+            int timeout = Integer.parseInt(timeoutString);
+            searchSourceBuilder.timeout(new TimeValue(timeout, TimeUnit.SECONDS));
+        }
+        searchRequest.source(searchSourceBuilder);
+
+        Map<String, Object> data = new HashMap<>();
+        data.put("type", "elasticsearch");
+        try {
+            SearchResponse searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT);
+            RestStatus status = searchResponse.status();
+            TimeValue took = searchResponse.getTook();
+            Boolean terminatedEarly = searchResponse.isTerminatedEarly();
+            boolean timedOut = searchResponse.isTimedOut();
+            int totalShards = searchResponse.getTotalShards();
+            int successfulShards = searchResponse.getSuccessfulShards();
+            int failedShards = searchResponse.getFailedShards();
+            SearchHits hits = searchResponse.getHits();
+            for (SearchHit hit : hits) {
+                Map<String, Object> sourceAsMap = hit.getSourceAsMap();
+                data.putAll(sourceAsMap);
+            }
+        } catch (Exception e) {
+            LOGGER.error("Can't query elasticsearch", e);
+        }
+        try {
+            PropertiesPreparator.prepare(data, configuration);
+        } catch (Exception e) {
+            LOGGER.warn("Can't prepare event", e);
+        }
+
+        dispatcher.postEvent(new Event("decanter/collect/elasticsearch", data));
+    }
+
+    /**
+     * Visible for testing
+     */
+    public void setDispatcher(EventAdmin dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+}
diff --git a/collector/elasticsearch/src/test/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollectorTest.java b/collector/elasticsearch/src/test/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollectorTest.java
new file mode 100644
index 0000000..010cecf
--- /dev/null
+++ b/collector/elasticsearch/src/test/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollectorTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.collector.elasticsearch;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.node.MockNode;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.transport.Netty4Plugin;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+import java.util.*;
+
+@RunWith(com.carrotsearch.randomizedtesting.RandomizedRunner.class)
+@ThreadLeakScope(value = ThreadLeakScope.Scope.NONE)
+public class ElasticsearchCollectorTest {
+
+    private static final String CLUSTER_NAME = "elasticsearch-test";
+    private static final String HOST = "127.0.0.1";
+    private static final int HTTP_PORT = 9201;
+    private static final int TRANSPORT_PORT = 9301;
+
+    private Node node;
+
+    @Before
+    public void setup() throws Exception {
+        Collection plugins = Arrays.asList(Netty4Plugin.class);
+        Settings settings = Settings.builder()
+                .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), CLUSTER_NAME)
+                .put(Node.NODE_NAME_SETTING.getKey(), "test")
+                .put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
+                .put(Environment.PATH_HOME_SETTING.getKey(), "target/data")
+                .put(Environment.PATH_DATA_SETTING.getKey(), "target/data")
+                .put("network.host", HOST)
+                .put("http.port", HTTP_PORT)
+                .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
+                .put("transport.port", TRANSPORT_PORT)
+                .build();
+        node = new MockNode(settings, plugins);
+        node.start();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        node.close();
+    }
+
+    @Test(timeout = 60000L)
+    public void testAll() throws Exception {
+        HttpHost host = new HttpHost(HOST, HTTP_PORT, "http");
+        RestClient client = RestClient.builder(new HttpHost[]{ host }).build();
+        HttpEntity entity = new NStringEntity("{\"foo\":\"bar\"}", ContentType.APPLICATION_JSON);
+        Request request = new Request("POST", "/test/_doc/");
+        request.setEntity(entity);
+        client.performRequest(request);
+
+        MockDispatcher dispatcher = new MockDispatcher();
+        ElasticsearchCollector collector = new ElasticsearchCollector();
+        collector.setDispatcher(dispatcher);
+        Hashtable<String, Object> configuration = new Hashtable<>();
+        configuration.put("addresses", "http://localhost:" + HTTP_PORT);
+        configuration.put("index", "test");
+        collector.activate(configuration);
+
+        collector.run();
+
+        Assert.assertEquals(1, dispatcher.postedEvents.size());
+        Assert.assertEquals("elasticsearch", dispatcher.postedEvents.get(0).getProperty("type"));
+        Assert.assertEquals("decanter/collect/elasticsearch", dispatcher.postedEvents.get(0).getTopic());
+    }
+
+    @Test(timeout = 60000L)
+    public void testQuery() throws Exception {
+        HttpHost host = new HttpHost(HOST, HTTP_PORT, "http");
+        RestClient client = RestClient.builder(new HttpHost[]{ host }).build();
+        HttpEntity entity = new NStringEntity("{\"foo\":\"bar\"}", ContentType.APPLICATION_JSON);
+        Request request = new Request("POST", "/test/_doc/");
+        request.setEntity(entity);
+        client.performRequest(request);
+        entity = new NStringEntity("{\"other\":\"test\"}", ContentType.APPLICATION_JSON);
+        request = new Request("POST", "/test/_doc/");
+        request.setEntity(entity);
+        client.performRequest(request);
+
+        MockDispatcher dispatcher = new MockDispatcher();
+        ElasticsearchCollector collector = new ElasticsearchCollector();
+        collector.setDispatcher(dispatcher);
+        Hashtable<String, Object> configuration = new Hashtable<>();
+        configuration.put("addresses", "http://localhost:" + HTTP_PORT);
+        configuration.put("index", "test");
+        configuration.put("query", "foo:b*");
+        collector.activate(configuration);
+
+        collector.run();
+
+        Assert.assertEquals(1, dispatcher.postedEvents.size());
+        Assert.assertEquals("elasticsearch", dispatcher.postedEvents.get(0).getProperty("type"));
+        Assert.assertEquals("decanter/collect/elasticsearch", dispatcher.postedEvents.get(0).getTopic());
+    }
+
+    class MockDispatcher implements EventAdmin {
+
+        public List<Event> postedEvents = new ArrayList<>();
+        public List<Event> sentEvents = new ArrayList<>();
+
+        @Override
+        public void postEvent(Event event) {
+            postedEvents.add(event);
+        }
+
+        @Override
+        public void sendEvent(Event event) {
+            sentEvents.add(event);
+        }
+
+    }
+
+}
diff --git a/collector/pom.xml b/collector/pom.xml
index 3cbbec4..2255a0a 100644
--- a/collector/pom.xml
+++ b/collector/pom.xml
@@ -38,6 +38,7 @@
         <module>camel</module>
         <module>configadmin</module>
         <module>dropwizard</module>
+        <module>elasticsearch</module>
         <module>eventadmin</module>
         <module>file</module>
         <module>jms</module>
diff --git a/manual/src/main/asciidoc/user-guide/collectors.adoc b/manual/src/main/asciidoc/user-guide/collectors.adoc
index de2c07d..882e2e1 100644
--- a/manual/src/main/asciidoc/user-guide/collectors.adoc
+++ b/manual/src/main/asciidoc/user-guide/collectors.adoc
@@ -999,6 +999,50 @@ map=Decanter
 
 You can configure the Redis connection (depending of the topology) and the key pattern in this configuration file.
 
+==== Elasticsearch
+
+The Decanter Elasticsearch collector retrieves documents from Elasticsearch periodically (scheduled collector).
+By default, it harvests all documents in the given index, but you can also specify a query.
+
+The `decanter-collector-elasticsearch` feature installs the Elasticsearch collector:
+
+----
+karaf@root()> feature:install decanter-collector-elasticsearch
+----
+
+The feature also install `etc/org.apache.karaf.decanter.collector.elasticsearch.cfg` configuration file containing:
+
+----
+# HTTP address of the elasticsearch nodes (separated with comma)
+addresses=http://localhost:9200
+
+# Basic username and password authentication (no authentication by default)
+#username=user
+#password=password
+
+# Name of the index to request (decanter by default)
+#index=decanter
+
+# Query to request document (match all by default)
+#query=
+
+# Starting point for the document query (no from by default)
+#from=
+
+# Max number of documents retrieved (no max by default)
+#max=
+
+# Search timeout, in seconds (no timeout by default)
+#timeout=
+----
+
+* `addresses` property is the location of the Elasticsearch instances. Default is `http://localhost:9200`.
+* `username` and `password` properties are used for authentication. They are `null` (no authentication) by default.
+* `index` property is the Elasticsearch index where to get documents. It's `decanter` by default.
+* `query` property is a search query to use. Default is `null` meaning all documents in the index are harvested.
+* `from` and `max` properties are used to "square" the query. They are `null` by default.
+* `timeout` property limits the query execution. There's no timeout by default.
+
 ==== Customizing properties in collectors
 
 You can add, rename or remove properties collected by the collectors before sending it to the dispatcher.