You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/07/03 13:49:12 UTC
[metron] branch master updated: METRON-2168 Elasticsearch Updates
Not Tested in Integration Test (nickwallen) closes apache/metron#1451
This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new bf673a2 METRON-2168 Elasticsearch Updates Not Tested in Integration Test (nickwallen) closes apache/metron#1451
bf673a2 is described below
commit bf673a2e9c00a59716e70feea55d581af91145f1
Author: nickwallen <ni...@nickallen.org>
AuthorDate: Wed Jul 3 09:48:53 2019 -0400
METRON-2168 Elasticsearch Updates Not Tested in Integration Test (nickwallen) closes apache/metron#1451
---
.../dao/ElasticsearchRetrieveLatestDao.java | 8 +
.../ElasticsearchUpdateIntegrationTest.java | 57 ++++++-
.../apache/metron/indexing/dao/MultiIndexDao.java | 3 +-
.../metron/indexing/dao/MultiIndexDaoTest.java | 173 +++++++++++++++++++--
.../metron/indexing/dao/UpdateIntegrationTest.java | 56 +++++--
5 files changed, 262 insertions(+), 35 deletions(-)
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
index 95d27db..8044d62 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
@@ -43,6 +43,14 @@ import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.termsQuery;
import static org.elasticsearch.index.query.QueryBuilders.typeQuery;
+/**
+ * A {@link RetrieveLatestDao} that retrieves documents from Elasticsearch.
+ *
+ * The index being searched must have a field "guid" of type "keyword",
+ * otherwise no documents can be found by "guid".
+ *
+ * See https://www.elastic.co/guide/en/elasticsearch/reference/5.6/query-dsl-term-query.html
+ */
public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
private ElasticsearchClient client;
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
index 6489206..16760d2 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
@@ -19,9 +19,15 @@ package org.apache.metron.elasticsearch.integration;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Iterables;
+import org.adrianwalker.multilinestring.Multiline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
@@ -29,17 +35,21 @@ import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.HBaseDao;
import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.MultiIndexDao;
import org.apache.metron.indexing.dao.UpdateIntegrationTest;
import org.apache.metron.integration.UnableToStartException;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.Response;
+import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -55,11 +65,27 @@ public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest {
private static final String TABLE_NAME = "modifications";
private static final String CF = "p";
private static MockHTable table;
- private static IndexDao hbaseDao;
private static IndexDao elasticsearchDao;
private static AccessConfig accessConfig;
private static Map<String, Object> globalConfig;
+ /**
+ * {
+ * "template": "test*",
+ * "mappings": {
+ * "test_doc": {
+ * "properties": {
+ * "guid": {
+ * "type": "keyword"
+ * }
+ * }
+ * }
+ * }
+ * }
+ */
+ @Multiline
+ private static String indexTemplate;
+
@Override
protected String getIndexName() {
return SENSOR_NAME + "_index_" + new SimpleDateFormat(dateFormat).format(new Date());
@@ -90,15 +116,16 @@ public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest {
.withAccessConfig(accessConfig)
.build();
es.start();
+
+ installIndexTemplate();
}
@Before
public void setup() {
- hbaseDao = new HBaseDao();
- elasticsearchDao = new ElasticsearchDao();
- MultiIndexDao dao = new MultiIndexDao(hbaseDao, elasticsearchDao);
- dao.init(accessConfig);
- setDao(dao);
+ elasticsearchDao = new ElasticsearchDao()
+ .withRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
+ elasticsearchDao.init(accessConfig);
+ setDao(elasticsearchDao);
}
@After
@@ -137,4 +164,20 @@ public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest {
protected MockHTable getMockHTable() {
return table;
}
+
+ /**
+ * Install the index template to ensure that "guid" is of type "keyword". The
+ * {@link org.apache.metron.elasticsearch.dao.ElasticsearchRetrieveLatestDao} cannot find
+ * documents if the type is not "keyword".
+ *
+ * See https://www.elastic.co/guide/en/elasticsearch/reference/5.6/query-dsl-term-query.html
+ */
+ private static void installIndexTemplate() throws IOException {
+ HttpEntity broEntity = new NStringEntity(indexTemplate, ContentType.APPLICATION_JSON);
+ ElasticsearchClient client = ElasticsearchClientFactory.create(globalConfig);
+ Response response = client
+ .getLowLevelClient()
+ .performRequest("PUT", "/_template/test_template", Collections.emptyMap(), broEntity);
+ Assert.assertThat(response.getStatusLine().getStatusCode(), CoreMatchers.equalTo(200));
+ }
}
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java b/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
index 09f7df9..a240c56 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
+++ b/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
@@ -282,8 +282,7 @@ public class MultiIndexDao implements IndexDao {
}
@Override
- public Iterable<Document> getAllLatest(
- List<GetRequest> getRequests) throws IOException {
+ public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException {
Iterable<Document> ret = null;
List<DocumentIterableContainer> output =
indices.parallelStream().map(dao -> {
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/MultiIndexDaoTest.java b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/MultiIndexDaoTest.java
index dad6a52..0c0d1f1 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/MultiIndexDaoTest.java
+++ b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/MultiIndexDaoTest.java
@@ -18,6 +18,11 @@
package org.apache.metron.indexing.dao;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.junit.Assert;
@@ -27,9 +32,16 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class MultiIndexDaoTest {
@@ -40,24 +52,72 @@ public class MultiIndexDaoTest {
private IndexDao dao1;
private IndexDao dao2;
+ private Document document1;
+ private Document document2;
+
@Before
public void setup() {
dao1 = mock(IndexDao.class);
dao2 = mock(IndexDao.class);
multiIndexDao = new MultiIndexDao(dao1, dao2);
+
+ document1 = new Document(new HashMap<>(), "guid", "bro", 1L);
+ document2 = new Document(new HashMap<>(), "guid", "bro", 2L);
}
@Test
- public void getLatestShouldReturnLatestAlert() throws Exception {
- Document document1 = new Document(new HashMap<>(), "guid", "bro", 1L);
- Document document2 = new Document(new HashMap<>(), "guid", "bro", 2L);
+ public void shouldUpdateAll() throws IOException {
+ Document actual = multiIndexDao.update(document1, Optional.of("bro"));
+ Assert.assertEquals(document1, actual);
+
+ // both 'backing' daos should have received the update
+ verify(dao1).update(eq(document1), eq(Optional.of("bro")));
+ verify(dao2).update(eq(document1), eq(Optional.of("bro")));
+ }
+ @Test(expected = IOException.class)
+ public void shouldThrowExceptionWithPartialFailureOnUpdate() throws IOException {
+ // dao2 will throw an exception causing the 'partial failure'
+ when(dao2.update(any(), any())).thenThrow(new IllegalStateException());
+
+ multiIndexDao.update(document1, Optional.of("bro"));
+ }
+
+ @Test
+ public void shouldBatchUpdateAll() throws IOException {
+ Map<Document, Optional<String>> updates = new HashMap<Document, Optional<String>>() {{
+ put(document1, Optional.of("bro"));
+ put(document2, Optional.of("bro"));
+ }};
+
+ Map<Document, Optional<String>> actual = multiIndexDao.batchUpdate(updates);
+ Assert.assertEquals(updates, actual);
+
+ // both 'backing' daos should have received the updates
+ verify(dao1).batchUpdate(eq(updates));
+ verify(dao2).batchUpdate(eq(updates));
+ }
+
+ @Test(expected = IOException.class)
+ public void shouldThrowExceptionWithPartialFailureOnBatchUpdate() throws IOException {
+ // dao2 will throw an exception causing the 'partial failure'
+ when(dao2.batchUpdate(any())).thenThrow(new IllegalStateException());
+
+ Map<Document, Optional<String>> updates = new HashMap<Document, Optional<String>>() {{
+ put(document1, Optional.of("bro"));
+ put(document2, Optional.of("bro"));
+ }};
+
+ multiIndexDao.batchUpdate(updates);
+ }
+
+ @Test
+ public void getLatestShouldReturnLatestAlert() throws Exception {
CommentAddRemoveRequest request = new CommentAddRemoveRequest();
request.setGuid("guid");
when(dao1.getLatest("guid", "bro")).thenReturn(document1);
when(dao2.getLatest("guid", "bro")).thenReturn(document2);
-
Document expected = new Document(new HashMap<>(), "guid", "bro", 2L);
Assert.assertEquals(expected, multiIndexDao.getLatest("guid", "bro"));
}
@@ -65,32 +125,125 @@ public class MultiIndexDaoTest {
@Test
public void addCommentShouldAddCommentToAlert() throws Exception {
Document latest = mock(Document.class);
- Document document1 = new Document(new HashMap<>(), "guid", "bro", 1L);
- Document document2 = new Document(new HashMap<>(), "guid", "bro", 2L);
CommentAddRemoveRequest request = new CommentAddRemoveRequest();
request.setGuid("guid");
when(dao1.addCommentToAlert(request, latest)).thenReturn(document1);
when(dao2.addCommentToAlert(request, latest)).thenReturn(document2);
-
Document expected = new Document(new HashMap<>(), "guid", "bro", 2L);
Assert.assertEquals(expected, multiIndexDao.addCommentToAlert(request, latest));
}
+ @Test(expected = IOException.class)
+ public void shouldThrowExceptionWithPartialFailureOnAddComment() throws Exception {
+ Document latest = mock(Document.class);
+ CommentAddRemoveRequest request = new CommentAddRemoveRequest();
+ request.setGuid("guid");
+
+ // dao2 will throw an exception
+ when(dao1.addCommentToAlert(request, latest)).thenReturn(document1);
+ when(dao2.addCommentToAlert(request, latest)).thenThrow(new IllegalStateException());
+
+ multiIndexDao.addCommentToAlert(request, latest);
+ }
+
@Test
public void removeCommentShouldRemoveCommentFromAlert() throws Exception {
Document latest = mock(Document.class);
- Document document1 = new Document(new HashMap<>(), "guid", "bro", 1L);
- Document document2 = new Document(new HashMap<>(), "guid", "bro", 2L);
CommentAddRemoveRequest request = new CommentAddRemoveRequest();
request.setGuid("guid");
when(dao1.removeCommentFromAlert(request, latest)).thenReturn(document1);
when(dao2.removeCommentFromAlert(request, latest)).thenReturn(document2);
-
Document expected = new Document(new HashMap<>(), "guid", "bro", 2L);
Assert.assertEquals(expected, multiIndexDao.removeCommentFromAlert(request, latest));
}
+
+ @Test(expected = IOException.class)
+ public void shouldThrowExceptionWithPartialFailureOnRemoveComment() throws Exception {
+ Document latest = mock(Document.class);
+ CommentAddRemoveRequest request = new CommentAddRemoveRequest();
+ request.setGuid("guid");
+
+ // dao2 will throw an exception
+ when(dao1.removeCommentFromAlert(request, latest)).thenReturn(document1);
+ when(dao2.removeCommentFromAlert(request, latest)).thenThrow(new IllegalStateException());
+
+ multiIndexDao.removeCommentFromAlert(request, latest);
+ }
+
+ @Test
+ public void shouldGetColumnMetadata() throws Exception {
+ List<String> indices = Arrays.asList("bro");
+
+ Map<String, FieldType> expected = new HashMap<String, FieldType>() {{
+ put("bro", FieldType.TEXT);
+ }};
+
+ when(dao1.getColumnMetadata(eq(indices))).thenReturn(null);
+ when(dao2.getColumnMetadata(eq(indices))).thenReturn(expected);
+
+ Map<String, FieldType> actual = multiIndexDao.getColumnMetadata(indices);
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void shouldGetColumnMetadataWithNulls() throws Exception {
+ List<String> indices = Arrays.asList("bro");
+
+ // both 'backing' DAOs respond with null
+ when(dao1.getColumnMetadata(eq(indices))).thenReturn(null);
+ when(dao2.getColumnMetadata(eq(indices))).thenReturn(null);
+
+ Map<String, FieldType> actual = multiIndexDao.getColumnMetadata(indices);
+ Assert.assertNull(actual);
+ }
+
+ @Test
+ public void shouldSearch() throws Exception {
+ SearchRequest request = new SearchRequest();
+ SearchResponse expected = new SearchResponse();
+
+ when(dao1.search(eq(request))).thenReturn(null);
+ when(dao2.search(eq(request))).thenReturn(expected);
+
+ SearchResponse actual = multiIndexDao.search(request);
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void shouldSearchWithNulls() throws Exception {
+ SearchRequest request = new SearchRequest();
+
+ when(dao1.search(eq(request))).thenReturn(null);
+ when(dao2.search(eq(request))).thenReturn(null);
+
+ SearchResponse actual = multiIndexDao.search(request);
+ Assert.assertNull(actual);
+ }
+
+ @Test
+ public void shouldGroup() throws Exception {
+ GroupRequest request = new GroupRequest();
+ GroupResponse expected = new GroupResponse();
+
+ when(dao1.group(eq(request))).thenReturn(null);
+ when(dao2.group(eq(request))).thenReturn(expected);
+
+ GroupResponse actual = multiIndexDao.group(request);
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void shouldGroupWithNulls() throws Exception {
+ GroupRequest request = new GroupRequest();
+
+ when(dao1.group(eq(request))).thenReturn(null);
+ when(dao2.group(eq(request))).thenReturn(null);
+
+ GroupResponse actual = multiIndexDao.group(request);
+ Assert.assertNull(actual);
+ }
}
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java
index b15895a..6333d32 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java
+++ b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java
@@ -65,7 +65,7 @@ public abstract class UpdateIntegrationTest {
protected static final String SENSOR_NAME = "test";
private static final String CF = "p";
- private MultiIndexDao dao;
+ private IndexDao dao;
@Test
public void testUpdate() throws Exception {
@@ -73,16 +73,13 @@ public abstract class UpdateIntegrationTest {
final String guid = UUID.randomUUID().toString();
final Long timestamp = 1526306463050L;
Document toUpdate = createDocument(guid, timestamp);
- {
- // update the document and validate
- Document updated = getDao().update(toUpdate, Optional.of(SENSOR_NAME));
- Assert.assertEquals(toUpdate, updated);
- }
- {
- // ensure the document is updated in the index
- Document indexed = findUpdatedDoc(toUpdate.getDocument(), guid, SENSOR_NAME);
- Assert.assertEquals(toUpdate, indexed);
- }
+
+ // update the document and validate
+ Document updated = getDao().update(toUpdate, Optional.of(SENSOR_NAME));
+ Assert.assertEquals(toUpdate, updated);
+
+ // ensure the document was updated in the index
+ assertDocumentIndexed(toUpdate);
}
@Test
@@ -114,9 +111,9 @@ public abstract class UpdateIntegrationTest {
Assert.assertThat(updated.keySet(), hasItem(document3));
// ensure the documents were written to the index
- Assert.assertEquals(document1, findUpdatedDoc(document1.getDocument(), guid1, SENSOR_NAME));
- Assert.assertEquals(document2, findUpdatedDoc(document2.getDocument(), guid2, SENSOR_NAME));
- Assert.assertEquals(document3, findUpdatedDoc(document3.getDocument(), guid3, SENSOR_NAME));
+ assertDocumentIndexed(document1);
+ assertDocumentIndexed(document2);
+ assertDocumentIndexed(document3);
}
@Test
@@ -226,6 +223,33 @@ public abstract class UpdateIntegrationTest {
return request;
}
+ /**
+ * Ensures that a document was correctly indexed.
+ * @param expected The document that should have been indexed.
+ * @return The document that was retrieved from the index.
+ */
+ private Document assertDocumentIndexed(Document expected) throws Exception {
+ // search the index for the document
+ Document actual = findUpdatedDoc(expected.getDocument(), expected.getGuid(), expected.getSensorType());
+
+ // most fields should match exactly, except the documentID
+ Assert.assertEquals(expected.getGuid(), actual.getGuid());
+ Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
+ Assert.assertEquals(expected.getSensorType(), actual.getSensorType());
+ Assert.assertEquals(expected.getDocument(), actual.getDocument());
+
+ if(expected.getDocumentID().isPresent()) {
+ // the documentID was already defined in 'expected', this ID should have been used when the document was indexed
+ Assert.assertEquals(expected.getDocumentID().get(), actual.getDocumentID());
+
+ } else {
+ // if the documentID was not defined, the indexer should have created one
+ Assert.assertNotNull(expected.getDocumentID());
+ }
+
+ return actual;
+ }
+
private Document createAndIndexDocument(String guid) throws Exception {
// create the document
Long timestamp = 1526306463050L;
@@ -236,7 +260,7 @@ public abstract class UpdateIntegrationTest {
Assert.assertEquals(toCreate, created);
// ensure the document is indexed
- return findUpdatedDoc(toCreate.getDocument(), guid, SENSOR_NAME);
+ return assertDocumentIndexed(created);
}
protected Document createDocument(String guid, Long timestamp) {
@@ -280,7 +304,7 @@ public abstract class UpdateIntegrationTest {
return dao;
}
- protected void setDao(MultiIndexDao dao) {
+ protected void setDao(IndexDao dao) {
this.dao = dao;
}