You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2015/06/29 10:32:06 UTC

svn commit: r1688125 - in /james/mailbox/trunk/elasticsearch/src: main/java/org/apache/james/mailbox/elasticsearch/ main/resources/META-INF/spring/ test/java/org/apache/james/mailbox/elasticsearch/

Author: btellier
Date: Mon Jun 29 08:32:05 2015
New Revision: 1688125

URL: http://svn.apache.org/r1688125
Log:
MAILBOX-242 Implement an ElasticSearch indexer for basic index, update & delete methods - patch contributed by Antoine Duprat

Added:
    james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java
    james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexerTest.java
Modified:
    james/mailbox/trunk/elasticsearch/src/main/resources/META-INF/spring/mailbox-elasticsearch.xml

Added: james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java?rev=1688125&view=auto
==============================================================================
--- james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java (added)
+++ james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java Mon Jun 29 08:32:05 2015
@@ -0,0 +1,79 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.mailbox.elasticsearch;
+
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.update.UpdateResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.node.Node;
+
+import com.google.common.base.Preconditions;
+
+public class ElasticSearchIndexer {
+
+    public static final String MAILBOX_INDEX = "mailbox";
+    public static final String MESSAGE_TYPE = "message";
+    
+    private final Node node;
+
+    public ElasticSearchIndexer(Node node) {
+        this.node = node;
+    }
+    
+    public IndexResponse indexMessage(String id, String content) {
+        checkArgument(content);
+        try (Client client = node.client()) {
+            return client.prepareIndex(MAILBOX_INDEX, MESSAGE_TYPE, id)
+                .setSource(content)
+                .get();
+        }
+    }
+    
+    public UpdateResponse updateMessage(String id, String content) {
+        checkArgument(content);
+        try (Client client = node.client()) {
+            return client.prepareUpdate(MAILBOX_INDEX, MESSAGE_TYPE, id)
+                .setDoc(content)
+                .get();
+        }
+    }
+    
+    public DeleteResponse deleteMessage(String id) {
+        try (Client client = node.client()) {
+            return client.prepareDelete(MAILBOX_INDEX, MESSAGE_TYPE, id)
+                .get();
+        }
+    }
+    
+    public DeleteByQueryResponse deleteAllWithIdStarting(String idStart) {
+        try (Client client = node.client()) {
+            return client.prepareDeleteByQuery(MAILBOX_INDEX)
+                .setTypes(MESSAGE_TYPE)
+                .setQuery(QueryBuilders.prefixQuery("_id", idStart))
+                .get();
+        }
+    }
+
+    private void checkArgument(String content) {
+        Preconditions.checkArgument(content != null, "content should be provided");
+    }
+}

Modified: james/mailbox/trunk/elasticsearch/src/main/resources/META-INF/spring/mailbox-elasticsearch.xml
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/elasticsearch/src/main/resources/META-INF/spring/mailbox-elasticsearch.xml?rev=1688125&r1=1688124&r2=1688125&view=diff
==============================================================================
--- james/mailbox/trunk/elasticsearch/src/main/resources/META-INF/spring/mailbox-elasticsearch.xml (original)
+++ james/mailbox/trunk/elasticsearch/src/main/resources/META-INF/spring/mailbox-elasticsearch.xml Mon Jun 29 08:32:05 2015
@@ -23,6 +23,10 @@
        xsi:schemaLocation="
           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
 
+    <bean id="elasticsearch-indexer" class="org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer">
+        <constructor-arg index="0" ref="elasticsearch-node"/>
+    </bean>
+
     <bean id="elasticsearch-node" class="org.apache.james.mailbox.cassandra.elasticsearch.NodeProvider" factory-method="createNodeForClusterName">
         <constructor-arg index="0" ref="${elasticsearch.clusterName}"/>
         <constructor-arg index="1" ref="${elasticsearch.masterHost}"/>

