You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2015/06/29 10:32:06 UTC
svn commit: r1688125 - in /james/mailbox/trunk/elasticsearch/src:
main/java/org/apache/james/mailbox/elasticsearch/
main/resources/META-INF/spring/
test/java/org/apache/james/mailbox/elasticsearch/
Author: btellier
Date: Mon Jun 29 08:32:05 2015
New Revision: 1688125
URL: http://svn.apache.org/r1688125
Log:
MAILBOX-242 Implement an ElasticSearch indexer for basic index, update & delete methods - patch contributed by Antoine Duprat
Added:
james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java
james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexerTest.java
Modified:
james/mailbox/trunk/elasticsearch/src/main/resources/META-INF/spring/mailbox-elasticsearch.xml
Added: james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java?rev=1688125&view=auto
==============================================================================
--- james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java (added)
+++ james/mailbox/trunk/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexer.java Mon Jun 29 08:32:05 2015
@@ -0,0 +1,79 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.mailbox.elasticsearch;
+
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.update.UpdateResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.node.Node;
+
+import com.google.common.base.Preconditions;
+
+public class ElasticSearchIndexer {
+
+ public static final String MAILBOX_INDEX = "mailbox";
+ public static final String MESSAGE_TYPE = "message";
+
+ private final Node node;
+
+ public ElasticSearchIndexer(Node node) {
+ this.node = node;
+ }
+
+ public IndexResponse indexMessage(String id, String content) {
+ checkArgument(content);
+ try (Client client = node.client()) {
+ return client.prepareIndex(MAILBOX_INDEX, MESSAGE_TYPE, id)
+ .setSource(content)
+ .get();
+ }
+ }
+
+ public UpdateResponse updateMessage(String id, String content) {
+ checkArgument(content);
+ try (Client client = node.client()) {
+ return client.prepareUpdate(MAILBOX_INDEX, MESSAGE_TYPE, id)
+ .setDoc(content)
+ .get();
+ }
+ }
+
+ public DeleteResponse deleteMessage(String id) {
+ try (Client client = node.client()) {
+ return client.prepareDelete(MAILBOX_INDEX, MESSAGE_TYPE, id)
+ .get();
+ }
+ }
+
+ public DeleteByQueryResponse deleteAllWithIdStarting(String idStart) {
+ try (Client client = node.client()) {
+ return client.prepareDeleteByQuery(MAILBOX_INDEX)
+ .setTypes(MESSAGE_TYPE)
+ .setQuery(QueryBuilders.prefixQuery("_id", idStart))
+ .get();
+ }
+ }
+
+ private void checkArgument(String content) {
+ Preconditions.checkArgument(content != null, "content should be provided");
+ }
+}
Modified: james/mailbox/trunk/elasticsearch/src/main/resources/META-INF/spring/mailbox-elasticsearch.xml
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/elasticsearch/src/main/resources/META-INF/spring/mailbox-elasticsearch.xml?rev=1688125&r1=1688124&r2=1688125&view=diff
==============================================================================
--- james/mailbox/trunk/elasticsearch/src/main/resources/META-INF/spring/mailbox-elasticsearch.xml (original)
+++ james/mailbox/trunk/elasticsearch/src/main/resources/META-INF/spring/mailbox-elasticsearch.xml Mon Jun 29 08:32:05 2015
@@ -23,6 +23,10 @@
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+ <bean id="elasticsearch-indexer" class="org.apache.james.mailbox.elasticsearch.ElasticSearchIndexer">
+ <constructor-arg index="0" ref="elasticsearch-node"/>
+ </bean>
+
<bean id="elasticsearch-node" class="org.apache.james.mailbox.cassandra.elasticsearch.NodeProvider" factory-method="createNodeForClusterName">
<constructor-arg index="0" ref="${elasticsearch.clusterName}"/>
<constructor-arg index="1" ref="${elasticsearch.masterHost}"/>
Added: james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexerTest.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexerTest.java?rev=1688125&view=auto
==============================================================================
--- james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexerTest.java (added)
+++ james/mailbox/trunk/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIndexerTest.java Mon Jun 29 08:32:05 2015
@@ -0,0 +1,201 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.mailbox.elasticsearch;
+
+import static com.jayway.awaitility.Awaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+import org.elasticsearch.action.admin.indices.flush.FlushRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.node.Node;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import com.jayway.awaitility.Duration;
+
+
+public class ElasticSearchIndexerTest {
+
+ @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private Node node;
+ private ElasticSearchIndexer testee;
+
+ @Before
+ public void setup() throws Exception {
+ node = nodeBuilder().local(true)
+ .settings(ImmutableSettings.builder()
+ .put("path.data", temporaryFolder.newFolder().getAbsolutePath())
+ .build())
+ .node();
+ node.start();
+ awaitForElasticSearch();
+
+ testee = new ElasticSearchIndexer(node);
+ }
+
+ @After
+ public void tearDown() {
+ node.close();
+ }
+
+ @Test
+ public void indexMessageShouldWork() throws Exception {
+ String messageId = "1";
+ String content = "{\"message\": \"trying out Elasticsearch\"}";
+
+ testee.indexMessage(messageId, content);
+ awaitForElasticSearch();
+
+ try (Client client = node.client()) {
+ SearchResponse searchResponse = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
+ .setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
+ .setQuery(QueryBuilders.matchQuery("message", "trying"))
+ .get();
+ assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
+ }
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void indexMessageShouldThrowWhenJsonIsNull() throws InterruptedException {
+ testee.indexMessage("1", null);
+ }
+
+ @Test
+ public void updateMessage() throws Exception {
+ String messageId = "1";
+ String content = "{\"message\": \"trying out Elasticsearch\"}";
+
+ testee.indexMessage(messageId, content);
+ awaitForElasticSearch();
+
+ String updatedContent = "{\"message\": \"mastering out Elasticsearch\"}";
+ testee.updateMessage(messageId, updatedContent);
+ awaitForElasticSearch();
+
+ try (Client client = node.client()) {
+ SearchResponse searchResponse = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
+ .setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
+ .setQuery(QueryBuilders.matchQuery("message", "mastering"))
+ .get();
+ assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
+ }
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void updateMessageShouldThrowWhenJsonIsNull() throws InterruptedException {
+ testee.updateMessage("1", null);
+ }
+
+ @Test
+ public void deleteAllWithIdStarting() throws Exception {
+ String messageId = "1:2";
+ String content = "{\"message\": \"trying out Elasticsearch\"}";
+
+ testee.indexMessage(messageId, content);
+ awaitForElasticSearch();
+
+ testee.deleteAllWithIdStarting("1:");
+ awaitForElasticSearch();
+
+ try (Client client = node.client()) {
+ SearchResponse searchResponse = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
+ .setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
+ .setQuery(QueryBuilders.matchAllQuery())
+ .get();
+ assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0);
+ }
+ }
+
+ @Test
+ public void deleteAllWithIdStartingWhenMultipleMessages() throws Exception {
+ String messageId = "1:2";
+ String content = "{\"message\": \"trying out Elasticsearch\"}";
+
+ testee.indexMessage(messageId, content);
+ awaitForElasticSearch();
+
+ String messageId2 = "1:2";
+ String content2 = "{\"message\": \"trying out Elasticsearch 2\"}";
+
+ testee.indexMessage(messageId2, content2);
+ awaitForElasticSearch();
+
+ String messageId3 = "2:3";
+ String content3 = "{\"message\": \"trying out Elasticsearch 3\"}";
+
+ testee.indexMessage(messageId3, content3);
+ awaitForElasticSearch();
+
+ testee.deleteAllWithIdStarting("1:");
+ awaitForElasticSearch();
+
+ try (Client client = node.client()) {
+ SearchResponse searchResponse = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
+ .setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
+ .setQuery(QueryBuilders.matchAllQuery())
+ .get();
+ assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
+ }
+ }
+
+ @Test
+ public void deleteMessage() throws Exception {
+ String messageId = "1";
+ String content = "{\"message\": \"trying out Elasticsearch\"}";
+
+ testee.indexMessage(messageId, content);
+ awaitForElasticSearch();
+
+ testee.deleteMessage(messageId);
+ awaitForElasticSearch();
+
+ try (Client client = node.client()) {
+ SearchResponse searchResponse = client.prepareSearch(ElasticSearchIndexer.MAILBOX_INDEX)
+ .setTypes(ElasticSearchIndexer.MESSAGE_TYPE)
+ .setQuery(QueryBuilders.matchAllQuery())
+ .get();
+ assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0);
+ }
+ }
+
+ /**
+ * Sometimes, tests are too fast.
+ * This method ensure that ElasticSearch service is up and indices are updated
+ */
+ private void awaitForElasticSearch() {
+ await().atMost(Duration.ONE_SECOND).until(() -> flush());
+ }
+
+ private boolean flush() {
+ try (Client client = node.client()) {
+ new FlushRequestBuilder(client.admin().indices()).setForce(true).get();
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org