You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by jb...@apache.org on 2023/02/17 17:13:44 UTC

[karaf-decanter] branch main updated: [KARAF-7649] Upgrade to elasticsearch 8.6.1

This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d5199b6 [KARAF-7649] Upgrade to elasticsearch 8.6.1
     new 3b2f69bb Merge pull request #325 from jbonofre/KARAF-7649
5d5199b6 is described below

commit 5d5199b6258e7f87f75eef07087d171fcd4c5808
Author: Jean-Baptiste Onofré <jb...@apache.org>
AuthorDate: Thu Feb 9 11:09:16 2023 +0100

    [KARAF-7649] Upgrade to elasticsearch 8.6.1
---
 appender/elasticsearch/pom.xml                     |  46 +------
 .../elasticsearch/TestElasticsearchAppender.java   | 110 ---------------
 collector/elasticsearch/pom.xml                    |  41 ++----
 .../elasticsearch/ElasticsearchCollector.java      |  75 ++++++-----
 .../elasticsearch/ElasticsearchCollectorTest.java  | 148 ---------------------
 5 files changed, 52 insertions(+), 368 deletions(-)

diff --git a/appender/elasticsearch/pom.xml b/appender/elasticsearch/pom.xml
index dfb75070..36462293 100644
--- a/appender/elasticsearch/pom.xml
+++ b/appender/elasticsearch/pom.xml
@@ -58,49 +58,9 @@
             <artifactId>org.apache.karaf.decanter.appender.utils</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.elasticsearch.client</groupId>
-            <artifactId>elasticsearch-rest-client</artifactId>
-            <version>7.10.2</version>
-        </dependency>
-
-        <!-- test -->
-        <dependency>
-            <groupId>org.elasticsearch.test</groupId>
-            <artifactId>framework</artifactId>
-            <version>7.10.2</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.elasticsearch.plugin</groupId>
-            <artifactId>transport-netty4-client</artifactId>
-            <version>7.10.2</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-core</artifactId>
-            <version>2.12.1</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.karaf.decanter.marshaller</groupId>
-            <artifactId>org.apache.karaf.decanter.marshaller.json</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.johnzon</groupId>
-            <artifactId>johnzon-mapper</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.johnzon</groupId>
-            <artifactId>johnzon-core</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-simple</artifactId>
-            <scope>test</scope>
+            <groupId>co.elastic.clients</groupId>
+            <artifactId>elasticsearch-java</artifactId>
+            <version>8.6.1</version>
         </dependency>
     </dependencies>
 
