You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2017/10/16 16:21:10 UTC

metron git commit: METRON-1226 Searching Can Errantly Query the Wrong Indices (nickwallen) closes apache/metron#793

Repository: metron
Updated Branches:
  refs/heads/master 4ee55702e -> 67cba8130


METRON-1226 Searching Can Errantly Query the Wrong Indices (nickwallen) closes apache/metron#793


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/67cba813
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/67cba813
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/67cba813

Branch: refs/heads/master
Commit: 67cba8130af3a804e1aa5ccf6ea64c9e9e1a07bf
Parents: 4ee5570
Author: nickwallen <ni...@nickallen.org>
Authored: Mon Oct 16 12:20:01 2017 -0400
Committer: nickallen <ni...@apache.org>
Committed: Mon Oct 16 12:20:01 2017 -0400

----------------------------------------------------------------------
 .../CURRENT/package/scripts/indexing_master.py  |  2 +-
 .../elasticsearch/dao/ElasticsearchDao.java     | 76 +++++++++++++-------
 .../elasticsearch/utils/ElasticsearchUtils.java | 49 ++++++++++++-
 .../writer/ElasticsearchWriter.java             | 47 ++++--------
 .../elasticsearch/dao/ElasticsearchDaoTest.java | 38 +++++++---
 .../ElasticsearchSearchIntegrationTest.java     |  2 +-
 .../matcher/SearchRequestMatcher.java           | 43 +++++++++--
 .../metron/indexing/dao/MetaAlertDao.java       |  2 +-
 .../indexing/dao/InMemoryMetaAlertDao.java      |  2 +-
 .../indexing/dao/SearchIntegrationTest.java     |  2 +-
 10 files changed, 180 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/67cba813/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
index 371cab0..8f156d6 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
@@ -145,7 +145,7 @@ class Indexing(Script):
             'curl -s -XPOST http://{es_http_url}/_template/error_index -d @{error_index_path}')
         Execute(error_cmd, logoutput=True)
         error_cmd = ambari_format(
-            'curl -s -XPOST http://{es_http_url}/metaalerts -d @{meta_index_path}')
+            'curl -s -XPOST http://{es_http_url}/metaalert_index -d @{meta_index_path}')
         Execute(error_cmd, logoutput=True)
 
     def elasticsearch_template_delete(self, env):

