You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/07/03 13:49:12 UTC

[metron] branch master updated: METRON-2168 Elasticsearch Updates Not Tested in Integration Test (nickwallen) closes apache/metron#1451

This is an automated email from the ASF dual-hosted git repository.

nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new bf673a2  METRON-2168 Elasticsearch Updates Not Tested in Integration Test (nickwallen) closes apache/metron#1451
bf673a2 is described below

commit bf673a2e9c00a59716e70feea55d581af91145f1
Author: nickwallen <ni...@nickallen.org>
AuthorDate: Wed Jul 3 09:48:53 2019 -0400

    METRON-2168 Elasticsearch Updates Not Tested in Integration Test (nickwallen) closes apache/metron#1451
---
 .../dao/ElasticsearchRetrieveLatestDao.java        |   8 +
 .../ElasticsearchUpdateIntegrationTest.java        |  57 ++++++-
 .../apache/metron/indexing/dao/MultiIndexDao.java  |   3 +-
 .../metron/indexing/dao/MultiIndexDaoTest.java     | 173 +++++++++++++++++++--
 .../metron/indexing/dao/UpdateIntegrationTest.java |  56 +++++--
 5 files changed, 262 insertions(+), 35 deletions(-)

diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
index 95d27db..8044d62 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
@@ -43,6 +43,14 @@ import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
 import static org.elasticsearch.index.query.QueryBuilders.termsQuery;
 import static org.elasticsearch.index.query.QueryBuilders.typeQuery;
 
+/**
+ * A {@link RetrieveLatestDao} that retrieves documents from Elasticsearch.
+ *
+ * The index being searched must have a field "guid" of type "keyword",
+ * otherwise no documents can be found by "guid".
+ *
+ * See https://www.elastic.co/guide/en/elasticsearch/reference/5.6/query-dsl-term-query.html
+ */
 public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
 
   private ElasticsearchClient client;
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
index 6489206..16760d2 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
@@ -19,9 +19,15 @@ package org.apache.metron.elasticsearch.integration;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Iterables;
+import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
 import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
 import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
@@ -29,17 +35,21 @@ import org.apache.metron.hbase.mock.MockHTable;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.HBaseDao;
 import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.MultiIndexDao;
 import org.apache.metron.indexing.dao.UpdateIntegrationTest;
 import org.apache.metron.integration.UnableToStartException;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.Response;