diff --git a/appender/elasticsearch/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java b/appender/elasticsearch/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java
deleted file mode 100644
index d1355271..00000000
--- a/appender/elasticsearch/src/test/java/org/apache/karaf/decanter/appender/elasticsearch/TestElasticsearchAppender.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.decanter.appender.elasticsearch;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Dictionary;
-import java.util.Hashtable;
-
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
-import org.apache.http.HttpHost;
-import org.apache.http.util.EntityUtils;
-import org.apache.karaf.decanter.api.marshaller.Marshaller;
-import org.apache.karaf.decanter.appender.utils.EventFilter;
-import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
-import org.elasticsearch.client.Request;
-import org.elasticsearch.client.Response;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.common.collect.MapBuilder;
-import org.elasticsearch.common.network.NetworkModule;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.env.Environment;
-import org.elasticsearch.node.MockNode;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.transport.Netty4Plugin;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.osgi.service.event.Event;
-
-@RunWith(com.carrotsearch.randomizedtesting.RandomizedRunner.class)
-@ThreadLeakScope(value = ThreadLeakScope.Scope.NONE)
-public class TestElasticsearchAppender {
-
-    private static final String CLUSTER_NAME = "elasticsearch-test";
-    private static final String HOST = "127.0.0.1";
-    private static final int HTTP_PORT = 9201;
-    private static final int TRANSPORT_PORT = 9301;
-
-    private Node node;
-
-    @Before
-    public void setup() throws Exception {
-        Collection plugins = Arrays.asList(Netty4Plugin.class);
-        Settings settings = Settings.builder()
-                .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), CLUSTER_NAME)
-                .put(Node.NODE_NAME_SETTING.getKey(), "test")
-                .put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
-                .put(Environment.PATH_HOME_SETTING.getKey(), "target/data")
-                .put(Environment.PATH_DATA_SETTING.getKey(), "target/data")
-                .put("network.host", HOST)
-                .put("http.port", HTTP_PORT)
-                .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
-                .put("transport.port", TRANSPORT_PORT)
-                .build();
-        node = new MockNode(settings, plugins);
-        node.start();
-    }
-
-    @After
-    public void teardown() throws Exception {
-        node.close();
-    }
-
-    @Test(timeout = 60000L)
-    public void test() throws Exception {
-        Marshaller marshaller = new JsonMarshaller();
-        ElasticsearchAppender appender = new ElasticsearchAppender();
-        appender.marshaller = marshaller;
-        Dictionary<String, Object> config = new Hashtable<>();
-        config.put(ElasticsearchAppender.ADDRESSES_PROPERTY, "http://" + HOST + ":" + HTTP_PORT);
-        config.put(EventFilter.PROPERTY_NAME_EXCLUDE_CONFIG, ".*refused.*");
-        config.put(EventFilter.PROPERTY_VALUE_EXCLUDE_CONFIG, ".*refused.*");
-        appender.open(config);
-        appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
-        appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
-        appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "b").put("c", "d").map()));
-        appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("refused", "b").put("c", "d").map()));
-        appender.handleEvent(new Event("testTopic", MapBuilder.<String, String>newMapBuilder().put("a", "refused").put("c", "d").map()));
-        appender.close();
-
-        HttpHost host = new HttpHost(HOST, HTTP_PORT, "http");
-        RestClient client = RestClient.builder(new HttpHost[]{ host }).build();
-
-        String responseString = "";
-        while (!responseString.contains("\"count\":3")) {
-            Thread.sleep(200);
-            Request request = new Request("GET", "/_count");
-            Response response = client.performRequest(request);
-            responseString = EntityUtils.toString(response.getEntity());
-        }
-    }
-
-}
diff --git a/collector/elasticsearch/pom.xml b/collector/elasticsearch/pom.xml
index a311dbb1..3ec97cb9 100644
--- a/collector/elasticsearch/pom.xml
+++ b/collector/elasticsearch/pom.xml
@@ -50,39 +50,14 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
-            <groupId>org.elasticsearch.client</groupId>
-            <artifactId>elasticsearch-rest-client</artifactId>
-            <version>7.10.2</version>
-        </dependency>
-        <dependency>
-            <groupId>org.elasticsearch.client</groupId>
-            <artifactId>elasticsearch-rest-high-level-client</artifactId>
-            <version>7.10.2</version>
+            <groupId>co.elastic.clients</groupId>
+            <artifactId>elasticsearch-java</artifactId>
+            <version>8.6.1</version>
         </dependency>
         <dependency>
             <groupId>org.apache.karaf.decanter.collector</groupId>
             <artifactId>org.apache.karaf.decanter.collector.utils</artifactId>
         </dependency>
-
-        <!-- test -->
-        <dependency>
-            <groupId>org.elasticsearch.test</groupId>
-            <artifactId>framework</artifactId>
-            <version>7.10.2</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.elasticsearch.plugin</groupId>
-            <artifactId>transport-netty4-client</artifactId>
-            <version>7.10.2</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-core</artifactId>
-            <version>2.12.1</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <build>
@@ -132,6 +107,10 @@
                             !org.locationtech*,
                             !org.tartarus.snowball*,
                             !org.yaml.snakeyaml*,
+                            !co.elastic.clients*,
+                            !jakarta.json*,
+                            !javax.annotation*,
+                            !org.glassfish.json*,
                             org.apache.log;resolution:=optional,
                             javax.servlet;resolution:=optional,
                             jdk.net;resolution:=optional,
@@ -158,7 +137,11 @@
                             org.locationtech*;-split-package:=merge-first,
                             org.tartarus.snowball*;-split-package:=merge-first,
                             org.yaml.snakeyaml*;-split-package:=merge-first,
-                            org.elasticsearch*;-split-package:=merge-first
+                            org.elasticsearch*;-split-package:=merge-first,
+                            co.elastic.clients*;-split-package:=merge-first,
+                            jakarta.json*;-split-package:=merge-first,
+                            javax.annotation*;-split-package:=merge-first,
+                            org.glassfish.json*;-split-package:=merge-first
                         </Private-Package>
                     </instructions>
                 </configuration>