Added: james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexerTest.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexerTest.java?rev=1688125&view=auto
==============================================================================
--- james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexerTest.java (added)
+++ james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexerTest.java Mon Jun 29 08:32:05 2015
@@ -0,0 +1,201 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.mailbox.elasticsearch;
+
+import static com.jayway.awaitility.Awaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+import org.elasticsearch.action.admin.indices.flush.FlushRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.node.Node;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import com.jayway.awaitility.Duration;
+
+
+public class ElasticSearchIndexerTest {
+
+    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+    
+    private Node node;
+    private ElasticSearchIndexer testee;
+
+    @Before
+    public void setup() throws Exception {
+        node = nodeBuilder().local(true)
+                .settings(ImmutableSettings.builder()
+                        .put("path.data", temporaryFolder.newFolder().getAbsolutePath())
+                        .build())
+                .node();
+        node.start();
+        awaitForElasticSearch();
+        
+        testee = new ElasticSearchIndexer(node);
+    }
+    
+    @After
+    public void tearDown() {
+        node.close();
+    }
+    
+    @Test
+    public void indexMessageShouldWork() throws Exception {
+        String messageId = "1";
+        String content = "{\"message\": \"trying out Elasticsearch\"}";
+        
+        testee.indexMessage(messageId, content);
+        awaitForElasticSearch();
+        
+        try (Client client = node.client()) {
+            SearchResponse searchResponse = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
+                    .setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
+                    .setQuery(QueryBuilders.matchQuery("message", "trying"))
+                    .get();
+            assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
+        }
+    }
+    
+    @Test(expected=IllegalArgumentException.class)
+    public void indexMessageShouldThrowWhenJsonIsNull() throws InterruptedException {
+        testee.indexMessage("1", null);
+    }
+    
+    @Test
+    public void updateMessage() throws Exception {
+        String messageId = "1";
+        String content = "{\"message\": \"trying out Elasticsearch\"}";
+        
+        testee.indexMessage(messageId, content);
+        awaitForElasticSearch();
+        
+        String updatedContent = "{\"message\": \"mastering out Elasticsearch\"}";
+        testee.updateMessage(messageId, updatedContent);
+        awaitForElasticSearch();
+        
+        try (Client client = node.client()) {
+            SearchResponse searchResponse = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
+                    .setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
+                    .setQuery(QueryBuilders.matchQuery("message", "mastering"))
+                    .get();
+            assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
+        }
+    }
+    
+    @Test(expected=IllegalArgumentException.class)
+    public void updateMessageShouldThrowWhenJsonIsNull() throws InterruptedException {
+        testee.updateMessage("1", null);
+    }
+    
+    @Test
+    public void deleteAllWithIdStarting() throws Exception {
+        String messageId = "1:2";
+        String content = "{\"message\": \"trying out Elasticsearch\"}";
+
+        testee.indexMessage(messageId, content);
+        awaitForElasticSearch();
+        
+        testee.deleteAllWithIdStarting("1:");
+        awaitForElasticSearch();
+        
+        try (Client client = node.client()) {
+            SearchResponse searchResponse = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
+                    .setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
+                    .setQuery(QueryBuilders.matchAllQuery())
+                    .get();
+            assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0);
+        }
+    }
+    
+    @Test
+    public void deleteAllWithIdStartingWhenMultipleMessages() throws Exception {
+        String messageId = "1:2";
+        String content = "{\"message\": \"trying out Elasticsearch\"}";
+        
+        testee.indexMessage(messageId, content);
+        awaitForElasticSearch();
+        
+        String messageId2 = "1:2";
+        String content2 = "{\"message\": \"trying out Elasticsearch 2\"}";
+        
+        testee.indexMessage(messageId2, content2);
+        awaitForElasticSearch();
+        
+        String messageId3 = "2:3";
+        String content3 = "{\"message\": \"trying out Elasticsearch 3\"}";
+        
+        testee.indexMessage(messageId3, content3);
+        awaitForElasticSearch();
+        
+        testee.deleteAllWithIdStarting("1:");
+        awaitForElasticSearch();
+        
+        try (Client client = node.client()) {
+            SearchResponse searchResponse = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
+                    .setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
+                    .setQuery(QueryBuilders.matchAllQuery())
+                    .get();
+            assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
+        }
+    }
+    
+    @Test
+    public void deleteMessage() throws Exception {
+        String messageId = "1";
+        String content = "{\"message\": \"trying out Elasticsearch\"}";
+        
+        testee.indexMessage(messageId, content);
+        awaitForElasticSearch();
+        
+        testee.deleteMessage(messageId);
+        awaitForElasticSearch();
+        
+        try (Client client = node.client()) {
+            SearchResponse searchResponse = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
+                    .setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
+                    .setQuery(QueryBuilders.matchAllQuery())
+                    .get();
+            assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0);
+        }
+    }
+    
+    /**
+     * Sometimes, tests are too fast.
+     * This method ensure that ElasticSearch service is up and indices are updated
+     */
+    private void awaitForElasticSearch() {
+        await().atMost(Duration.ONE_SECOND).until(() -> flush());
+    }
+    
+    private boolean flush() {
+        try (Client client = node.client()) {
+            new FlushRequestBuilder(client.admin().indices()).setForce(true).get();
+            return true;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org