+import org.hamcrest.CoreMatchers;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 
 import java.io.File;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -55,11 +65,27 @@ public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest {
   private static final String TABLE_NAME = "modifications";
   private static final String CF = "p";
   private static MockHTable table;
-  private static IndexDao hbaseDao;
   private static IndexDao elasticsearchDao;
   private static AccessConfig accessConfig;
   private static Map<String, Object> globalConfig;
 
+  /**
+   * {
+   *   "template": "test*",
+   *   "mappings": {
+   *     "test_doc": {
+   *       "properties": {
+   *         "guid": {
+   *           "type": "keyword"
+   *         }
+   *       }
+   *     }
+   *   }
+   * }
+   */
+  @Multiline
+  private static String indexTemplate;
+
   @Override
   protected String getIndexName() {
     return SENSOR_NAME + "_index_" + new SimpleDateFormat(dateFormat).format(new Date());
@@ -90,15 +116,16 @@ public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest {
             .withAccessConfig(accessConfig)
             .build();
     es.start();
+
+    installIndexTemplate();
   }
 
   @Before
   public void setup() {
-    hbaseDao = new HBaseDao();
-    elasticsearchDao = new ElasticsearchDao();
-    MultiIndexDao dao = new MultiIndexDao(hbaseDao, elasticsearchDao);
-    dao.init(accessConfig);
-    setDao(dao);
+    elasticsearchDao = new ElasticsearchDao()
+            .withRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
+    elasticsearchDao.init(accessConfig);
+    setDao(elasticsearchDao);
   }
 
   @After
@@ -137,4 +164,20 @@ public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest {
   protected MockHTable getMockHTable() {
     return table;
   }
+
+  /**
+   * Install the index template to ensure that "guid" is of type "keyword". The
+   * {@link org.apache.metron.elasticsearch.dao.ElasticsearchRetrieveLatestDao} cannot find
+   * documents if the type is not "keyword".
+   *
+   * See https://www.elastic.co/guide/en/elasticsearch/reference/5.6/query-dsl-term-query.html
+   */
+  private static void installIndexTemplate() throws IOException {
+    HttpEntity broEntity = new NStringEntity(indexTemplate, ContentType.APPLICATION_JSON);
+    ElasticsearchClient client = ElasticsearchClientFactory.create(globalConfig);
+    Response response = client
+            .getLowLevelClient()
+            .performRequest("PUT", "/_template/test_template", Collections.emptyMap(), broEntity);
+    Assert.assertThat(response.getStatusLine().getStatusCode(), CoreMatchers.equalTo(200));
+  }
 }
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java b/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
index 09f7df9..a240c56 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
+++ b/metron-platform/metron-indexing/metron-indexing-common/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
@@ -282,8 +282,7 @@ public class MultiIndexDao implements IndexDao {
   }
 
   @Override
-  public Iterable<Document> getAllLatest(
-      List<GetRequest> getRequests) throws IOException {
+  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException {
     Iterable<Document> ret = null;
     List<DocumentIterableContainer> output =
         indices.parallelStream().map(dao -> {
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/MultiIndexDaoTest.java b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/MultiIndexDaoTest.java
index dad6a52..0c0d1f1 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/MultiIndexDaoTest.java
+++ b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/MultiIndexDaoTest.java
@@ -18,6 +18,11 @@
 
 package org.apache.metron.indexing.dao;
 
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
 import org.apache.metron.indexing.dao.update.Document;
 import org.junit.Assert;
@@ -27,9 +32,16 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class MultiIndexDaoTest {
@@ -40,24 +52,72 @@ public class MultiIndexDaoTest {
   private IndexDao dao1;
   private IndexDao dao2;
 
+  private Document document1;
+  private Document document2;
+
   @Before
   public void setup() {
     dao1 = mock(IndexDao.class);
     dao2 = mock(IndexDao.class);
     multiIndexDao = new MultiIndexDao(dao1, dao2);
+
+    document1 = new Document(new HashMap<>(), "guid", "bro", 1L);
+    document2 = new Document(new HashMap<>(), "guid", "bro", 2L);
   }
 
   @Test
-  public void getLatestShouldReturnLatestAlert() throws Exception {
-    Document document1 = new Document(new HashMap<>(), "guid", "bro", 1L);
-    Document document2 = new Document(new HashMap<>(), "guid", "bro", 2L);
+  public void shouldUpdateAll() throws IOException {
+    Document actual = multiIndexDao.update(document1, Optional.of("bro"));
+    Assert.assertEquals(document1, actual);
+
+    // both 'backing' daos should have received the update
+    verify(dao1).update(eq(document1), eq(Optional.of("bro")));
+    verify(dao2).update(eq(document1), eq(Optional.of("bro")));
+  }
 
+  @Test(expected = IOException.class)
+  public void shouldThrowExceptionWithPartialFailureOnUpdate() throws IOException {
+    // dao2 will throw an exception causing the 'partial failure'
+    when(dao2.update(any(), any())).thenThrow(new IllegalStateException());
+
+    multiIndexDao.update(document1, Optional.of("bro"));
+  }
+
+  @Test
+  public void shouldBatchUpdateAll() throws IOException {
+    Map<Document, Optional<String>> updates = new HashMap<Document, Optional<String>>() {{
+      put(document1, Optional.of("bro"));
+      put(document2, Optional.of("bro"));
+    }};
+
+    Map<Document, Optional<String>> actual = multiIndexDao.batchUpdate(updates);
+    Assert.assertEquals(updates, actual);
+
+    // both 'backing' daos should have received the updates
+    verify(dao1).batchUpdate(eq(updates));
+    verify(dao2).batchUpdate(eq(updates));
+  }
+
+  @Test(expected = IOException.class)
+  public void shouldThrowExceptionWithPartialFailureOnBatchUpdate() throws IOException {
+    // dao2 will throw an exception causing the 'partial failure'
+    when(dao2.batchUpdate(any())).thenThrow(new IllegalStateException());
+
+    Map<Document, Optional<String>> updates = new HashMap<Document, Optional<String>>() {{
+      put(document1, Optional.of("bro"));
+      put(document2, Optional.of("bro"));
+    }};
+
+    multiIndexDao.batchUpdate(updates);
+  }
+
+  @Test
+  public void getLatestShouldReturnLatestAlert() throws Exception {
     CommentAddRemoveRequest request = new CommentAddRemoveRequest();
     request.setGuid("guid");
     when(dao1.getLatest("guid", "bro")).thenReturn(document1);
     when(dao2.getLatest("guid", "bro")).thenReturn(document2);
 
-
     Document expected = new Document(new HashMap<>(), "guid", "bro", 2L);
     Assert.assertEquals(expected, multiIndexDao.getLatest("guid", "bro"));
   }
@@ -65,32 +125,125 @@ public class MultiIndexDaoTest {
   @Test
   public void addCommentShouldAddCommentToAlert() throws Exception {
     Document latest = mock(Document.class);
-    Document document1 = new Document(new HashMap<>(), "guid", "bro", 1L);
-    Document document2 = new Document(new HashMap<>(), "guid", "bro", 2L);
 
     CommentAddRemoveRequest request = new CommentAddRemoveRequest();
     request.setGuid("guid");
     when(dao1.addCommentToAlert(request, latest)).thenReturn(document1);
     when(dao2.addCommentToAlert(request, latest)).thenReturn(document2);
 
-
     Document expected = new Document(new HashMap<>(), "guid", "bro", 2L);
     Assert.assertEquals(expected, multiIndexDao.addCommentToAlert(request, latest));
   }
 
+  @Test(expected = IOException.class)
+  public void shouldThrowExceptionWithPartialFailureOnAddComment() throws Exception {
+    Document latest = mock(Document.class);
+    CommentAddRemoveRequest request = new CommentAddRemoveRequest();
+    request.setGuid("guid");
+
+    // dao2 will throw an exception
+    when(dao1.addCommentToAlert(request, latest)).thenReturn(document1);
+    when(dao2.addCommentToAlert(request, latest)).thenThrow(new IllegalStateException());
+
+    multiIndexDao.addCommentToAlert(request, latest);
+  }
+
   @Test
   public void removeCommentShouldRemoveCommentFromAlert() throws Exception {
     Document latest = mock(Document.class);
-    Document document1 = new Document(new HashMap<>(), "guid", "bro", 1L);
-    Document document2 = new Document(new HashMap<>(), "guid", "bro", 2L);
 
     CommentAddRemoveRequest request = new CommentAddRemoveRequest();
     request.setGuid("guid");
     when(dao1.removeCommentFromAlert(request, latest)).thenReturn(document1);
     when(dao2.removeCommentFromAlert(request, latest)).thenReturn(document2);
 
-
     Document expected = new Document(new HashMap<>(), "guid", "bro", 2L);
     Assert.assertEquals(expected, multiIndexDao.removeCommentFromAlert(request, latest));
   }
+
+  @Test(expected = IOException.class)
+  public void shouldThrowExceptionWithPartialFailureOnRemoveComment() throws Exception {
+    Document latest = mock(Document.class);
+    CommentAddRemoveRequest request = new CommentAddRemoveRequest();
+    request.setGuid("guid");
+
+    // dao2 will throw an exception
+    when(dao1.removeCommentFromAlert(request, latest)).thenReturn(document1);
+    when(dao2.removeCommentFromAlert(request, latest)).thenThrow(new IllegalStateException());
+
+    multiIndexDao.removeCommentFromAlert(request, latest);
+  }
+
+  @Test
+  public void shouldGetColumnMetadata() throws Exception {
+    List<String> indices = Arrays.asList("bro");
+
+    Map<String, FieldType> expected = new HashMap<String, FieldType>() {{
+      put("bro", FieldType.TEXT);
+    }};
+
+    when(dao1.getColumnMetadata(eq(indices))).thenReturn(null);
+    when(dao2.getColumnMetadata(eq(indices))).thenReturn(expected);
+
+    Map<String, FieldType> actual = multiIndexDao.getColumnMetadata(indices);
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void shouldGetColumnMetadataWithNulls() throws Exception {
+    List<String> indices = Arrays.asList("bro");
+
+    // both 'backing' DAOs respond with null
+    when(dao1.getColumnMetadata(eq(indices))).thenReturn(null);
+    when(dao2.getColumnMetadata(eq(indices))).thenReturn(null);
+
+    Map<String, FieldType> actual = multiIndexDao.getColumnMetadata(indices);
+    Assert.assertNull(actual);
+  }
+
+  @Test
+  public void shouldSearch() throws Exception {
+    SearchRequest request = new SearchRequest();
+    SearchResponse expected = new SearchResponse();
+
+    when(dao1.search(eq(request))).thenReturn(null);
+    when(dao2.search(eq(request))).thenReturn(expected);
+
+    SearchResponse actual = multiIndexDao.search(request);
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void shouldSearchWithNulls() throws Exception {
+    SearchRequest request = new SearchRequest();
+
+    when(dao1.search(eq(request))).thenReturn(null);
+    when(dao2.search(eq(request))).thenReturn(null);
+
+    SearchResponse actual = multiIndexDao.search(request);
+    Assert.assertNull(actual);
+  }
+
+  @Test
+  public void shouldGroup() throws Exception {
+    GroupRequest request = new GroupRequest();
+    GroupResponse expected = new GroupResponse();
+
+    when(dao1.group(eq(request))).thenReturn(null);
+    when(dao2.group(eq(request))).thenReturn(expected);
+
+    GroupResponse actual = multiIndexDao.group(request);
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void shouldGroupWithNulls() throws Exception {
+    GroupRequest request = new GroupRequest();
+
+    when(dao1.group(eq(request))).thenReturn(null);
+    when(dao2.group(eq(request))).thenReturn(null);
+
+    GroupResponse actual = multiIndexDao.group(request);
+    Assert.assertNull(actual);
+  }
 }
diff --git a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java
index b15895a..6333d32 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java
+++ b/metron-platform/metron-indexing/metron-indexing-common/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java
@@ -65,7 +65,7 @@ public abstract class UpdateIntegrationTest {
   protected static final String SENSOR_NAME = "test";
   private static final String CF = "p";
 
-  private MultiIndexDao dao;
+  private IndexDao dao;
 
   @Test
   public void testUpdate() throws Exception {
@@ -73,16 +73,13 @@ public abstract class UpdateIntegrationTest {
     final String guid = UUID.randomUUID().toString();
     final Long timestamp = 1526306463050L;
     Document toUpdate = createDocument(guid, timestamp);
-    {
-      // update the document and validate
-      Document updated = getDao().update(toUpdate, Optional.of(SENSOR_NAME));
-      Assert.assertEquals(toUpdate, updated);
-    }
-    {
-      // ensure the document is updated in the index
-      Document indexed = findUpdatedDoc(toUpdate.getDocument(), guid, SENSOR_NAME);
-      Assert.assertEquals(toUpdate, indexed);
-    }
+
+    // update the document and validate
+    Document updated = getDao().update(toUpdate, Optional.of(SENSOR_NAME));
+    Assert.assertEquals(toUpdate, updated);
+
+    // ensure the document was updated in the index
+    assertDocumentIndexed(toUpdate);
   }
 
   @Test
@@ -114,9 +111,9 @@ public abstract class UpdateIntegrationTest {
     Assert.assertThat(updated.keySet(), hasItem(document3));
 
     // ensure the documents were written to the index
-    Assert.assertEquals(document1, findUpdatedDoc(document1.getDocument(), guid1, SENSOR_NAME));
-    Assert.assertEquals(document2, findUpdatedDoc(document2.getDocument(), guid2, SENSOR_NAME));
-    Assert.assertEquals(document3, findUpdatedDoc(document3.getDocument(), guid3, SENSOR_NAME));
+    assertDocumentIndexed(document1);
+    assertDocumentIndexed(document2);
+    assertDocumentIndexed(document3);
   }
 
   @Test
@@ -226,6 +223,33 @@ public abstract class UpdateIntegrationTest {
     return request;
   }
 
+  /**
+   * Ensures that a document was correctly indexed.
+   * @param expected The document that should have been indexed.
+   * @return The document that was retrieved from the index.
+   */
+  private Document assertDocumentIndexed(Document expected) throws Exception {
+    // search the index for the document
+    Document actual = findUpdatedDoc(expected.getDocument(), expected.getGuid(), expected.getSensorType());
+
+    // most fields should match exactly, except the documentID
+    Assert.assertEquals(expected.getGuid(), actual.getGuid());
+    Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
+    Assert.assertEquals(expected.getSensorType(), actual.getSensorType());
+    Assert.assertEquals(expected.getDocument(), actual.getDocument());
+
+    if(expected.getDocumentID().isPresent()) {
+      // the documentID was already defined in 'expected', this ID should have been used when the document was indexed
+      Assert.assertEquals(expected.getDocumentID().get(), actual.getDocumentID());
+
+    } else {
+      // if the documentID was not defined, the indexer should have created one
+      Assert.assertNotNull(expected.getDocumentID());
+    }
+
+    return actual;
+  }
+
   private Document createAndIndexDocument(String guid) throws Exception {
     // create the document
     Long timestamp = 1526306463050L;
@@ -236,7 +260,7 @@ public abstract class UpdateIntegrationTest {
     Assert.assertEquals(toCreate, created);
 
     // ensure the document is indexed
-    return findUpdatedDoc(toCreate.getDocument(), guid, SENSOR_NAME);
+    return assertDocumentIndexed(created);
   }
 
   protected Document createDocument(String guid, Long timestamp) {
@@ -280,7 +304,7 @@ public abstract class UpdateIntegrationTest {
     return dao;
   }
 
-  protected void setDao(MultiIndexDao dao) {
+  protected void setDao(IndexDao dao) {
     this.dao = dao;
   }