diff --git a/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java b/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java
index 8ef9176d..707099a7 100644
--- a/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java
+++ b/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java
@@ -16,6 +16,19 @@
  */
 package org.apache.karaf.decanter.collector.elasticsearch;
 
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.query_dsl.Query;
+import co.elastic.clients.elasticsearch._types.query_dsl.QueryBase;
+import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
+import co.elastic.clients.elasticsearch._types.query_dsl.SimpleQueryStringQuery;
+import co.elastic.clients.elasticsearch.core.SearchRequest;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.elasticsearch.core.search.Hit;
+import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
@@ -23,19 +36,9 @@ import org.apache.http.client.CredentialsProvider;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
 import org.apache.karaf.decanter.collector.utils.PropertiesPreparator;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -67,7 +70,7 @@ public class ElasticsearchCollector implements Runnable {
     private EventAdmin dispatcher;
 
     private Dictionary<String, Object> config;
-    private RestHighLevelClient restClient;
+    private RestClient restClient;
 
     @Activate
     public void activate(ComponentContext componentContext) {
@@ -111,7 +114,7 @@ public class ElasticsearchCollector implements Runnable {
             );
         }
 
-        restClient = new RestHighLevelClient(restClientBuilder);
+        restClient = restClientBuilder.build();
     }
 
     @Deactivate
