You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2018/07/11 01:32:28 UTC

[12/50] [abbrv] metron git commit: METRON-1421 Create a SolrMetaAlertDao (justinleet) closes apache/metron#970

METRON-1421 Create a SolrMetaAlertDao (justinleet) closes apache/metron#970


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/49f851e0
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/49f851e0
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/49f851e0

Branch: refs/heads/feature/METRON-1554-pcap-query-panel
Commit: 49f851e0b8c2ffa1cdd7c8f169bed3dfa07cf35c
Parents: eb33666
Author: justinleet <ju...@gmail.com>
Authored: Wed May 23 10:32:34 2018 -0400
Committer: leet <le...@apache.org>
Committed: Wed May 23 10:32:34 2018 -0400

----------------------------------------------------------------------
 metron-analytics/metron-profiler/.gitignore     |    1 +
 .../apache/metron/rest/config/IndexConfig.java  |   12 +-
 .../rest/service/impl/MetaAlertServiceImpl.java |    5 +-
 .../rest/service/impl/SearchServiceImpl.java    |    2 +-
 .../rest/service/impl/UpdateServiceImpl.java    |    2 +-
 .../MetaAlertControllerIntegrationTest.java     |    6 +-
 .../UpdateControllerIntegrationTest.java        |    4 +-
 .../elasticsearch/dao/ElasticsearchDao.java     |   65 +-
 .../dao/ElasticsearchMetaAlertDao.java          |  641 ++---------
 ...ElasticsearchMetaAlertRetrieveLatestDao.java |   44 +
 .../dao/ElasticsearchMetaAlertSearchDao.java    |  110 ++
 .../dao/ElasticsearchMetaAlertUpdateDao.java    |  219 ++++
 .../dao/ElasticsearchRetrieveLatestDao.java     |  151 +++
 .../dao/ElasticsearchSearchDao.java             |  102 --
 .../dao/ElasticsearchUpdateDao.java             |   10 +-
 .../elasticsearch/utils/ElasticsearchUtils.java |   64 ++
 .../elasticsearch/dao/ElasticsearchDaoTest.java |   70 +-
 .../dao/ElasticsearchMetaAlertDaoTest.java      |  164 +--
 .../ElasticsearchMetaAlertIntegrationTest.java  |  986 ++---------------
 .../ElasticsearchSearchIntegrationTest.java     |   64 +-
 .../ElasticsearchUpdateIntegrationTest.java     |   84 +-
 .../components/ElasticSearchComponent.java      |   26 +-
 metron-platform/metron-indexing/README.md       |    2 +-
 metron-platform/metron-indexing/pom.xml         |    8 +-
 .../metron/indexing/dao/AccessConfig.java       |   10 +
 .../apache/metron/indexing/dao/IndexDao.java    |  141 +--
 .../metron/indexing/dao/MetaAlertDao.java       |  154 ---
 .../metron/indexing/dao/RetrieveLatestDao.java  |   67 ++
 .../metaalert/DeferredMetaAlertIndexDao.java    |   42 +
 .../metaalert/MetaAlertAddRemoveRequest.java    |    1 -
 .../indexing/dao/metaalert/MetaAlertConfig.java |   74 ++
 .../dao/metaalert/MetaAlertConstants.java       |   30 +
 .../indexing/dao/metaalert/MetaAlertDao.java    |   77 ++
 .../metaalert/MetaAlertRetrieveLatestDao.java   |   25 +
 .../dao/metaalert/MetaAlertSearchDao.java       |   35 +
 .../dao/metaalert/MetaAlertUpdateDao.java       |  146 +++
 .../indexing/dao/metaalert/MetaScores.java      |   52 +-
 .../AbstractLuceneMetaAlertUpdateDao.java       |  334 ++++++
 .../metron/indexing/dao/search/SearchDao.java   |   22 +-
 .../indexing/dao/search/SearchResponse.java     |   10 +-
 .../metron/indexing/dao/update/PatchUtil.java   |   50 +
 .../metron/indexing/dao/update/UpdateDao.java   |   47 +
 .../metron/indexing/util/IndexingCacheUtil.java |   35 +
 .../indexing/dao/InMemoryMetaAlertDao.java      |   69 +-
 .../indexing/dao/SearchIntegrationTest.java     |   60 +-
 .../indexing/dao/UpdateIntegrationTest.java     |   87 +-
 .../dao/metaalert/MetaAlertIntegrationTest.java | 1012 ++++++++++++++++++
 .../indexing/dao/metaalert/MetaScoresTest.java  |   75 ++
 .../AbstractLuceneMetaAlertUpdateDaoTest.java   |  854 +++++++++++++++
 .../integration/IndexingIntegrationTest.java    |    4 +-
 metron-platform/metron-pcap-backend/.gitignore  |    1 +
 metron-platform/metron-solr/pom.xml             |    4 +-
 .../src/main/config/schema/bro/schema.xml       |    3 +
 .../src/main/config/schema/metaalert/schema.xml |   39 +-
 .../src/main/config/schema/snort/schema.xml     |    3 +
 .../src/main/config/schema/yaf/schema.xml       |    3 +
 .../org/apache/metron/solr/dao/SolrDao.java     |   37 +-
 .../metron/solr/dao/SolrMetaAlertDao.java       |  285 +++--
 .../dao/SolrMetaAlertRetrieveLatestDao.java     |   77 ++
 .../metron/solr/dao/SolrMetaAlertSearchDao.java |  211 ++++
 .../metron/solr/dao/SolrMetaAlertUpdateDao.java |  216 ++++
 .../metron/solr/dao/SolrRetrieveLatestDao.java  |   81 ++
 .../apache/metron/solr/dao/SolrSearchDao.java   |  127 +--
 .../apache/metron/solr/dao/SolrUpdateDao.java   |   51 +-
 .../apache/metron/solr/dao/SolrUtilities.java   |   92 ++
 .../org/apache/metron/solr/dao/SolrDaoTest.java |   61 +-
 .../metron/solr/dao/SolrMetaAlertDaoTest.java   |  137 +++
 .../metron/solr/dao/SolrSearchDaoTest.java      |  176 ++-
 .../metron/solr/dao/SolrUpdateDaoTest.java      |   19 +-
 .../metron/solr/dao/SolrUtilitiesTest.java      |   48 +
 .../SolrIndexingIntegrationTest.java            |    5 +-
 .../SolrMetaAlertIntegrationTest.java           |  397 +++++++
 .../integration/SolrSearchIntegrationTest.java  |   59 +-
 .../integration/SolrUpdateIntegrationTest.java  |   87 +-
 .../integration/components/SolrComponent.java   |   94 +-
 .../schema/SchemaValidationIntegrationTest.java |    9 +-
 .../metron/solr/writer/SolrWriterTest.java      |   23 +-
 .../resources/config/test/conf/managed-schema   |   84 +-
 78 files changed, 6051 insertions(+), 2733 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-analytics/metron-profiler/.gitignore
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/.gitignore b/metron-analytics/metron-profiler/.gitignore
new file mode 100644
index 0000000..df1a13b
--- /dev/null
+++ b/metron-analytics/metron-profiler/.gitignore
@@ -0,0 +1 @@
+/logs
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
index 635d1de..c432c6c 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
@@ -20,12 +20,14 @@ package org.apache.metron.rest.config;
 import static org.apache.metron.rest.MetronRestConstants.INDEX_DAO_IMPL;
 
 import java.util.Optional;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.hbase.HTableProvider;
 import org.apache.metron.hbase.TableProvider;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.IndexDaoFactory;
-import org.apache.metron.indexing.dao.MetaAlertDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertDao;
+import org.apache.metron.indexing.util.IndexingCacheUtil;
 import org.apache.metron.rest.MetronRestConstants;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.GlobalConfigService;
@@ -34,10 +36,6 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.core.env.Environment;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
 @Configuration
 public class IndexConfig {
 
@@ -45,6 +43,9 @@ public class IndexConfig {
   private GlobalConfigService globalConfigService;
 
   @Autowired
+  private ConfigurationsCache cache;
+
+  @Autowired
   private Environment environment;
 
   @Autowired
@@ -72,6 +73,7 @@ public class IndexConfig {
           throw new IllegalStateException("Unable to retrieve the global config.", e);
         }
       });