http://git-wip-us.apache.org/repos/asf/metron/blob/67cba813/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index aa56ed0..f2f1b38 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -19,19 +19,6 @@ package org.apache.metron.elasticsearch.dao;
 
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
@@ -76,6 +63,23 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
+
+
 public class ElasticsearchDao implements IndexDao {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -131,9 +135,9 @@ public class ElasticsearchDao implements IndexDao {
             .size(searchRequest.getSize())
             .from(searchRequest.getFrom())
             .query(queryBuilder)
-
             .trackScores(true);
-    searchRequest.getSort().forEach(sortField -> searchSourceBuilder.sort(sortField.getField(), getElasticsearchSortOrder(sortField.getSortOrder())));Optional<List<String>> fields = searchRequest.getFields();
+    searchRequest.getSort().forEach(sortField -> searchSourceBuilder.sort(sortField.getField(), getElasticsearchSortOrder(sortField.getSortOrder())));
+    Optional<List<String>> fields = searchRequest.getFields();
     if (fields.isPresent()) {
       searchSourceBuilder.fields(fields.get());
     } else {
@@ -143,7 +147,7 @@ public class ElasticsearchDao implements IndexDao {
     if (facetFields.isPresent()) {
       facetFields.get().forEach(field -> searchSourceBuilder.aggregation(new TermsBuilder(getFacentAggregationName(field)).field(field)));
     }
-    String[] wildcardIndices = searchRequest.getIndices().stream().map(index -> String.format("%s*", index)).toArray(value -> new String[searchRequest.getIndices().size()]);
+    String[] wildcardIndices = wildcardIndices(searchRequest.getIndices());
     org.elasticsearch.action.search.SearchResponse elasticsearchResponse;
     try {
       elasticsearchResponse = client.search(new org.elasticsearch.action.search.SearchRequest(wildcardIndices)
@@ -179,11 +183,13 @@ public class ElasticsearchDao implements IndexDao {
     final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
     searchSourceBuilder.query(new QueryStringQueryBuilder(groupRequest.getQuery()));
     searchSourceBuilder.aggregation(getGroupsTermBuilder(groupRequest, 0));
-    String[] wildcardIndices = groupRequest.getIndices().stream().map(index -> String.format("%s*", index)).toArray(value -> new String[groupRequest.getIndices().size()]);
-    org.elasticsearch.action.search.SearchResponse elasticsearchResponse;
+    String[] wildcardIndices = wildcardIndices(groupRequest.getIndices());
+    org.elasticsearch.action.search.SearchRequest request;
+    org.elasticsearch.action.search.SearchResponse response;
+
     try {
-      elasticsearchResponse = client.search(new org.elasticsearch.action.search.SearchRequest(wildcardIndices)
-          .source(searchSourceBuilder)).actionGet();
+      request = new org.elasticsearch.action.search.SearchRequest(wildcardIndices).source(searchSourceBuilder);
+      response = client.search(request).actionGet();
     } catch (SearchPhaseExecutionException e) {
       throw new InvalidSearchException("Could not execute search", e);
     }
@@ -195,10 +201,17 @@ public class ElasticsearchDao implements IndexDao {
     }
     GroupResponse groupResponse = new GroupResponse();
     groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField());
-    groupResponse.setGroupResults(getGroupResults(groupRequest, 0, elasticsearchResponse.getAggregations(), commonColumnMetadata));
+    groupResponse.setGroupResults(getGroupResults(groupRequest, 0, response.getAggregations(), commonColumnMetadata));
     return groupResponse;
   }
 
+  private String[] wildcardIndices(List<String> indices) {
+    return indices
+            .stream()
+            .map(index -> String.format("%s%s*", index, INDEX_NAME_DELIMITER))
+            .toArray(value -> new String[indices.size()]);
+  }
+
   @Override
   public synchronized void init(AccessConfig config) {
     if(this.client == null) {
@@ -301,11 +314,18 @@ public class ElasticsearchDao implements IndexDao {
   @Override
   public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) throws IOException {
     Map<String, Map<String, FieldType>> allColumnMetadata = new HashMap<>();
-    ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings =
-            client.admin().indices().getMappings(new GetMappingsRequest().indices(getLatestIndices(indices))).actionGet().getMappings();
-    for(Object index: mappings.keys().toArray()) {
+    String[] latestIndices = getLatestIndices(indices);
+    ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = client
+            .admin()
+            .indices()
+            .getMappings(new GetMappingsRequest().indices(latestIndices))
+            .actionGet()
+            .getMappings();
+    for(Object key: mappings.keys().toArray()) {
+      String indexName = key.toString();
+
       Map<String, FieldType> indexColumnMetadata = new HashMap<>();
-      ImmutableOpenMap<String, MappingMetaData> mapping = mappings.get(index.toString());
+      ImmutableOpenMap<String, MappingMetaData> mapping = mappings.get(indexName);
       Iterator<String> mappingIterator = mapping.keysIt();
       while(mappingIterator.hasNext()) {
         MappingMetaData mappingMetaData = mapping.get(mappingIterator.next());
@@ -314,7 +334,9 @@ public class ElasticsearchDao implements IndexDao {
           indexColumnMetadata.put(field, elasticsearchSearchTypeMap.getOrDefault(map.get(field).get("type"), FieldType.OTHER));
         }
       }
-      allColumnMetadata.put(index.toString().split("_index_")[0], indexColumnMetadata);
+
+      String baseIndexName = ElasticsearchUtils.getBaseIndexName(indexName);
+      allColumnMetadata.put(baseIndexName, indexColumnMetadata);
     }
     return allColumnMetadata;
   }
@@ -348,7 +370,7 @@ public class ElasticsearchDao implements IndexDao {
     String[] indices = client.admin().indices().prepareGetIndex().setFeatures().get().getIndices();
     for (String index : indices) {
       if (!ignoredIndices.contains(index)) {
-        int prefixEnd = index.indexOf("_index_");
+        int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER);
         if (prefixEnd != -1) {
           String prefix = index.substring(0, prefixEnd);
           if (includeIndices.contains(prefix)) {

http://git-wip-us.apache.org/repos/asf/metron/blob/67cba813/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
index c7c4d90..4c9933b 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
@@ -20,8 +20,8 @@ package org.apache.metron.elasticsearch.utils;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import org.apache.commons.lang.StringUtils;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.elasticsearch.writer.ElasticsearchWriter;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
@@ -29,13 +29,30 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
 
 public class ElasticsearchUtils {
 
   private static ThreadLocal<Map<String, SimpleDateFormat>> DATE_FORMAT_CACHE
           = ThreadLocal.withInitial(() -> new HashMap<>());
 
+  /**
+   * A delimiter that is appended to the user-defined index name to separate
+   * the index's date postfix.
+   *
+   * For example, if the user-defined index name is 'bro', the delimiter is
+   * '_index', and the index's date postfix is '2017.10.03.19', then the actual
+   * index name should be 'bro_index_2017.10.03.19'.
+   */
+  public static final String INDEX_NAME_DELIMITER = "_index";
+
   public static SimpleDateFormat getIndexFormat(WriterConfiguration configurations) {
     return getIndexFormat(configurations.getGlobalConfig());
   }
@@ -45,15 +62,41 @@ public class ElasticsearchUtils {
     return DATE_FORMAT_CACHE.get().computeIfAbsent(format, SimpleDateFormat::new);
   }
 
+  /**
+   * Builds the name of an Elasticsearch index.
+   * @param sensorType The sensor type; bro, yaf, snort, ...
+   * @param indexPostfix The index postfix; most often a formatted date.
+   * @param configurations User-defined configuration for the writers.
+   */
   public static String getIndexName(String sensorType, String indexPostfix, WriterConfiguration configurations) {
     String indexName = sensorType;
     if (configurations != null) {
       indexName = configurations.getIndex(sensorType);
     }
-    indexName = indexName + "_index_" + indexPostfix;
+    indexName = indexName + INDEX_NAME_DELIMITER + "_" + indexPostfix;
     return indexName;
   }
 
+  /**
+   * Extracts the base index name from a full index name.
+   *
+   * For example, given an index named 'bro_index_2017.01.01.01', the base
+   * index name is 'bro'.
+   *
+   * @param indexName The full index name including delimiter and date postfix.
+   * @return The base index name.
+   */
+  public static String getBaseIndexName(String indexName) {
+
+    String[] parts = indexName.split(INDEX_NAME_DELIMITER);
+    if(parts.length < 1 || StringUtils.isEmpty(parts[0])) {
+      String msg = format("Unexpected index name; index=%s, delimiter=%s", indexName, INDEX_NAME_DELIMITER);
+      throw new IllegalStateException(msg);
+    }
+
+    return parts[0];
+  }
+
   public static TransportClient getClient(Map<String, Object> globalConfiguration, Map<String, String> optionalSettings) {
     Settings.Builder settingsBuilder = Settings.settingsBuilder();
     settingsBuilder.put("cluster.name", globalConfiguration.get("es.clustername"));

http://git-wip-us.apache.org/repos/asf/metron/blob/67cba813/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index dd32532..bc9eccc 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -18,18 +18,6 @@
 package org.apache.metron.elasticsearch.writer;
 
 import org.apache.metron.common.Constants;
-import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import java.io.Serializable;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.interfaces.FieldNameConverter;
 import org.apache.metron.common.writer.BulkMessageWriter;
@@ -46,13 +34,19 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Serializable {
 
   private Map<String, String> optionalSettings;
   private transient TransportClient client;
   private SimpleDateFormat dateFormat;
-  private static final Logger LOG = LoggerFactory
-          .getLogger(ElasticsearchWriter.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriter.class);
   private FieldNameConverter fieldNameConverter = new ElasticsearchFieldNameConverter();
 
   public ElasticsearchWriter withOptionalSettings(Map<String, String> optionalSettings) {
@@ -64,34 +58,24 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
   public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) {
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
     client = ElasticsearchUtils.getClient(globalConfiguration, optionalSettings);
-    dateFormat = new SimpleDateFormat((String) globalConfiguration.get("es.date.format"));
+    dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
   }
 
+
   @Override
   public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
-    String indexPostfix = dateFormat.format(new Date());
+    final String indexPostfix = dateFormat.format(new Date());
     BulkRequestBuilder bulkRequest = client.prepareBulk();
 
     for(JSONObject message: messages) {
 
-      String indexName = sensorType;
-
-      if (configurations != null) {
-        indexName = configurations.getIndex(sensorType);
-      }
-
-      indexName = indexName + "_index_" + indexPostfix;
-
       JSONObject esDoc = new JSONObject();
       for(Object k : message.keySet()){
-
-        deDot(k.toString(),message,esDoc);
-
+        deDot(k.toString(), message, esDoc);
       }
 
-      IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName,
-              sensorType + "_doc");
-
+      String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
+      IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName, sensorType + "_doc");
       indexRequestBuilder = indexRequestBuilder.setSource(esDoc.toJSONString());
       String guid = (String)esDoc.get(Constants.GUID);
       if(guid != null) {
@@ -102,12 +86,11 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
       if(ts != null) {
         indexRequestBuilder = indexRequestBuilder.setTimestamp(ts.toString());
       }
-      bulkRequest.add(indexRequestBuilder);
 
+      bulkRequest.add(indexRequestBuilder);
     }
 
     BulkResponse bulkResponse = bulkRequest.execute().actionGet();
-
     return buildWriteReponse(tuples, bulkResponse);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/67cba813/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
index c28ffc7..7c33018 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
@@ -27,9 +27,7 @@ import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.mockito.Mock;
 
 import java.util.Arrays;
@@ -54,42 +52,53 @@ public class ElasticsearchDaoTest {
     AccessConfig config = mock(AccessConfig.class);
     when(config.getMaxSearchResults()).thenReturn(50);
     searchService = new ElasticsearchDao(client, config);
-
   }
 
   @Test
   public void searchShouldProperlyBuildSearchRequest() throws Exception {
+
+    // setup the mock client
     SearchHit searchHit1 = mock(SearchHit.class);
     when(searchHit1.getId()).thenReturn("id1");
     when(searchHit1.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value1"); }});
     when(searchHit1.getScore()).thenReturn(0.1f);
+
     SearchHit searchHit2 = mock(SearchHit.class);
     when(searchHit2.getId()).thenReturn("id2");
     when(searchHit2.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value2"); }});
     when(searchHit2.getScore()).thenReturn(0.2f);
+
     SearchHits searchHits = mock(SearchHits.class);
     when(searchHits.getHits()).thenReturn(new SearchHit[]{searchHit1, searchHit2});
     when(searchHits.getTotalHits()).thenReturn(2L);
+
     org.elasticsearch.action.search.SearchResponse elasticsearchResponse = mock(org.elasticsearch.action.search.SearchResponse.class);
     when(elasticsearchResponse.getHits()).thenReturn(searchHits);
+
     ActionFuture actionFuture = mock(ActionFuture.class);
     when(actionFuture.actionGet()).thenReturn(elasticsearchResponse);
     when(client.search(any())).thenReturn(actionFuture);
 
+    // "sort by" fields for the search request
+    SortField[] sortFields = {
+            sortBy("sortField1", SortOrder.DESC),
+            sortBy("sortField2", SortOrder.ASC)
+    };
+
+    // create a search request
     SearchRequest searchRequest = new SearchRequest();
     searchRequest.setSize(2);
     searchRequest.setIndices(Arrays.asList("bro", "snort"));
     searchRequest.setFrom(5);
-    SortField sortField1 = new SortField();
-    sortField1.setField("sortField1");
-    sortField1.setSortOrder(SortOrder.DESC.toString());
-    SortField sortField2 = new SortField();
-    sortField2.setField("sortField2");
-    sortField2.setSortOrder(SortOrder.ASC.toString());
-    searchRequest.setSort(Arrays.asList(sortField1, sortField2));
+    searchRequest.setSort(Arrays.asList(sortFields));
     searchRequest.setQuery("some query");
+
+    // submit the search request
     SearchResponse searchResponse = searchService.search(searchRequest);
-    verify(client, times(1)).search(argThat(new SearchRequestMatcher(new String[]{"bro*", "snort*"}, "some query", 2, 5, new SortField[]{sortField1, sortField2})));
+
+    // validate
+    String[] expectedIndices = {"bro_index*", "snort_index*"};
+    verify(client).search(argThat(new SearchRequestMatcher(expectedIndices, "some query", 2, 5, sortFields)));
     assertEquals(2, searchResponse.getTotal());
     List<SearchResult> actualSearchResults = searchResponse.getResults();
     assertEquals(2, actualSearchResults.size());
@@ -102,6 +111,13 @@ public class ElasticsearchDaoTest {
     verifyNoMoreInteractions(client);
   }
 
+  private SortField sortBy(String field, SortOrder order) {
+    SortField sortField = new SortField();
+    sortField.setField(field);
+    sortField.setSortOrder(order.toString());
+    return sortField;
+  }
+
   @Test
   public void searchShouldThrowExceptionWhenMaxResultsAreExceeded() throws Exception {
     SearchRequest searchRequest = new SearchRequest();

http://git-wip-us.apache.org/repos/asf/metron/blob/67cba813/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
index e21bb13..e7b609e 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
@@ -155,7 +155,7 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest {
     JSONArray metaAlertArray = (JSONArray) new JSONParser().parse(metaAlertData);
     for(Object o: metaAlertArray) {
       JSONObject jsonObject = (JSONObject) o;
-      IndexRequestBuilder indexRequestBuilder = es.getClient().prepareIndex("metaalerts", "metaalert_doc");
+      IndexRequestBuilder indexRequestBuilder = es.getClient().prepareIndex("metaalert_index", "metaalert_doc");
       indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString());
       bulkRequest.add(indexRequestBuilder);
     }

http://git-wip-us.apache.org/repos/asf/metron/blob/67cba813/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/matcher/SearchRequestMatcher.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/matcher/SearchRequestMatcher.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/matcher/SearchRequestMatcher.java
index 9d69471..417e48b 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/matcher/SearchRequestMatcher.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/matcher/SearchRequestMatcher.java
@@ -25,17 +25,24 @@ import org.elasticsearch.index.query.QueryStringQueryBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.sort.FieldSortBuilder;
 import org.elasticsearch.search.sort.SortOrder;
+import org.hamcrest.Description;
 import org.mockito.ArgumentMatcher;
 
 import java.util.Arrays;
 
 public class SearchRequestMatcher extends ArgumentMatcher<SearchRequest> {
 
-  private String[] expectedIndicies;
+  private String[] expectedIndices;
+  private String[] actualIndices;
+
   private BytesReference expectedSource;
+  private BytesReference actualSource;
+
+  private boolean indicesMatch;
+  private boolean sourcesMatch;
 
   public SearchRequestMatcher(String[] indices, String query, int size, int from, SortField[] sortFields) {
-    expectedIndicies = indices;
+    expectedIndices = indices;
     SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
             .size(size)
             .from(from)
@@ -53,8 +60,34 @@ public class SearchRequestMatcher extends ArgumentMatcher<SearchRequest> {
   @Override
   public boolean matches(Object o) {
     SearchRequest searchRequest = (SearchRequest) o;
-    boolean indiciesMatch = Arrays.equals(expectedIndicies, searchRequest.indices());
-    boolean sourcesMatch = searchRequest.source().equals(expectedSource);
-    return indiciesMatch && sourcesMatch;
+
+    actualIndices = searchRequest.indices();
+    actualSource = searchRequest.source();
+
+    indicesMatch = Arrays.equals(expectedIndices, actualIndices);
+    sourcesMatch = expectedSource.equals(actualSource);
+
+    return indicesMatch && sourcesMatch;
+  }
+
+  @Override
+  public void describeTo(Description description) {
+    if(!indicesMatch) {
+      description.appendText("Bad search request indices: ");
+      description.appendText(" expected=");
+      description.appendValue(expectedIndices);
+      description.appendText(", got=");
+      description.appendValue(actualIndices);
+      description.appendText("  ");
+    }
+
+    if(!sourcesMatch) {
+      description.appendText("Bad search request sources: ");
+      description.appendText(" expected=");
+      description.appendValue(expectedSource);
+      description.appendText(", got=");
+      description.appendValue(actualSource);
+      description.appendText("  ");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/67cba813/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java
index 4e0851b..05746c4 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java
@@ -27,7 +27,7 @@ import org.apache.metron.indexing.dao.search.SearchResponse;
 
 public interface MetaAlertDao extends IndexDao {
 
-  String METAALERTS_INDEX = "metaalerts";
+  String METAALERTS_INDEX = "metaalert";
   String METAALERT_TYPE = "metaalert";
   String METAALERT_DOC = METAALERT_TYPE + "_doc";
   String THREAT_FIELD_DEFAULT = "threat:triage:score";

http://git-wip-us.apache.org/repos/asf/metron/blob/67cba813/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
index 8807bbc..39c0001 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
@@ -52,7 +52,7 @@ public class InMemoryMetaAlertDao implements MetaAlertDao {
 
   /**
    * {
-   * "indices": ["metaalerts"],
+   * "indices": ["metaalert"],
    * "query": "alert|guid:${GUID}",
    * "from": 0,
    * "size": 10,

http://git-wip-us.apache.org/repos/asf/metron/blob/67cba813/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
index e2a37f1..2961d96 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
@@ -279,7 +279,7 @@ public abstract class SearchIntegrationTest {
   /**
    * {
    * "fields": ["guid"],
-   * "indices": ["metaalerts"],
+   * "indices": ["metaalert"],
    * "query": "*",
    * "from": 0,
    * "size": 10,