@@ -125,54 +128,50 @@ public class ElasticsearchCollector implements Runnable {
 
     @Override
     public void run() {
-        SearchRequest searchRequest = new SearchRequest();
+        SearchRequest.Builder searchRequestBuilder = new SearchRequest.Builder();
 
         String index = (config.get("index") != null) ? config.get("index").toString() : "decanter";
-        searchRequest.indices(index);
-
-        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchRequestBuilder.index(index);
 
         String query = (config.get("query") != null) ? config.get("query").toString() : null;
-        QueryBuilder queryBuilder;
+        QueryBase queryBase;
+
         if (query == null) {
-            queryBuilder = QueryBuilders.matchAllQuery();
+            searchRequestBuilder.q(QueryBuilders.matchAll().build().toString());
         } else {
-            queryBuilder = QueryBuilders.queryStringQuery(query);
+            searchRequestBuilder.q(query);
         }
-        searchSourceBuilder.query(queryBuilder);
+
         String fromString = (config.get("from") != null) ? config.get("from").toString() : null;
         if (fromString != null) {
             int from = Integer.parseInt(fromString);
-            searchSourceBuilder.from(from);
+            searchRequestBuilder.from(from);
         }
         String sizeString = (config.get("size") != null) ? config.get("size").toString() : null;
         if (sizeString != null) {
             int size = Integer.parseInt(sizeString);
-            searchSourceBuilder.size(size);
+            searchRequestBuilder.size(size);
         }
         String timeoutString = (config.get("timeout") != null) ? config.get("timeout").toString() : null;
         if (timeoutString != null) {
-            int timeout = Integer.parseInt(timeoutString);
-            searchSourceBuilder.timeout(new TimeValue(timeout, TimeUnit.SECONDS));
+            searchRequestBuilder.timeout(timeoutString);
         }
-        searchRequest.source(searchSourceBuilder);
 
         Map<String, Object> data = new HashMap<>();
         data.put("type", "elasticsearch");
         try {
-            SearchResponse searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT);
-            RestStatus status = searchResponse.status();
-            TimeValue took = searchResponse.getTook();
-            Boolean terminatedEarly = searchResponse.isTerminatedEarly();
-            boolean timedOut = searchResponse.isTimedOut();
-            int totalShards = searchResponse.getTotalShards();
-            int successfulShards = searchResponse.getSuccessfulShards();
-            int failedShards = searchResponse.getFailedShards();
-            SearchHits hits = searchResponse.getHits();
-            for (SearchHit hit : hits) {
-                Map<String, Object> sourceAsMap = hit.getSourceAsMap();
-                data.putAll(sourceAsMap);
-            }
+            ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
+            new ElasticsearchAsyncClient(transport).search(searchRequestBuilder.build(), Map.class)
+                    .thenApply((response) -> {
+                        data.put("timeout", response.timedOut());
+                        data.put("totalShards", response.shards().total().intValue());
+                        data.put("successfulShards", response.shards().successful().intValue());
+                        data.put("failedShards", response.shards().failed().intValue());
+                        for (Hit hit : response.hits().hits()) {
+                            data.putAll(hit.fields());
+                        }
+                        return null;
+                    });
         } catch (Exception e) {
             LOGGER.error("Can't query elasticsearch", e);
         }
diff --git a/collector/elasticsearch/src/test/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollectorTest.java b/collector/elasticsearch/src/test/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollectorTest.java
deleted file mode 100644
index 010cecff..00000000
--- a/collector/elasticsearch/src/test/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollectorTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.decanter.collector.elasticsearch;
-
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpHost;
-import org.apache.http.entity.ContentType;
-import org.apache.http.nio.entity.NStringEntity;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.client.Request;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.common.network.NetworkModule;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.env.Environment;
-import org.elasticsearch.node.MockNode;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.transport.Netty4Plugin;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.osgi.service.event.Event;
-import org.osgi.service.event.EventAdmin;
-
-import java.util.*;
-
-@RunWith(com.carrotsearch.randomizedtesting.RandomizedRunner.class)
-@ThreadLeakScope(value = ThreadLeakScope.Scope.NONE)
-public class ElasticsearchCollectorTest {
-
-    private static final String CLUSTER_NAME = "elasticsearch-test";
-    private static final String HOST = "127.0.0.1";
-    private static final int HTTP_PORT = 9201;
-    private static final int TRANSPORT_PORT = 9301;
-
-    private Node node;
-
-    @Before
-    public void setup() throws Exception {
-        Collection plugins = Arrays.asList(Netty4Plugin.class);
-        Settings settings = Settings.builder()
-                .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), CLUSTER_NAME)
-                .put(Node.NODE_NAME_SETTING.getKey(), "test")
-                .put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
-                .put(Environment.PATH_HOME_SETTING.getKey(), "target/data")
-                .put(Environment.PATH_DATA_SETTING.getKey(), "target/data")
-                .put("network.host", HOST)
-                .put("http.port", HTTP_PORT)
-                .put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
-                .put("transport.port", TRANSPORT_PORT)
-                .build();
-        node = new MockNode(settings, plugins);
-        node.start();
-    }
-
-    @After
-    public void teardown() throws Exception {
-        node.close();
-    }
-
-    @Test(timeout = 60000L)
-    public void testAll() throws Exception {
-        HttpHost host = new HttpHost(HOST, HTTP_PORT, "http");
-        RestClient client = RestClient.builder(new HttpHost[]{ host }).build();
-        HttpEntity entity = new NStringEntity("{\"foo\":\"bar\"}", ContentType.APPLICATION_JSON);
-        Request request = new Request("POST", "/test/_doc/");
-        request.setEntity(entity);
-        client.performRequest(request);
-
-        MockDispatcher dispatcher = new MockDispatcher();
-        ElasticsearchCollector collector = new ElasticsearchCollector();
-        collector.setDispatcher(dispatcher);
-        Hashtable<String, Object> configuration = new Hashtable<>();
-        configuration.put("addresses", "http://localhost:" + HTTP_PORT);
-        configuration.put("index", "test");
-        collector.activate(configuration);
-
-        collector.run();
-
-        Assert.assertEquals(1, dispatcher.postedEvents.size());
-        Assert.assertEquals("elasticsearch", dispatcher.postedEvents.get(0).getProperty("type"));
-        Assert.assertEquals("decanter/collect/elasticsearch", dispatcher.postedEvents.get(0).getTopic());
-    }
-
-    @Test(timeout = 60000L)
-    public void testQuery() throws Exception {
-        HttpHost host = new HttpHost(HOST, HTTP_PORT, "http");
-        RestClient client = RestClient.builder(new HttpHost[]{ host }).build();
-        HttpEntity entity = new NStringEntity("{\"foo\":\"bar\"}", ContentType.APPLICATION_JSON);
-        Request request = new Request("POST", "/test/_doc/");
-        request.setEntity(entity);
-        client.performRequest(request);
-        entity = new NStringEntity("{\"other\":\"test\"}", ContentType.APPLICATION_JSON);
-        request = new Request("POST", "/test/_doc/");
-        request.setEntity(entity);
-        client.performRequest(request);
-
-        MockDispatcher dispatcher = new MockDispatcher();
-        ElasticsearchCollector collector = new ElasticsearchCollector();
-        collector.setDispatcher(dispatcher);
-        Hashtable<String, Object> configuration = new Hashtable<>();
-        configuration.put("addresses", "http://localhost:" + HTTP_PORT);
-        configuration.put("index", "test");
-        configuration.put("query", "foo:b*");
-        collector.activate(configuration);
-
-        collector.run();
-
-        Assert.assertEquals(1, dispatcher.postedEvents.size());
-        Assert.assertEquals("elasticsearch", dispatcher.postedEvents.get(0).getProperty("type"));
-        Assert.assertEquals("decanter/collect/elasticsearch", dispatcher.postedEvents.get(0).getTopic());
-    }
-
-    class MockDispatcher implements EventAdmin {
-
-        public List<Event> postedEvents = new ArrayList<>();
-        public List<Event> sentEvents = new ArrayList<>();
-
-        @Override
-        public void postEvent(Event event) {
-            postedEvents.add(event);
-        }
-
-        @Override
-        public void sendEvent(Event event) {
-            sentEvents.add(event);
-        }
-
-    }
-
-}