+      config.setIndexSupplier(IndexingCacheUtil.getIndexLookupFunction(cache));
       config.setTableProvider(TableProvider.create(hbaseProviderImpl, () -> new HTableProvider()));
       config.setKerberosEnabled(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false));
       if (indexDaoImpl == null) {

http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
index aafab24..3f9b3e4 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
@@ -19,16 +19,14 @@
 package org.apache.metron.rest.service.impl;
 
 import java.io.IOException;
-import java.util.Collection;
 import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.MetaAlertDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertDao;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
 import org.apache.metron.indexing.dao.search.InvalidCreateException;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
-import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.MetaAlertService;
@@ -48,7 +46,6 @@ public class MetaAlertServiceImpl implements MetaAlertService {
     this.environment = environment;
   }
 
-
   @Override
   public MetaAlertCreateResponse create(MetaAlertCreateRequest createRequest) throws RestException {
     try {

http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
index 21d158f..82b9c11 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
@@ -18,7 +18,7 @@
 package org.apache.metron.rest.service.impl;
 
 import static org.apache.metron.common.Constants.ERROR_TYPE;
-import static org.apache.metron.indexing.dao.MetaAlertDao.METAALERT_TYPE;
+import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_TYPE;
 import static org.apache.metron.rest.MetronRestConstants.INDEX_WRITER_NAME;
 import static org.apache.metron.rest.MetronRestConstants.SEARCH_FACET_FIELDS_SPRING_PROPERTY;
 

http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/UpdateServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/UpdateServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/UpdateServiceImpl.java
index 76ac75d..6a42248 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/UpdateServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/UpdateServiceImpl.java
@@ -44,7 +44,7 @@ public class UpdateServiceImpl implements UpdateService {
   @Override
   public void patch(PatchRequest request) throws RestException, OriginalNotFoundException {
     try {
-      dao.patch(request, Optional.of(System.currentTimeMillis()));
+      dao.patch(dao, request, Optional.of(System.currentTimeMillis()));
     } catch (Exception e) {
 
       throw new RestException(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
index 3e69e37..9200fd1 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
@@ -30,13 +30,10 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.InMemoryMetaAlertDao;
-import org.apache.metron.indexing.dao.MetaAlertDao;
 import org.apache.metron.indexing.dao.SearchIntegrationTest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
@@ -75,6 +72,7 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest {
   private String metaalertUrl = "/api/v1/metaalert";
   private String user = "user";
   private String password = "password";
+  private String metaAlertIndex = "metaalert_index";
 
   /**
    {
@@ -111,7 +109,7 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest {
     ImmutableMap<String, String> testData = ImmutableMap.of(
         "bro_index_2017.01.01.01", SearchIntegrationTest.broData,
         "snort_index_2017.01.01.01", SearchIntegrationTest.snortData,
-        MetaAlertDao.METAALERTS_INDEX, metaAlertData
+        metaAlertIndex, metaAlertData
     );
     loadTestData(testData);
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
index e8d00d3..e437325 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.metron.hbase.mock.MockHTable;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.indexing.dao.HBaseDao;
-import org.apache.metron.indexing.dao.MetaAlertDao;
 import org.apache.metron.indexing.dao.SearchIntegrationTest;
 import org.apache.metron.rest.service.UpdateService;
 import org.junit.Assert;
@@ -72,6 +71,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest {
   private String searchUrl = "/api/v1/search";
   private String user = "user";
   private String password = "password";
+  private String metaAlertIndex = "metaalert_index";
 
   /**
    {
@@ -121,7 +121,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest {
     ImmutableMap<String, String> testData = ImmutableMap.of(
         "bro_index_2017.01.01.01", SearchIntegrationTest.broData,
         "snort_index_2017.01.01.01", SearchIntegrationTest.snortData,
-        MetaAlertDao.METAALERTS_INDEX, MetaAlertControllerIntegrationTest.metaAlertData
+        metaAlertIndex, MetaAlertControllerIntegrationTest.metaAlertData
     );
     loadTestData(testData);
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index a09086a..246de6a 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -24,8 +24,8 @@ import java.util.Map;
 import java.util.Optional;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
-import org.apache.metron.indexing.dao.ColumnMetadataDao;
 import org.apache.metron.indexing.dao.IndexDao;
+import org.apache.metron.indexing.dao.RetrieveLatestDao;
 import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.GroupRequest;
@@ -34,6 +34,9 @@ import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.update.Document;
+import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
+import org.apache.metron.indexing.dao.update.PatchRequest;
+import org.apache.metron.indexing.dao.update.ReplaceRequest;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.slf4j.Logger;
@@ -46,6 +49,7 @@ public class ElasticsearchDao implements IndexDao {
   private transient TransportClient client;
   private ElasticsearchSearchDao searchDao;
   private ElasticsearchUpdateDao updateDao;
+  private ElasticsearchRetrieveLatestDao retrieveLatestDao;
 
   /**
    * Retrieves column metadata about search indices.
@@ -63,12 +67,14 @@ public class ElasticsearchDao implements IndexDao {
       AccessConfig config,
       ElasticsearchSearchDao searchDao,
       ElasticsearchUpdateDao updateDao,
+      ElasticsearchRetrieveLatestDao retrieveLatestDao,
       ElasticsearchColumnMetadataDao columnMetadataDao,
       ElasticsearchRequestSubmitter requestSubmitter
-                             ) {
+  ) {
     this.client = client;
     this.searchDao = searchDao;
     this.updateDao = updateDao;
+    this.retrieveLatestDao = retrieveLatestDao;
     this.columnMetadataDao = columnMetadataDao;
     this.requestSubmitter = requestSubmitter;
     this.accessConfig = config;
@@ -78,32 +84,25 @@ public class ElasticsearchDao implements IndexDao {
     //uninitialized.
   }
 
-  public ElasticsearchDao columnMetadataDao(ElasticsearchColumnMetadataDao columnMetadataDao) {
-    this.columnMetadataDao = columnMetadataDao;
-    return this;
-  }
-
-  public ElasticsearchDao accessConfig(AccessConfig accessConfig) {
-    this.accessConfig = accessConfig;
-    return this;
-  }
-
   @Override
   public synchronized void init(AccessConfig config) {
-    if(this.client == null) {
-      this.client = ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get());
+    if (this.client == null) {
+      this.client = ElasticsearchUtils
+          .getClient(config.getGlobalConfigSupplier().get());
       this.accessConfig = config;
       this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client.admin());
       this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client);
-      this.searchDao = new ElasticsearchSearchDao(client, accessConfig, columnMetadataDao, requestSubmitter);
-      this.updateDao = new ElasticsearchUpdateDao(client, accessConfig, searchDao);
+      this.searchDao = new ElasticsearchSearchDao(client, accessConfig, columnMetadataDao,
+          requestSubmitter);
+      this.retrieveLatestDao = new ElasticsearchRetrieveLatestDao(client);
+      this.updateDao = new ElasticsearchUpdateDao(client, accessConfig, retrieveLatestDao);
     }
 
-    if(columnMetadataDao == null) {
+    if (columnMetadataDao == null) {
       throw new IllegalArgumentException("No ColumnMetadataDao available");
     }
 
-    if(requestSubmitter == null) {
+    if (requestSubmitter == null) {
       throw new IllegalArgumentException("No ElasticsearchRequestSubmitter available");
     }
   }
@@ -119,14 +118,14 @@ public class ElasticsearchDao implements IndexDao {
   }
 
   @Override
-  public Document getLatest(final String guid, final String sensorType) throws IOException {
-    return searchDao.getLatest(guid, sensorType);
+  public Document getLatest(final String guid, final String sensorType) {
+    return retrieveLatestDao.getLatest(guid, sensorType);
   }
 
   @Override
   public Iterable<Document> getAllLatest(
-      final List<GetRequest> getRequests) throws IOException {
-    return searchDao.getAllLatest(getRequests);
+      final List<GetRequest> getRequests) {
+    return retrieveLatestDao.getAllLatest(getRequests);
   }
 
   @Override
@@ -140,19 +139,37 @@ public class ElasticsearchDao implements IndexDao {
   }
 
   @Override
+  public void patch(RetrieveLatestDao retrieveLatestDao, PatchRequest request, Optional<Long> timestamp)
+      throws OriginalNotFoundException, IOException {
+    updateDao.patch(retrieveLatestDao, request, timestamp);
+  }
+
+  @Override
+  public void replace(ReplaceRequest request, Optional<Long> timestamp) throws IOException {
+    updateDao.replace(request, timestamp);
+  }
+
+  @Override
   public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException {
     return this.columnMetadataDao.getColumnMetadata(indices);
   }
 
+  @Override
+  public Optional<Map<String, Object>> getLatestResult(GetRequest request) throws IOException {
+    return retrieveLatestDao.getLatestResult(request);
+  }
+
   protected Optional<String> getIndexName(String guid, String sensorType) {
     return updateDao.getIndexName(guid, sensorType);
   }
 
-  protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder) throws InvalidSearchException {
+  protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder)
+      throws InvalidSearchException {
     return searchDao.search(request, queryBuilder);
   }
 
-  protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder) throws InvalidSearchException {
+  protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder)
+      throws InvalidSearchException {
     return searchDao.group(groupRequest, queryBuilder);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
index 2311a2b..faec939 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
@@ -18,29 +18,21 @@
 
 package org.apache.metron.elasticsearch.dao;
 
-import static org.apache.metron.common.Constants.GUID;
-import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
-import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery;
-import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
-import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
-import static org.elasticsearch.index.query.QueryBuilders.termQuery;
-
-import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
-import org.apache.commons.collections4.SetUtils;
-import org.apache.lucene.search.join.ScoreMode;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import org.apache.metron.common.Constants;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.MetaAlertDao;
 import org.apache.metron.indexing.dao.MultiIndexDao;
+import org.apache.metron.indexing.dao.RetrieveLatestDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertDao;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
-import org.apache.metron.indexing.dao.metaalert.MetaScores;
 import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.GroupRequest;
@@ -49,59 +41,36 @@ import org.apache.metron.indexing.dao.search.InvalidCreateException;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
-import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.get.MultiGetItemResponse;
-import org.elasticsearch.action.get.MultiGetRequest.Item;
-import org.elasticsearch.action.get.MultiGetRequestBuilder;
-import org.elasticsearch.action.get.MultiGetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.action.update.UpdateResponse;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.index.query.InnerHitBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.QueryStringQueryBuilder;
-import org.elasticsearch.search.SearchHit;
 import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
 import org.apache.metron.indexing.dao.update.PatchRequest;
-import org.apache.metron.stellar.common.utils.ConversionUtils;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.QueryStringQueryBuilder;
 
 public class ElasticsearchMetaAlertDao implements MetaAlertDao {
 
-  public static final String SOURCE_TYPE = Constants.SENSOR_TYPE.replace('.', ':');
-  private static final String STATUS_PATH = "/status";
-  private static final String ALERT_PATH = "/alert";
+  public static final String THREAT_TRIAGE_FIELD = MetaAlertConstants.THREAT_FIELD_DEFAULT
+      .replace('.', ':');
+  public static final String METAALERTS_INDEX = "metaalert_index";
+
+  public static final String SOURCE_TYPE_FIELD = Constants.SENSOR_TYPE.replace('.', ':');
+  protected String metaAlertsIndex = METAALERTS_INDEX;
+  protected String threatTriageField = THREAT_TRIAGE_FIELD;
+  protected String threatSort = MetaAlertConstants.THREAT_SORT_DEFAULT;
 
-  private IndexDao indexDao;
   private ElasticsearchDao elasticsearchDao;
-  private String index = METAALERTS_INDEX;
-  private String threatTriageField = THREAT_FIELD_DEFAULT;
+  private IndexDao indexDao;
+  private ElasticsearchMetaAlertSearchDao metaAlertSearchDao;
+  private ElasticsearchMetaAlertUpdateDao metaAlertUpdateDao;
+  private ElasticsearchMetaAlertRetrieveLatestDao metaAlertRetrieveLatestDao;
 
-  /**
-   * Defines which summary aggregation is used to represent the overall threat triage score for
-   * the metaalert. The summary aggregation is applied to the threat triage score of all child alerts.
-   *
-   * This overall score is primarily used for sorting; hence it is called the 'threatSort'.  This
-   * can be either max, min, average, count, median, or sum.
-   */
-  private String threatSort = THREAT_SORT_DEFAULT;
-  private int pageSize = 500;
+  protected int pageSize = 500;
 
   /**
    * Wraps an {@link org.apache.metron.indexing.dao.IndexDao} to handle meta alerts.
    * @param indexDao The Dao to wrap
    */
   public ElasticsearchMetaAlertDao(IndexDao indexDao) {
-    this(indexDao, METAALERTS_INDEX, THREAT_FIELD_DEFAULT, THREAT_SORT_DEFAULT);
+    this(indexDao, METAALERTS_INDEX, MetaAlertConstants.THREAT_FIELD_DEFAULT,
+        MetaAlertConstants.THREAT_SORT_DEFAULT);
   }
 
   /**
@@ -112,10 +81,13 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
    *                   as the overall threat triage score for the metaalert. This
    *                   can be either max, min, average, count, median, or sum.
    */
-  public ElasticsearchMetaAlertDao(IndexDao indexDao, String index, String triageLevelField, String threatSort) {
+  public ElasticsearchMetaAlertDao(IndexDao indexDao, String metaAlertsIndex,
+      String triageLevelField,
+      String threatSort) {
     init(indexDao, Optional.of(threatSort));
-    this.index = index;
     this.threatTriageField = triageLevelField;
+    this.threatSort = threatSort;
+    this.metaAlertsIndex = metaAlertsIndex;
   }
 
   public ElasticsearchMetaAlertDao() {
@@ -123,8 +95,10 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
   }
 
   /**
-   * Initializes this implementation by setting the supplied IndexDao and also setting a separate ElasticsearchDao.
-   * This is needed for some specific Elasticsearch functions (looking up an index from a GUID for example).
+   * Initializes this implementation by setting the supplied IndexDao and also setting a separate
+   *     ElasticsearchDao.
+   * This is needed for some specific Elasticsearch functions (looking up an index from a GUID for
+   *     example).
    * @param indexDao The DAO to wrap for our queries
    * @param threatSort The summary aggregation of the child threat triage scores used
    *                   as the overall threat triage score for the metaalert. This
@@ -152,6 +126,24 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
     if (threatSort.isPresent()) {
       this.threatSort = threatSort.get();
     }
+
+    MetaAlertConfig config = new MetaAlertConfig(
+        metaAlertsIndex,
+        threatTriageField,
+        this.threatSort,
+        ElasticsearchMetaAlertDao.SOURCE_TYPE_FIELD
+    );
+
+    this.metaAlertSearchDao = new ElasticsearchMetaAlertSearchDao(
+        elasticsearchDao,
+        config,
+        pageSize);
+    this.metaAlertRetrieveLatestDao = new ElasticsearchMetaAlertRetrieveLatestDao(indexDao);
+    this.metaAlertUpdateDao = new ElasticsearchMetaAlertUpdateDao(
+        elasticsearchDao,
+        metaAlertRetrieveLatestDao,
+        config,
+        pageSize);
   }
 
   @Override
@@ -160,551 +152,74 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
   }
 
   @Override
-  public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException {
-    if (guid == null || guid.trim().isEmpty()) {
-      throw new InvalidSearchException("Guid cannot be empty");
-    }
-    // Searches for all alerts containing the meta alert guid in it's "metalerts" array
-    QueryBuilder qb = boolQuery()
-        .must(
-            nestedQuery(
-                ALERT_FIELD,
-                boolQuery()
-                    .must(termQuery(ALERT_FIELD + "." + GUID, guid)),
-                    ScoreMode.None
-            ).innerHit(new InnerHitBuilder())
-        )
-        .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
-    return queryAllResults(qb);
+  public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException {
+    return indexDao.getColumnMetadata(indices);
   }
 
   @Override
-  @SuppressWarnings("unchecked")
-  public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request)
-      throws InvalidCreateException, IOException {
-    List<GetRequest> alertRequests = request.getAlerts();
-    if (request.getAlerts().isEmpty()) {
-      throw new InvalidCreateException("MetaAlertCreateRequest must contain alerts");
-    }
-    if (request.getGroups().isEmpty()) {
-      throw new InvalidCreateException("MetaAlertCreateRequest must contain UI groups");
-    }
-
-    // Retrieve the documents going into the meta alert and build it
-    Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
-
-    Document metaAlert = buildCreateDocument(alerts, request.getGroups());
-    calculateMetaScores(metaAlert);
-    // Add source type to be consistent with other sources and allow filtering
-    metaAlert.getDocument().put(SOURCE_TYPE, MetaAlertDao.METAALERT_TYPE);
-
-    // Start a list of updates / inserts we need to run
-    Map<Document, Optional<String>> updates = new HashMap<>();
-    updates.put(metaAlert, Optional.of(MetaAlertDao.METAALERTS_INDEX));
-
-    try {
-      // We need to update the associated alerts with the new meta alerts, making sure existing
-      // links are maintained.
-      Map<String, Optional<String>> guidToIndices = alertRequests.stream().collect(Collectors.toMap(
-          GetRequest::getGuid, GetRequest::getIndex));
-      Map<String, String> guidToSensorTypes = alertRequests.stream().collect(Collectors.toMap(
-          GetRequest::getGuid, GetRequest::getSensorType));
-      for (Document alert: alerts) {
-        if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
-          // Use the index in the request if it exists
-          Optional<String> index = guidToIndices.get(alert.getGuid());
-          if (!index.isPresent()) {
-            // Look up the index from Elasticsearch if one is not supplied in the request
-            index = elasticsearchDao.getIndexName(alert.getGuid(), guidToSensorTypes.get(alert.getGuid()));
-            if (!index.isPresent()) {
-              throw new IllegalArgumentException("Could not find index for " + alert.getGuid());
-            }
-          }
-          updates.put(alert, index);
-        }
-      }
-
-      // Kick off any updates.
-      indexDaoUpdate(updates);
-
-      MetaAlertCreateResponse createResponse = new MetaAlertCreateResponse();
-      createResponse.setCreated(true);
-      createResponse.setGuid(metaAlert.getGuid());
-      return createResponse;
-    } catch (IOException ioe) {
-      throw new InvalidCreateException("Unable to create meta alert", ioe);
-    }
+  public Document getLatest(String guid, String sensorType) throws IOException {
+    return indexDao.getLatest(guid, sensorType);
   }
 
   @Override
-  public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
-      throws IOException {
-    Map<Document, Optional<String>> updates = new HashMap<>();
-    Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
-    if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) {
-      Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
-      boolean metaAlertUpdated = addAlertsToMetaAlert(metaAlert, alerts);
-      if (metaAlertUpdated) {
-        calculateMetaScores(metaAlert);
-        updates.put(metaAlert, Optional.of(index));
-        for(Document alert: alerts) {
-          if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
-            updates.put(alert, Optional.empty());
-          }
-        }
-        indexDaoUpdate(updates);
-      }
-      return metaAlertUpdated;
-    } else {
-      throw new IllegalStateException("Adding alerts to an INACTIVE meta alert is not allowed");
-    }
+  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException {
+    return indexDao.getAllLatest(getRequests);
   }
 
-  protected boolean addAlertsToMetaAlert(Document metaAlert, Iterable<Document> alerts) {
-    boolean alertAdded = false;
-    List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD);
-    Set<String> currentAlertGuids = currentAlerts.stream().map(currentAlert ->
-        (String) currentAlert.get(GUID)).collect(Collectors.toSet());
-    for (Document alert: alerts) {
-      String alertGuid = alert.getGuid();
-      // Only add an alert if it isn't already in the meta alert
-      if (!currentAlertGuids.contains(alertGuid)) {
-        currentAlerts.add(alert.getDocument());
-        alertAdded = true;
-      }
-    }
-    return alertAdded;
+  @Override
+  public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException {
+    return metaAlertSearchDao.getAllMetaAlertsForAlert(guid);
   }
 
-  protected boolean addMetaAlertToAlert(String metaAlertGuid, Document alert) {
-    List<String> metaAlertField = new ArrayList<>();
-    List<String> alertField = (List<String>) alert.getDocument()
-        .get(MetaAlertDao.METAALERT_FIELD);
-    if (alertField != null) {
-      metaAlertField.addAll(alertField);
-    }
-    boolean metaAlertAdded = !metaAlertField.contains(metaAlertGuid);
-    if (metaAlertAdded) {
-      metaAlertField.add(metaAlertGuid);
-      alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
-    }
-    return metaAlertAdded;
+  @Override
+  public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request)
+      throws InvalidCreateException, IOException {
+    return metaAlertUpdateDao.createMetaAlert(request);
   }
 
   @Override
-  public boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
+  public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
       throws IOException {
-    Map<Document, Optional<String>> updates = new HashMap<>();
-    Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
-    if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) {
-      Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
-      Collection<String> alertGuids = alertRequests.stream().map(GetRequest::getGuid).collect(
-          Collectors.toList());
-      boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, alertGuids);
-      if (metaAlertUpdated) {
-        calculateMetaScores(metaAlert);
-        updates.put(metaAlert, Optional.of(index));
-        for(Document alert: alerts) {
-          if (removeMetaAlertFromAlert(metaAlert.getGuid(), alert)) {
-            updates.put(alert, Optional.empty());
-          }
-        }
-        indexDaoUpdate(updates);
-      }
-      return metaAlertUpdated;
-    } else {
-      throw new IllegalStateException("Removing alerts from an INACTIVE meta alert is not allowed");
-    }
-
+    return metaAlertUpdateDao.addAlertsToMetaAlert(metaAlertGuid, alertRequests);
   }
 
-  protected boolean removeAlertsFromMetaAlert(Document metaAlert, Collection<String> alertGuids) {
-    List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD);
-    int previousSize = currentAlerts.size();
-    // Only remove an alert if it is in the meta alert
-    currentAlerts.removeIf(currentAlert -> alertGuids.contains((String) currentAlert.get(GUID)));
-    return currentAlerts.size() != previousSize;
-  }
-
-  protected boolean removeMetaAlertFromAlert(String metaAlertGuid, Document alert) {
-    List<String> metaAlertField = new ArrayList<>();
-    List<String> alertField = (List<String>) alert.getDocument()
-        .get(MetaAlertDao.METAALERT_FIELD);
-    if (alertField != null) {
-      metaAlertField.addAll(alertField);
-    }
-    boolean metaAlertRemoved = metaAlertField.remove(metaAlertGuid);
-    if (metaAlertRemoved) {
-      alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
-    }
-    return metaAlertRemoved;
+  @Override
+  public boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
+      throws IOException {
+    return metaAlertUpdateDao.removeAlertsFromMetaAlert(metaAlertGuid, alertRequests);
   }
 
   @Override
   public boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status)
       throws IOException {
-    Map<Document, Optional<String>> updates = new HashMap<>();
-    Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
-    String currentStatus = (String) metaAlert.getDocument().get(MetaAlertDao.STATUS_FIELD);
-    boolean metaAlertUpdated = !status.getStatusString().equals(currentStatus);
-    if (metaAlertUpdated) {
-      metaAlert.getDocument().put(MetaAlertDao.STATUS_FIELD, status.getStatusString());
-      updates.put(metaAlert, Optional.of(index));
-      List<GetRequest> getRequests = new ArrayList<>();
-      List<Map<String, Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument()
-          .get(MetaAlertDao.ALERT_FIELD);
-      currentAlerts.stream().forEach(currentAlert -> {
-        getRequests.add(new GetRequest((String) currentAlert.get(GUID), (String) currentAlert.get(SOURCE_TYPE)));
-      });
-      Iterable<Document> alerts = indexDao.getAllLatest(getRequests);
-      for (Document alert : alerts) {
-        boolean metaAlertAdded = false;
-        boolean metaAlertRemoved = false;
-        // If we're making it active add add the meta alert guid for every alert.
-        if (MetaAlertStatus.ACTIVE.equals(status)) {
-          metaAlertAdded = addMetaAlertToAlert(metaAlert.getGuid(), alert);
-        }
-        // If we're making it inactive, remove the meta alert guid from every alert.
-        if (MetaAlertStatus.INACTIVE.equals(status)) {
-          metaAlertRemoved = removeMetaAlertFromAlert(metaAlert.getGuid(), alert);
-        }
-        if (metaAlertAdded || metaAlertRemoved) {
-          updates.put(alert, Optional.empty());
-        }
-      }
-    }
-    if (metaAlertUpdated) {
-      indexDaoUpdate(updates);
-    }
-    return metaAlertUpdated;
+    return metaAlertUpdateDao.updateMetaAlertStatus(metaAlertGuid, status);
   }
 
   @Override
   public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
-    // Wrap the query to also get any meta-alerts.
-    QueryBuilder qb = constantScoreQuery(boolQuery()
-        .must(boolQuery()
-            .should(new QueryStringQueryBuilder(searchRequest.getQuery()))
-            .should(nestedQuery(
-                ALERT_FIELD,
-                new QueryStringQueryBuilder(searchRequest.getQuery()),
-                ScoreMode.None
-                )
-            )
-        )
-        // Ensures that it's a meta alert with active status or that it's an alert (signified by
-        // having no status field)
-        .must(boolQuery()
-            .should(termQuery(MetaAlertDao.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()))
-            .should(boolQuery().mustNot(existsQuery(MetaAlertDao.STATUS_FIELD)))
-        )
-        .mustNot(existsQuery(MetaAlertDao.METAALERT_FIELD))
-    );
-    return elasticsearchDao.search(searchRequest, qb);
+    return metaAlertSearchDao.search(searchRequest);
   }
 
   @Override
-  public Document getLatest(String guid, String sensorType) throws IOException {
-    return indexDao.getLatest(guid, sensorType);
-  }
-
-  @Override
-  public Iterable<Document> getAllLatest(
-      List<GetRequest> getRequests) throws IOException {
-    return indexDao.getAllLatest(getRequests);
+  public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException {
+    return metaAlertSearchDao.group(groupRequest);
   }
 
   @Override
   public void update(Document update, Optional<String> index) throws IOException {
-    if (METAALERT_TYPE.equals(update.getSensorType())) {
-      // We've been passed an update to the meta alert.
-      throw new UnsupportedOperationException("Meta alerts cannot be directly updated");
-    } else {
-      Map<Document, Optional<String>> updates = new HashMap<>();
-      updates.put(update, index);
-      // We need to update an alert itself.  Only that portion of the update can be delegated.
-      // We still need to get meta alerts potentially associated with it and update.
-      Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults().stream()
-          .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(), METAALERT_TYPE, 0L))
-          .collect(Collectors.toList());
-      // Each meta alert needs to be updated with the new alert
-      for (Document metaAlert : metaAlerts) {
-        replaceAlertInMetaAlert(metaAlert, update);
-        updates.put(metaAlert, Optional.of(METAALERTS_INDEX));
-      }
-
-      // Run the alert's update
-      indexDao.batchUpdate(updates);
-    }
-  }
-
-  protected boolean replaceAlertInMetaAlert(Document metaAlert, Document alert) {
-    boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, Collections.singleton(alert.getGuid()));
-    if (metaAlertUpdated) {
-      addAlertsToMetaAlert(metaAlert, Collections.singleton(alert));
-    }
-    return metaAlertUpdated;
+    metaAlertUpdateDao.update(update, index);
   }
 
   @Override
-  public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException {
-    throw new UnsupportedOperationException("Meta alerts do not allow for bulk updates");
+  public void batchUpdate(Map<Document, Optional<String>> updates) {
+    metaAlertUpdateDao.batchUpdate(updates);
   }
 
-  /**
-   * Does not allow patches on the "alerts" or "status" fields.  These fields must be updated with their
-   * dedicated methods.
-   *
-   * @param request The patch request
-   * @param timestamp Optionally a timestamp to set. If not specified then current time is used.
-   * @throws OriginalNotFoundException
-   * @throws IOException
-   */
   @Override
-  public void patch(PatchRequest request, Optional<Long> timestamp)
+  public void patch(RetrieveLatestDao retrieveLatestDao, PatchRequest request,
+      Optional<Long> timestamp)
       throws OriginalNotFoundException, IOException {
-    if (isPatchAllowed(request)) {
-      Document d = getPatchedDocument(request, timestamp);
-      indexDao.update(d, Optional.ofNullable(request.getIndex()));
-    } else {
-      throw new IllegalArgumentException("Meta alert patches are not allowed for /alert or /status paths.  "
-          + "Please use the add/remove alert or update status functions instead.");
-    }
-  }
-
-  protected boolean isPatchAllowed(PatchRequest request) {
-    if(request.getPatch() != null && !request.getPatch().isEmpty()) {
-      for(Map<String, Object> patch : request.getPatch()) {
-        Object pathObj = patch.get("path");
-        if(pathObj != null && pathObj instanceof String) {
-          String path = (String)pathObj;
-          if (STATUS_PATH.equals(path) || ALERT_PATH.equals(path)) {
-            return false;
-          }
-        }
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Given an alert GUID, retrieve all associated meta alerts.
-   * @param alertGuid The GUID of the child alert
-   * @return The Elasticsearch response containing the meta alerts
-   */
-  protected SearchResponse getMetaAlertsForAlert(String alertGuid) {
-    QueryBuilder qb = boolQuery()
-        .must(
-            nestedQuery(
-                ALERT_FIELD,
-                boolQuery()
-                    .must(termQuery(ALERT_FIELD + "." + Constants.GUID, alertGuid)),
-                ScoreMode.None
-            ).innerHit(new InnerHitBuilder())
-        )
-        .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
-    return queryAllResults(qb);
-  }
-
-  /**
-   * Elasticsearch queries default to 10 records returned.  Some internal queries require that all
-   * results are returned.  Rather than setting an arbitrarily high size, this method pages through results
-   * and returns them all in a single SearchResponse.
-   * @param qb
-   * @return
-   */
-  protected SearchResponse queryAllResults(QueryBuilder qb) {
-    SearchRequestBuilder searchRequestBuilder = elasticsearchDao
-        .getClient()
-        .prepareSearch(index)
-        .addStoredField("*")
-        .setFetchSource(true)
-        .setQuery(qb)
-        .setSize(pageSize);
-    org.elasticsearch.action.search.SearchResponse esResponse = searchRequestBuilder
-        .execute()
-        .actionGet();
-    List<SearchResult> allResults = getSearchResults(esResponse);
-    long total = esResponse.getHits().getTotalHits();
-    if (total > pageSize) {
-      int pages = (int) (total / pageSize) + 1;
-      for (int i = 1; i < pages; i++) {
-        int from = i * pageSize;
-        searchRequestBuilder.setFrom(from);
-        esResponse = searchRequestBuilder
-            .execute()
-            .actionGet();
-        allResults.addAll(getSearchResults(esResponse));
-      }
-    }
-    SearchResponse searchResponse = new SearchResponse();
-    searchResponse.setTotal(total);
-    searchResponse.setResults(allResults);
-    return searchResponse;
-  }
-
-  /**
-   * Transforms a list of Elasticsearch SearchHits to a list of SearchResults
-   * @param searchResponse
-   * @return
-   */
-  protected List<SearchResult> getSearchResults(org.elasticsearch.action.search.SearchResponse searchResponse) {
-    return Arrays.stream(searchResponse.getHits().getHits()).map(searchHit -> {
-          SearchResult searchResult = new SearchResult();
-          searchResult.setId(searchHit.getId());
-          searchResult.setSource(searchHit.getSource());
-          searchResult.setScore(searchHit.getScore());
-          searchResult.setIndex(searchHit.getIndex());
-          return searchResult;
-        }
-    ).collect(Collectors.toList());
-  }
-
-  /**
-   * Build the Document representing a meta alert to be created.
-   * @param alerts The Elasticsearch results for the meta alerts child documents
-   * @param groups The groups used to create this meta alert
-   * @return A Document representing the new meta alert
-   */
-  protected Document buildCreateDocument(Iterable<Document> alerts, List<String> groups) {
-    // Need to create a Document from the multiget. Scores will be calculated later
-    Map<String, Object> metaSource = new HashMap<>();
-    List<Map<String, Object>> alertList = new ArrayList<>();
-    for (Document alert: alerts) {
-      alertList.add(alert.getDocument());
-    }
-    metaSource.put(ALERT_FIELD, alertList);
-
-    // Add any meta fields
-    String guid = UUID.randomUUID().toString();
-    metaSource.put(GUID, guid);
-    metaSource.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis());
-    metaSource.put(GROUPS_FIELD, groups);
-    metaSource.put(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString());
-
-    return new Document(metaSource, guid, METAALERT_TYPE, System.currentTimeMillis());
-  }
-
-  /**
-   * Calls the single update variant if there's only one update, otherwise calls batch.
-   * @param updates The list of updates to run
-   * @throws IOException If there's an update error
-   */
-  protected void indexDaoUpdate(Map<Document, Optional<String>> updates) throws IOException {
-    if (updates.size() == 1) {
-      Entry<Document, Optional<String>> singleUpdate = updates.entrySet().iterator().next();
-      indexDao.update(singleUpdate.getKey(), singleUpdate.getValue());
-    } else if (updates.size() > 1) {
-      indexDao.batchUpdate(updates);
-    } // else we have no updates, so don't do anything
-  }
-
-
-
-  @SuppressWarnings("unchecked")
-  protected List<Map<String, Object>> getAllAlertsForMetaAlert(Document update) throws IOException {
-    Document latest = indexDao.getLatest(update.getGuid(), MetaAlertDao.METAALERT_TYPE);
-    if (latest == null) {
-      return new ArrayList<>();
-    }
-    List<String> guids = new ArrayList<>();
-    List<Map<String, Object>> latestAlerts = (List<Map<String, Object>>) latest.getDocument()
-        .get(MetaAlertDao.ALERT_FIELD);
-    for (Map<String, Object> alert : latestAlerts) {
-      guids.add((String) alert.get(Constants.GUID));
-    }
-
-    List<Map<String, Object>> alerts = new ArrayList<>();
-    QueryBuilder query = QueryBuilders.idsQuery().addIds(guids.toArray(new String[0]));
-    SearchRequestBuilder request = elasticsearchDao.getClient().prepareSearch()
-        .setQuery(query);
-    org.elasticsearch.action.search.SearchResponse response = request.get();
-    for (SearchHit hit : response.getHits().getHits()) {
-      alerts.add(hit.sourceAsMap());
-    }
-    return alerts;
-  }
-
-  /**
-   * Builds an update Document for updating the meta alerts list.
-   * @param alertGuid The GUID of the alert to update
-   * @param sensorType The sensor type to update
-   * @param metaAlertField The new metaAlertList to use
-   * @return The update Document
-   */
-  protected Document buildAlertUpdate(String alertGuid, String sensorType,
-      List<String> metaAlertField, Long timestamp) {
-    Document alertUpdate;
-    Map<String, Object> document = new HashMap<>();
-    document.put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
-    alertUpdate = new Document(
-        document,
-        alertGuid,
-        sensorType,
-        timestamp
-    );
-    return alertUpdate;
-  }
-
-
-  @Override
-  public Map<String, FieldType> getColumnMetadata(List<String> indices)
-      throws IOException {
-    return indexDao.getColumnMetadata(indices);
-  }
-
-  @Override
-  public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException {
-    // Wrap the query to hide any alerts already contained in meta alerts
-    QueryBuilder qb = QueryBuilders.boolQuery()
-        .must(new QueryStringQueryBuilder(groupRequest.getQuery()))
-        .mustNot(existsQuery(MetaAlertDao.METAALERT_FIELD));
-    return elasticsearchDao.group(groupRequest, qb);
-  }
-
-  /**
-   * Calculate the meta alert scores for a Document.
-   * @param metaAlert The Document containing scores
-   * @return Set of score statistics
-   */
-  @SuppressWarnings("unchecked")
-  protected void calculateMetaScores(Document metaAlert) {
-    MetaScores metaScores = new MetaScores(new ArrayList<>());
-    List<Object> alertsRaw = ((List<Object>) metaAlert.getDocument().get(ALERT_FIELD));
-    if (alertsRaw != null && !alertsRaw.isEmpty()) {
-      ArrayList<Double> scores = new ArrayList<>();
-      for (Object alertRaw : alertsRaw) {
-        Map<String, Object> alert = (Map<String, Object>) alertRaw;
-        Double scoreNum = parseThreatField(alert.get(threatTriageField));
-        if (scoreNum != null) {
-          scores.add(scoreNum);
-        }
-      }
-      metaScores = new MetaScores(scores);
-    }
-
-    // add a summary (max, min, avg, ...) of all the threat scores from the child alerts
-    metaAlert.getDocument().putAll(metaScores.getMetaScores());
-
-    // add the overall threat score for the metaalert; one of the summary aggregations as defined by `threatSort`
-    Object threatScore = metaScores.getMetaScores().get(threatSort);
-
-    // add the threat score as a float; type needs to match the threat score field from each of the sensor indices
-    metaAlert.getDocument().put(threatTriageField, ConversionUtils.convert(threatScore, Float.class));
-  }
-
-  private Double parseThreatField(Object threatRaw) {
-    Double threat = null;
-    if (threatRaw instanceof Number) {
-      threat = ((Number) threatRaw).doubleValue();
-    } else if (threatRaw instanceof String) {
-      threat = Double.parseDouble((String) threatRaw);
-    }
-    return threat;
-  }
-
-  public int getPageSize() {
-    return pageSize;
+    metaAlertUpdateDao.patch(retrieveLatestDao, request, timestamp);
   }
 
   public void setPageSize(int pageSize) {

http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertRetrieveLatestDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertRetrieveLatestDao.java
new file mode 100644
index 0000000..8aa55d6
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertRetrieveLatestDao.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.metron.indexing.dao.RetrieveLatestDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertRetrieveLatestDao;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.update.Document;
+
+public class ElasticsearchMetaAlertRetrieveLatestDao implements MetaAlertRetrieveLatestDao {
+  private RetrieveLatestDao retrieveLatestDao;
+
+  public ElasticsearchMetaAlertRetrieveLatestDao(RetrieveLatestDao retrieveLatestDao) {
+    this.retrieveLatestDao = retrieveLatestDao;
+  }
+
+  @Override
+  public Document getLatest(String guid, String sensorType) throws IOException {
+    return retrieveLatestDao.getLatest(guid, sensorType);
+  }
+
+  @Override
+  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException {
+    return retrieveLatestDao.getAllLatest(getRequests);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
new file mode 100644
index 0000000..00fc9d0
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import static org.apache.metron.common.Constants.GUID;
+import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.queryAllResults;
+import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
+import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery;
+import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
+import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
+import static org.elasticsearch.index.query.QueryBuilders.termQuery;
+
+import org.apache.lucene.search.join.ScoreMode;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertSearchDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.elasticsearch.index.query.InnerHitBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+
+public class ElasticsearchMetaAlertSearchDao implements MetaAlertSearchDao {
+
+  protected ElasticsearchDao elasticsearchDao;
+  private MetaAlertConfig config;
+  private int pageSize;
+
+  public ElasticsearchMetaAlertSearchDao(ElasticsearchDao elasticsearchDao,
+      MetaAlertConfig config, int pageSize) {
+    this.elasticsearchDao = elasticsearchDao;
+    this.config = config;
+    this.pageSize = pageSize;
+  }
+
+  @Override
+  public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
+    // Wrap the query to also get any meta-alerts.
+    QueryBuilder qb = constantScoreQuery(boolQuery()
+        .must(boolQuery()
+            .should(new QueryStringQueryBuilder(searchRequest.getQuery()))
+            .should(nestedQuery(
+                MetaAlertConstants.ALERT_FIELD,
+                new QueryStringQueryBuilder(searchRequest.getQuery()),
+                ScoreMode.None
+                )
+            )
+        )
+        // Ensures that it's a meta alert with active status or that it's an alert (signified by
+        // having no status field)
+        .must(boolQuery()
+            .should(termQuery(MetaAlertConstants.STATUS_FIELD,
+                MetaAlertStatus.ACTIVE.getStatusString()))
+            .should(boolQuery().mustNot(existsQuery(MetaAlertConstants.STATUS_FIELD)))
+        )
+        .mustNot(existsQuery(MetaAlertConstants.METAALERT_FIELD))
+    );
+    return elasticsearchDao.search(searchRequest, qb);
+  }
+
+  @Override
+  public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException {
+    // Wrap the query to hide any alerts already contained in meta alerts
+    QueryBuilder qb = QueryBuilders.boolQuery()
+        .must(new QueryStringQueryBuilder(groupRequest.getQuery()))
+        .mustNot(existsQuery(MetaAlertConstants.METAALERT_FIELD));
+    return elasticsearchDao.group(groupRequest, qb);
+  }
+
+  @Override
+  public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException {
+    if (guid == null || guid.trim().isEmpty()) {
+      throw new InvalidSearchException("Guid cannot be empty");
+    }
+    // Searches for all alerts containing the meta alert guid in it's "metalerts" array
+    QueryBuilder qb = boolQuery()
+        .must(
+            nestedQuery(
+                MetaAlertConstants.ALERT_FIELD,
+                boolQuery()
+                    .must(termQuery(MetaAlertConstants.ALERT_FIELD + "." + GUID, guid)),
+                ScoreMode.None
+            ).innerHit(new InnerHitBuilder())
+        )
+        .must(termQuery(MetaAlertConstants.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
+    return queryAllResults(elasticsearchDao.getClient(), qb, config.getMetaAlertIndex(),
+        pageSize);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
new file mode 100644
index 0000000..6c709a6
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
+import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
+import static org.elasticsearch.index.query.QueryBuilders.termQuery;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.lucene.search.join.ScoreMode;
+import org.apache.metron.common.Constants;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertRetrieveLatestDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
+import org.apache.metron.indexing.dao.metaalert.MetaScores;
+import org.apache.metron.indexing.dao.metaalert.lucene.AbstractLuceneMetaAlertUpdateDao;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.search.InvalidCreateException;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.index.query.InnerHitBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+
+public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpdateDao {
+
+  private ElasticsearchDao elasticsearchDao;
+  private MetaAlertRetrieveLatestDao retrieveLatestDao;
+  private int pageSize;
+
+  /**
+   * Constructor an ElasticsearchMetaAlertUpdateDao
+   * @param elasticsearchDao An UpdateDao to defer queries to.
+   * @param retrieveLatestDao A RetrieveLatestDao for getting the current state of items being
+   *     mutated.
+   * @param config The meta alert config to use.
+   */
+  public ElasticsearchMetaAlertUpdateDao(
+      ElasticsearchDao elasticsearchDao,
+      MetaAlertRetrieveLatestDao retrieveLatestDao,
+      MetaAlertConfig config,
+      int pageSize
+  ) {
+    super(elasticsearchDao, retrieveLatestDao, config);
+    this.elasticsearchDao = elasticsearchDao;
+    this.retrieveLatestDao = retrieveLatestDao;
+    this.pageSize = pageSize;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request)
+      throws InvalidCreateException, IOException {
+    List<GetRequest> alertRequests = request.getAlerts();
+    if (request.getAlerts().isEmpty()) {
+      throw new InvalidCreateException("MetaAlertCreateRequest must contain alerts");
+    }
+    if (request.getGroups().isEmpty()) {
+      throw new InvalidCreateException("MetaAlertCreateRequest must contain UI groups");
+    }
+
+    // Retrieve the documents going into the meta alert and build it
+    Iterable<Document> alerts = retrieveLatestDao.getAllLatest(alertRequests);
+
+    Document metaAlert = buildCreateDocument(alerts, request.getGroups(),
+        MetaAlertConstants.ALERT_FIELD);
+    MetaScores
+        .calculateMetaScores(metaAlert, getConfig().getThreatTriageField(),
+            getConfig().getThreatSort());
+    // Add source type to be consistent with other sources and allow filtering
+    metaAlert.getDocument()
+        .put(ElasticsearchMetaAlertDao.SOURCE_TYPE_FIELD, MetaAlertConstants.METAALERT_TYPE);
+
+    // Start a list of updates / inserts we need to run
+    Map<Document, Optional<String>> updates = new HashMap<>();
+    updates.put(metaAlert, Optional.of(getConfig().getMetaAlertIndex()));
+
+    try {
+      // We need to update the associated alerts with the new meta alerts, making sure existing
+      // links are maintained.
+      Map<String, Optional<String>> guidToIndices = alertRequests.stream().collect(Collectors.toMap(
+          GetRequest::getGuid, GetRequest::getIndex));
+      Map<String, String> guidToSensorTypes = alertRequests.stream().collect(Collectors.toMap(
+          GetRequest::getGuid, GetRequest::getSensorType));
+      for (Document alert : alerts) {
+        if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
+          // Use the index in the request if it exists
+          Optional<String> index = guidToIndices.get(alert.getGuid());
+          if (!index.isPresent()) {
+            // Look up the index from Elasticsearch if one is not supplied in the request
+            index = elasticsearchDao
+                .getIndexName(alert.getGuid(), guidToSensorTypes.get(alert.getGuid()));
+            if (!index.isPresent()) {
+              throw new IllegalArgumentException("Could not find index for " + alert.getGuid());
+            }
+          }
+          updates.put(alert, index);
+        }
+      }
+
+      // Kick off any updates.
+      update(updates);
+
+      MetaAlertCreateResponse createResponse = new MetaAlertCreateResponse();
+      createResponse.setCreated(true);
+      createResponse.setGuid(metaAlert.getGuid());
+      return createResponse;
+    } catch (IOException ioe) {
+      throw new InvalidCreateException("Unable to create meta alert", ioe);
+    }
+  }
+
+  /**
+   * Adds alerts to a metaalert, based on a list of GetRequests provided for retrieval.
+   * @param metaAlertGuid The GUID of the metaalert to be given new children.
+   * @param alertRequests GetRequests for the appropriate alerts to add.
+   * @return True if metaalert is modified, false otherwise.
+   */
+  public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
+      throws IOException {
+
+    Document metaAlert = retrieveLatestDao
+        .getLatest(metaAlertGuid, MetaAlertConstants.METAALERT_TYPE);
+    if (MetaAlertStatus.ACTIVE.getStatusString()
+        .equals(metaAlert.getDocument().get(MetaAlertConstants.STATUS_FIELD))) {
+      Iterable<Document> alerts = retrieveLatestDao.getAllLatest(alertRequests);
+      Map<Document, Optional<String>> updates = buildAddAlertToMetaAlertUpdates(metaAlert, alerts);
+      update(updates);
+      return updates.size() != 0;
+    } else {
+      throw new IllegalStateException("Adding alerts to an INACTIVE meta alert is not allowed");
+    }
+  }
+
+  @Override
+  public void update(Document update, Optional<String> index) throws IOException {
+    if (MetaAlertConstants.METAALERT_TYPE.equals(update.getSensorType())) {
+      // We've been passed an update to the meta alert.
+      throw new UnsupportedOperationException("Meta alerts cannot be directly updated");
+    } else {
+      Map<Document, Optional<String>> updates = new HashMap<>();
+      updates.put(update, index);
+      // We need to update an alert itself.  Only that portion of the update can be delegated.
+      // We still need to get meta alerts potentially associated with it and update.
+      Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults()
+          .stream()
+          .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(),
+              MetaAlertConstants.METAALERT_TYPE, 0L))
+          .collect(Collectors.toList());
+      // Each meta alert needs to be updated with the new alert
+      for (Document metaAlert : metaAlerts) {
+        if (replaceAlertInMetaAlert(metaAlert, update)) {
+          updates.put(metaAlert, Optional.of(getConfig().getMetaAlertIndex()));
+        }
+      }
+
+      // Run the alert's update
+      elasticsearchDao.batchUpdate(updates);
+    }
+  }
+
+  /**
+   * Given an alert GUID, retrieve all associated meta alerts.
+   * @param alertGuid The GUID of the child alert
+   * @return The Elasticsearch response containing the meta alerts
+   */
+  protected SearchResponse getMetaAlertsForAlert(String alertGuid) {
+    QueryBuilder qb = boolQuery()
+        .must(
+            nestedQuery(
+                MetaAlertConstants.ALERT_FIELD,
+                boolQuery()
+                    .must(termQuery(MetaAlertConstants.ALERT_FIELD + "." + Constants.GUID,
+                        alertGuid)),
+                ScoreMode.None
+            ).innerHit(new InnerHitBuilder())
+        )
+        .must(termQuery(MetaAlertConstants.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
+    return ElasticsearchUtils
+        .queryAllResults(elasticsearchDao.getClient(), qb, getConfig().getMetaAlertIndex(),
+            pageSize);
+  }
+
+
+  protected boolean replaceAlertInMetaAlert(Document metaAlert, Document alert) {
+    boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert,
+        Collections.singleton(alert.getGuid()));
+    if (metaAlertUpdated) {
+      addAlertsToMetaAlert(metaAlert, Collections.singleton(alert));
+    }
+    return metaAlertUpdated;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
new file mode 100644
index 0000000..f6bfeda
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.metron.indexing.dao.RetrieveLatestDao;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.index.query.IdsQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+
+public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
+
+  private TransportClient transportClient;
+
+  public ElasticsearchRetrieveLatestDao(TransportClient transportClient) {
+    this.transportClient = transportClient;
+  }
+
+  @Override
+  public Document getLatest(String guid, String sensorType) {
+    Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit));
+    return doc.orElse(null);
+  }
+
+  @Override
+  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) {
+    Collection<String> guids = new HashSet<>();
+    Collection<String> sensorTypes = new HashSet<>();
+    for (GetRequest getRequest : getRequests) {
+      guids.add(getRequest.getGuid());
+      sensorTypes.add(getRequest.getSensorType());
+    }
+    List<Document> documents = searchByGuids(
+        guids,
+        sensorTypes,
+        hit -> {
+          Long ts = 0L;
+          String doc = hit.getSourceAsString();
+          String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
+          try {
+            return Optional.of(new Document(doc, hit.getId(), sourceType, ts));
+          } catch (IOException e) {
+            throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
+          }
+        }
+
+    );
+    return documents;
+  }
+
+  <T> Optional<T> searchByGuid(String guid, String sensorType,
+      Function<SearchHit, Optional<T>> callback) {
+    Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null;
+    List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback);
+    if (results.size() > 0) {
+      return Optional.of(results.get(0));
+    } else {
+      return Optional.empty();
+    }
+  }
+
+  /**
+   * Return the search hit based on the UUID and sensor type.
+   * A callback can be specified to transform the hit into a type T.
+   * If more than one hit happens, the first one will be returned.
+   */
+  <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes,
+      Function<SearchHit, Optional<T>> callback) {
+    if (guids == null || guids.isEmpty()) {
+      return Collections.emptyList();
+    }
+    QueryBuilder query = null;
+    IdsQueryBuilder idsQuery;
+    if (sensorTypes != null) {
+      String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc")
+          .toArray(String[]::new);
+      idsQuery = QueryBuilders.idsQuery(types);
+    } else {
+      idsQuery = QueryBuilders.idsQuery();
+    }
+
+    for (String guid : guids) {
+      query = idsQuery.addIds(guid);
+    }
+
+    SearchRequestBuilder request = transportClient.prepareSearch()
+        .setQuery(query)
+        .setSize(guids.size());
+    org.elasticsearch.action.search.SearchResponse response = request.get();
+    SearchHits hits = response.getHits();
+    List<T> results = new ArrayList<>();
+    for (SearchHit hit : hits) {
+      Optional<T> result = callback.apply(hit);
+      if (result.isPresent()) {
+        results.add(result.get());
+      }
+    }
+    return results;
+  }
+
+  private Optional<Document> toDocument(final String guid, SearchHit hit) {
+    Long ts = 0L;
+    String doc = hit.getSourceAsString();
+    String sourceType = toSourceType(hit.getType());
+    try {
+      return Optional.of(new Document(doc, guid, sourceType, ts));
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Returns the source type based on a given doc type.
+   * @param docType The document type.
+   * @return The source type.
+   */
+  private String toSourceType(String docType) {
+    return Iterables.getFirst(Splitter.on("_doc").split(docType), null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
index 3971237..5725534 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
@@ -116,49 +116,6 @@ public class ElasticsearchSearchDao implements SearchDao {
     return group(groupRequest, new QueryStringQueryBuilder(groupRequest.getQuery()));
   }
 
-  @Override
-  public Document getLatest(String guid, String sensorType) throws IOException {
-    Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit));
-    return doc.orElse(null);
-  }
-
-  <T> Optional<T> searchByGuid(String guid, String sensorType,
-      Function<SearchHit, Optional<T>> callback) {
-    Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null;
-    List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback);
-    if (results.size() > 0) {
-      return Optional.of(results.get(0));
-    } else {
-      return Optional.empty();
-    }
-  }
-
-  @Override
-  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException {
-    Collection<String> guids = new HashSet<>();
-    Collection<String> sensorTypes = new HashSet<>();
-    for (GetRequest getRequest: getRequests) {
-      guids.add(getRequest.getGuid());
-      sensorTypes.add(getRequest.getSensorType());
-    }
-    List<Document> documents = searchByGuids(
-        guids
-        , sensorTypes
-        , hit -> {
-          Long ts = 0L;
-          String doc = hit.getSourceAsString();
-          String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
-          try {
-            return Optional.of(new Document(doc, hit.getId(), sourceType, ts));
-          } catch (IOException e) {
-            throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
-          }
-        }
-
-    );
-    return documents;
-  }
-
   /**
    * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query.
    * @param request The request defining the parameters of the search
@@ -505,63 +462,4 @@ public class ElasticsearchSearchDao implements SearchDao {
     }
     return searchResultGroups;
   }
-
-  /**
-   * Return the search hit based on the UUID and sensor type.
-   * A callback can be specified to transform the hit into a type T.
-   * If more than one hit happens, the first one will be returned.
-   */
-  <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes,
-      Function<SearchHit, Optional<T>> callback) {
-    if(guids == null || guids.isEmpty()) {
-      return Collections.EMPTY_LIST;
-    }
-    QueryBuilder query = null;
-    IdsQueryBuilder idsQuery = null;
-    if (sensorTypes != null) {
-      String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc").toArray(String[]::new);
-      idsQuery = QueryBuilders.idsQuery(types);
-    } else {
-      idsQuery = QueryBuilders.idsQuery();
-    }
-
-    for(String guid : guids) {
-      query = idsQuery.addIds(guid);
-    }
-
-    SearchRequestBuilder request = client.prepareSearch()
-        .setQuery(query)
-        .setSize(guids.size())
-        ;
-    org.elasticsearch.action.search.SearchResponse response = request.get();
-    SearchHits hits = response.getHits();
-    List<T> results = new ArrayList<>();
-    for (SearchHit hit : hits) {
-      Optional<T> result = callback.apply(hit);
-      if (result.isPresent()) {
-        results.add(result.get());
-      }
-    }
-    return results;
-  }
-
-  private Optional<Document> toDocument(final String guid, SearchHit hit) {
-    Long ts = 0L;
-    String doc = hit.getSourceAsString();
-    String sourceType = toSourceType(hit.getType());
-    try {
-      return Optional.of(new Document(doc, guid, sourceType, ts));
-    } catch (IOException e) {
-      throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
-    }
-  }
-
-  /**
-   * Returns the source type based on a given doc type.
-   * @param docType The document type.
-   * @return The source type.
-   */
-  private String toSourceType(String docType) {
-    return Iterables.getFirst(Splitter.on("_doc").split(docType), null);
-  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
index a7c3a71..c4d7412 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
@@ -42,14 +42,14 @@ public class ElasticsearchUpdateDao implements UpdateDao {
 
   private transient TransportClient client;
   private AccessConfig accessConfig;
-  private ElasticsearchSearchDao searchDao;
+  private ElasticsearchRetrieveLatestDao retrieveLatestDao;
 
   public ElasticsearchUpdateDao(TransportClient client,
       AccessConfig accessConfig,
-      ElasticsearchSearchDao searchDao) {
+      ElasticsearchRetrieveLatestDao searchDao) {
     this.client = client;
     this.accessConfig = accessConfig;
-    this.searchDao = searchDao;
+    this.retrieveLatestDao = searchDao;
   }
 
   @Override
@@ -110,7 +110,7 @@ public class ElasticsearchUpdateDao implements UpdateDao {
   }
 
   protected Optional<String> getIndexName(String guid, String sensorType) {
-    return searchDao.searchByGuid(guid,
+    return retrieveLatestDao.searchByGuid(guid,
         sensorType,
         hit -> Optional.ofNullable(hit.getIndex())
     );
@@ -121,7 +121,7 @@ public class ElasticsearchUpdateDao implements UpdateDao {
     Object ts = update.getTimestamp();
     IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid())
         .source(update.getDocument());
-    if(ts != null) {
+    if (ts != null) {
       indexRequest = indexRequest.timestamp(ts.toString());
     }