You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2017/11/16 01:35:46 UTC

[1/3] metron git commit: METRON-1289 Alert fields are lost when a MetaAlert is created (merrimanr) closes apache/metron#824

Repository: metron
Updated Branches:
  refs/heads/master c4c930f7c -> fd896fbeb


http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
index 1775018..3103ea7 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
@@ -18,13 +18,19 @@
 
 package org.apache.metron.indexing.dao;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Optional;
+
+import com.google.common.hash.Hasher;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -32,7 +38,9 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.utils.KeyUtil;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.GroupRequest;
 import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
@@ -46,7 +54,7 @@ import org.apache.metron.indexing.dao.update.Document;
  * * Get document
  *
  * The mechanism here is that updates to documents will be added to a HBase Table as a write-ahead log.
- * The Key for a row supporting a given document will be the GUID, which should be sufficiently distributed.
+ * The Key for a row supporting a given document will be the GUID plus the sensor type, which should be sufficiently distributed.
  * Every new update will have a column added (column qualifier will be the timestamp of the update).
  * Upon retrieval, the most recent column will be returned.
  *
@@ -57,6 +65,72 @@ public class HBaseDao implements IndexDao {
   private HTableInterface tableInterface;
   private byte[] cf;
   private AccessConfig config;
+
+  /**
+   * Implements the HBaseDao row key and exposes convenience methods for serializing/deserializing the row key.
+   * The row key is made of a GUID and sensor type along with a prefix to ensure data is distributed evenly.
+   */
+  public static class Key {
+    private String guid;
+    private String sensorType;
+    public Key(String guid, String sensorType) {
+      this.guid = guid;
+      this.sensorType = sensorType;
+    }
+
+    public String getGuid() {
+      return guid;
+    }
+
+    public String getSensorType() {
+      return sensorType;
+    }
+
+    public static Key fromBytes(byte[] buffer) throws IOException {
+      ByteArrayInputStream baos = new ByteArrayInputStream(buffer);
+      DataInputStream w = new DataInputStream(baos);
+      baos.skip(KeyUtil.HASH_PREFIX_SIZE);
+      return new Key(w.readUTF(), w.readUTF());
+    }
+
+    public byte[] toBytes() throws IOException {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      if(getGuid() == null || getSensorType() == null) {
+        throw new IllegalStateException("Guid and sensor type must not be null: guid = " + getGuid() + ", sensorType = " + getSensorType());
+      }
+      DataOutputStream w = new DataOutputStream(baos);
+      w.writeUTF(getGuid());
+      w.writeUTF(getSensorType());
+      w.flush();
+      byte[] key = baos.toByteArray();
+      byte[] prefix = KeyUtil.INSTANCE.getPrefix(key);
+      return KeyUtil.INSTANCE.merge(prefix, key);
+    }
+
+    public static byte[] toBytes(Key k) throws IOException {
+      return k.toBytes();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Key key = (Key) o;
+
+      if (getGuid() != null ? !getGuid().equals(key.getGuid()) : key.getGuid() != null) return false;
+      return getSensorType() != null ? getSensorType().equals(key.getSensorType()) : key.getSensorType() == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = getGuid() != null ? getGuid().hashCode() : 0;
+      result = 31 * result + (getSensorType() != null ? getSensorType().hashCode() : 0);
+      return result;
+    }
+  }
+
   public HBaseDao() {
 
   }
@@ -102,9 +176,32 @@ public class HBaseDao implements IndexDao {
 
   @Override
   public synchronized Document getLatest(String guid, String sensorType) throws IOException {
-    Get get = new Get(guid.getBytes());
+    Key k = new Key(guid, sensorType);
+    Get get = new Get(Key.toBytes(k));
     get.addFamily(cf);
     Result result = getTableInterface().get(get);
+    return getDocumentFromResult(result);
+  }
+
+  @Override
+  public Iterable<Document> getAllLatest(
+      List<GetRequest> getRequests) throws IOException {
+    List<Get> gets = new ArrayList<>();
+    for (GetRequest getRequest: getRequests) {
+      gets.add(buildGet(getRequest));
+    }
+    Result[] results = getTableInterface().get(gets);
+    List<Document> allLatest = new ArrayList<>();
+    for (Result result: results) {
+      Document d = getDocumentFromResult(result);
+      if (d != null) {
+        allLatest.add(d);
+      }
+    }
+    return allLatest;
+  }
+
+  private Document getDocumentFromResult(Result result) throws IOException {
     NavigableMap<byte[], byte[]> columns = result.getFamilyMap( cf);
     if(columns == null || columns.size() == 0) {
       return null;
@@ -112,8 +209,14 @@ public class HBaseDao implements IndexDao {
     Map.Entry<byte[], byte[]> entry= columns.lastEntry();
     Long ts = Bytes.toLong(entry.getKey());
     if(entry.getValue()!= null) {
-      String json = new String(entry.getValue());
-      return new Document(json, guid, sensorType, ts);
+      Map<String, Object> json = JSONUtils.INSTANCE.load(new String(entry.getValue()),
+          new TypeReference<Map<String, Object>>() {});
+      try {
+        Key k = Key.fromBytes(result.getRow());
+        return new Document(json, k.getGuid(), k.getSensorType(), ts);
+      } catch (IOException e) {
+        throw new RuntimeException("Unable to convert row key to a document", e);
+      }
     }
     else {
       return null;
@@ -126,8 +229,6 @@ public class HBaseDao implements IndexDao {
     getTableInterface().put(put);
   }
 
-
-
   @Override
   public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException {
     List<Put> puts = new ArrayList<>();
@@ -140,8 +241,16 @@ public class HBaseDao implements IndexDao {
     getTableInterface().put(puts);
   }
 
-  protected Put buildPut(Document update) throws JsonProcessingException {
-    Put put = new Put(update.getGuid().getBytes());
+  protected Get buildGet(GetRequest getRequest) throws IOException {
+    Key k = new Key(getRequest.getGuid(), getRequest.getSensorType());
+    Get get = new Get(Key.toBytes(k));
+    get.addFamily(cf);
+    return get;
+  }
+
+  protected Put buildPut(Document update) throws IOException {
+    Key k = new Key(update.getGuid(), update.getSensorType());
+    Put put = new Put(Key.toBytes(k));
     long ts = update.getTimestamp() == null ? System.currentTimeMillis() : update.getTimestamp();
     byte[] columnQualifier = Bytes.toBytes(ts);
     byte[] doc = JSONUtils.INSTANCE.toJSONPretty(update.getDocument());
@@ -149,6 +258,7 @@ public class HBaseDao implements IndexDao {
     return put;
   }
 
+
   @Override
   public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) throws IOException {
     return null;

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
index 4b7829e..8855a14 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
@@ -36,6 +36,10 @@ import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
 import org.apache.metron.indexing.dao.update.PatchRequest;
 import org.apache.metron.indexing.dao.update.ReplaceRequest;
 
+/**
+ * The IndexDao provides a common interface for retrieving and storing data in a variety of persistent stores.
+ * Document reads and writes require a GUID and sensor type with an index being optional.
+ */
 public interface IndexDao {
 
   /**
@@ -66,6 +70,15 @@ public interface IndexDao {
   Document getLatest(String guid, String sensorType) throws IOException;
 
   /**
+   * Return a list of the latest versions of documents given a list of GUIDs and sensor types.
+   *
+   * @param getRequests A list of get requests for documents
+   * @return A list of documents matching or an empty list in not available.
+   * @throws IOException
+   */
+  Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException;
+
+  /**
    * Return the latest version of a document given a GetRequest.
    * @param request The GetRequest which indicates the GUID and sensor type.
    * @return Optionally the document (dependent upon existence in the index).
@@ -82,7 +95,9 @@ public interface IndexDao {
   }
 
   /**
-   * Update given a Document and optionally the index where the document exists.
+   * Update a given Document and optionally the index where the document exists.  This is a full update,
+   * meaning the current document will be replaced if it exists or a new document will be created if it does
+   * not exist.  Partial updates are not supported in this method.
    *
    * @param update The document to replace from the index.
    * @param index The index where the document lives.
@@ -91,7 +106,7 @@ public interface IndexDao {
   void update(Document update, Optional<String> index) throws IOException;
 
   /**
-   * Update given a Document and optionally the index where the document exists.
+   * Similar to the update method but accepts multiple documents and performs updates in batch.
    *
    * @param updates A map of the documents to update to the index where they live.
    * @throws IOException
@@ -108,6 +123,13 @@ public interface IndexDao {
   default void patch( PatchRequest request
                     , Optional<Long> timestamp
                     ) throws OriginalNotFoundException, IOException {
+    Document d = getPatchedDocument(request, timestamp);
+    update(d, Optional.ofNullable(request.getIndex()));
+  }
+
+  default Document getPatchedDocument(PatchRequest request
+      , Optional<Long> timestamp
+      ) throws OriginalNotFoundException, IOException {
     Map<String, Object> latest = request.getSource();
     if(latest == null) {
       Document latestDoc = getLatest(request.getGuid(), request.getSensorType());
@@ -121,13 +143,11 @@ public interface IndexDao {
     JsonNode originalNode = JSONUtils.INSTANCE.convert(latest, JsonNode.class);
     JsonNode patched = JSONUtils.INSTANCE.applyPatch(request.getPatch(), originalNode);
     Map<String, Object> updated = JSONUtils.INSTANCE.getMapper()
-                                           .convertValue(patched, new TypeReference<Map<String, Object>>() {});
-    Document d = new Document( updated
-                             , request.getGuid()
-                             , request.getSensorType()
-                             , timestamp.orElse(System.currentTimeMillis())
-                             );
-    update(d, Optional.ofNullable(request.getIndex()));
+        .convertValue(patched, new TypeReference<Map<String, Object>>() {});
+    return new Document( updated
+        , request.getGuid()
+        , request.getSensorType()
+        , timestamp.orElse(System.currentTimeMillis()));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java
index de12f22..4530d2a 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java
@@ -18,13 +18,50 @@
 
 package org.apache.metron.indexing.dao;
 
+import java.util.List;
+import java.util.Optional;
 import java.io.IOException;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
+import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.InvalidCreateException;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 
+/**
+ * The MetaAlertDao exposes methods for interacting with meta alerts.  Meta alerts are objects that contain
+ * alerts and summary statistics based on the scores of these alerts.  Meta alerts are returned in searches
+ * just as alerts are and match based on the field values of child alerts.  If a child alert matches a search
+ * the meta alert will be returned while the original child alert will not.  A meta alert also contains a
+ * status field that controls it's inclusion in search results and a groups field that can be used to track
+ * the groups a meta alert was created from.
+ *
+ * The structure of a meta alert is as follows:
+ * {
+ *   "guid": "meta alert guid",
+ *   "timestamp": timestamp,
+ *   "source:type": "metaalert",
+ *   "alerts": [ array of child alerts ],
+ *   "status": "active or inactive",
+ *   "groups": [ array of group names ],
+ *   "average": 10,
+ *   "max": 10,
+ *   "threat:triage:score": 30,
+ *   "count": 3,
+ *   "sum": 30,
+ *   "min": 10,
+ *   "median": 10
+ * }
+ *
+ * A child alert that has been added to a meta alert will store the meta alert GUID in a "metaalerts" field.
+ * This field is an array of meta alert GUIDs, meaning a child alert can be contained in multiple meta alerts.
+ * Any update to a child alert will trigger an update to the meta alert so that the alert inside a meta alert
+ * and the original alert will be kept in sync.
+ *
+ * Other fields can be added to a meta alert through the patch method on the IndexDao interface.  However, attempts
+ * to directly change the "alerts" or "status" field will result in an exception.
+ */
 public interface MetaAlertDao extends IndexDao {
 
   String METAALERTS_INDEX = "metaalert_index";
@@ -46,21 +83,65 @@ public interface MetaAlertDao extends IndexDao {
   SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException;
 
   /**
-   * Create a meta alert.
-   * @param request The parameters for creating the new meta alert
-   * @return A response indicating success or failure
+   * Creates a meta alert from a list of child alerts.  The most recent version of each child alert is
+   * retrieved using the DAO abstractions.
+   *
+   * @param request A request object containing get requests for alerts to be added and a list of groups
+   * @return A response indicating success or failure along with the GUID of the new meta alert
    * @throws InvalidCreateException If a malformed create request is provided
    * @throws IOException If a problem occurs during communication
    */
   MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request)
       throws InvalidCreateException, IOException;
 
+
+  /**
+   * Adds a list of alerts to an existing meta alert.  This will add each alert object to the "alerts" array in the meta alert
+   * and also add the meta alert GUID to each child alert's "metaalerts" array.  After alerts have been added the
+   * meta alert scores are recalculated.  Any alerts already in the meta alert are skipped and no updates are
+   * performed if all of the alerts are already in the meta alert.  The most recent version of each child alert is
+   * retrieved using the DAO abstractions.  Alerts cannot be added to an 'inactive' meta alert.
+   *
+   * @param metaAlertGuid The meta alert GUID
+   * @param getRequests Get requests for alerts to be added
+   * @return True or false depending on if any alerts were added
+   * @throws IOException
+   */
+  boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> getRequests) throws IOException;
+
+  /**
+   * Removes a list of alerts from an existing meta alert.  This will remove each alert object from the "alerts" array in the meta alert
+   * and also remove the meta alert GUID from each child alert's "metaalerts" array.  After alerts have been removed the
+   * meta alert scores are recalculated.  Any alerts not contained in the meta alert are skipped and no updates are
+   * performed if no alerts can be found in the meta alert.  Alerts cannot be removed from an 'inactive' meta alert.
+   *
+   * @param metaAlertGuid The meta alert GUID
+   * @param getRequests Get requests for alerts to be removed
+   * @return True or false depending on if any alerts were removed
+   * @throws IOException
+   */
+  boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> getRequests) throws IOException;
+
+  /**
+   * The meta alert status field can be set to either 'active' or 'inactive' and will control whether or not meta alerts
+   * (and child alerts) appear in search results.  An 'active' status will cause meta alerts to appear in search
+   * results instead of it's child alerts and an 'inactive' status will suppress the meta alert from search results
+   * with child alerts appearing in search results as normal.  A change to 'inactive' will cause the meta alert GUID to
+   * be removed from all it's child alert's "metaalerts" field.  A change back to 'active' will have the opposite effect.
+   *
+   * @param metaAlertGuid The GUID of the meta alert
+   * @param status A status value of 'active' or 'inactive'
+   * @return True or false depending on if the status was changed
+   * @throws IOException
+   */
+  boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status) throws IOException;
+
   /**
    * Initializes a Meta Alert DAO with default "sum" meta alert threat sorting.
    * @param indexDao The DAO to wrap for our queries.
    */
   default void init(IndexDao indexDao) {
-    init(indexDao, null);
+    init(indexDao, Optional.empty());
   }
 
   /**
@@ -69,5 +150,5 @@ public interface MetaAlertDao extends IndexDao {
    * @param threatSort The aggregation to use as the threat field. E.g. "sum", "median", etc.
    *     null is "sum"
    */
-  void init(IndexDao indexDao, String threatSort);
+  void init(IndexDao indexDao, Optional<String> threatSort);
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
index 779e6c6..ed8bc95 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
@@ -30,6 +30,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.GroupRequest;
 import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
@@ -127,6 +128,25 @@ public class MultiIndexDao implements IndexDao {
 
   }
 
+  private static class DocumentIterableContainer {
+    private Optional<Iterable<Document>> d = Optional.empty();
+    private Optional<Throwable> t = Optional.empty();
+    public DocumentIterableContainer(Iterable<Document> d) {
+      this.d = Optional.ofNullable(d);
+    }
+    public DocumentIterableContainer(Throwable t) {
+      this.t = Optional.ofNullable(t);
+    }
+
+    public Optional<Iterable<Document>> getDocumentIterable() {
+      return d;
+    }
+    public Optional<Throwable> getException() {
+      return t;
+    }
+
+  }
+
   @Override
   public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
     for(IndexDao dao : indices) {
@@ -189,6 +209,40 @@ public class MultiIndexDao implements IndexDao {
     return ret;
   }
 
+  @Override
+  public Iterable<Document> getAllLatest(
+      List<GetRequest> getRequests) throws IOException {
+    Iterable<Document> ret = null;
+    List<DocumentIterableContainer> output =
+        indices.parallelStream().map(dao -> {
+          try {
+            return new DocumentIterableContainer(dao.getAllLatest(getRequests));
+          } catch (Throwable e) {
+            return new DocumentIterableContainer(e);
+          }
+        }).collect(Collectors.toList());
+
+    List<String> error = new ArrayList<>();
+    for(DocumentIterableContainer dc : output) {
+      if(dc.getException().isPresent()) {
+        Throwable e = dc.getException().get();
+        error.add(e.getMessage() + "\n" + ExceptionUtils.getStackTrace(e));
+      }
+      else {
+        if(dc.getDocumentIterable().isPresent()) {
+          Iterable<Document> documents = dc.getDocumentIterable().get();
+          if(ret == null) {
+            ret = documents;
+          }
+        }
+      }
+    }
+    if(error.size() > 0) {
+      throw new IOException(Joiner.on("\n").join(error));
+    }
+    return ret;
+  }
+
   public List<IndexDao> getIndices() {
     return indices;
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertAddRemoveRequest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertAddRemoveRequest.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertAddRemoveRequest.java
new file mode 100644
index 0000000..6183d37
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertAddRemoveRequest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.metron.indexing.dao.metaalert;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.metron.indexing.dao.search.GetRequest;
+
+public class MetaAlertAddRemoveRequest {
+
+  private String metaAlertGuid;
+  private List<GetRequest> alerts;
+
+  public String getMetaAlertGuid() {
+    return metaAlertGuid;
+  }
+
+  public void setMetaAlertGuid(String metaAlertGuid) {
+    this.metaAlertGuid = metaAlertGuid;
+  }
+
+  public List<GetRequest> getAlerts() {
+    return alerts;
+  }
+
+  public void setAlerts(List<GetRequest> alerts) {
+    this.alerts = alerts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertCreateRequest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertCreateRequest.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertCreateRequest.java
index 388527a..d368b3a 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertCreateRequest.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertCreateRequest.java
@@ -22,23 +22,23 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.metron.indexing.dao.search.GetRequest;
 
 public class MetaAlertCreateRequest {
-  // A map from the alert GUID to the Document index
-  private Map<String, String> guidToIndices;
+  private List<GetRequest> alerts;
   private List<String> groups;
 
   public MetaAlertCreateRequest() {
-    this.guidToIndices = new HashMap<>();
+    this.alerts = new ArrayList<>();
     this.groups = new ArrayList<>();
   }
 
-  public Map<String, String> getGuidToIndices() {
-    return guidToIndices;
+  public List<GetRequest> getAlerts() {
+    return alerts;
   }
 
-  public void setGuidToIndices(Map<String, String> guidToIndices) {
-    this.guidToIndices = guidToIndices;
+  public void setAlerts(List<GetRequest> alerts) {
+    this.alerts = alerts;
   }
 
   public List<String> getGroups() {

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertStatus.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertStatus.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertStatus.java
new file mode 100644
index 0000000..c9b0138
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertStatus.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.indexing.dao.metaalert;
+
+public enum MetaAlertStatus {
+  ACTIVE("active"),
+  INACTIVE("inactive");
+
+  private String statusString;
+
+  MetaAlertStatus(String statusString) {
+    this.statusString = statusString;
+  }
+
+  public String getStatusString() {
+    return statusString;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java
index eb255dc..959d4e6 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java
@@ -17,9 +17,27 @@
  */
 package org.apache.metron.indexing.dao.search;
 
+import com.fasterxml.jackson.annotation.JsonGetter;
+import java.util.Optional;
+
 public class GetRequest {
-  String guid;
-  String sensorType;
+  private String guid;
+  private String sensorType;
+  private String index;
+
+  public GetRequest() {
+  }
+
+  public GetRequest(String guid, String sensorType) {
+    this.guid = guid;
+    this.sensorType = sensorType;
+  }
+
+  public GetRequest(String guid, String sensorType, String index) {
+    this.guid = guid;
+    this.sensorType = sensorType;
+    this.index = index;
+  }
 
   /**
    * The GUID of the document
@@ -44,4 +62,17 @@ public class GetRequest {
   public void setSensorType(String sensorType) {
     this.sensorType = sensorType;
   }
+
+  public Optional<String> getIndex() {
+    return index != null ? Optional.of(this.index) : Optional.empty();
+  }
+
+  @JsonGetter("index")
+  public String getIndexString() {
+    return index;
+  }
+
+  public void setIndex(String index) {
+    this.index = index;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
index f48187e..3bce4d0 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
@@ -17,10 +17,13 @@
  */
 package org.apache.metron.indexing.dao;
 
+import static org.apache.metron.common.Constants.SENSOR_TYPE;
+
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Iterables;
+import java.util.stream.Collectors;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.search.*;
@@ -32,7 +35,7 @@ import java.util.*;
 public class InMemoryDao implements IndexDao {
   // Map from index to list of documents as JSON strings
   public static Map<String, List<String>> BACKING_STORE = new HashMap<>();
-  public static Map<String, Map<String, FieldType>> COLUMN_METADATA;
+  public static Map<String, Map<String, FieldType>> COLUMN_METADATA = new HashMap<>();
   private AccessConfig config;
 
   @Override
@@ -169,7 +172,7 @@ public class InMemoryDao implements IndexDao {
     return false;
   }
 
-  private static Map<String, Object> parse(String doc) {
+  public static Map<String, Object> parse(String doc) {
     try {
       return JSONUtils.INSTANCE.load(doc, new TypeReference<Map<String, Object>>() {});
     } catch (IOException e) {
@@ -199,6 +202,24 @@ public class InMemoryDao implements IndexDao {
   }
 
   @Override
+  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException {
+    List<Document> documents = new ArrayList<>();
+    for(Map.Entry<String, List<String>> kv: BACKING_STORE.entrySet()) {
+      for(String doc : kv.getValue()) {
+        Map<String, Object> docParsed = parse(doc);
+        String guid = (String) docParsed.getOrDefault(Constants.GUID, "");
+        for (GetRequest getRequest: getRequests) {
+          if(getRequest.getGuid().equals(guid)) {
+            documents.add(new Document(doc, guid, getRequest.getSensorType(), 0L));
+          }
+        }
+
+      }
+    }
+    return documents;
+  }
+
+  @Override
   public void update(Document update, Optional<String> index) throws IOException {
     for (Map.Entry<String, List<String>> kv : BACKING_STORE.entrySet()) {
       if (kv.getKey().startsWith(update.getSensorType())) {

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
index cb7635e..fad0eda 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
@@ -18,17 +18,23 @@
 
 package org.apache.metron.indexing.dao;
 
+import static org.apache.metron.common.Constants.GUID;
+
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
 import org.apache.metron.indexing.dao.metaalert.MetaScores;
 import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.indexing.dao.search.GetRequest;
@@ -48,6 +54,8 @@ import org.json.simple.JSONObject;
 
 public class InMemoryMetaAlertDao implements MetaAlertDao {
 
+  public static Map<String, Collection<String>> METAALERT_STORE = new HashMap<>();
+
   private IndexDao indexDao;
 
   /**
@@ -83,7 +91,7 @@ public class InMemoryMetaAlertDao implements MetaAlertDao {
   }
 
   @Override
-  public void init(IndexDao indexDao, String threatSort) {
+  public void init(IndexDao indexDao, Optional<String> threatSort) {
     this.indexDao = indexDao;
     // Ignore threatSort for test.
   }
@@ -94,6 +102,11 @@ public class InMemoryMetaAlertDao implements MetaAlertDao {
   }
 
   @Override
+  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException {
+    return indexDao.getAllLatest(getRequests);
+  }
+
+  @Override
   public void update(Document update, Optional<String> index) throws IOException {
     indexDao.update(update, index);
   }
@@ -146,15 +159,16 @@ public class InMemoryMetaAlertDao implements MetaAlertDao {
   @Override
   public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request)
       throws InvalidCreateException, IOException {
-    if (request.getGuidToIndices().isEmpty()) {
+    List<GetRequest> alertRequests = request.getAlerts();
+    if (alertRequests.isEmpty()) {
       MetaAlertCreateResponse response = new MetaAlertCreateResponse();
       response.setCreated(false);
       return response;
     }
     // Build meta alert json.  Give it a reasonable GUID
     JSONObject metaAlert = new JSONObject();
-    metaAlert.put(Constants.GUID,
-        "meta_" + (InMemoryDao.BACKING_STORE.get(MetaAlertDao.METAALERTS_INDEX).size() + 1));
+    String metaAlertGuid = "meta_" + (InMemoryDao.BACKING_STORE.get(MetaAlertDao.METAALERTS_INDEX).size() + 1);
+    metaAlert.put(GUID, metaAlertGuid);
 
     JSONArray groupsArray = new JSONArray();
     groupsArray.addAll(request.getGroups());
@@ -164,16 +178,17 @@ public class InMemoryMetaAlertDao implements MetaAlertDao {
     // For the purpose of testing, we're just using guids for the alerts field and grabbing the scores.
     JSONArray alertArray = new JSONArray();
     List<Double> threatScores = new ArrayList<>();
-    for (Map.Entry<String, String> entry : request.getGuidToIndices().entrySet()) {
+    Collection<String> alertGuids = new ArrayList<>();
+    for (GetRequest alertRequest : alertRequests) {
       SearchRequest searchRequest = new SearchRequest();
-      searchRequest.setIndices(ImmutableList.of(entry.getValue()));
-      searchRequest.setQuery("guid:" + entry.getKey());
+      searchRequest.setIndices(ImmutableList.of(alertRequest.getIndex().get()));
+      searchRequest.setQuery("guid:" + alertRequest.getGuid());
       try {
         SearchResponse searchResponse = search(searchRequest);
         List<SearchResult> searchResults = searchResponse.getResults();
         if (searchResults.size() > 1) {
           throw new InvalidCreateException(
-              "Found more than one result for: " + entry.getKey() + ". Values: " + searchResults
+              "Found more than one result for: " + alertRequest.getGuid() + ". Values: " + searchResults
           );
         }
 
@@ -186,18 +201,79 @@ public class InMemoryMetaAlertDao implements MetaAlertDao {
           threatScores.add(threatScore);
         }
       } catch (InvalidSearchException e) {
-        throw new InvalidCreateException("Unable to find guid: " + entry.getKey(), e);
+        throw new InvalidCreateException("Unable to find guid: " + alertRequest.getGuid(), e);
       }
+      alertGuids.add(alertRequest.getGuid());
     }
 
     metaAlert.put(MetaAlertDao.ALERT_FIELD, alertArray);
     metaAlert.putAll(new MetaScores(threatScores).getMetaScores());
+    metaAlert.put(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString());
 
     // Add the alert to the store, but make sure not to overwrite existing results
     InMemoryDao.BACKING_STORE.get(MetaAlertDao.METAALERTS_INDEX).add(metaAlert.toJSONString());
 
+    METAALERT_STORE.put(metaAlertGuid, new HashSet<>(alertGuids));
+
     MetaAlertCreateResponse createResponse = new MetaAlertCreateResponse();
+    createResponse.setGuid(metaAlertGuid);
     createResponse.setCreated(true);
     return createResponse;
   }
+
+  @Override
+  public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests) throws IOException {
+    Collection<String> currentAlertGuids = METAALERT_STORE.get(metaAlertGuid);
+    if (currentAlertGuids == null) {
+      return false;
+    }
+    Collection<String> alertGuids = alertRequests.stream().map(GetRequest::getGuid).collect(Collectors.toSet());
+    boolean added = currentAlertGuids.addAll(alertGuids);
+    if (added) {
+      METAALERT_STORE.put(metaAlertGuid, currentAlertGuids);
+    }
+    return added;
+  }
+
+  @Override
+  public boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests) throws IOException {
+    Collection<String> currentAlertGuids = METAALERT_STORE.get(metaAlertGuid);
+    if (currentAlertGuids == null) {
+      return false;
+    }
+    Collection<String> alertGuids = alertRequests.stream().map(GetRequest::getGuid).collect(Collectors.toSet());
+    boolean removed = currentAlertGuids.removeAll(alertGuids);
+    if (removed) {
+      METAALERT_STORE.put(metaAlertGuid, currentAlertGuids);
+    }
+    return removed;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status)
+      throws IOException {
+    boolean statusChanged = false;
+    List<String> metaAlerts = InMemoryDao.BACKING_STORE.get(MetaAlertDao.METAALERTS_INDEX);
+    for (String metaAlert: metaAlerts) {
+      JSONObject metaAlertJSON = JSONUtils.INSTANCE.load(metaAlert, JSONObject.class);
+      if (metaAlertGuid.equals(metaAlertJSON.get(GUID))) {
+        statusChanged = !status.getStatusString().equals(metaAlertJSON.get(STATUS_FIELD));
+        if (statusChanged) {
+          metaAlertJSON.put(STATUS_FIELD, status.getStatusString());
+          metaAlerts.remove(metaAlert);
+          metaAlerts.add(metaAlertJSON.toJSONString());
+          InMemoryDao.BACKING_STORE.put(MetaAlertDao.METAALERTS_INDEX, metaAlerts);
+        }
+        break;
+      }
+    }
+    return statusChanged;
+  }
+
+  public static void clear() {
+    InMemoryDao.clear();
+    METAALERT_STORE.clear();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
index 2961d96..d991d50 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.metron.indexing.dao;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.util.Iterator;
 import java.util.Optional;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.utils.JSONUtils;
@@ -29,8 +31,8 @@ import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.indexing.dao.search.GroupResult;
+import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.integration.InMemoryComponent;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -103,6 +105,21 @@ public abstract class SearchIntegrationTest {
   public static String findOneGuidQuery;
 
   /**
+   * [
+   * {
+   * "guid": "bro-1",
+   * "sensorType": "bro"
+   * },
+   * {
+   * "guid": "bro-2",
+   * "sensorType": "bro"
+   * }
+   * ]
+   */
+  @Multiline
+  public static String getAllLatestQuery;
+
+  /**
    * {
    * "indices": ["bro", "snort"],
    * "query": "ip_src_addr:192.168.1.1",
@@ -390,6 +407,19 @@ public abstract class SearchIntegrationTest {
       Assert.assertEquals("bro", doc.get("source:type"));
       Assert.assertEquals(3, doc.get("timestamp"));
     }
+    //Get All Latest Guid Testcase
+    {
+      List<GetRequest> request = JSONUtils.INSTANCE.load(getAllLatestQuery, new TypeReference<List<GetRequest>>() {
+      });
+      Iterator<Document> response = dao.getAllLatest(request).iterator();
+      Document bro2 = response.next();
+      Assert.assertEquals("bro_1", bro2.getDocument().get("guid"));
+      Assert.assertEquals("bro", bro2.getDocument().get("source:type"));
+      Document snort2 = response.next();
+      Assert.assertEquals("bro_2", snort2.getDocument().get("guid"));
+      Assert.assertEquals("bro", snort2.getDocument().get("source:type"));
+      Assert.assertFalse(response.hasNext());
+    }
     //Filter test case
     {
       SearchRequest request = JSONUtils.INSTANCE.load(filterQuery, SearchRequest.class);

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java
new file mode 100644
index 0000000..aa32aa0
--- /dev/null
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.indexing.integration;
+
+import static org.apache.metron.indexing.dao.HBaseDao.HBASE_CF;
+import static org.apache.metron.indexing.dao.HBaseDao.HBASE_TABLE;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.HBaseDao;
+import org.apache.metron.indexing.dao.IndexDao;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.update.Document;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class HBaseDaoIntegrationTest {
+
+  private static final String TABLE_NAME = "metron_update";
+  private static final String COLUMN_FAMILY = "cf";
+  private static final String SENSOR_TYPE = "test";
+
+  private static IndexDao hbaseDao;
+  private static byte[] expectedKeySerialization = new byte[] {
+          (byte)0xf5,0x53,0x76,(byte)0x96,0x67,0x3a,
+          (byte)0xc1,(byte)0xaf,(byte)0xff,0x41,0x33,(byte)0x9d,
+          (byte)0xac,(byte)0xb9,0x1a,(byte)0xb0,0x00,0x04,
+          0x67,0x75,0x69,0x64,0x00,0x0a,
+          0x73,0x65,0x6e,0x73,0x6f,0x72,
+          0x54,0x79,0x70,0x65
+  };
+
+  @BeforeClass
+  public static void startHBase() throws Exception {
+    AccessConfig accessConfig = new AccessConfig();
+    accessConfig.setMaxSearchResults(1000);
+    accessConfig.setMaxSearchGroups(1000);
+    accessConfig.setGlobalConfigSupplier(() -> new HashMap<String, Object>() {{
+      put(HBASE_TABLE, TABLE_NAME);
+      put(HBASE_CF, COLUMN_FAMILY);
+    }});
+    MockHBaseTableProvider.addToCache(TABLE_NAME, COLUMN_FAMILY);
+    accessConfig.setTableProvider(new MockHBaseTableProvider());
+
+    hbaseDao = new HBaseDao();
+    hbaseDao.init(accessConfig);
+  }
+
+  @After
+  public void clearTable() throws Exception {
+    MockHBaseTableProvider.clear();
+  }
+
+  /**
+   * IF this test fails then you have broken the key serialization in that your change has
+   * caused a key to change serialization, so keys from previous releases will not be able to be found
+   * under your scheme.  Please either provide a migration plan or undo this change.  DO NOT CHANGE THIS
+   * TEST BLITHELY!
+   * @throws Exception
+   */
+  @Test
+  public void testKeySerializationRemainsConstant() throws IOException {
+    HBaseDao.Key k = new HBaseDao.Key("guid", "sensorType");
+    byte[] raw = k.toBytes();
+    Assert.assertArrayEquals(raw, expectedKeySerialization);
+  }
+
+
+  @Test
+  public void testKeySerialization() throws Exception {
+    HBaseDao.Key k = new HBaseDao.Key("guid", "sensorType");
+    Assert.assertEquals(k, HBaseDao.Key.fromBytes(HBaseDao.Key.toBytes(k)));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testKeySerializationWithInvalidGuid() throws Exception {
+    HBaseDao.Key k = new HBaseDao.Key(null, "sensorType");
+    Assert.assertEquals(k, HBaseDao.Key.fromBytes(HBaseDao.Key.toBytes(k)));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testKeySerializationWithInvalidSensorType() throws Exception {
+    HBaseDao.Key k = new HBaseDao.Key("guid", null);
+    Assert.assertEquals(k, HBaseDao.Key.fromBytes(HBaseDao.Key.toBytes(k)));
+  }
+
+  @Test
+  public void shouldGetLatest() throws Exception {
+    // Load alerts
+    List<Document> alerts = buildAlerts(3);
+    Map<Document, Optional<String>> updates = alerts.stream()
+        .collect(Collectors.toMap(document -> document, document -> Optional.empty()));
+    hbaseDao.batchUpdate(updates);
+
+    Document actualDocument = hbaseDao.getLatest("message_1", SENSOR_TYPE);
+    Document expectedDocument = alerts.get(1);
+    Assert.assertEquals(expectedDocument, actualDocument);
+  }
+
+  @Test
+  public void shouldGetAllLatest() throws Exception {
+    // Load alerts
+    List<Document> alerts = buildAlerts(15);
+    alerts.stream().collect(Collectors.toMap(Document::getGuid, document -> Optional.empty()));
+    Map<Document, Optional<String>> updates = alerts.stream()
+        .collect(Collectors.toMap(document -> document, document -> Optional.empty()));
+    hbaseDao.batchUpdate(updates);
+
+    int expectedCount = 12;
+    List<GetRequest> getRequests = new ArrayList<>();
+    for(int i = 1; i < expectedCount + 1; i ++) {
+      getRequests.add(new GetRequest("message_" + i, SENSOR_TYPE));
+    }
+    Iterator<Document> results = hbaseDao.getAllLatest(getRequests).iterator();
+
+    for (int i = 0; i < expectedCount; i++) {
+      Document expectedDocument = alerts.get(i + 1);
+      Document actualDocument = results.next();
+      Assert.assertEquals(expectedDocument, actualDocument);
+    }
+
+    Assert.assertFalse("Result size should be 12 but was greater", results.hasNext());
+  }
+
+  protected List<Document> buildAlerts(int count) throws IOException {
+    List<Document> alerts = new ArrayList<>();
+    for (int i = 0; i < count; ++i) {
+      String guid = "message_" + i;
+      String json = "{\"guid\":\"message_" + i + "\", \"source:type\":\"test\"}";
+      Document alert = new Document(json, guid, SENSOR_TYPE, System.currentTimeMillis());
+      alerts.add(alert);
+    }
+    return alerts;
+  }
+
+}


[3/3] metron git commit: METRON-1289 Alert fields are lost when a MetaAlert is created (merrimanr) closes apache/metron#824

Posted by rm...@apache.org.
METRON-1289 Alert fields are lost when a MetaAlert is created (merrimanr) closes apache/metron#824


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/fd896fbe
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/fd896fbe
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/fd896fbe

Branch: refs/heads/master
Commit: fd896fbebe9d5e77eb11d1ce953ab2b55cc84387
Parents: c4c930f
Author: merrimanr <me...@gmail.com>
Authored: Wed Nov 15 19:35:18 2017 -0600
Committer: merrimanr <me...@apache.org>
Committed: Wed Nov 15 19:35:18 2017 -0600

----------------------------------------------------------------------
 metron-interface/metron-rest/README.md          |   36 +-
 .../apache/metron/rest/config/IndexConfig.java  |    3 +-
 .../rest/controller/MetaAlertController.java    |   48 +-
 .../metron/rest/service/MetaAlertService.java   |   10 +
 .../rest/service/impl/MetaAlertServiceImpl.java |   31 +
 .../rest/service/impl/SearchServiceImpl.java    |   18 +-
 .../MetaAlertControllerIntegrationTest.java     |  120 +-
 .../UpdateControllerIntegrationTest.java        |    5 +-
 .../org/apache/metron/common/utils/KeyUtil.java |   50 +
 .../hbase/HBaseEnrichmentConverterTest.java     |   21 +
 .../elasticsearch/dao/ElasticsearchDao.java     |  115 +-
 .../dao/ElasticsearchMetaAlertDao.java          |  717 +++++-----
 .../elasticsearch/dao/MetaAlertStatus.java      |   34 -
 .../dao/ElasticsearchMetaAlertDaoTest.java      |  304 +---
 .../ElasticsearchMetaAlertIntegrationTest.java  | 1301 ++++++++++--------
 .../ElasticsearchUpdateIntegrationTest.java     |    4 +-
 .../enrichment/converter/EnrichmentKey.java     |   23 +-
 metron-platform/metron-indexing/README.md       |   17 +-
 metron-platform/metron-indexing/pom.xml         |    7 +
 .../apache/metron/indexing/dao/HBaseDao.java    |  128 +-
 .../apache/metron/indexing/dao/IndexDao.java    |   38 +-
 .../metron/indexing/dao/MetaAlertDao.java       |   91 +-
 .../metron/indexing/dao/MultiIndexDao.java      |   54 +
 .../metaalert/MetaAlertAddRemoveRequest.java    |   41 +
 .../dao/metaalert/MetaAlertCreateRequest.java   |   14 +-
 .../indexing/dao/metaalert/MetaAlertStatus.java |   34 +
 .../metron/indexing/dao/search/GetRequest.java  |   35 +-
 .../apache/metron/indexing/dao/InMemoryDao.java |   25 +-
 .../indexing/dao/InMemoryMetaAlertDao.java      |   96 +-
 .../indexing/dao/SearchIntegrationTest.java     |   32 +-
 .../integration/HBaseDaoIntegrationTest.java    |  164 +++
 31 files changed, 2219 insertions(+), 1397 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/README.md
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md
index b79b44d..724239b 100644
--- a/metron-interface/metron-rest/README.md
+++ b/metron-interface/metron-rest/README.md
@@ -218,6 +218,9 @@ Request and Response objects are JSON formatted.  The JSON schemas are available
 | [ `GET /api/v1/kafka/topic/{name}/sample`](#get-apiv1kafkatopicnamesample)|
 | [ `GET /api/v1/metaalert/searchByAlert`](#get-apiv1metaalertsearchbyalert)|
 | [ `GET /api/v1/metaalert/create`](#get-apiv1metaalertcreate)|
+| [ `GET /api/v1/metaalert/add/alert`](#get-apiv1metaalertaddalert)|
+| [ `GET /api/v1/metaalert/remove/alert`](#get-apiv1metaalertremovealert)|
+| [ `GET /api/v1/metaalert/update/status/{guid}/{status}`](#get-apiv1metaalertupdatestatusguidstatus)|
 | [ `GET /api/v1/search/search`](#get-apiv1searchsearch)|
 | [ `POST /api/v1/search/search`](#get-apiv1searchsearch)|
 | [ `POST /api/v1/search/group`](#get-apiv1searchgroup)|
@@ -415,19 +418,40 @@ Request and Response objects are JSON formatted.  The JSON schemas are available
     * 404 - Either Kafka topic is missing or contains no messages
 
 ### `POST /api/v1/metaalert/searchByAlert`
-  * Description: Searches meta alerts to find any containing an alert for the provided GUID
+  * Description: Get all meta alerts that contain an alert.
   * Input:
     * guid - GUID of the alert
   * Returns:
-    * 200 - Returns the meta alerts associated with this alert
-    * 404 - The child alert isn't found
+    * 200 - Search results
 
 ### `POST /api/v1/metaalert/create`
-  * Description: Creates a meta alert containing the provide alerts
+  * Description: Creates a new meta alert from a list of existing alerts.  The meta alert status will initially be set to 'ACTIVE' and summary statistics will be computed from the list of alerts.  A list of groups included in the request are also added to the meta alert.
   * Input:
-    * request - Meta Alert Create Request
+    * request - Meta alert create request which includes a list of alert get requests and a list of custom groups used to annotate a meta alert.
   * Returns:
-    * 200 - The meta alert was created
+    * 200 - The GUID of the new meta alert
+    
+### `POST /api/v1/metaalert/add/alert`
+  * Description: Adds an alert to an existing meta alert.  An alert will not be added if it is already contained in a meta alert.
+  * Input:
+    * request - Meta alert add request which includes a meta alert GUID and list of alert get requests
+  * Returns:
+    * 200 - Returns 'true' if the alert was added and 'false' if the meta alert did not change.
+        
+### `POST /api/v1/metaalert/remove/alert`
+  * Description: Removes an alert from an existing meta alert.  If the alert to be removed is not in a meta alert, 'false' will be returned.
+  * Input:
+    * request - Meta alert remove request which includes a meta alert GUID and list of alert get requests
+  * Returns:
+    * 200 - Returns 'true' if the alert was removed and 'false' if the meta alert did not change.
+            
+### `POST /api/v1/metaalert/update/status/{guid}/{status}`
+  * Description: Updates the status of a meta alert to either 'ACTIVE' or 'INACTIVE'.
+  * Input:
+    * guid - Meta alert GUID
+    * status - Meta alert status with a value of either 'ACTIVE' or 'INACTIVE'
+  * Returns:
+    * 200 - Returns 'true' if the status changed and 'false' if it did not.
 
 ### `POST /api/v1/search/search`
   * Description: Searches the indexing store. GUIDs must be quoted to ensure correct results.

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
index 8eabb2e..4ce9644 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
@@ -19,6 +19,7 @@ package org.apache.metron.rest.config;
 
 import static org.apache.metron.rest.MetronRestConstants.INDEX_DAO_IMPL;
 
+import java.util.Optional;
 import org.apache.metron.hbase.HTableProvider;
 import org.apache.metron.hbase.TableProvider;
 import org.apache.metron.indexing.dao.AccessConfig;
@@ -81,7 +82,7 @@ public class IndexConfig {
 
       // Create the meta alert dao and wrap it around the index dao.
       MetaAlertDao ret = (MetaAlertDao) IndexDaoFactory.create(metaDaoImpl, config).get(0);
-      ret.init(indexDao, metaDaoSort);
+      ret.init(indexDao, Optional.ofNullable(metaDaoSort));
       return ret;
     }
     catch(RuntimeException re) {

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java
index e9cff8b..d42403a 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java
@@ -21,6 +21,8 @@ package org.apache.metron.rest.controller;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
 import org.apache.metron.indexing.dao.search.SearchResponse;
@@ -29,6 +31,7 @@ import org.apache.metron.rest.service.MetaAlertService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
@@ -41,24 +44,59 @@ public class MetaAlertController {
   @Autowired
   private MetaAlertService metaAlertService;
 
-  @ApiOperation(value = "Get all meta alerts for alert")
+  @ApiOperation(value = "Get all meta alerts that contain an alert.")
   @ApiResponse(message = "Search results", code = 200)
   @RequestMapping(value = "/searchByAlert", method = RequestMethod.POST)
   ResponseEntity<SearchResponse> searchByAlert(
-      @ApiParam(name = "guid", value = "GUID", required = true)
+      @ApiParam(name = "guid", value = "Alert GUID", required = true)
       @RequestBody final String guid
   ) throws RestException {
     return new ResponseEntity<>(metaAlertService.getAllMetaAlertsForAlert(guid), HttpStatus.OK);
   }
 
-  @ApiOperation(value = "Create a meta alert")
-  @ApiResponse(message = "Created meta alert", code = 200)
+  @ApiOperation(value = "Creates a new meta alert from a list of existing alerts.  "
+      + "The meta alert status will initially be set to 'ACTIVE' and summary statistics "
+      + "will be computed from the list of alerts.  A list of groups included in the request are also added to the meta alert.")
+  @ApiResponse(message = "The GUID of the new meta alert", code = 200)
   @RequestMapping(value = "/create", method = RequestMethod.POST)
   ResponseEntity<MetaAlertCreateResponse> create(
-      @ApiParam(name = "request", value = "Meta Alert Create Request", required = true)
+      @ApiParam(name = "createRequest", value = "Meta alert create request which includes a list of alert "
+          + "get requests and a list of custom groups used to annotate a meta alert", required = true)
       @RequestBody  final MetaAlertCreateRequest createRequest
   ) throws RestException {
     return new ResponseEntity<>(metaAlertService.create(createRequest), HttpStatus.OK);
   }
+
+  @ApiOperation(value = "Adds an alert to an existing meta alert.  An alert will not be added if it is already contained in a meta alert.")
+  @ApiResponse(message = "Returns 'true' if the alert was added and 'false' if the meta alert did not change.", code = 200)
+  @RequestMapping(value = "/add/alert", method = RequestMethod.POST)
+  ResponseEntity<Boolean> addAlertsToMetaAlert(
+      @ApiParam(name = "metaAlertAddRemoveRequest", value = "Meta alert add request which includes a meta alert GUID and list of alert get requests", required = true)
+      @RequestBody  final MetaAlertAddRemoveRequest metaAlertAddRemoveRequest
+  ) throws RestException {
+    return new ResponseEntity<>(metaAlertService.addAlertsToMetaAlert(metaAlertAddRemoveRequest), HttpStatus.OK);
+  }
+
+  @ApiOperation(value = "Removes an alert from an existing meta alert.  If the alert to be removed is not in a meta alert, 'false' will be returned.")
+  @ApiResponse(message = "Returns 'true' if the alert was removed and 'false' if the meta alert did not change.", code = 200)
+  @RequestMapping(value = "/remove/alert", method = RequestMethod.POST)
+  ResponseEntity<Boolean> removeAlertsFromMetaAlert(
+      @ApiParam(name = "metaAlertAddRemoveRequest", value = "Meta alert remove request which includes a meta alert GUID and list of alert get requests", required = true)
+      @RequestBody  final MetaAlertAddRemoveRequest metaAlertAddRemoveRequest
+  ) throws RestException {
+    return new ResponseEntity<>(metaAlertService.removeAlertsFromMetaAlert(metaAlertAddRemoveRequest), HttpStatus.OK);
+  }
+
+  @ApiOperation(value = "Updates the status of a meta alert to either 'ACTIVE' or 'INACTIVE'.")
+  @ApiResponse(message = "Returns 'true' if the status changed and 'false' if it did not.", code = 200)
+  @RequestMapping(value = "/update/status/{guid}/{status}", method = RequestMethod.POST)
+  ResponseEntity<Boolean> updateMetaAlertStatus(
+      final @ApiParam(name = "guid", value = "Meta alert GUID", required = true)
+      @PathVariable String guid,
+      final @ApiParam(name = "status", value = "Meta alert status with a value of either 'ACTIVE' or 'INACTIVE'", required = true)
+      @PathVariable String status) throws RestException {
+    return new ResponseEntity<>(metaAlertService.updateMetaAlertStatus(guid,
+        MetaAlertStatus.valueOf(status.toUpperCase())), HttpStatus.OK);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java
index c339506..e8abaf3 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java
@@ -18,8 +18,12 @@
 
 package org.apache.metron.rest.service;
 
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.rest.RestException;
 
@@ -28,4 +32,10 @@ public interface MetaAlertService {
   MetaAlertCreateResponse create(MetaAlertCreateRequest createRequest) throws RestException;
 
   SearchResponse getAllMetaAlertsForAlert(String guid) throws RestException;
+
+  boolean addAlertsToMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException;
+
+  boolean removeAlertsFromMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException;
+
+  boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status) throws RestException;
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
index f120c9e..aafab24 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
@@ -19,10 +19,13 @@
 package org.apache.metron.rest.service.impl;
 
 import java.io.IOException;
+import java.util.Collection;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.MetaAlertDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
 import org.apache.metron.indexing.dao.search.InvalidCreateException;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
@@ -63,4 +66,32 @@ public class MetaAlertServiceImpl implements MetaAlertService {
       throw new RestException(ise.getMessage(), ise);
     }
   }
+
+  @Override
+  public boolean addAlertsToMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException {
+    try {
+      return dao.addAlertsToMetaAlert(metaAlertAddRemoveRequest.getMetaAlertGuid(), metaAlertAddRemoveRequest.getAlerts());
+    } catch (IOException ioe) {
+      throw new RestException(ioe.getMessage(), ioe);
+    }
+  }
+
+  @Override
+  public boolean removeAlertsFromMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException {
+    try {
+      return dao.removeAlertsFromMetaAlert(metaAlertAddRemoveRequest.getMetaAlertGuid(), metaAlertAddRemoveRequest.getAlerts());
+    } catch (IOException ioe) {
+      throw new RestException(ioe.getMessage(), ioe);
+    }
+  }
+
+  @Override
+  public boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status)
+      throws RestException {
+    try {
+      return dao.updateMetaAlertStatus(metaAlertGuid, status);
+    } catch (IOException ioe) {
+      throw new RestException(ioe.getMessage(), ioe);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
index efd80a7..433eae3 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
@@ -58,13 +58,10 @@ public class SearchServiceImpl implements SearchService {
   @Override
   public SearchResponse search(SearchRequest searchRequest) throws RestException {
     try {
-      // Pull the indices from the cache by default
       if (searchRequest.getIndices() == null || searchRequest.getIndices().isEmpty()) {
-        List<String> indices = Lists.newArrayList((sensorIndexingConfigService.getAllIndices(environment.getProperty(INDEX_WRITER_NAME))));
-        // metaalerts should be included by default
+        List<String> indices = getDefaultIndices();
+        // metaalerts should be included by default in search requests
         indices.add(METAALERT_TYPE);
-        // errors should not be included by default
-        indices.remove(ERROR_TYPE);
         searchRequest.setIndices(indices);
       }
       return dao.search(searchRequest);
@@ -77,6 +74,9 @@ public class SearchServiceImpl implements SearchService {
   @Override
   public GroupResponse group(GroupRequest groupRequest) throws RestException {
     try {
+      if (groupRequest.getIndices() == null || groupRequest.getIndices().isEmpty()) {
+        groupRequest.setIndices(getDefaultIndices());
+      }
       return dao.group(groupRequest);
     }
     catch(InvalidSearchException ise) {
@@ -112,4 +112,12 @@ public class SearchServiceImpl implements SearchService {
       throw new RestException(ioe.getMessage(), ioe);
     }
   }
+
+  private List<String> getDefaultIndices() throws RestException {
+    // Pull the indices from the cache by default
+    List<String> indices = Lists.newArrayList((sensorIndexingConfigService.getAllIndices(environment.getProperty(INDEX_WRITER_NAME))));
+    // errors should not be included by default
+    indices.remove(ERROR_TYPE);
+    return indices;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
index 983c207..b0dd774 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
@@ -28,11 +28,22 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.indexing.dao.InMemoryMetaAlertDao;
 import org.apache.metron.indexing.dao.MetaAlertDao;
 import org.apache.metron.indexing.dao.SearchIntegrationTest;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.rest.service.MetaAlertService;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -67,10 +78,18 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest {
 
   /**
    {
-   "guidToIndices" : {
-   "bro_1":"bro_index_2017.01.01.01",
-   "snort_2":"snort_index_2017.01.01.01"
+   "alerts" : [
+   {
+   "guid": "bro_1",
+   "sensorType": "bro",
+   "index": "bro_index_2017.01.01.01"
    },
+   {
+   "guid": "snort_2",
+   "sensorType": "snort",
+   "index": "snort_index_2017.01.01.01"
+   }
+   ],
    "groups" : ["group_one", "group_two"]
    }
    */
@@ -88,6 +107,11 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest {
     loadTestData(testData);
   }
 
+  @After
+  public void cleanup() {
+    InMemoryMetaAlertDao.clear();
+  }
+
   @Test
   public void test() throws Exception {
     // Testing searching by alert
@@ -171,4 +195,94 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest {
         .andExpect(jsonPath("$.results[0].source.guid").value("meta_3"))
         .andExpect(jsonPath("$.results[0].source.count").value(2.0));
   }
+
+  @Test
+  public void shouldAddRemoveAlerts() throws Exception {
+    MetaAlertCreateRequest metaAlertCreateRequest = new MetaAlertCreateRequest();
+    metaAlertCreateRequest.setGroups(Arrays.asList("group_one", "group_two"));
+    metaAlertCreateRequest.setAlerts(new ArrayList<GetRequest>() {{
+      add(new GetRequest("bro_1", "bro", "bro_index_2017.01.01.01"));
+      add(new GetRequest("snort_2", "snort", "snort_index_2017.01.01.01"));
+    }});
+    MetaAlertCreateResponse metaAlertCreateResponse = metaAlertService.create(metaAlertCreateRequest);
+
+    MetaAlertAddRemoveRequest addRequest = new MetaAlertAddRemoveRequest();
+    addRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid());
+    addRequest.setAlerts(new ArrayList<GetRequest>() {{
+      add(new GetRequest("bro_2", "bro", "bro_index_2017.01.01.01"));
+      add(new GetRequest("bro_3", "bro", "bro_index_2017.01.01.01"));
+    }});
+
+    ResultActions result = this.mockMvc.perform(
+        post(metaalertUrl + "/add/alert")
+            .with(httpBasic(user, password)).with(csrf())
+            .contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))
+            .content(JSONUtils.INSTANCE.toJSON(addRequest, false)));
+    result.andExpect(status().isOk()).andExpect(content().string("true"));
+
+    MetaAlertAddRemoveRequest addDuplicateRequest = new MetaAlertAddRemoveRequest();
+    addDuplicateRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid());
+    addDuplicateRequest.setAlerts(new ArrayList<GetRequest>() {{
+      add(new GetRequest("bro_1", "bro"));
+    }});
+
+    result = this.mockMvc.perform(
+        post(metaalertUrl + "/add/alert")
+            .with(httpBasic(user, password)).with(csrf())
+            .contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))
+            .content(JSONUtils.INSTANCE.toJSON(addDuplicateRequest, false)));
+    result.andExpect(status().isOk()).andExpect(content().string("false"));
+
+    MetaAlertAddRemoveRequest removeRequest = new MetaAlertAddRemoveRequest();
+    removeRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid());
+    removeRequest.setAlerts(new ArrayList<GetRequest>() {{
+      add(new GetRequest("bro_2", "bro"));
+      add(new GetRequest("bro_3", "bro"));
+    }});
+
+    result = this.mockMvc.perform(
+        post(metaalertUrl + "/remove/alert")
+            .with(httpBasic(user, password)).with(csrf())
+            .contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))
+            .content(JSONUtils.INSTANCE.toJSON(removeRequest, false)));
+    result.andExpect(status().isOk()).andExpect(content().string("true"));
+
+    MetaAlertAddRemoveRequest removeMissingRequest = new MetaAlertAddRemoveRequest();
+    addRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid());
+    removeMissingRequest.setAlerts(new ArrayList<GetRequest>() {{
+      add(new GetRequest("bro_1", "bro"));
+    }});
+
+    result = this.mockMvc.perform(
+        post(metaalertUrl + "/remove/alert")
+            .with(httpBasic(user, password)).with(csrf())
+            .contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))
+            .content(JSONUtils.INSTANCE.toJSON(removeMissingRequest, false)));
+    result.andExpect(status().isOk()).andExpect(content().string("false"));
+  }
+
+  @Test
+  public void shouldUpdateStatus() throws Exception {
+    MetaAlertCreateRequest metaAlertCreateRequest = new MetaAlertCreateRequest();
+    metaAlertCreateRequest.setGroups(Arrays.asList("group_one", "group_two"));
+    metaAlertCreateRequest.setAlerts(new ArrayList<GetRequest>() {{
+      add(new GetRequest("bro_1", "bro", "bro_index_2017.01.01.01"));
+      add(new GetRequest("snort_2", "snort", "snort_index_2017.01.01.01"));
+    }});
+
+    MetaAlertCreateResponse metaAlertCreateResponse = metaAlertService.create(metaAlertCreateRequest);
+
+    ResultActions result = this.mockMvc.perform(
+        post(metaalertUrl + "/update/status/" + metaAlertCreateResponse.getGuid() + "/inactive")
+            .with(httpBasic(user, password)).with(csrf())
+            .contentType(MediaType.parseMediaType("application/json;charset=UTF-8")));
+    result.andExpect(status().isOk()).andExpect(content().string("true"));
+
+    result = this.mockMvc.perform(
+        post(metaalertUrl + "/update/status/" + metaAlertCreateResponse.getGuid() + "/inactive")
+            .with(httpBasic(user, password)).with(csrf())
+            .contentType(MediaType.parseMediaType("application/json;charset=UTF-8")));
+    result.andExpect(status().isOk()).andExpect(content().string("false"));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
index 4708bc4..57a1b28 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.metron.hbase.mock.MockHTable;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.indexing.dao.HBaseDao;
 import org.apache.metron.indexing.dao.MetaAlertDao;
 import org.apache.metron.indexing.dao.SearchIntegrationTest;
 import org.apache.metron.rest.service.UpdateService;
@@ -161,7 +162,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest {
     Assert.assertEquals(1,table.size());
     {
         //ensure hbase is up to date
-        Get g = new Get(guid.getBytes());
+        Get g = new Get(new HBaseDao.Key(guid,"bro").toBytes());
         Result r = table.get(g);
         NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
         Assert.assertEquals(1, columns.size());
@@ -183,7 +184,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest {
     Assert.assertEquals(1,table.size());
     {
         //ensure hbase is up to date
-        Get g = new Get(guid.getBytes());
+        Get g = new Get(new HBaseDao.Key(guid, "bro").toBytes());
         Result r = table.get(g);
         NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
         Assert.assertEquals(2, columns.size());

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java
new file mode 100644
index 0000000..595a839
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+public enum KeyUtil {
+  INSTANCE;
+  private static final int SEED = 0xDEADBEEF;
+  public static final int HASH_PREFIX_SIZE=16;
+  ThreadLocal<HashFunction> hFunction= new ThreadLocal<HashFunction>() {
+    @Override
+    protected HashFunction initialValue() {
+      return Hashing.murmur3_128(SEED);
+    }
+  };
+
+  public byte[] getPrefix(byte[] key) {
+    Hasher hasher = hFunction.get().newHasher();
+    hasher.putBytes(key);
+    return hasher.hash().asBytes();
+  }
+
+  public byte[] merge(byte[] prefix, byte[] key) {
+    byte[] val = new byte[key.length + prefix.length];
+    int offset = 0;
+    System.arraycopy(prefix, 0, val, offset, prefix.length);
+    offset += prefix.length;
+    System.arraycopy(key, 0, val, offset, key.length);
+    return val;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
index a018e27..fff1d9b 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
@@ -34,6 +34,15 @@ import java.util.HashMap;
 
 
 public class HBaseEnrichmentConverterTest {
+    public static byte[] keyBytes = new byte[] {
+            0x31,(byte)0xc2,0x49,0x05,0x6b,(byte)0xea,
+            0x0e,0x59,(byte)0xe1,(byte)0xad,(byte)0xa0,0x24,
+            0x55,(byte)0xa9,0x6b,0x63,0x00,0x06,
+            0x64,0x6f,0x6d,0x61,0x69,0x6e,
+            0x00,0x06,0x67,0x6f,0x6f,0x67,
+            0x6c,0x65
+    };
+
     EnrichmentKey key = new EnrichmentKey("domain", "google");
     EnrichmentValue value = new EnrichmentValue(
             new HashMap<String, Object>() {{
@@ -41,6 +50,18 @@ public class HBaseEnrichmentConverterTest {
                 put("grok", "baz");
             }});
     LookupKV<EnrichmentKey, EnrichmentValue> results = new LookupKV(key, value);
+
+    /**
+     * IF this test fails then you have broken the key serialization in that your change has
+     * caused a key to change serialization, so keys from previous releases will not be able to be found
+     * under your scheme.  Please either provide a migration plan or undo this change.  DO NOT CHANGE THIS
+     * TEST BLITHELY!
+     */
+    @Test
+    public void testKeySerializationRemainsConstant() {
+        byte[] raw = key.toBytes();
+        Assert.assertArrayEquals(raw, keyBytes);
+    }
     @Test
     public void testKeySerialization() {
         byte[] serialized = key.toBytes();

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index f114b4c..61d5472 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -25,9 +25,11 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +40,7 @@ import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.Group;
 import org.apache.metron.indexing.dao.search.GroupOrder;
 import org.apache.metron.indexing.dao.search.GroupOrderType;
@@ -55,10 +58,9 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.action.update.UpdateResponse;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
@@ -256,40 +258,73 @@ public class ElasticsearchDao implements IndexDao {
     return ret.orElse(null);
   }
 
+  @Override
+  public Iterable<Document> getAllLatest(
+      final List<GetRequest> getRequests) throws IOException {
+    Collection<String> guids = new HashSet<>();
+    Collection<String> sensorTypes = new HashSet<>();
+    for (GetRequest getRequest: getRequests) {
+      guids.add(getRequest.getGuid());
+      sensorTypes.add(getRequest.getSensorType());
+    }
+    List<Document> documents = searchByGuids(
+        guids
+        , sensorTypes
+        , hit -> {
+          Long ts = 0L;
+          String doc = hit.getSourceAsString();
+          String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
+          try {
+            return Optional.of(new Document(doc, hit.getId(), sourceType, ts));
+          } catch (IOException e) {
+            throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
+          }
+        }
+
+    );
+    return documents;
+  }
+
+  <T> Optional<T> searchByGuid(String guid, String sensorType,
+      Function<SearchHit, Optional<T>> callback) {
+    Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null;
+    List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback);
+    if (results.size() > 0) {
+      return Optional.of(results.get(0));
+    } else {
+      return Optional.empty();
+    }
+  }
+
   /**
    * Return the search hit based on the UUID and sensor type.
    * A callback can be specified to transform the hit into a type T.
    * If more than one hit happens, the first one will be returned.
    */
-  <T> Optional<T> searchByGuid(String guid, String sensorType,
+  <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes,
       Function<SearchHit, Optional<T>> callback) {
     QueryBuilder query;
-    if (sensorType != null) {
-      query = QueryBuilders.idsQuery(sensorType + "_doc").ids(guid);
+    if (sensorTypes != null) {
+      String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc").toArray(String[]::new);
+      query = QueryBuilders.idsQuery(types).ids(guids);
     } else {
-      query = QueryBuilders.idsQuery().ids(guid);
+      query = QueryBuilders.idsQuery().ids(guids);
     }
     SearchRequestBuilder request = client.prepareSearch()
                                          .setQuery(query)
                                          .setSource("message")
+                                         .setSize(guids.size())
                                          ;
     org.elasticsearch.action.search.SearchResponse response = request.get();
     SearchHits hits = response.getHits();
-    long totalHits = hits.getTotalHits();
-    if (totalHits > 1) {
-      LOG.warn("Encountered {} results for guid {} in sensor {}. Returning first hit.",
-          totalHits,
-          guid,
-          sensorType
-      );
-    }
+    List<T> results = new ArrayList<>();
     for (SearchHit hit : hits) {
-      Optional<T> ret = callback.apply(hit);
-      if (ret.isPresent()) {
-        return ret;
+      Optional<T> result = callback.apply(hit);
+      if (result.isPresent()) {
+        results.add(result.get());
       }
     }
-    return Optional.empty();
+    return results;
   }
 
   @Override
@@ -297,18 +332,17 @@ public class ElasticsearchDao implements IndexDao {
     String indexPostfix = ElasticsearchUtils
         .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date());
     String sensorType = update.getSensorType();
-    String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, null);
-    String existingIndex = calculateExistingIndex(update, index, indexPostfix);
+    String indexName = getIndexName(update, index, indexPostfix);
 
-    UpdateRequest updateRequest = buildUpdateRequest(update, sensorType, indexName, existingIndex);
+    IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName);
     try {
-      UpdateResponse response = client.update(updateRequest).get();
+      IndexResponse response = client.index(indexRequest).get();
 
       ShardInfo shardInfo = response.getShardInfo();
       int failed = shardInfo.getFailed();
       if (failed > 0) {
         throw new IOException(
-            "ElasticsearchDao upsert failed: " + Arrays.toString(shardInfo.getFailures()));
+            "ElasticsearchDao index failed: " + Arrays.toString(shardInfo.getFailures()));
       }
     } catch (Exception e) {
       throw new IOException(e.getMessage(), e);
@@ -326,16 +360,14 @@ public class ElasticsearchDao implements IndexDao {
     for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) {
       Document update = updateEntry.getKey();
       String sensorType = update.getSensorType();
-      String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, null);
-      String existingIndex = calculateExistingIndex(update, updateEntry.getValue(), indexPostfix);
-      UpdateRequest updateRequest = buildUpdateRequest(
+      String indexName = getIndexName(update, updateEntry.getValue(), indexPostfix);
+      IndexRequest indexRequest = buildIndexRequest(
           update,
           sensorType,
-          indexName,
-          existingIndex
+          indexName
       );
 
-      bulkRequestBuilder.add(updateRequest);
+      bulkRequestBuilder.add(indexRequest);
     }
 
     BulkResponse bulkResponse = bulkRequestBuilder.get();
@@ -346,21 +378,20 @@ public class ElasticsearchDao implements IndexDao {
     }
   }
 
-  protected String calculateExistingIndex(Document update, Optional<String> index,
-      String indexPostFix) {
-    String sensorType = update.getSensorType();
-    String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostFix, null);
+  protected String getIndexName(Document update, Optional<String> index, String indexPostFix) {
+      return index.orElse(getIndexName(update.getGuid(), update.getSensorType())
+                  .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null))
+      );
+  }
 
-    return index.orElse(
-        searchByGuid(update.getGuid(),
-            sensorType,
-            hit -> Optional.ofNullable(hit.getIndex())
-        ).orElse(indexName)
+  protected Optional<String> getIndexName(String guid, String sensorType) {
+    return searchByGuid(guid,
+        sensorType,
+        hit -> Optional.ofNullable(hit.getIndex())
     );
   }
 
-  protected UpdateRequest buildUpdateRequest(Document update, String sensorType, String indexName,
-      String existingIndex) {
+  protected IndexRequest buildIndexRequest(Document update, String sensorType, String indexName) {
     String type = sensorType + "_doc";
     Object ts = update.getTimestamp();
     IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid())
@@ -370,9 +401,7 @@ public class ElasticsearchDao implements IndexDao {
       indexRequest = indexRequest.timestamp(ts.toString());
     }
 
-    return new UpdateRequest(existingIndex, type, update.getGuid())
-        .doc(update.getDocument())
-        .upsert(indexRequest);
+    return indexRequest;
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
index eef134f..c24ba0c 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
@@ -18,18 +18,21 @@
 
 package org.apache.metron.elasticsearch.dao;
 
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.apache.metron.common.Constants.GUID;
 import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
 import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery;
 import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
 import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -37,7 +40,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
-import org.apache.commons.collections4.SetUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
@@ -45,8 +47,10 @@ import org.apache.metron.indexing.dao.MetaAlertDao;
 import org.apache.metron.indexing.dao.MultiIndexDao;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
 import org.apache.metron.indexing.dao.metaalert.MetaScores;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.GroupRequest;
 import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.InvalidCreateException;
@@ -55,31 +59,26 @@ import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.ActionWriteResponse.ShardInfo;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.get.MultiGetItemResponse;
-import org.elasticsearch.action.get.MultiGetRequest.Item;
-import org.elasticsearch.action.get.MultiGetRequestBuilder;
-import org.elasticsearch.action.get.MultiGetResponse;
-import org.elasticsearch.action.index.IndexRequest;
+import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
+import org.apache.metron.indexing.dao.update.PatchRequest;
 import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.action.update.UpdateResponse;
-import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.QueryStringQueryBuilder;
 import org.elasticsearch.index.query.support.QueryInnerHitBuilder;
-import org.elasticsearch.search.SearchHit;
 
 public class ElasticsearchMetaAlertDao implements MetaAlertDao {
 
-  private static final String SOURCE_TYPE = Constants.SENSOR_TYPE.replace('.', ':');
+  public static final String SOURCE_TYPE = Constants.SENSOR_TYPE.replace('.', ':');
+  private static final String STATUS_PATH = "/status";
+  private static final String ALERT_PATH = "/alert";
+
   private IndexDao indexDao;
   private ElasticsearchDao elasticsearchDao;
   private String index = METAALERTS_INDEX;
   private String threatTriageField = THREAT_FIELD_DEFAULT;
   private String threatSort = THREAT_SORT_DEFAULT;
+  private int pageSize = 500;
 
   /**
    * Wraps an {@link org.apache.metron.indexing.dao.IndexDao} to handle meta alerts.
@@ -96,7 +95,7 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
    */
   public ElasticsearchMetaAlertDao(IndexDao indexDao, String index, String triageLevelField,
       String threatSort) {
-    init(indexDao, threatSort);
+    init(indexDao, Optional.of(threatSort));
     this.index = index;
     this.threatTriageField = triageLevelField;
   }
@@ -105,8 +104,14 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
     //uninitialized.
   }
 
+  /**
+   * Initializes this implementation by setting the supplied IndexDao and also setting a separate ElasticsearchDao.
+   * This is needed for some specific Elasticsearch functions (looking up an index from a GUID for example).
+   * @param indexDao The DAO to wrap for our queries
+   * @param threatSort The aggregation to use as the threat field. E.g. "sum", "median", etc.
+   */
   @Override
-  public void init(IndexDao indexDao, String threatSort) {
+  public void init(IndexDao indexDao, Optional<String> threatSort) {
     if (indexDao instanceof MultiIndexDao) {
       this.indexDao = indexDao;
       MultiIndexDao multiIndexDao = (MultiIndexDao) indexDao;
@@ -124,8 +129,8 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
       );
     }
 
-    if (threatSort != null) {
-      this.threatSort = threatSort;
+    if (threatSort.isPresent()) {
+      this.threatSort = threatSort.get();
     }
   }
 
@@ -139,66 +144,63 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
     if (guid == null || guid.trim().isEmpty()) {
       throw new InvalidSearchException("Guid cannot be empty");
     }
-    org.elasticsearch.action.search.SearchResponse esResponse = getMetaAlertsForAlert(guid.trim());
-    SearchResponse searchResponse = new SearchResponse();
-    searchResponse.setTotal(esResponse.getHits().getTotalHits());
-    searchResponse.setResults(
-        Arrays.stream(esResponse.getHits().getHits()).map(searchHit -> {
-              SearchResult searchResult = new SearchResult();
-              searchResult.setId(searchHit.getId());
-              searchResult.setSource(searchHit.getSource());
-              searchResult.setScore(searchHit.getScore());
-              searchResult.setIndex(searchHit.getIndex());
-              return searchResult;
-            }
-        ).collect(Collectors.toList()));
-    return searchResponse;
+    // Searches for all alerts containing the meta alert guid in it's "metalerts" array
+    QueryBuilder qb = boolQuery()
+        .must(
+            nestedQuery(
+                ALERT_FIELD,
+                boolQuery()
+                    .must(termQuery(ALERT_FIELD + "." + GUID, guid))
+            ).innerHit(new QueryInnerHitBuilder())
+        )
+        .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
+    return queryAllResults(qb);
   }
 
   @Override
   @SuppressWarnings("unchecked")
   public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request)
       throws InvalidCreateException, IOException {
-    if (request.getGuidToIndices().isEmpty()) {
-      throw new InvalidCreateException("MetaAlertCreateRequest must contain alert GUIDs");
+    List<GetRequest> alertRequests = request.getAlerts();
+    if (request.getAlerts().isEmpty()) {
+      throw new InvalidCreateException("MetaAlertCreateRequest must contain alerts");
     }
     if (request.getGroups().isEmpty()) {
       throw new InvalidCreateException("MetaAlertCreateRequest must contain UI groups");
     }
 
     // Retrieve the documents going into the meta alert and build it
-    MultiGetResponse multiGetResponse = getDocumentsByGuid(request);
-    Document createDoc = buildCreateDocument(multiGetResponse, request.getGroups());
-    MetaScores metaScores = calculateMetaScores(createDoc);
-    createDoc.getDocument().putAll(metaScores.getMetaScores());
-    createDoc.getDocument().put(threatTriageField, metaScores.getMetaScores().get(threatSort));
+    Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
+
+    Document metaAlert = buildCreateDocument(alerts, request.getGroups());
+    calculateMetaScores(metaAlert);
     // Add source type to be consistent with other sources and allow filtering
-    createDoc.getDocument().put("source:type", MetaAlertDao.METAALERT_TYPE);
+    metaAlert.getDocument().put(SOURCE_TYPE, MetaAlertDao.METAALERT_TYPE);
 
     // Start a list of updates / inserts we need to run
     Map<Document, Optional<String>> updates = new HashMap<>();
-    updates.put(createDoc, Optional.of(MetaAlertDao.METAALERTS_INDEX));
+    updates.put(metaAlert, Optional.of(MetaAlertDao.METAALERTS_INDEX));
 
     try {
       // We need to update the associated alerts with the new meta alerts, making sure existing
       // links are maintained.
-      List<String> metaAlertField;
-      for (MultiGetItemResponse itemResponse : multiGetResponse) {
-        metaAlertField = new ArrayList<>();
-        GetResponse response = itemResponse.getResponse();
-        if (response.isExists()) {
-          List<String> alertField = (List<String>) response.getSourceAsMap()
-              .get(MetaAlertDao.METAALERT_FIELD);
-          if (alertField != null) {
-            metaAlertField.addAll(alertField);
+      Map<String, Optional<String>> guidToIndices = alertRequests.stream().collect(Collectors.toMap(
+          GetRequest::getGuid, GetRequest::getIndex));
+      Map<String, String> guidToSensorTypes = alertRequests.stream().collect(Collectors.toMap(
+          GetRequest::getGuid, GetRequest::getSensorType));
+      for (Document alert: alerts) {
+        if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
+          // Use the index in the request if it exists
+          Optional<String> index = guidToIndices.get(alert.getGuid());
+          if (!index.isPresent()) {
+            // Look up the index from Elasticsearch if one is not supplied in the request
+            index = elasticsearchDao.getIndexName(alert.getGuid(), guidToSensorTypes.get(alert.getGuid()));
+            if (!index.isPresent()) {
+              throw new IllegalArgumentException("Could not find index for " + alert.getGuid());
+            }
           }
+          updates.put(alert, index);
         }
-        metaAlertField.add(createDoc.getGuid());
-
-        Document alertUpdate = buildAlertUpdate(response.getId(),
-            (String) response.getSource().get(SOURCE_TYPE), metaAlertField,
-            (Long) response.getSourceAsMap().get("_timestamp"));
-        updates.put(alertUpdate, Optional.of(itemResponse.getIndex()));
       }
 
       // Kick off any updates.
@@ -206,7 +208,7 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
 
       MetaAlertCreateResponse createResponse = new MetaAlertCreateResponse();
       createResponse.setCreated(true);
-      createResponse.setGuid(createDoc.getGuid());
+      createResponse.setGuid(metaAlert.getGuid());
       return createResponse;
     } catch (IOException ioe) {
       throw new InvalidCreateException("Unable to create meta alert", ioe);
@@ -214,6 +216,149 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
   }
 
   @Override
+  public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
+      throws IOException {
+    Map<Document, Optional<String>> updates = new HashMap<>();
+    Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
+    if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) {
+      Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
+      boolean metaAlertUpdated = addAlertsToMetaAlert(metaAlert, alerts);
+      if (metaAlertUpdated) {
+        calculateMetaScores(metaAlert);
+        updates.put(metaAlert, Optional.of(index));
+        for(Document alert: alerts) {
+          if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
+            updates.put(alert, Optional.empty());
+          }
+        }
+        indexDaoUpdate(updates);
+      }
+      return metaAlertUpdated;
+    } else {
+      throw new IllegalStateException("Adding alerts to an INACTIVE meta alert is not allowed");
+    }
+  }
+
+  protected boolean addAlertsToMetaAlert(Document metaAlert, Iterable<Document> alerts) {
+    boolean alertAdded = false;
+    List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD);
+    Set<String> currentAlertGuids = currentAlerts.stream().map(currentAlert ->
+        (String) currentAlert.get(GUID)).collect(Collectors.toSet());
+    for (Document alert: alerts) {
+      String alertGuid = alert.getGuid();
+      // Only add an alert if it isn't already in the meta alert
+      if (!currentAlertGuids.contains(alertGuid)) {
+        currentAlerts.add(alert.getDocument());
+        alertAdded = true;
+      }
+    }
+    return alertAdded;
+  }
+
+  protected boolean addMetaAlertToAlert(String metaAlertGuid, Document alert) {
+    List<String> metaAlertField = new ArrayList<>();
+    List<String> alertField = (List<String>) alert.getDocument()
+        .get(MetaAlertDao.METAALERT_FIELD);
+    if (alertField != null) {
+      metaAlertField.addAll(alertField);
+    }
+    boolean metaAlertAdded = !metaAlertField.contains(metaAlertGuid);
+    if (metaAlertAdded) {
+      metaAlertField.add(metaAlertGuid);
+      alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
+    }
+    return metaAlertAdded;
+  }
+
+  @Override
+  public boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
+      throws IOException {
+    Map<Document, Optional<String>> updates = new HashMap<>();
+    Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
+    if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) {
+      Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
+      Collection<String> alertGuids = alertRequests.stream().map(GetRequest::getGuid).collect(
+          Collectors.toList());
+      boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, alertGuids);
+      if (metaAlertUpdated) {
+        calculateMetaScores(metaAlert);
+        updates.put(metaAlert, Optional.of(index));
+        for(Document alert: alerts) {
+          if (removeMetaAlertFromAlert(metaAlert.getGuid(), alert)) {
+            updates.put(alert, Optional.empty());
+          }
+        }
+        indexDaoUpdate(updates);
+      }
+      return metaAlertUpdated;
+    } else {
+      throw new IllegalStateException("Removing alerts from an INACTIVE meta alert is not allowed");
+    }
+
+  }
+
+  protected boolean removeAlertsFromMetaAlert(Document metaAlert, Collection<String> alertGuids) {
+    List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD);
+    int previousSize = currentAlerts.size();
+    // Only remove an alert if it is in the meta alert
+    currentAlerts.removeIf(currentAlert -> alertGuids.contains((String) currentAlert.get(GUID)));
+    return currentAlerts.size() != previousSize;
+  }
+
+  protected boolean removeMetaAlertFromAlert(String metaAlertGuid, Document alert) {
+    List<String> metaAlertField = new ArrayList<>();
+    List<String> alertField = (List<String>) alert.getDocument()
+        .get(MetaAlertDao.METAALERT_FIELD);
+    if (alertField != null) {
+      metaAlertField.addAll(alertField);
+    }
+    boolean metaAlertRemoved = metaAlertField.remove(metaAlertGuid);
+    if (metaAlertRemoved) {
+      alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
+    }
+    return metaAlertRemoved;
+  }
+
+  @Override
+  public boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status)
+      throws IOException {
+    Map<Document, Optional<String>> updates = new HashMap<>();
+    Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
+    String currentStatus = (String) metaAlert.getDocument().get(MetaAlertDao.STATUS_FIELD);
+    boolean metaAlertUpdated = !status.getStatusString().equals(currentStatus);
+    if (metaAlertUpdated) {
+      metaAlert.getDocument().put(MetaAlertDao.STATUS_FIELD, status.getStatusString());
+      updates.put(metaAlert, Optional.of(index));
+      List<GetRequest> getRequests = new ArrayList<>();
+      List<Map<String, Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument()
+          .get(MetaAlertDao.ALERT_FIELD);
+      currentAlerts.stream().forEach(currentAlert -> {
+        getRequests.add(new GetRequest((String) currentAlert.get(GUID), (String) currentAlert.get(SOURCE_TYPE)));
+      });
+      Iterable<Document> alerts = indexDao.getAllLatest(getRequests);
+      for (Document alert : alerts) {
+        boolean metaAlertAdded = false;
+        boolean metaAlertRemoved = false;
+        // If we're making it active add add the meta alert guid for every alert.
+        if (MetaAlertStatus.ACTIVE.equals(status)) {
+          metaAlertAdded = addMetaAlertToAlert(metaAlert.getGuid(), alert);
+        }
+        // If we're making it inactive, remove the meta alert guid from every alert.
+        if (MetaAlertStatus.INACTIVE.equals(status)) {
+          metaAlertRemoved = removeMetaAlertFromAlert(metaAlert.getGuid(), alert);
+        }
+        if (metaAlertAdded || metaAlertRemoved) {
+          updates.put(alert, Optional.empty());
+        }
+      }
+    }
+    if (metaAlertUpdated) {
+      indexDaoUpdate(updates);
+    }
+    return metaAlertUpdated;
+  }
+
+  @Override
   public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
     // Wrap the query to also get any meta-alerts.
     QueryBuilder qb = constantScoreQuery(boolQuery()
@@ -242,95 +387,171 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
   }
 
   @Override
+  public Iterable<Document> getAllLatest(
+      List<GetRequest> getRequests) throws IOException {
+    return indexDao.getAllLatest(getRequests);
+  }
+
+  @Override
   public void update(Document update, Optional<String> index) throws IOException {
     if (METAALERT_TYPE.equals(update.getSensorType())) {
       // We've been passed an update to the meta alert.
-      handleMetaUpdate(update);
+      throw new UnsupportedOperationException("Meta alerts cannot be directly updated");
     } else {
+      Map<Document, Optional<String>> updates = new HashMap<>();
+      updates.put(update, index);
       // We need to update an alert itself.  Only that portion of the update can be delegated.
       // We still need to get meta alerts potentially associated with it and update.
-      org.elasticsearch.action.search.SearchResponse response = getMetaAlertsForAlert(
-          update.getGuid()
-      );
-
-      // Each hit, if any, is a metaalert that needs to be updated
-      for (SearchHit hit : response.getHits()) {
-        handleAlertUpdate(update, hit);
+      Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults().stream()
+          .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(), METAALERT_TYPE, 0L))
+          .collect(Collectors.toList());
+      // Each meta alert needs to be updated with the new alert
+      for (Document metaAlert : metaAlerts) {
+        replaceAlertInMetaAlert(metaAlert, update);
+        updates.put(metaAlert, Optional.of(METAALERTS_INDEX));
       }
 
       // Run the alert's update
-      indexDao.update(update, index);
+      indexDao.batchUpdate(updates);
     }
   }
 
+  protected boolean replaceAlertInMetaAlert(Document metaAlert, Document alert) {
+    boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, Collections.singleton(alert.getGuid()));
+    if (metaAlertUpdated) {
+      addAlertsToMetaAlert(metaAlert, Collections.singleton(alert));
+    }
+    return metaAlertUpdated;
+  }
+
   @Override
   public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException {
     throw new UnsupportedOperationException("Meta alerts do not allow for bulk updates");
   }
 
   /**
+   * Does not allow patches on the "alerts" or "status" fields.  These fields must be updated with their
+   * dedicated methods.
+   *
+   * @param request The patch request
+   * @param timestamp Optionally a timestamp to set. If not specified then current time is used.
+   * @throws OriginalNotFoundException
+   * @throws IOException
+   */
+  @Override
+  public void patch(PatchRequest request, Optional<Long> timestamp)
+      throws OriginalNotFoundException, IOException {
+    if (isPatchAllowed(request)) {
+      Document d = getPatchedDocument(request, timestamp);
+      indexDao.update(d, Optional.ofNullable(request.getIndex()));
+    } else {
+      throw new IllegalArgumentException("Meta alert patches are not allowed for /alert or /status paths.  "
+          + "Please use the add/remove alert or update status functions instead.");
+    }
+  }
+
+  protected boolean isPatchAllowed(PatchRequest request) {
+    Iterator patchIterator = request.getPatch().iterator();
+    while(patchIterator.hasNext()) {
+      JsonNode patch = (JsonNode) patchIterator.next();
+      String path = patch.path("path").asText();
+      if (STATUS_PATH.equals(path) || ALERT_PATH.equals(path)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
    * Given an alert GUID, retrieve all associated meta alerts.
-   * @param guid The GUID of the child alert
+   * @param alertGuid The GUID of the child alert
    * @return The Elasticsearch response containing the meta alerts
    */
-  protected org.elasticsearch.action.search.SearchResponse getMetaAlertsForAlert(String guid) {
+  protected SearchResponse getMetaAlertsForAlert(String alertGuid) {
     QueryBuilder qb = boolQuery()
         .must(
             nestedQuery(
                 ALERT_FIELD,
                 boolQuery()
-                    .must(termQuery(ALERT_FIELD + "." + Constants.GUID, guid))
+                    .must(termQuery(ALERT_FIELD + "." + GUID, alertGuid))
             ).innerHit(new QueryInnerHitBuilder())
         )
         .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
-    SearchRequest sr = new SearchRequest();
-    ArrayList<String> indices = new ArrayList<>();
-    indices.add(index);
-    sr.setIndices(indices);
-    return elasticsearchDao
+    return queryAllResults(qb);
+  }
+
+  /**
+   * Elasticsearch queries default to 10 records returned.  Some internal queries require that all
+   * results are returned.  Rather than setting an arbitrarily high size, this method pages through results
+   * and returns them all in a single SearchResponse.
+   * @param qb
+   * @return
+   */
+  protected SearchResponse queryAllResults(QueryBuilder qb) {
+    SearchRequestBuilder searchRequestBuilder = elasticsearchDao
         .getClient()
         .prepareSearch(index)
         .addFields("*")
         .setFetchSource(true)
         .setQuery(qb)
+        .setSize(pageSize);
+    org.elasticsearch.action.search.SearchResponse esResponse = searchRequestBuilder
         .execute()
         .actionGet();
+    List<SearchResult> allResults = getSearchResults(esResponse);
+    long total = esResponse.getHits().getTotalHits();
+    if (total > pageSize) {
+      int pages = (int) (total / pageSize) + 1;
+      for (int i = 1; i < pages; i++) {
+        int from = i * pageSize;
+        searchRequestBuilder.setFrom(from);
+        esResponse = searchRequestBuilder
+            .execute()
+            .actionGet();
+        allResults.addAll(getSearchResults(esResponse));
+      }
+    }
+    SearchResponse searchResponse = new SearchResponse();
+    searchResponse.setTotal(total);
+    searchResponse.setResults(allResults);
+    return searchResponse;
   }
 
   /**
-   * Return child documents after retrieving them from Elasticsearch.
-   * @param request The request detailing which child alerts we need
-   * @return The Elasticsearch response to our request for alerts
+   * Transforms a list of Elasticsearch SearchHits to a list of SearchResults
+   * @param searchResponse
+   * @return
    */
-  protected MultiGetResponse getDocumentsByGuid(MetaAlertCreateRequest request) {
-    MultiGetRequestBuilder multiGet = elasticsearchDao.getClient().prepareMultiGet();
-    for (Entry<String, String> entry : request.getGuidToIndices().entrySet()) {
-      multiGet.add(new Item(entry.getValue(), null, entry.getKey()));
-    }
-    return multiGet.get();
+  protected List<SearchResult> getSearchResults(org.elasticsearch.action.search.SearchResponse searchResponse) {
+    return Arrays.stream(searchResponse.getHits().getHits()).map(searchHit -> {
+          SearchResult searchResult = new SearchResult();
+          searchResult.setId(searchHit.getId());
+          searchResult.setSource(searchHit.getSource());
+          searchResult.setScore(searchHit.getScore());
+          searchResult.setIndex(searchHit.getIndex());
+          return searchResult;
+        }
+    ).collect(Collectors.toList());
   }
 
   /**
    * Build the Document representing a meta alert to be created.
-   * @param multiGetResponse The Elasticsearch results for the meta alerts child documents
+   * @param alerts The Elasticsearch results for the meta alerts child documents
    * @param groups The groups used to create this meta alert
    * @return A Document representing the new meta alert
    */
-  protected Document buildCreateDocument(MultiGetResponse multiGetResponse, List<String> groups) {
+  protected Document buildCreateDocument(Iterable<Document> alerts, List<String> groups) {
     // Need to create a Document from the multiget. Scores will be calculated later
     Map<String, Object> metaSource = new HashMap<>();
     List<Map<String, Object>> alertList = new ArrayList<>();
-    for (MultiGetItemResponse itemResponse : multiGetResponse) {
-      GetResponse response = itemResponse.getResponse();
-      if (response.isExists()) {
-        alertList.add(response.getSource());
-      }
+    for (Document alert: alerts) {
+      alertList.add(alert.getDocument());
     }
     metaSource.put(ALERT_FIELD, alertList);
 
     // Add any meta fields
     String guid = UUID.randomUUID().toString();
-    metaSource.put(Constants.GUID, guid);
+    metaSource.put(GUID, guid);
     metaSource.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis());
     metaSource.put(GROUPS_FIELD, groups);
     metaSource.put(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString());
@@ -339,29 +560,6 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
   }
 
   /**
-   * Process an update to a meta alert itself.
-   * @param update The update Document to be applied
-   * @throws IOException If there's a problem running the update
-   */
-  protected void handleMetaUpdate(Document update) throws IOException {
-    Map<Document, Optional<String>> updates = new HashMap<>();
-
-    if (update.getDocument().containsKey(MetaAlertDao.STATUS_FIELD)) {
-      // Update all associated alerts to maintain the meta alert link properly
-      updates.putAll(buildStatusAlertUpdates(update));
-    }
-    if (update.getDocument().containsKey(MetaAlertDao.ALERT_FIELD)) {
-      // If the alerts field changes (i.e. add/remove alert), update all affected alerts to
-      // maintain the meta alert link properly.
-      updates.putAll(buildAlertFieldUpdates(update));
-    }
-
-    // Run meta alert update.
-    updates.put(update, Optional.of(index));
-    indexDaoUpdate(updates);
-  }
-
-  /**
    * Calls the single update variant if there's only one update, otherwise calls batch.
    * @param updates The list of updates to run
    * @throws IOException If there's an update error
@@ -375,203 +573,6 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
     } // else we have no updates, so don't do anything
   }
 
-  protected Map<Document, Optional<String>> buildStatusAlertUpdates(Document update)
-      throws IOException {
-    Map<Document, Optional<String>> updates = new HashMap<>();
-    List<Map<String, Object>> alerts = getAllAlertsForMetaAlert(update);
-    for (Map<String, Object> alert : alerts) {
-      // Retrieve the associated alert, so we can update the array
-      List<String> metaAlertField = new ArrayList<>();
-      @SuppressWarnings("unchecked")
-      List<String> alertField = (List<String>) alert.get(MetaAlertDao.METAALERT_FIELD);
-      if (alertField != null) {
-        metaAlertField.addAll(alertField);
-      }
-      String status = (String) update.getDocument().get(MetaAlertDao.STATUS_FIELD);
-
-      Document alertUpdate = null;
-      String alertGuid = (String) alert.get(Constants.GUID);
-      // If we're making it active add add the meta alert guid for every alert.
-      if (MetaAlertStatus.ACTIVE.getStatusString().equals(status)
-          && !metaAlertField.contains(update.getGuid())) {
-        metaAlertField.add(update.getGuid());
-        alertUpdate = buildAlertUpdate(
-            alertGuid,
-            (String) alert.get(SOURCE_TYPE),
-            metaAlertField,
-            (Long) alert.get("_timestamp")
-        );
-      }
-
-      // If we're making it inactive, remove the meta alert guid from every alert.
-      if (MetaAlertStatus.INACTIVE.getStatusString().equals(status)
-          && metaAlertField.remove(update.getGuid())) {
-        alertUpdate = buildAlertUpdate(
-            alertGuid,
-            (String) alert.get(SOURCE_TYPE),
-            metaAlertField,
-            (Long) alert.get("_timestamp")
-        );
-      }
-
-      // Only run an alert update if we have an actual update.
-      if (alertUpdate != null) {
-        updates.put(alertUpdate, Optional.empty());
-      }
-    }
-    return updates;
-  }
-
-  protected Map<Document, Optional<String>> buildAlertFieldUpdates(Document update)
-      throws IOException {
-    Map<Document, Optional<String>> updates = new HashMap<>();
-    // If we've updated the alerts field (i.e add/remove), recalculate meta alert scores and
-    // the metaalerts fields for updating the children alerts.
-    MetaScores metaScores = calculateMetaScores(update);
-    update.getDocument().putAll(metaScores.getMetaScores());
-    update.getDocument().put(threatTriageField, metaScores.getMetaScores().get(threatSort));
-
-    // Get the set of GUIDs that are in the new version.
-    Set<String> updateGuids = new HashSet<>();
-    @SuppressWarnings("unchecked")
-    List<Map<String, Object>> updateAlerts = (List<Map<String, Object>>) update.getDocument()
-        .get(MetaAlertDao.ALERT_FIELD);
-    for (Map<String, Object> alert : updateAlerts) {
-      updateGuids.add((String) alert.get(Constants.GUID));
-    }
-
-    // Get the set of GUIDs from the old version
-    List<Map<String, Object>> alerts = getAllAlertsForMetaAlert(update);
-    Set<String> currentGuids = new HashSet<>();
-    for (Map<String, Object> alert : alerts) {
-      currentGuids.add((String) alert.get(Constants.GUID));
-    }
-
-    // Get both set differences, so we know what's been added and removed.
-    Set<String> removedGuids = SetUtils.difference(currentGuids, updateGuids);
-    Set<String> addedGuids = SetUtils.difference(updateGuids, currentGuids);
-
-    Document alertUpdate;
-
-    // Handle any removed GUIDs
-    for (String guid : removedGuids) {
-      // Retrieve the associated alert, so we can update the array
-      Document alert = elasticsearchDao.getLatest(guid, null);
-      List<String> metaAlertField = new ArrayList<>();
-      @SuppressWarnings("unchecked")
-      List<String> alertField = (List<String>) alert.getDocument()
-          .get(MetaAlertDao.METAALERT_FIELD);
-      if (alertField != null) {
-        metaAlertField.addAll(alertField);
-      }
-      if (metaAlertField.remove(update.getGuid())) {
-        alertUpdate = buildAlertUpdate(guid, alert.getSensorType(), metaAlertField,
-            alert.getTimestamp());
-        updates.put(alertUpdate, Optional.empty());
-      }
-    }
-
-    // Handle any added GUIDs
-    for (String guid : addedGuids) {
-      // Retrieve the associated alert, so we can update the array
-      Document alert = elasticsearchDao.getLatest(guid, null);
-      List<String> metaAlertField = new ArrayList<>();
-      @SuppressWarnings("unchecked")
-      List<String> alertField = (List<String>) alert.getDocument()
-          .get(MetaAlertDao.METAALERT_FIELD);
-      if (alertField != null) {
-        metaAlertField.addAll(alertField);
-      }
-      metaAlertField.add(update.getGuid());
-      alertUpdate = buildAlertUpdate(guid, alert.getSensorType(), metaAlertField,
-          alert.getTimestamp());
-      updates.put(alertUpdate, Optional.empty());
-    }
-
-    return updates;
-  }
-
-  @SuppressWarnings("unchecked")
-  protected List<Map<String, Object>> getAllAlertsForMetaAlert(Document update) throws IOException {
-    Document latest = indexDao.getLatest(update.getGuid(), MetaAlertDao.METAALERT_TYPE);
-    if (latest == null) {
-      return new ArrayList<>();
-    }
-    List<String> guids = new ArrayList<>();
-    List<Map<String, Object>> latestAlerts = (List<Map<String, Object>>) latest.getDocument()
-        .get(MetaAlertDao.ALERT_FIELD);
-    for (Map<String, Object> alert : latestAlerts) {
-      guids.add((String) alert.get(Constants.GUID));
-    }
-
-    List<Map<String, Object>> alerts = new ArrayList<>();
-    QueryBuilder query = QueryBuilders.idsQuery().ids(guids);
-    SearchRequestBuilder request = elasticsearchDao.getClient().prepareSearch()
-        .setQuery(query);
-    org.elasticsearch.action.search.SearchResponse response = request.get();
-    for (SearchHit hit : response.getHits().getHits()) {
-      alerts.add(hit.sourceAsMap());
-    }
-    return alerts;
-  }
-
-  /**
-   * Builds an update Document for updating the meta alerts list.
-   * @param alertGuid The GUID of the alert to update
-   * @param sensorType The sensor type to update
-   * @param metaAlertField The new metaAlertList to use
-   * @return The update Document
-   */
-  protected Document buildAlertUpdate(String alertGuid, String sensorType,
-      List<String> metaAlertField, Long timestamp) {
-    Document alertUpdate;
-    Map<String, Object> document = new HashMap<>();
-    document.put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
-    alertUpdate = new Document(
-        document,
-        alertGuid,
-        sensorType,
-        timestamp
-    );
-    return alertUpdate;
-  }
-
-  /**
-   * Takes care of upserting a child alert to a meta alert.
-   * @param update The update Document to be applied
-   * @param hit The meta alert to be updated
-   * @throws IOException If there's an issue running the upsert
-   */
-  protected void handleAlertUpdate(Document update, SearchHit hit) throws IOException {
-    XContentBuilder builder = buildUpdatedMetaAlert(update, hit);
-
-    // Run the meta alert's update
-    IndexRequest indexRequest = new IndexRequest(
-        METAALERTS_INDEX,
-        METAALERT_DOC,
-        hit.getId()
-    ).source(builder);
-    UpdateRequest updateRequest = new UpdateRequest(
-        METAALERTS_INDEX,
-        METAALERT_DOC,
-        hit.getId()
-    ).doc(builder).upsert(indexRequest);
-    try {
-      UpdateResponse updateResponse = elasticsearchDao.getClient().update(updateRequest).get();
-
-      ShardInfo shardInfo = updateResponse.getShardInfo();
-      int failed = shardInfo.getFailed();
-      if (failed > 0) {
-        throw new IOException(
-            "ElasticsearchMetaAlertDao upsert failed: "
-                + Arrays.toString(shardInfo.getFailures())
-        );
-      }
-    } catch (Exception e) {
-      throw new IOException(e.getMessage(), e);
-    }
-  }
-
   @Override
   public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices)
       throws IOException {
@@ -595,80 +596,26 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
 
   /**
    * Calculate the meta alert scores for a Document.
-   * @param document The Document containing scores
+   * @param metaAlert The Document containing scores
    * @return Set of score statistics
    */
   @SuppressWarnings("unchecked")
-  protected MetaScores calculateMetaScores(Document document) {
-    List<Object> alertsRaw = ((List<Object>) document.getDocument().get(ALERT_FIELD));
-    if (alertsRaw == null || alertsRaw.isEmpty()) {
-      throw new IllegalArgumentException("No alerts to use in calculation for doc GUID: "
-          + document.getDocument().get(Constants.GUID));
-    }
-
-    ArrayList<Double> scores = new ArrayList<>();
-    for (Object alertRaw : alertsRaw) {
-      Map<String, Object> alert = (Map<String, Object>) alertRaw;
-      Double scoreNum = parseThreatField(alert.get(threatTriageField));
-      if (scoreNum != null) {
-        scores.add(scoreNum);
-      }
-    }
-
-    return new MetaScores(scores);
-  }
-
-  /**
-   * Builds the updated meta alert based on the update.
-   * @param update The update Document for the meta alert
-   * @param hit The meta alert to be updated
-   * @return A builder for Elasticsearch to use
-   * @throws IOException If we have an issue building the result
-   */
-  protected XContentBuilder buildUpdatedMetaAlert(Document update, SearchHit hit)
-      throws IOException {
-    // Make sure to get all the threat scores while we're going through the docs
-    List<Double> scores = new ArrayList<>();
-    // Start building the new version of the metaalert
-    XContentBuilder builder = jsonBuilder().startObject();
-
-    // Run through the nested alerts of the meta alert and either use the new or old versions
-    builder.startArray(ALERT_FIELD);
-    Map<String, Object> hitAlerts = hit.sourceAsMap();
-
-    @SuppressWarnings("unchecked")
-    List<Map<String, Object>> alertHits = (List<Map<String, Object>>) hitAlerts.get(ALERT_FIELD);
-    for (Map<String, Object> alertHit : alertHits) {
-      Map<String, Object> docMap = alertHit;
-      // If we're at the update use it instead of the original
-      if (alertHit.get(Constants.GUID).equals(update.getGuid())) {
-        docMap = update.getDocument();
-      }
-      builder.map(docMap);
-
-      // Handle either String or Number values in the threatTriageField
-      Object threatRaw = docMap.get(threatTriageField);
-      Double threat = parseThreatField(threatRaw);
-
-      if (threat != null) {
-        scores.add(threat);
-      }
-    }
-    builder.endArray();
-
-    // Add all the meta alert fields, and score calculation
-    Map<String, Object> updatedMeta = new HashMap<>();
-    updatedMeta.putAll(hit.getSource());
-    updatedMeta.putAll(new MetaScores(scores).getMetaScores());
-    for (Entry<String, Object> entry : updatedMeta.entrySet()) {
-      // The alerts field is being added separately, so ignore the original
-      if (!(entry.getKey().equals(ALERT_FIELD))) {
-        builder.field(entry.getKey(), entry.getValue());
+  protected void calculateMetaScores(Document metaAlert) {
+    MetaScores metaScores = new MetaScores(new ArrayList<>());
+    List<Object> alertsRaw = ((List<Object>) metaAlert.getDocument().get(ALERT_FIELD));
+    if (alertsRaw != null && !alertsRaw.isEmpty()) {
+      ArrayList<Double> scores = new ArrayList<>();
+      for (Object alertRaw : alertsRaw) {
+        Map<String, Object> alert = (Map<String, Object>) alertRaw;
+        Double scoreNum = parseThreatField(alert.get(threatTriageField));
+        if (scoreNum != null) {
+          scores.add(scoreNum);
+        }
       }
+      metaScores = new MetaScores(scores);
     }
-    builder.endObject();
-
-    return builder;
+    metaAlert.getDocument().putAll(metaScores.getMetaScores());
+    metaAlert.getDocument().put(threatTriageField, metaScores.getMetaScores().get(threatSort));
   }
 
   private Double parseThreatField(Object threatRaw) {
@@ -680,4 +627,12 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
     }
     return threat;
   }
+
+  public int getPageSize() {
+    return pageSize;
+  }
+
+  public void setPageSize(int pageSize) {
+    this.pageSize = pageSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java
deleted file mode 100644
index 6c8e858..0000000
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.elasticsearch.dao;
-
-public enum MetaAlertStatus {
-  ACTIVE("active"),
-  INACTIVE("inactive");
-
-  private String statusString;
-
-  MetaAlertStatus(String statusString) {
-    this.statusString = statusString;
-  }
-
-  public String getStatusString() {
-    return statusString;
-  }
-}


[2/3] metron git commit: METRON-1289 Alert fields are lost when a MetaAlert is created (merrimanr) closes apache/metron#824

Posted by rm...@apache.org.
http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java
index 9a02854..a1027f7 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java
@@ -20,17 +20,12 @@ package org.apache.metron.elasticsearch.dao;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -40,10 +35,10 @@ import org.apache.metron.common.Constants.Fields;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.MetaAlertDao;
-import org.apache.metron.indexing.dao.MultiIndexDao;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
-import org.apache.metron.indexing.dao.metaalert.MetaScores;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.GroupRequest;
 import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.InvalidCreateException;
@@ -51,147 +46,10 @@ import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.get.MultiGetItemResponse;
-import org.elasticsearch.action.get.MultiGetResponse;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
 import org.junit.Test;
 
 public class ElasticsearchMetaAlertDaoTest {
 
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testBuildUpdatedMetaAlertSingleAlert() throws IOException, ParseException {
-    // Construct the expected result
-    JSONObject expected = new JSONObject();
-    expected.put("average", 5.0);
-    expected.put("min", 5.0);
-    expected.put("median", 5.0);
-    expected.put("max", 5.0);
-    expected.put("count", 1L);
-    expected.put(Constants.GUID, "m1");
-    expected.put("sum", 5.0);
-    expected.put(MetaAlertDao.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString());
-    JSONArray expectedAlerts = new JSONArray();
-    JSONObject expectedAlert = new JSONObject();
-    expectedAlert.put(MetaAlertDao.THREAT_FIELD_DEFAULT, 5L);
-    expectedAlert.put("fakekey", "fakevalue");
-    expectedAlerts.add(expectedAlert);
-    expected.put(MetaAlertDao.ALERT_FIELD, expectedAlerts);
-
-    // Construct the meta alert object
-    Map<String, Object> metaSource = new HashMap<>();
-    metaSource.put(Constants.GUID, "m1");
-    metaSource.put(MetaAlertDao.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString());
-    List<Double> alertScores = new ArrayList<>();
-    alertScores.add(10d);
-    metaSource.putAll(new MetaScores(alertScores).getMetaScores());
-    SearchHit metaHit = mock(SearchHit.class);
-    when(metaHit.getSource()).thenReturn(metaSource);
-
-    // Construct the inner alert
-    HashMap<String, Object> innerAlertSource = new HashMap<>();
-    innerAlertSource.put(Constants.GUID, "a1");
-    innerAlertSource.put(MetaAlertDao.THREAT_FIELD_DEFAULT, 10d);
-
-    Map<String, Object> innerHits = new HashMap<>();
-    innerHits.put(MetaAlertDao.ALERT_FIELD, Collections.singletonList(innerAlertSource));
-    when(metaHit.sourceAsMap()).thenReturn(innerHits);
-
-    // Construct  the updated Document
-    Map<String, Object> updateMap = new HashMap<>();
-    updateMap.put(MetaAlertDao.THREAT_FIELD_DEFAULT, 5);
-    updateMap.put("fakekey", "fakevalue");
-    Document update = new Document(updateMap, "a1", "bro_doc", 0L);
-
-    ElasticsearchDao esDao = new ElasticsearchDao();
-    ElasticsearchMetaAlertDao emaDao = new ElasticsearchMetaAlertDao();
-    emaDao.init(esDao);
-    XContentBuilder builder = emaDao.buildUpdatedMetaAlert(update, metaHit);
-    JSONParser parser = new JSONParser();
-    Object obj = parser.parse(builder.string());
-    JSONObject actual = (JSONObject) obj;
-
-    assertEquals(expected, actual);
-  }
-
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testBuildUpdatedMetaAlertMultipleAlerts() throws IOException, ParseException {
-    // Construct the expected result
-    JSONObject expected = new JSONObject();
-    expected.put("average", 7.5);
-    expected.put("min", 5.0);
-    expected.put("median", 7.5);
-    expected.put("max", 10.0);
-    expected.put("count", 2L);
-    expected.put(Constants.GUID, "m1");
-    expected.put("sum", 15.0);
-    expected.put(MetaAlertDao.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString());
-    JSONArray expectedAlerts = new JSONArray();
-    JSONObject expectedAlertOne = new JSONObject();
-    expectedAlertOne.put(MetaAlertDao.THREAT_FIELD_DEFAULT, 5d);
-    expectedAlertOne.put("fakekey", "fakevalue");
-    expectedAlerts.add(expectedAlertOne);
-    JSONObject expectedAlertTwo = new JSONObject();
-    expectedAlertTwo.put(MetaAlertDao.THREAT_FIELD_DEFAULT, 10d);
-    String guidTwo = "a2";
-    expectedAlertTwo.put(Constants.GUID, guidTwo);
-    expectedAlerts.add(expectedAlertTwo);
-    expected.put(MetaAlertDao.ALERT_FIELD, expectedAlerts);
-
-    // Construct the meta alert object
-    Map<String, Object> metaSource = new HashMap<>();
-    metaSource.put(Constants.GUID, "m1");
-    metaSource.put(MetaAlertDao.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString());
-    double threatValueOne = 5d;
-    double threatValueTwo = 10d;
-    List<Double> alertScores = new ArrayList<>();
-    alertScores.add(threatValueOne);
-    alertScores.add(threatValueTwo);
-    metaSource.putAll(new MetaScores(alertScores).getMetaScores());
-    SearchHit metaHit = mock(SearchHit.class);
-    when(metaHit.getSource()).thenReturn(metaSource);
-
-    // Construct the inner alerts
-    HashMap<String, Object> innerAlertSourceOne = new HashMap<>();
-    String guidOne = "a1";
-    innerAlertSourceOne.put(Constants.GUID, guidOne);
-    innerAlertSourceOne.put(MetaAlertDao.THREAT_FIELD_DEFAULT, threatValueTwo);
-
-    HashMap<String, Object> innerAlertSourceTwo = new HashMap<>();
-    innerAlertSourceTwo.put(Constants.GUID, guidTwo);
-    innerAlertSourceTwo.put(MetaAlertDao.THREAT_FIELD_DEFAULT, threatValueTwo);
-
-    Map<String, Object> innerHits = new HashMap<>();
-    innerHits
-        .put(MetaAlertDao.ALERT_FIELD, Arrays.asList(innerAlertSourceOne, innerAlertSourceTwo));
-    when(metaHit.sourceAsMap()).thenReturn(innerHits);
-
-    // Construct  the updated Document
-    Map<String, Object> updateMap = new HashMap<>();
-    updateMap.put(MetaAlertDao.THREAT_FIELD_DEFAULT, threatValueOne);
-    updateMap.put("fakekey", "fakevalue");
-    Document update = new Document(updateMap, guidOne, "bro_doc", 0L);
-
-    ElasticsearchDao esDao = new ElasticsearchDao();
-    ElasticsearchMetaAlertDao emaDao = new ElasticsearchMetaAlertDao();
-    MultiIndexDao multiIndexDao = new MultiIndexDao(esDao);
-    emaDao.init(multiIndexDao);
-    XContentBuilder builder = emaDao.buildUpdatedMetaAlert(update, metaHit);
-
-    JSONParser parser = new JSONParser();
-    Object obj = parser.parse(builder.string());
-    JSONObject actual = (JSONObject) obj;
-
-    assertEquals(expected, actual);
-  }
-
   @Test(expected = IllegalArgumentException.class)
   public void testInvalidInit() {
     IndexDao dao = new IndexDao() {
@@ -215,6 +73,12 @@ public class ElasticsearchMetaAlertDaoTest {
       }
 
       @Override
+      public Iterable<Document> getAllLatest(
+          List<GetRequest> getRequests) throws IOException {
+        return null;
+      }
+
+      @Override
       public void update(Document update, Optional<String> index) throws IOException {
       }
 
@@ -252,24 +116,12 @@ public class ElasticsearchMetaAlertDaoTest {
     Map<String, Object> alertOne = new HashMap<>();
     alertOne.put(Constants.GUID, "alert_one");
     alertOne.put(MetaAlertDao.THREAT_FIELD_DEFAULT, 10.0d);
-    GetResponse getResponseOne = mock(GetResponse.class);
-    when(getResponseOne.isExists()).thenReturn(true);
-    when(getResponseOne.getSource()).thenReturn(alertOne);
-    MultiGetItemResponse multiGetItemResponseOne = mock(MultiGetItemResponse.class);
-    when(multiGetItemResponseOne.getResponse()).thenReturn(getResponseOne);
-
-    // Add it to the iterator
-    @SuppressWarnings("unchecked")
-    Iterator<MultiGetItemResponse> mockIterator = mock(Iterator.class);
-    when(mockIterator.hasNext()).thenReturn(true, false);
-    when(mockIterator.next()).thenReturn(multiGetItemResponseOne);
-
-    // Add it to the response
-    MultiGetResponse mockResponse = mock(MultiGetResponse.class);
-    when(mockResponse.iterator()).thenReturn(mockIterator);
+    List<Document> alerts = new ArrayList<Document>() {{
+      add(new Document(alertOne, "", "", 0L));
+    }};
 
     // Actually build the doc
-    Document actual = emaDao.buildCreateDocument(mockResponse, groups);
+    Document actual = emaDao.buildCreateDocument(alerts, groups);
 
     ArrayList<Map<String, Object>> alertList = new ArrayList<>();
     alertList.add(alertOne);
@@ -306,34 +158,18 @@ public class ElasticsearchMetaAlertDaoTest {
     Map<String, Object> alertOne = new HashMap<>();
     alertOne.put(Constants.GUID, "alert_one");
     alertOne.put(MetaAlertDao.THREAT_FIELD_DEFAULT, 10.0d);
-    GetResponse getResponseOne = mock(GetResponse.class);
-    when(getResponseOne.isExists()).thenReturn(true);
-    when(getResponseOne.getSource()).thenReturn(alertOne);
-    MultiGetItemResponse multiGetItemResponseOne = mock(MultiGetItemResponse.class);
-    when(multiGetItemResponseOne.getResponse()).thenReturn(getResponseOne);
 
     // Build the second response from the multiget
     Map<String, Object> alertTwo = new HashMap<>();
     alertTwo.put(Constants.GUID, "alert_one");
     alertTwo.put(MetaAlertDao.THREAT_FIELD_DEFAULT, 5.0d);
-    GetResponse getResponseTwo = mock(GetResponse.class);
-    when(getResponseTwo.isExists()).thenReturn(true);
-    when(getResponseTwo.getSource()).thenReturn(alertTwo);
-    MultiGetItemResponse multiGetItemResponseTwo = mock(MultiGetItemResponse.class);
-    when(multiGetItemResponseTwo.getResponse()).thenReturn(getResponseTwo);
-
-    // Add it to the iterator
-    @SuppressWarnings("unchecked")
-    Iterator<MultiGetItemResponse> mockIterator = mock(Iterator.class);
-    when(mockIterator.hasNext()).thenReturn(true, true, false);
-    when(mockIterator.next()).thenReturn(multiGetItemResponseOne, multiGetItemResponseTwo);
-
-    // Add them to the response
-    MultiGetResponse mockResponse = mock(MultiGetResponse.class);
-    when(mockResponse.iterator()).thenReturn(mockIterator);
+    List<Document> alerts = new ArrayList<Document>() {{
+      add(new Document(alertOne, "", "", 0L));
+      add(new Document(alertTwo, "", "", 0L));
+    }};
 
     // Actually build the doc
-    Document actual = emaDao.buildCreateDocument(mockResponse, groups);
+    Document actual = emaDao.buildCreateDocument(alerts, groups);
 
     ArrayList<Map<String, Object>> alertList = new ArrayList<>();
     alertList.add(alertOne);
@@ -371,9 +207,7 @@ public class ElasticsearchMetaAlertDaoTest {
     emaDao.init(esDao);
 
     MetaAlertCreateRequest createRequest = new MetaAlertCreateRequest();
-    HashMap<String, String> guidsToGroups = new HashMap<>();
-    guidsToGroups.put("don't", "care");
-    createRequest.setGuidToIndices(guidsToGroups);
+    createRequest.setAlerts(Collections.singletonList(new GetRequest("don't", "care")));
     emaDao.createMetaAlert(createRequest);
   }
 
@@ -388,105 +222,11 @@ public class ElasticsearchMetaAlertDaoTest {
 
     Document doc = new Document(docMap, "guid", MetaAlertDao.METAALERT_TYPE, 0L);
 
-    List<Double> scores = new ArrayList<>();
-    scores.add(10.0d);
-    MetaScores expected = new MetaScores(scores);
-
     ElasticsearchMetaAlertDao metaAlertDao = new ElasticsearchMetaAlertDao();
-    MetaScores actual = metaAlertDao.calculateMetaScores(doc);
-    assertEquals(expected.getMetaScores(), actual.getMetaScores());
-  }
-
-  @Test
-  public void testHandleMetaUpdateNonAlertNonStatus() throws IOException {
-    ElasticsearchDao mockEsDao = mock(ElasticsearchDao.class);
-
-    Map<String, Object> docMap = new HashMap<>();
-    docMap.put("test", "value");
-    Document update = new Document(docMap, "guid", MetaAlertDao.METAALERT_TYPE, 0L);
-
-    ElasticsearchMetaAlertDao metaAlertDao = new ElasticsearchMetaAlertDao(mockEsDao);
-    metaAlertDao.handleMetaUpdate(update);
-    verify(mockEsDao, times(1))
-        .update(update, Optional.of(MetaAlertDao.METAALERTS_INDEX));
-  }
-
-  @Test
-  public void testHandleMetaUpdateAlert() throws IOException {
-    // The child alert of the meta alert
-    Map<String, Object> alertMapBefore = new HashMap<>();
-    alertMapBefore.put(MetaAlertDao.THREAT_FIELD_DEFAULT, 10.0d);
-    String guidAlert = "guid_alert";
-    alertMapBefore.put(Constants.GUID, guidAlert);
-    List<Map<String, Object>> alertList = new ArrayList<>();
-    alertList.add(alertMapBefore);
-    String alertSensorType = "alert_sensor";
-    Document alertBefore = new Document(
-        alertMapBefore,
-        guidAlert,
-        alertSensorType,
-        0L
-    );
-
-    // The original meta alert. It contains the alert we previously constructed.
-    Map<String, Object> metaMapBefore = new HashMap<>();
-    String metaGuid = "guid_meta";
-    metaMapBefore.putAll(alertBefore.getDocument());
-    metaMapBefore.put(MetaAlertDao.ALERT_FIELD, alertList);
-    metaMapBefore.put(Constants.GUID, metaGuid);
-    Document metaBefore = new Document(
-        metaMapBefore,
-        metaGuid,
-        MetaAlertDao.METAALERT_TYPE,
-        0L
-    );
-
-    // Build the Documents we expect to see from updates
-    // Build the after alert.  Don't add the original fields: This is only an update.
-    // The new field is the link to the meta alert.
-    Map<String, Object> alertMapAfter = new HashMap<>();
-    List<String> metaAlertField = new ArrayList<>();
-    metaAlertField.add(metaGuid);
-    alertMapAfter.put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
-    Document alertAfter = new Document(
-        alertMapAfter,
-        guidAlert,
-        alertSensorType,
-        0L
+    metaAlertDao.calculateMetaScores(doc);
+    assertEquals(1L, doc.getDocument().get("count"));
+    assertEquals(10.0d,
+        doc.getDocument().get(ElasticsearchMetaAlertDao.THREAT_FIELD_DEFAULT)
     );
-
-    // Build the meta alert after. This'll be a replace, so add the original fields plus the
-    // threat fields
-    Map<String, Object> metaMapAfter = new HashMap<>();
-    metaMapAfter.putAll(metaMapBefore);
-    metaMapAfter.put("average", 10.0d);
-    metaMapAfter.put("min", 10.0d);
-    metaMapAfter.put("median", 10.0d);
-    metaMapAfter.put("max", 10.0d);
-    metaMapAfter.put("count", 1L);
-    metaMapAfter.put("sum", 10.0d);
-    metaMapAfter.put(MetaAlertDao.THREAT_FIELD_DEFAULT, 10.0d);
-
-    Document metaAfter = new Document(
-        metaMapAfter,
-        metaGuid,
-        MetaAlertDao.METAALERT_TYPE,
-        0L
-    );
-
-    // Build the method calls we'd expect to see.
-    Map<Document, Optional<String>> updates = new HashMap<>();
-    updates.put(metaAfter, Optional.of(MetaAlertDao.METAALERTS_INDEX));
-    updates.put(alertAfter, Optional.empty());
-
-    // Build a mock ElasticsearchDao to track interactions.  Actual runs are in integration tests
-    ElasticsearchDao mockEsDao = mock(ElasticsearchDao.class);
-    ElasticsearchMetaAlertDao metaAlertDao = new ElasticsearchMetaAlertDao(mockEsDao);
-    when(mockEsDao.getLatest(guidAlert, null)).thenReturn(alertBefore);
-    metaAlertDao.handleMetaUpdate(metaBefore);
-
-    // Validate we're calling what we need to with what we expect.
-    verify(mockEsDao, times(1)).getLatest(guidAlert, null);
-    verify(mockEsDao, times(1)).batchUpdate(updates);
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
index 27e5566..c28094b 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
@@ -18,19 +18,23 @@
 
 package org.apache.metron.elasticsearch.integration;
 
+import static org.apache.metron.indexing.dao.MetaAlertDao.ALERT_FIELD;
+import static org.apache.metron.indexing.dao.MetaAlertDao.METAALERTS_INDEX;
+import static org.apache.metron.indexing.dao.MetaAlertDao.METAALERT_FIELD;
+import static org.apache.metron.indexing.dao.MetaAlertDao.METAALERT_TYPE;
+import static org.apache.metron.indexing.dao.MetaAlertDao.STATUS_FIELD;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.File;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -40,24 +44,26 @@ import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
 import org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao;
-import org.apache.metron.elasticsearch.dao.MetaAlertStatus;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
 import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.MetaAlertDao;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.Group;
 import org.apache.metron.indexing.dao.search.GroupRequest;
 import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.GroupResult;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.indexing.dao.search.SortField;
 import org.apache.metron.indexing.dao.update.Document;
+import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
 import org.apache.metron.indexing.dao.update.PatchRequest;
-import org.apache.metron.indexing.dao.update.ReplaceRequest;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -75,6 +81,7 @@ public class ElasticsearchMetaAlertIntegrationTest {
   private static final String INDEX =
       SENSOR_NAME + "_index_" + new SimpleDateFormat(DATE_FORMAT).format(new Date());
   private static final String NEW_FIELD = "new-field";
+  private static final String NAME_FIELD = "name";
 
   private static IndexDao esDao;
   private static MetaAlertDao metaDao;
@@ -82,161 +89,76 @@ public class ElasticsearchMetaAlertIntegrationTest {
 
   /**
    {
-   "guid": "update_metaalert_alert_0",
-   "source:type": "test",
-   "field": "value 0"
-   }
-   */
-  @Multiline
-  public static String updateMetaAlertAlert0;
-
-  /**
-   {
-   "guid": "update_metaalert_alert_1",
-   "source:type": "test",
-   "field":"value 1"
-   }
-   */
-  @Multiline
-  public static String updateMetaAlertAlert1;
-
-  /**
-   {
-   "guid": "update_metaalert_alert_0",
-   "patch": [
-   {
-   "op": "add",
-   "path": "/field",
-   "value": "patched value 0"
-   }
-   ],
-   "sensorType": "test"
-   }
-   */
-  @Multiline
-  public static String updateMetaAlertPatchRequest;
-
-  /**
-   {
-   "guid": "update_metaalert_alert_0",
-   "replacement": {
-   "guid": "update_metaalert_alert_0",
-   "source:type": "test",
-   "field": "replaced value 0"
-   },
-   "sensorType": "test"
-   }
-   */
-  @Multiline
-  public static String updateMetaAlertReplaceRequest;
-
-  /**
-   {
-   "guid": "active_metaalert",
-   "source:type": "metaalert",
-   "alert": [],
-   "status": "active",
-   "timestamp": 0
-   }
-   */
-  @Multiline
-  public static String activeMetaAlert;
-
-  /**
-   {
-   "guid": "inactive_metaalert",
-   "source:type": "metaalert",
-   "alert": [],
-   "status": "inactive"
-   }
-   */
-  @Multiline
-  public static String inactiveMetaAlert;
-
-  /**
-   {
-   "guid": "search_by_nested_alert_active_0",
-   "source:type": "test",
-   "ip_src_addr": "192.168.1.1",
-   "ip_src_port": 8010,
-   "metaalerts": ["active_metaalert"],
-   "timestamp": 0
-   }
-   */
-  @Multiline
-  public static String searchByNestedAlertActive0;
-
-  /**
-   {
-   "guid": "search_by_nested_alert_active_1",
-   "source:type": "test",
-   "ip_src_addr": "192.168.1.2",
-   "ip_src_port": 8009,
-   "metaalerts": ["active_metaalert"],
-   "timestamp": 0
-   }
-   */
-  @Multiline
-  public static String searchByNestedAlertActive1;
-
-  /**
-   {
-   "guid": "search_by_nested_alert_inactive_0",
-   "source:type": "test",
-   "ip_src_addr": "192.168.1.3",
-   "ip_src_port": 8008
-   }
-   */
-  @Multiline
-  public static String searchByNestedAlertInactive0;
-
-  /**
-   {
-   "guid": "search_by_nested_alert_inactive_1",
-   "source:type": "test",
-   "ip_src_addr": "192.168.1.4",
-   "ip_src_port": 8007
+     "properties": {
+       "alert": {
+         "type": "nested"
+       }
+     }
    }
    */
   @Multiline
-  public static String searchByNestedAlertInactive1;
+  public static String nestedAlertMapping;
 
   /**
    {
-     "properties": {
-       "alert": {
-         "type": "nested"
+     "guid": "meta_alert",
+     "index": "metaalert_index",
+     "patch": [
+       {
+         "op": "add",
+         "path": "/name",
+         "value": "New Meta Alert"
        }
-     }
+     ],
+     "sensorType": "metaalert"
    }
    */
   @Multiline
-  public static String nestedAlertMapping;
+  public static String namePatchRequest;
 
   /**
    {
-   "guid": "group_by_child_alert",
-   "source:type": "test",
-   "ip_src_addr": "192.168.1.1",
-   "ip_src_port": 8010,
-   "score_field": 1,
-   "metaalerts": ["active_metaalert"]
+     "guid": "meta_alert",
+     "index": "metaalert_index",
+     "patch": [
+       {
+         "op": "add",
+         "path": "/name",
+         "value": "New Meta Alert"
+       },
+       {
+         "op": "add",
+         "path": "/alert",
+         "value": []
+       }
+     ],
+     "sensorType": "metaalert"
    }
    */
   @Multiline
-  public static String groupByChildAlert;
+  public static String alertPatchRequest;
 
   /**
    {
-   "guid": "group_by_standalone_alert",
-   "source:type": "test",
-   "ip_src_addr": "192.168.1.1",
-   "ip_src_port": 8010,
-   "score_field": 10
+     "guid": "meta_alert",
+     "index": "metaalert_index",
+     "patch": [
+       {
+         "op": "add",
+         "path": "/status",
+         "value": "inactive"
+       },
+       {
+         "op": "add",
+         "path": "/name",
+         "value": "New Meta Alert"
+       }
+     ],
+     "sensorType": "metaalert"
    }
    */
   @Multiline
-  public static String groupByStandaloneAlert;
+  public static String statusPatchRequest;
 
   @BeforeClass
   public static void setupBefore() throws Exception {
@@ -267,7 +189,7 @@ public class ElasticsearchMetaAlertIntegrationTest {
 
   @Before
   public void setup() throws IOException {
-    es.createIndexWithMapping(MetaAlertDao.METAALERTS_INDEX, MetaAlertDao.METAALERT_DOC,
+    es.createIndexWithMapping(METAALERTS_INDEX, MetaAlertDao.METAALERT_DOC,
         buildMetaMappingSource());
   }
 
@@ -305,206 +227,414 @@ public class ElasticsearchMetaAlertIntegrationTest {
         .string();
   }
 
-
-  @SuppressWarnings("unchecked")
   @Test
-  public void test() throws Exception {
-    List<Map<String, Object>> inputData = new ArrayList<>();
-    for (int i = 0; i < 2; ++i) {
-      final String name = "message" + i;
-      int finalI = i;
-      inputData.add(
-          new HashMap<String, Object>() {
-            {
-              put("source:type", SENSOR_NAME);
-              put("name", name);
-              put(MetaAlertDao.THREAT_FIELD_DEFAULT, finalI);
-              put("timestamp", System.currentTimeMillis());
-              put(Constants.GUID, name);
-            }
-          }
-      );
+  public void shouldGetAllMetaAlertsForAlert() throws Exception {
+    // Load alerts
+    List<Map<String, Object>> alerts = buildAlerts(3);
+    elasticsearchAdd(alerts, INDEX, SENSOR_NAME);
+
+    // Load metaAlerts
+    List<Map<String, Object>> metaAlerts = buildMetaAlerts(12, MetaAlertStatus.ACTIVE,
+        Optional.of(Collections.singletonList(alerts.get(0))));
+    metaAlerts.add(buildMetaAlert("meta_active_12", MetaAlertStatus.ACTIVE,
+        Optional.of(Arrays.asList(alerts.get(0), alerts.get(2)))));
+    metaAlerts.add(buildMetaAlert("meta_inactive", MetaAlertStatus.INACTIVE,
+        Optional.of(Arrays.asList(alerts.get(0), alerts.get(2)))));
+    // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
+    elasticsearchAdd(metaAlerts, METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE);
+
+    // Verify load was successful
+    List<GetRequest> createdDocs = metaAlerts.stream().map(metaAlert ->
+        new GetRequest((String) metaAlert.get(Constants.GUID), METAALERT_TYPE))
+        .collect(Collectors.toList());
+    createdDocs.addAll(alerts.stream().map(alert ->
+        new GetRequest((String) alert.get(Constants.GUID), SENSOR_NAME))
+        .collect(Collectors.toList()));
+    findCreatedDocs(createdDocs);
+
+    int previousPageSize = ((ElasticsearchMetaAlertDao) metaDao).getPageSize();
+    ((ElasticsearchMetaAlertDao) metaDao).setPageSize(5);
+
+    {
+      // Verify searches successfully return more than 10 results
+      SearchResponse searchResponse0 = metaDao.getAllMetaAlertsForAlert("message_0");
+      List<SearchResult> searchResults0 = searchResponse0.getResults();
+      Assert.assertEquals(13, searchResults0.size());
+      Assert.assertEquals(metaAlerts.get(0), searchResults0.get(0).getSource());
+
+      // Verify no meta alerts are returned because message_1 was not added to any
+      SearchResponse searchResponse1 = metaDao.getAllMetaAlertsForAlert("message_1");
+      List<SearchResult> searchResults1 = searchResponse1.getResults();
+      Assert.assertEquals(0, searchResults1.size());
+
+      // Verify only the meta alert message_2 was added to is returned
+      SearchResponse searchResponse2 = metaDao.getAllMetaAlertsForAlert("message_2");
+      List<SearchResult> searchResults2 = searchResponse2.getResults();
+      Assert.assertEquals(1, searchResults2.size());
+      Assert.assertEquals(metaAlerts.get(12), searchResults2.get(0).getSource());
     }
+    ((ElasticsearchMetaAlertDao) metaDao).setPageSize(previousPageSize);
+  }
 
-    elasticsearchAdd(inputData, INDEX, SENSOR_NAME);
+  @Test
+  public void getAllMetaAlertsForAlertShouldThrowExceptionForEmtpyGuid() throws Exception {
+    try {
+      metaDao.getAllMetaAlertsForAlert("");
+      Assert.fail("An exception should be thrown for empty guid");
+    } catch (InvalidSearchException ise) {
+      Assert.assertEquals("Guid cannot be empty", ise.getMessage());
+    }
+  }
 
-    List<Map<String, Object>> metaInputData = new ArrayList<>();
-    final String name = "meta_message";
-    Map<String, Object>[] alertArray = new Map[1];
-    alertArray[0] = inputData.get(0);
-    metaInputData.add(
-        new HashMap<String, Object>() {
-          {
-            put("source:type", SENSOR_NAME);
-            put("alert", alertArray);
-            put(Constants.GUID, name + "_active");
-            put(MetaAlertDao.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString());
-          }
-        }
-    );
-    // Add an inactive message
-    metaInputData.add(
-        new HashMap<String, Object>() {
-          {
-            put("source:type", SENSOR_NAME);
-            put("alert", alertArray);
-            put(Constants.GUID, name + "_inactive");
-            put(MetaAlertDao.STATUS_FIELD, MetaAlertStatus.INACTIVE.getStatusString());
-          }
-        }
-    );
+  @Test
+  public void shouldCreateMetaAlert() throws Exception {
+    // Load alerts
+    List<Map<String, Object>> alerts = buildAlerts(3);
+    elasticsearchAdd(alerts, INDEX, SENSOR_NAME);
 
-    // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
-    elasticsearchAdd(metaInputData, MetaAlertDao.METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE);
+    // Verify load was successful
+    findCreatedDocs(Arrays.asList(
+        new GetRequest("message_0", SENSOR_NAME),
+        new GetRequest("message_1", SENSOR_NAME),
+        new GetRequest("message_2", SENSOR_NAME)));
 
-    List<Map<String, Object>> docs = null;
-    for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) {
-      docs = es.getAllIndexedDocs(INDEX, SENSOR_NAME + "_doc");
-      if (docs.size() >= 10) {
-        break;
+    {
+      MetaAlertCreateRequest metaAlertCreateRequest = new MetaAlertCreateRequest() {{
+        setAlerts(new ArrayList<GetRequest>() {{
+          add(new GetRequest("message_1", SENSOR_NAME));
+          add(new GetRequest("message_2", SENSOR_NAME, INDEX));
+        }});
+        setGroups(Collections.singletonList("group"));
+      }};
+      MetaAlertCreateResponse metaAlertCreateResponse = metaDao.createMetaAlert(metaAlertCreateRequest);
+      {
+        // Verify metaAlert was created
+        findCreatedDoc(metaAlertCreateResponse.getGuid(), MetaAlertDao.METAALERT_TYPE);
+      }
+      {
+        // Verify alert 0 was not updated with metaalert field
+        Document alert = metaDao.getLatest("message_0", SENSOR_NAME);
+        Assert.assertEquals(4, alert.getDocument().size());
+        Assert.assertNull(alert.getDocument().get(METAALERT_FIELD));
+      }
+      {
+        // Verify alert 1 was properly updated with metaalert field
+        Document alert = metaDao.getLatest("message_1", SENSOR_NAME);
+        Assert.assertEquals(5, alert.getDocument().size());
+        Assert.assertEquals(1, ((List) alert.getDocument().get(METAALERT_FIELD)).size());
+        Assert.assertEquals(metaAlertCreateResponse.getGuid(), ((List) alert.getDocument().get(METAALERT_FIELD)).get(0));
+      }
+      {
+        // Verify alert 2 was properly updated with metaalert field
+        Document alert = metaDao.getLatest("message_2", SENSOR_NAME);
+        Assert.assertEquals(5, alert.getDocument().size());
+        Assert.assertEquals(1, ((List) alert.getDocument().get(METAALERT_FIELD)).size());
+        Assert.assertEquals(metaAlertCreateResponse.getGuid(), ((List) alert.getDocument().get(METAALERT_FIELD)).get(0));
       }
     }
-    Assert.assertEquals(2, docs.size());
+  }
+
+  @Test
+  public void shouldAddAlertsToMetaAlert() throws Exception {
+    // Load alerts
+    List<Map<String, Object>> alerts = buildAlerts(4);
+    alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_alert"));
+    elasticsearchAdd(alerts, INDEX, SENSOR_NAME);
+
+    // Load metaAlert
+    Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE,
+        Optional.of(Collections.singletonList(alerts.get(0))));
+    elasticsearchAdd(Collections.singletonList(metaAlert), METAALERTS_INDEX, METAALERT_TYPE);
+
+    // Verify load was successful
+    findCreatedDocs(Arrays.asList(
+        new GetRequest("message_0", SENSOR_NAME),
+        new GetRequest("message_1", SENSOR_NAME),
+        new GetRequest("message_2", SENSOR_NAME),
+        new GetRequest("message_3", SENSOR_NAME),
+        new GetRequest("meta_alert", METAALERT_TYPE)));
+
+    // Build expected metaAlert after alerts are added
+    Map<String, Object> expectedMetaAlert = new HashMap<>(metaAlert);
+
+    // Verify the proper alerts were added
+    List<Map<String, Object>> metaAlertAlerts = new ArrayList<>((List<Map<String, Object>>) expectedMetaAlert.get(ALERT_FIELD));
+    Map<String, Object> expectedAlert0 = alerts.get(0);
+    Map<String, Object> expectedAlert1 = alerts.get(1);
+    expectedAlert1.put(METAALERT_FIELD, Collections.singletonList("meta_alert"));
+    metaAlertAlerts.add(expectedAlert1);
+    Map<String, Object> expectedAlert2 = alerts.get(2);
+    expectedAlert2.put(METAALERT_FIELD, Collections.singletonList("meta_alert"));
+    metaAlertAlerts.add(expectedAlert2);
+    expectedMetaAlert.put(ALERT_FIELD, metaAlertAlerts);
+
+    // Verify the counts were properly updated
+    expectedMetaAlert.put("average", 1.0d);
+    expectedMetaAlert.put("min", 0.0d);
+    expectedMetaAlert.put("median", 1.0d);
+    expectedMetaAlert.put("max", 2.0d);
+    expectedMetaAlert.put("count", 3);
+    expectedMetaAlert.put("sum", 3.0d);
+    expectedMetaAlert.put("threat:triage:score", 3.0d);
+
     {
-      //modify the first message and add a new field
-      Map<String, Object> message0 = new HashMap<String, Object>(inputData.get(0)) {
-        {
-          put(NEW_FIELD, "metron");
-          put(MetaAlertDao.THREAT_FIELD_DEFAULT, "10");
-        }
-      };
-      String guid = "" + message0.get(Constants.GUID);
-      metaDao.replace(new ReplaceRequest() {
-        {
-          setReplacement(message0);
-          setGuid(guid);
-          setSensorType(SENSOR_NAME);
-        }
-      }, Optional.empty());
+      // Verify alerts were successfully added to the meta alert
+      Assert.assertTrue(metaDao.addAlertsToMetaAlert("meta_alert", Arrays.asList(new GetRequest("message_1", SENSOR_NAME), new GetRequest("message_2", SENSOR_NAME))));
+      findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+    }
 
-      {
-        //ensure alerts in ES are up-to-date
-        boolean found = findUpdatedDoc(message0, guid, SENSOR_NAME);
-        Assert.assertTrue("Unable to find updated document", found);
-        long cnt = 0;
-        for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS)) {
-          docs = es.getAllIndexedDocs(INDEX, SENSOR_NAME + "_doc");
-          cnt = docs
-              .stream()
-              .filter(d -> {
-                Object newfield = d.get(NEW_FIELD);
-                return newfield != null && newfield.equals(message0.get(NEW_FIELD));
-              }).count();
-        }
-        if (cnt == 0) {
-          Assert.fail("Elasticsearch is not updated!");
-        }
+    {
+      // Verify False when alerts are already in a meta alert and no new alerts are added
+      Assert.assertFalse(metaDao.addAlertsToMetaAlert("meta_alert", Arrays.asList(new GetRequest("message_0", SENSOR_NAME), new GetRequest("message_1", SENSOR_NAME))));
+      findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+    }
+
+    {
+      // Verify only 1 alert is added when a list of alerts only contains 1 alert that is not in the meta alert
+      metaAlertAlerts = new ArrayList<>((List<Map<String, Object>>) expectedMetaAlert.get(ALERT_FIELD));
+      Map<String, Object> expectedAlert3 = alerts.get(3);
+      expectedAlert3.put(METAALERT_FIELD, Collections.singletonList("meta_alert"));
+      metaAlertAlerts.add(expectedAlert3);
+      expectedMetaAlert.put(ALERT_FIELD, metaAlertAlerts);
+
+      expectedMetaAlert.put("average", 1.5d);
+      expectedMetaAlert.put("min", 0.0d);
+      expectedMetaAlert.put("median", 1.5d);
+      expectedMetaAlert.put("max", 3.0d);
+      expectedMetaAlert.put("count", 4);
+      expectedMetaAlert.put("sum", 6.0d);
+      expectedMetaAlert.put("threat:triage:score", 6.0d);
+
+      Assert.assertTrue(metaDao.addAlertsToMetaAlert("meta_alert", Arrays.asList(new GetRequest("message_2", SENSOR_NAME), new GetRequest("message_3", SENSOR_NAME))));
+      findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+    }
+
+  }
+
+  @Test
+  public void shouldRemoveAlertsFromMetaAlert() throws Exception {
+    // Load alerts
+    List<Map<String, Object>> alerts = buildAlerts(4);
+    alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_alert"));
+    alerts.get(1).put(METAALERT_FIELD, Collections.singletonList("meta_alert"));
+    alerts.get(2).put(METAALERT_FIELD, Collections.singletonList("meta_alert"));
+    alerts.get(3).put(METAALERT_FIELD, Collections.singletonList("meta_alert"));
+    elasticsearchAdd(alerts, INDEX, SENSOR_NAME);
+
+    // Load metaAlert
+    Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE,
+        Optional.of(Arrays.asList(alerts.get(0), alerts.get(1), alerts.get(2), alerts.get(3))));
+    elasticsearchAdd(Collections.singletonList(metaAlert), METAALERTS_INDEX, METAALERT_TYPE);
+
+    // Verify load was successful
+    findCreatedDocs(Arrays.asList(
+        new GetRequest("message_0", SENSOR_NAME),
+        new GetRequest("message_1", SENSOR_NAME),
+        new GetRequest("message_2", SENSOR_NAME),
+        new GetRequest("message_3", SENSOR_NAME),
+        new GetRequest("meta_alert", METAALERT_TYPE)));
+
+    // Build expected metaAlert after alerts are added
+    Map<String, Object> expectedMetaAlert = new HashMap<>(metaAlert);
+
+    // Verify the proper alerts were added
+    List<Map<String, Object>> metaAlertAlerts = new ArrayList<>((List<Map<String, Object>>) expectedMetaAlert.get(ALERT_FIELD));
+    metaAlertAlerts.remove(0);
+    metaAlertAlerts.remove(0);
+    expectedMetaAlert.put(ALERT_FIELD, metaAlertAlerts);
+
+    // Verify the counts were properly updated
+    expectedMetaAlert.put("average", 2.5d);
+    expectedMetaAlert.put("min", 2.0d);
+    expectedMetaAlert.put("median", 2.5d);
+    expectedMetaAlert.put("max", 3.0d);
+    expectedMetaAlert.put("count", 2);
+    expectedMetaAlert.put("sum", 5.0d);
+    expectedMetaAlert.put("threat:triage:score", 5.0d);
+
+
+    {
+      // Verify a list of alerts are removed from a meta alert
+      Assert.assertTrue(metaDao.removeAlertsFromMetaAlert("meta_alert", Arrays.asList(new GetRequest("message_0", SENSOR_NAME), new GetRequest("message_1", SENSOR_NAME))));
+      findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+    }
+
+    {
+      // Verify False when alerts are not present in a meta alert and no alerts are removed
+      Assert.assertFalse(metaDao.removeAlertsFromMetaAlert("meta_alert", Arrays.asList(new GetRequest("message_0", SENSOR_NAME), new GetRequest("message_1", SENSOR_NAME))));
+      findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+    }
+
+    {
+      // Verify only 1 alert is removed when a list of alerts only contains 1 alert that is in the meta alert
+      metaAlertAlerts = new ArrayList<>((List<Map<String, Object>>) expectedMetaAlert.get(ALERT_FIELD));
+      metaAlertAlerts.remove(0);
+      expectedMetaAlert.put(ALERT_FIELD, metaAlertAlerts);
+
+      expectedMetaAlert.put("average", 3.0d);
+      expectedMetaAlert.put("min", 3.0d);
+      expectedMetaAlert.put("median", 3.0d);
+      expectedMetaAlert.put("max", 3.0d);
+      expectedMetaAlert.put("count", 1);
+      expectedMetaAlert.put("sum", 3.0d);
+      expectedMetaAlert.put("threat:triage:score", 3.0d);
+
+      Assert.assertTrue(metaDao.removeAlertsFromMetaAlert("meta_alert", Arrays.asList(new GetRequest("message_0", SENSOR_NAME), new GetRequest("message_2", SENSOR_NAME))));
+      findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+    }
+
+    {
+      // Verify all alerts are removed from a metaAlert
+      metaAlertAlerts = new ArrayList<>((List<Map<String, Object>>) expectedMetaAlert.get(ALERT_FIELD));
+      metaAlertAlerts.remove(0);
+      expectedMetaAlert.put(ALERT_FIELD, metaAlertAlerts);
+
+      expectedMetaAlert.put("average", 0.0d);
+      expectedMetaAlert.put("min", "Infinity");
+      expectedMetaAlert.put("median", "NaN");
+      expectedMetaAlert.put("max", "-Infinity");
+      expectedMetaAlert.put("count", 0);
+      expectedMetaAlert.put("sum", 0.0d);
+      expectedMetaAlert.put("threat:triage:score", 0.0d);
+
+      Assert.assertTrue(metaDao.removeAlertsFromMetaAlert("meta_alert",
+          Collections.singletonList(new GetRequest("message_3", SENSOR_NAME))));
+      findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+    }
+
+  }
+
+  @Test
+  public void addRemoveAlertsShouldThrowExceptionForInactiveMetaAlert() throws Exception {
+    // Load alerts
+    List<Map<String, Object>> alerts = buildAlerts(2);
+    alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_alert"));
+    elasticsearchAdd(alerts, INDEX, SENSOR_NAME);
+
+    // Load metaAlert
+    Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.INACTIVE,
+        Optional.of(Collections.singletonList(alerts.get(0))));
+    elasticsearchAdd(Collections.singletonList(metaAlert), METAALERTS_INDEX, METAALERT_TYPE);
+
+    // Verify load was successful
+    findCreatedDocs(Arrays.asList(
+        new GetRequest("message_0", SENSOR_NAME),
+        new GetRequest("message_1", SENSOR_NAME),
+        new GetRequest("meta_alert", METAALERT_TYPE)));
+
+    {
+      // Verify alerts cannot be added to an INACTIVE meta alert
+      try {
+        metaDao.addAlertsToMetaAlert("meta_alert",
+            Collections.singletonList(new GetRequest("message_1", SENSOR_NAME)));
+        Assert.fail("Adding alerts to an inactive meta alert should throw an exception");
+      } catch (IllegalStateException ise) {
+        Assert.assertEquals("Adding alerts to an INACTIVE meta alert is not allowed", ise.getMessage());
       }
+    }
 
-      {
-        //ensure meta alerts in ES are up-to-date
-        long cnt = 0;
-        for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS)) {
-          docs = es.getAllIndexedDocs(MetaAlertDao.METAALERTS_INDEX, MetaAlertDao.METAALERT_DOC);
-          cnt = docs
-              .stream()
-              .filter(d -> {
-                List<Map<String, Object>> alerts = (List<Map<String, Object>>) d
-                    .get(MetaAlertDao.ALERT_FIELD);
-
-                for (Map<String, Object> alert : alerts) {
-                  Object newField = alert.get(NEW_FIELD);
-                  if (newField != null && newField.equals(message0.get(NEW_FIELD))) {
-                    return true;
-                  }
-                }
-
-                return false;
-              }).count();
-        }
-        if (cnt == 0) {
-          Assert.fail("Elasticsearch metaalerts not updated!");
-        }
+    {
+      // Verify alerts cannot be removed from an INACTIVE meta alert
+      try {
+        metaDao.removeAlertsFromMetaAlert("meta_alert",
+            Collections.singletonList(new GetRequest("message_0", SENSOR_NAME)));
+        Assert.fail("Removing alerts from an inactive meta alert should throw an exception");
+      } catch (IllegalStateException ise) {
+        Assert.assertEquals("Removing alerts from an INACTIVE meta alert is not allowed", ise.getMessage());
       }
     }
-    //modify the same message and modify the new field
+  }
+
+  @Test
+  public void shouldUpdateMetaAlertStatus() throws Exception {
+    // Load alerts
+    List<Map<String, Object>> alerts = buildAlerts(3);
+    alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_alert"));
+    alerts.get(1).put(METAALERT_FIELD, Collections.singletonList("meta_alert"));
+    elasticsearchAdd(alerts, INDEX, SENSOR_NAME);
+
+    // Load metaAlerts
+    Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE,
+        Optional.of(Arrays.asList(alerts.get(0), alerts.get(1))));
+    // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
+    elasticsearchAdd(Collections.singletonList(metaAlert), METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE);
+
+    // Verify load was successful
+    findCreatedDocs(Arrays.asList(
+        new GetRequest("message_0", SENSOR_NAME),
+        new GetRequest("message_1", SENSOR_NAME),
+        new GetRequest("message_2", SENSOR_NAME),
+        new GetRequest("meta_alert", METAALERT_TYPE)));
+
     {
-      Map<String, Object> message0 = new HashMap<String, Object>(inputData.get(0)) {
-        {
-          put(NEW_FIELD, "metron2");
-        }
-      };
-      String guid = "" + message0.get(Constants.GUID);
-      metaDao.replace(new ReplaceRequest() {
-        {
-          setReplacement(message0);
-          setGuid(guid);
-          setSensorType(SENSOR_NAME);
-        }
-      }, Optional.empty());
+      // Verify status changed to inactive and child alerts are updated
+      Assert.assertTrue(metaDao.updateMetaAlertStatus("meta_alert", MetaAlertStatus.INACTIVE));
+
+      Map<String, Object> expectedMetaAlert = new HashMap<>(metaAlert);
+      expectedMetaAlert.put(STATUS_FIELD, MetaAlertStatus.INACTIVE.getStatusString());
+
+      findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+
+      Map<String, Object> expectedAlert0 = new HashMap<>(alerts.get(0));
+      expectedAlert0.put("metaalerts", new ArrayList());
+      findUpdatedDoc(expectedAlert0, "message_0", SENSOR_NAME);
+
+      Map<String, Object> expectedAlert1 = new HashMap<>(alerts.get(1));
+      expectedAlert1.put("metaalerts", new ArrayList());
+      findUpdatedDoc(expectedAlert1, "message_1", SENSOR_NAME);
+
+      Map<String, Object> expectedAlert2 = new HashMap<>(alerts.get(2));
+      findUpdatedDoc(expectedAlert2, "message_2", SENSOR_NAME);
+    }
+
+    {
+      // Verify status changed to active and child alerts are updated
+      Assert.assertTrue(metaDao.updateMetaAlertStatus("meta_alert", MetaAlertStatus.ACTIVE));
+
+      Map<String, Object> expectedMetaAlert = new HashMap<>(metaAlert);
+      expectedMetaAlert.put(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString());
+
+      findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+
+      Map<String, Object> expectedAlert0 = new HashMap<>(alerts.get(0));
+      expectedAlert0.put("metaalerts", Collections.singletonList("meta_alert"));
+      findUpdatedDoc(expectedAlert0, "message_0", SENSOR_NAME);
+
+      Map<String, Object> expectedAlert1 = new HashMap<>(alerts.get(1));
+      expectedAlert1.put("metaalerts", Collections.singletonList("meta_alert"));
+      findUpdatedDoc(expectedAlert1, "message_1", SENSOR_NAME);
+
+      Map<String, Object> expectedAlert2 = new HashMap<>(alerts.get(2));
+      findUpdatedDoc(expectedAlert2, "message_2", SENSOR_NAME);
 
-      boolean found = findUpdatedDoc(message0, guid, SENSOR_NAME);
-      Assert.assertTrue("Unable to find updated document", found);
-      {
-        //ensure ES is up-to-date
-        long cnt = 0;
-        for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS)) {
-          docs = es.getAllIndexedDocs(INDEX, SENSOR_NAME + "_doc");
-          cnt = docs
-              .stream()
-              .filter(d -> message0.get(NEW_FIELD).equals(d.get(NEW_FIELD)))
-              .count();
-        }
-        Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0);
-        if (cnt == 0) {
-          Assert.fail("Elasticsearch is not updated!");
-        }
-      }
       {
-        //ensure meta alerts in ES are up-to-date
-        long cnt = 0;
-        for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS)) {
-          docs = es.getAllIndexedDocs(MetaAlertDao.METAALERTS_INDEX, MetaAlertDao.METAALERT_DOC);
-          cnt = docs
-              .stream()
-              .filter(d -> {
-                List<Map<String, Object>> alerts = (List<Map<String, Object>>) d
-                    .get(MetaAlertDao.ALERT_FIELD);
-
-                for (Map<String, Object> alert : alerts) {
-                  Object newField = alert.get(NEW_FIELD);
-                  if (newField != null && newField.equals(message0.get(NEW_FIELD))) {
-                    return true;
-                  }
-                }
-
-                return false;
-              }).count();
-        }
-        if (cnt == 0) {
-          Assert.fail("Elasticsearch metaalerts not updated!");
-        }
+        // Verify status changed to current status has no effect
+        Assert.assertFalse(metaDao.updateMetaAlertStatus("meta_alert", MetaAlertStatus.ACTIVE));
+
+        findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+        findUpdatedDoc(expectedAlert0, "message_0", SENSOR_NAME);
+        findUpdatedDoc(expectedAlert1, "message_1", SENSOR_NAME);
+        findUpdatedDoc(expectedAlert2, "message_2", SENSOR_NAME);
       }
     }
   }
 
-
   @Test
   public void shouldSearchByStatus() throws Exception {
-    List<Map<String, Object>> metaInputData = new ArrayList<>();
-    Map<String, Object> activeMetaAlertJSON = JSONUtils.INSTANCE
-        .load(activeMetaAlert, new TypeReference<Map<String, Object>>() {
-        });
-    metaInputData.add(activeMetaAlertJSON);
-    Map<String, Object> inactiveMetaAlertJSON = JSONUtils.INSTANCE
-        .load(inactiveMetaAlert, new TypeReference<Map<String, Object>>() {
-        });
-    metaInputData.add(inactiveMetaAlertJSON);
+    // Load metaAlerts
+    Map<String, Object> activeMetaAlert = buildMetaAlert("meta_active", MetaAlertStatus.ACTIVE,
+        Optional.empty());
+    Map<String, Object> inactiveMetaAlert = buildMetaAlert("meta_inactive", MetaAlertStatus.INACTIVE,
+        Optional.empty());
+
 
     // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
-    elasticsearchAdd(metaInputData, MetaAlertDao.METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE);
-    // Wait for updates to persist
-    findUpdatedDoc(inactiveMetaAlertJSON, "inactive_metaalert", MetaAlertDao.METAALERT_TYPE);
+    elasticsearchAdd(Arrays.asList(activeMetaAlert, inactiveMetaAlert), METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE);
+
+    // Verify load was successful
+    findCreatedDocs(Arrays.asList(
+        new GetRequest("meta_active", METAALERT_TYPE),
+        new GetRequest("meta_inactive", METAALERT_TYPE)));
 
     SearchResponse searchResponse = metaDao.search(new SearchRequest() {
       {
@@ -517,6 +647,8 @@ public class ElasticsearchMetaAlertIntegrationTest {
         }}));
       }
     });
+
+    // Verify only active meta alerts are returned
     Assert.assertEquals(1, searchResponse.getTotal());
     Assert.assertEquals(MetaAlertStatus.ACTIVE.getStatusString(),
         searchResponse.getResults().get(0).getSource().get(MetaAlertDao.STATUS_FIELD));
@@ -525,34 +657,19 @@ public class ElasticsearchMetaAlertIntegrationTest {
 
   @Test
   public void shouldSearchByNestedAlert() throws Exception {
-    // Create alerts
-    List<Map<String, Object>> alerts = new ArrayList<>();
-    Map<String, Object> searchByNestedAlertActive0JSON = JSONUtils.INSTANCE
-        .load(searchByNestedAlertActive0, new TypeReference<Map<String, Object>>() {
-        });
-    alerts.add(searchByNestedAlertActive0JSON);
-    Map<String, Object> searchByNestedAlertActive1JSON = JSONUtils.INSTANCE
-        .load(searchByNestedAlertActive1, new TypeReference<Map<String, Object>>() {
-        });
-    alerts.add(searchByNestedAlertActive1JSON);
-    Map<String, Object> searchByNestedAlertInactive0JSON = JSONUtils.INSTANCE
-        .load(searchByNestedAlertInactive0, new TypeReference<Map<String, Object>>() {
-        });
-    alerts.add(searchByNestedAlertInactive0JSON);
-    Map<String, Object> searchByNestedAlertInactive1JSON = JSONUtils.INSTANCE
-        .load(searchByNestedAlertInactive1, new TypeReference<Map<String, Object>>() {
-        });
-    alerts.add(searchByNestedAlertInactive1JSON);
+    // Load alerts
+    List<Map<String, Object>> alerts = buildAlerts(4);
+    alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_active"));
+    alerts.get(0).put("ip_src_addr", "192.168.1.1");
+    alerts.get(0).put("ip_src_port", 8010);
+    alerts.get(1).put(METAALERT_FIELD, Collections.singletonList("meta_active"));
+    alerts.get(1).put("ip_src_addr", "192.168.1.2");
+    alerts.get(1).put("ip_src_port", 8009);
+    alerts.get(2).put("ip_src_addr", "192.168.1.3");
+    alerts.get(2).put("ip_src_port", 8008);
+    alerts.get(3).put("ip_src_addr", "192.168.1.4");
+    alerts.get(3).put("ip_src_port", 8007);
     elasticsearchAdd(alerts, INDEX, SENSOR_NAME);
-    // Wait for updates to persist
-    findUpdatedDoc(searchByNestedAlertActive0JSON, "search_by_nested_alert_active_0",
-        SENSOR_NAME);
-    findUpdatedDoc(searchByNestedAlertActive1JSON, "search_by_nested_alert_active_1",
-        SENSOR_NAME);
-    findUpdatedDoc(searchByNestedAlertInactive0JSON, "search_by_nested_alert_inactive_0",
-        SENSOR_NAME);
-    findUpdatedDoc(searchByNestedAlertInactive1JSON, "search_by_nested_alert_inactive_1",
-        SENSOR_NAME);
 
     // Put the nested type into the test index, so that it'll match appropriately
     ((ElasticsearchDao) esDao).getClient().admin().indices().preparePutMapping(INDEX)
@@ -560,23 +677,23 @@ public class ElasticsearchMetaAlertIntegrationTest {
         .setSource(nestedAlertMapping)
         .get();
 
-    // Create metaalerts
-    Map<String, Object> activeMetaAlertJSON = JSONUtils.INSTANCE
-        .load(activeMetaAlert, new TypeReference<Map<String, Object>>() {
-        });
-    activeMetaAlertJSON.put("alert",
-        Arrays.asList(searchByNestedAlertActive0JSON, searchByNestedAlertActive1JSON));
-    Map<String, Object> inactiveMetaAlertJSON = JSONUtils.INSTANCE
-        .load(inactiveMetaAlert, new TypeReference<Map<String, Object>>() {
-        });
-    inactiveMetaAlertJSON.put("alert",
-        Arrays.asList(searchByNestedAlertInactive0JSON, searchByNestedAlertInactive1JSON));
-
+    // Load metaAlerts
+    Map<String, Object> activeMetaAlert = buildMetaAlert("meta_active", MetaAlertStatus.ACTIVE,
+        Optional.of(Arrays.asList(alerts.get(0), alerts.get(1))));
+    Map<String, Object> inactiveMetaAlert = buildMetaAlert("meta_inactive", MetaAlertStatus.INACTIVE,
+        Optional.of(Arrays.asList(alerts.get(2), alerts.get(3))));
     // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
-    elasticsearchAdd(Arrays.asList(activeMetaAlertJSON, inactiveMetaAlertJSON),
-        MetaAlertDao.METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE);
-    // Wait for updates to persist
-    findUpdatedDoc(activeMetaAlertJSON, "active_metaalert", MetaAlertDao.METAALERT_TYPE);
+    elasticsearchAdd(Arrays.asList(activeMetaAlert, inactiveMetaAlert), METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE);
+
+    // Verify load was successful
+    findCreatedDocs(Arrays.asList(
+        new GetRequest("message_0", SENSOR_NAME),
+        new GetRequest("message_1", SENSOR_NAME),
+        new GetRequest("message_2", SENSOR_NAME),
+        new GetRequest("message_3", SENSOR_NAME),
+        new GetRequest("meta_active", METAALERT_TYPE),
+        new GetRequest("meta_inactive", METAALERT_TYPE)));
+
 
     SearchResponse searchResponse = metaDao.search(new SearchRequest() {
       {
@@ -615,7 +732,7 @@ public class ElasticsearchMetaAlertIntegrationTest {
 
     // Nested query should match a nested alert
     Assert.assertEquals(1, searchResponse.getTotal());
-    Assert.assertEquals("active_metaalert",
+    Assert.assertEquals("meta_active",
         searchResponse.getResults().get(0).getSource().get("guid"));
 
     // Query against all indices. The child alert has no actual attached meta alerts, and should
@@ -638,28 +755,21 @@ public class ElasticsearchMetaAlertIntegrationTest {
 
     // Nested query should match a plain alert
     Assert.assertEquals(1, searchResponse.getTotal());
-    Assert.assertEquals("search_by_nested_alert_inactive_0",
+    Assert.assertEquals("message_2",
         searchResponse.getResults().get(0).getSource().get("guid"));
   }
 
   @Test
-  public void shouldGroupHidesAlert() throws Exception {
-    // Create alerts
-    List<Map<String, Object>> alerts = new ArrayList<>();
-    Map<String, Object> groupByChildAlertJson = JSONUtils.INSTANCE
-        .load(groupByChildAlert, new TypeReference<Map<String, Object>>() {
-        });
-    alerts.add(groupByChildAlertJson);
-    Map<String, Object> groupByStandaloneAlertJson = JSONUtils.INSTANCE
-        .load(groupByStandaloneAlert, new TypeReference<Map<String, Object>>() {
-        });
-    alerts.add(groupByStandaloneAlertJson);
+  public void shouldHidesAlertsOnGroup() throws Exception {
+    // Load alerts
+    List<Map<String, Object>> alerts = buildAlerts(2);
+    alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_active"));
+    alerts.get(0).put("ip_src_addr", "192.168.1.1");
+    alerts.get(0).put("score_field", 1);
+    alerts.get(1).put("ip_src_addr", "192.168.1.1");
+    alerts.get(1).put("score_field", 10);
     elasticsearchAdd(alerts, INDEX, SENSOR_NAME);
-    // Wait for updates to persist
-    findUpdatedDoc(groupByChildAlertJson, "group_by_child_alert",
-        SENSOR_NAME);
-    findUpdatedDoc(groupByStandaloneAlertJson, "group_by_standalone_alert",
-        SENSOR_NAME);
+
 
     // Put the nested type into the test index, so that it'll match appropriately
     ((ElasticsearchDao) esDao).getClient().admin().indices().preparePutMapping(INDEX)
@@ -669,6 +779,11 @@ public class ElasticsearchMetaAlertIntegrationTest {
 
     // Don't need any meta alerts to actually exist, since we've populated the field on the alerts.
 
+    // Verify load was successful
+    findCreatedDocs(Arrays.asList(
+        new GetRequest("message_0", SENSOR_NAME),
+        new GetRequest("message_1", SENSOR_NAME)));
+
     // Build our group request
     Group searchGroup = new Group();
     searchGroup.setField("ip_src_addr");
@@ -690,24 +805,109 @@ public class ElasticsearchMetaAlertIntegrationTest {
     Assert.assertEquals(10.0d, result.getScore(), 0.0d);
   }
 
+  @SuppressWarnings("unchecked")
   @Test
-  public void testStatusChanges() throws Exception {
-    // Create alerts
-    List<Map<String, Object>> alerts = new ArrayList<>();
-    Map<String, Object> searchByNestedAlertActive0Json = JSONUtils.INSTANCE
-        .load(searchByNestedAlertActive0, new TypeReference<Map<String, Object>>() {
-        });
-    alerts.add(searchByNestedAlertActive0Json);
-    Map<String, Object> searchByNestedAlertActive1Json = JSONUtils.INSTANCE
-        .load(searchByNestedAlertActive1, new TypeReference<Map<String, Object>>() {
-        });
-    alerts.add(searchByNestedAlertActive1Json);
+  public void shouldUpdateMetaAlertOnAlertUpdate() throws Exception {
+    // Load alerts
+    List<Map<String, Object>> alerts = buildAlerts(2);
+    alerts.get(0).put(METAALERT_FIELD, Arrays.asList("meta_active", "meta_inactive"));
+    elasticsearchAdd(alerts, INDEX, SENSOR_NAME);
+
+    // Load metaAlerts
+    Map<String, Object> activeMetaAlert = buildMetaAlert("meta_active", MetaAlertStatus.ACTIVE,
+        Optional.of(Collections.singletonList(alerts.get(0))));
+    Map<String, Object> inactiveMetaAlert = buildMetaAlert("meta_inactive", MetaAlertStatus.INACTIVE,
+        Optional.of(Collections.singletonList(alerts.get(0))));
+    // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
+    elasticsearchAdd(Arrays.asList(activeMetaAlert, inactiveMetaAlert), METAALERTS_INDEX, METAALERT_TYPE);
+
+    // Verify load was successful
+    findCreatedDocs(Arrays.asList(
+        new GetRequest("message_0", SENSOR_NAME),
+        new GetRequest("message_1", SENSOR_NAME),
+        new GetRequest("meta_active", METAALERT_TYPE),
+        new GetRequest("meta_inactive", METAALERT_TYPE)));
+
+    {
+      // Modify the first message and add a new field
+      Map<String, Object> message0 = new HashMap<String, Object>(alerts.get(0)) {
+        {
+          put(NEW_FIELD, "metron");
+          put(MetaAlertDao.THREAT_FIELD_DEFAULT, "10");
+        }
+      };
+      String guid = "" + message0.get(Constants.GUID);
+      metaDao.update(new Document(message0, guid, SENSOR_NAME, null), Optional.empty());
+
+      {
+        // Verify alerts in ES are up-to-date
+        findUpdatedDoc(message0, guid, SENSOR_NAME);
+        long cnt = getMatchingAlertCount(NEW_FIELD, message0.get(NEW_FIELD));
+        if (cnt == 0) {
+          Assert.fail("Elasticsearch alert not updated!");
+        }
+      }
+
+      {
+        // Verify meta alerts in ES are up-to-date
+        long cnt = getMatchingMetaAlertCount(NEW_FIELD, "metron");
+        if (cnt == 0) {
+          Assert.fail("Active metaalert was not updated!");
+        }
+        if (cnt != 1) {
+          Assert.fail("Elasticsearch metaalerts not updated correctly!");
+        }
+      }
+    }
+    //modify the same message and modify the new field
+    {
+      Map<String, Object> message0 = new HashMap<String, Object>(alerts.get(0)) {
+        {
+          put(NEW_FIELD, "metron2");
+        }
+      };
+      String guid = "" + message0.get(Constants.GUID);
+      metaDao.update(new Document(message0, guid, SENSOR_NAME, null), Optional.empty());
+
+      {
+        // Verify ES is up-to-date
+        findUpdatedDoc(message0, guid, SENSOR_NAME);
+        long cnt = getMatchingAlertCount(NEW_FIELD, message0.get(NEW_FIELD));
+        if (cnt == 0) {
+          Assert.fail("Elasticsearch alert not updated!");
+        }
+      }
+      {
+        // Verify meta alerts in ES are up-to-date
+        long cnt = getMatchingMetaAlertCount(NEW_FIELD, "metron2");
+        if (cnt == 0) {
+          Assert.fail("Active metaalert was not updated!");
+        }
+        if (cnt != 1) {
+          Assert.fail("Elasticsearch metaalerts not updated correctly!");
+        }
+      }
+    }
+  }
+
+  @Test
+  public void shouldThrowExceptionOnMetaAlertUpdate() throws Exception {
+    Document metaAlert = new Document(new HashMap<>(), "meta_alert", METAALERT_TYPE, 0L);
+    try {
+      // Verify a meta alert cannot be updated in the meta alert dao
+      metaDao.update(metaAlert, Optional.empty());
+      Assert.fail("Direct meta alert update should throw an exception");
+    } catch (UnsupportedOperationException uoe) {
+      Assert.assertEquals("Meta alerts cannot be directly updated", uoe.getMessage());
+    }
+  }
+  @Test
+  public void shouldPatchAllowedMetaAlerts() throws Exception {
+    // Load alerts
+    List<Map<String, Object>> alerts = buildAlerts(2);
+    alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_active"));
+    alerts.get(1).put(METAALERT_FIELD, Collections.singletonList("meta_active"));
     elasticsearchAdd(alerts, INDEX, SENSOR_NAME);
-    // Wait for updates to persist
-    findUpdatedDoc(searchByNestedAlertActive0Json, "search_by_nested_alert_active_0",
-        SENSOR_NAME);
-    findUpdatedDoc(searchByNestedAlertActive1Json, "search_by_nested_alert_active_1",
-        SENSOR_NAME);
 
     // Put the nested type into the test index, so that it'll match appropriately
     ((ElasticsearchDao) esDao).getClient().admin().indices().preparePutMapping(INDEX)
@@ -715,199 +915,170 @@ public class ElasticsearchMetaAlertIntegrationTest {
         .setSource(nestedAlertMapping)
         .get();
 
-    // Create metaalerts
-    Map<String, Object> activeMetaAlertJSON = JSONUtils.INSTANCE
-        .load(activeMetaAlert, new TypeReference<Map<String, Object>>() {
-        });
-    activeMetaAlertJSON.put("alert",
-        Arrays.asList(searchByNestedAlertActive0Json, searchByNestedAlertActive1Json));
-
+    // Load metaAlerts
+    Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE,
+        Optional.of(Arrays.asList(alerts.get(0), alerts.get(1))));
     // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
-    elasticsearchAdd(Collections.singletonList(activeMetaAlertJSON),
-        MetaAlertDao.METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE);
-    // Wait for updates to persist
-    findUpdatedDoc(activeMetaAlertJSON, "active_metaalert", MetaAlertDao.METAALERT_TYPE);
-
-    // Build our update request to inactive status
-    Map<String, Object> documentMap = new HashMap<>();
-
-    documentMap.put("status", MetaAlertStatus.INACTIVE.getStatusString());
-    Document document = new Document(documentMap, "active_metaalert", MetaAlertDao.METAALERT_TYPE,
-        0L);
-    metaDao.update(document, Optional.of(MetaAlertDao.METAALERTS_INDEX));
-
-    Map<String, Object> expectedMetaDoc = new HashMap<>();
-    expectedMetaDoc.putAll(activeMetaAlertJSON);
-    expectedMetaDoc.put("status", MetaAlertStatus.INACTIVE.getStatusString());
-
-    // Make sure the update has gone through on the meta alert and the child alerts.
-    Assert.assertTrue(
-        findUpdatedDoc(expectedMetaDoc, "active_metaalert", MetaAlertDao.METAALERT_TYPE));
-
-    Map<String, Object> expectedAlertDoc0 = new HashMap<>();
-    expectedAlertDoc0.putAll(searchByNestedAlertActive0Json);
-    expectedAlertDoc0.put("metaalerts", new ArrayList<>());
-    Assert.assertTrue(
-        findUpdatedDoc(expectedAlertDoc0, "search_by_nested_alert_active_0", SENSOR_NAME));
-
-    Map<String, Object> expectedAlertDoc1 = new HashMap<>();
-    expectedAlertDoc1.putAll(searchByNestedAlertActive1Json);
-    expectedAlertDoc1.put("metaalerts", new ArrayList<>());
-    Assert.assertTrue(
-        findUpdatedDoc(expectedAlertDoc1, "search_by_nested_alert_active_1", SENSOR_NAME));
-
-    // Search against the indices. Should return the two alerts, but not the inactive metaalert.
-    SearchRequest searchRequest = new SearchRequest();
-    ArrayList<String> indices = new ArrayList<>();
-    indices.add(SENSOR_NAME);
-    indices.add(MetaAlertDao.METAALERT_TYPE);
-    searchRequest.setIndices(indices);
-    searchRequest.setSize(5);
-    searchRequest.setQuery("*");
-
-    // Validate our results
-    SearchResult expected0 = new SearchResult();
-    expected0.setId((String) expectedAlertDoc0.get(Constants.GUID));
-    expected0.setIndex(INDEX);
-    expected0.setSource(expectedAlertDoc0);
-    expected0.setScore(1.0f);
-
-    SearchResult expected1 = new SearchResult();
-    expected1.setId((String) expectedAlertDoc1.get(Constants.GUID));
-    expected1.setIndex(INDEX);
-    expected1.setSource(expectedAlertDoc1);
-    expected1.setScore(1.0f);
-
-    ArrayList<SearchResult> expectedResults = new ArrayList<>();
-    expectedResults.add(expected0);
-    expectedResults.add(expected1);
-
-    SearchResponse result = metaDao.search(searchRequest);
-    Assert.assertEquals(2, result.getTotal());
-    // Use set comparison to avoid ordering issues. We already checked counts.
-    Assert.assertEquals(new HashSet<>(expectedResults), new HashSet<>(result.getResults()));
-
-    // Build our update request back to active status
-    documentMap.put("status", MetaAlertStatus.ACTIVE.getStatusString());
-    document = new Document(documentMap, "active_metaalert", MetaAlertDao.METAALERT_TYPE, 0L);
-    metaDao.update(document, Optional.of(MetaAlertDao.METAALERTS_INDEX));
-
-    expectedMetaDoc = new HashMap<>();
-    expectedMetaDoc.putAll(activeMetaAlertJSON);
-
-    // Make sure the update has gone through on the meta alert and the child alerts.
-    Assert.assertTrue(
-        findUpdatedDoc(expectedMetaDoc, "active_metaalert", MetaAlertDao.METAALERT_TYPE));
-
-    expectedAlertDoc0 = new HashMap<>();
-    expectedAlertDoc0.putAll(searchByNestedAlertActive0Json);
-    Assert.assertTrue(
-        findUpdatedDoc(expectedAlertDoc0, "search_by_nested_alert_active_0", SENSOR_NAME));
-
-    expectedAlertDoc1 = new HashMap<>();
-    expectedAlertDoc1.putAll(searchByNestedAlertActive1Json);
-    Assert.assertTrue(
-        findUpdatedDoc(expectedAlertDoc1, "search_by_nested_alert_active_1", SENSOR_NAME));
-
-    // Search against the indices. Should return just the active metaalert.
-    SearchResult expectedMeta = new SearchResult();
-    expectedMeta.setId((String) activeMetaAlertJSON.get(Constants.GUID));
-    expectedMeta.setIndex(MetaAlertDao.METAALERTS_INDEX);
-    expectedMeta.setSource(activeMetaAlertJSON);
-    expectedMeta.setScore(1.0f);
-
-    expectedResults = new ArrayList<>();
-    expectedResults.add(expectedMeta);
-
-    result = metaDao.search(searchRequest);
-    Assert.assertEquals(1, result.getTotal());
-    Assert.assertEquals(expectedResults, result.getResults());
-  }
+    elasticsearchAdd(Collections.singletonList(metaAlert), METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE);
 
-  @Test
-  @SuppressWarnings("unchecked")
-  public void shouldUpdateMetaAlertOnAlertPatchOrReplace() throws Exception {
-    List<Map<String, Object>> inputData = new ArrayList<>();
-    Map<String, Object> updateMetaAlertAlert0JSON = JSONUtils.INSTANCE
-        .load(updateMetaAlertAlert0, new TypeReference<Map<String, Object>>() {
-        });
-    inputData.add(updateMetaAlertAlert0JSON);
-    Map<String, Object> updateMetaAlertAlert1JSON = JSONUtils.INSTANCE
-        .load(updateMetaAlertAlert1, new TypeReference<Map<String, Object>>() {
-        });
-    inputData.add(updateMetaAlertAlert1JSON);
-    elasticsearchAdd(inputData, INDEX, SENSOR_NAME);
-    // Wait for updates to persist
-    findUpdatedDoc(updateMetaAlertAlert1JSON, "update_metaalert_alert_1", SENSOR_NAME);
-
-    MetaAlertCreateResponse metaAlertCreateResponse = metaDao
-        .createMetaAlert(new MetaAlertCreateRequest() {{
-          setGuidToIndices(new HashMap<String, String>() {{
-            put("update_metaalert_alert_0", INDEX);
-            put("update_metaalert_alert_1", INDEX);
-          }});
-          setGroups(Collections.singletonList("group"));
-        }});
-    // Wait for updates to persist
-    findCreatedDoc(metaAlertCreateResponse.getGuid(), MetaAlertDao.METAALERT_TYPE);
+    // Verify load was successful
+    findCreatedDocs(Arrays.asList(
+        new GetRequest("message_0", SENSOR_NAME),
+        new GetRequest("message_1", SENSOR_NAME),
+        new GetRequest("meta_alert", METAALERT_TYPE)));
 
-    // Patch alert
-    metaDao.patch(JSONUtils.INSTANCE.load(updateMetaAlertPatchRequest, PatchRequest.class),
-        Optional.empty());
+    Map<String, Object> expectedMetaAlert = new HashMap<>(metaAlert);
+    expectedMetaAlert.put(NAME_FIELD, "New Meta Alert");
+    {
+      // Verify a patch to a field other than "status" or "alert" can be patched
+      PatchRequest patchRequest = JSONUtils.INSTANCE.load(namePatchRequest, PatchRequest.class);
+      metaDao.patch(patchRequest, Optional.of(System.currentTimeMillis()));
 
-    // Wait for updates to persist
-    updateMetaAlertAlert0JSON.put("field", "patched value 0");
-    findUpdatedDoc(updateMetaAlertAlert0JSON, "update_metaalert_alert_0", SENSOR_NAME);
-
-    Map<String, Object> metaalert = metaDao
-        .getLatest(metaAlertCreateResponse.getGuid(), MetaAlertDao.METAALERT_TYPE).getDocument();
-    List<Map<String, Object>> alerts = (List<Map<String, Object>>) metaalert.get("alert");
-    Assert.assertEquals(2, alerts.size());
-    Assert.assertEquals("update_metaalert_alert_1", alerts.get(0).get("guid"));
-    Assert.assertEquals("value 1", alerts.get(0).get("field"));
-    Assert.assertEquals("update_metaalert_alert_0", alerts.get(1).get("guid"));
-    Assert.assertEquals("patched value 0", alerts.get(1).get("field"));
-
-    // Replace alert
-    metaDao.replace(JSONUtils.INSTANCE.load(updateMetaAlertReplaceRequest, ReplaceRequest.class),
-        Optional.empty());
+      findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+    }
+
+    {
+      // Verify a patch to an alert field should throw an exception
+      try {
+        PatchRequest patchRequest = JSONUtils.INSTANCE.load(alertPatchRequest, PatchRequest.class);
+        metaDao.patch(patchRequest, Optional.of(System.currentTimeMillis()));
+
+        Assert.fail("A patch on the alert field should throw an exception");
+      } catch (IllegalArgumentException iae) {
+        Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths.  "
+            + "Please use the add/remove alert or update status functions instead.", iae.getMessage());
+      }
+
+      // Verify the metaAlert was not updated
+      findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+    }
 
-    // Wait for updates to persist
-    updateMetaAlertAlert0JSON.put("field", "replaced value 0");
-    findUpdatedDoc(updateMetaAlertAlert0JSON, "update_metaalert_alert_0", SENSOR_NAME);
-
-    metaalert = metaDao.getLatest(metaAlertCreateResponse.getGuid(), MetaAlertDao.METAALERT_TYPE)
-        .getDocument();
-    alerts = (List<Map<String, Object>>) metaalert.get("alert");
-    Assert.assertEquals(2, alerts.size());
-    Assert.assertEquals("update_metaalert_alert_1", alerts.get(0).get("guid"));
-    Assert.assertEquals("value 1", alerts.get(0).get("field"));
-    Assert.assertEquals("update_metaalert_alert_0", alerts.get(1).get("guid"));
-    Assert.assertEquals("replaced value 0", alerts.get(1).get("field"));
+    {
+      // Verify a patch to a status field should throw an exception
+      try {
+        PatchRequest patchRequest = JSONUtils.INSTANCE.load(statusPatchRequest, PatchRequest.class);
+        metaDao.patch(patchRequest, Optional.of(System.currentTimeMillis()));
+
+        Assert.fail("A patch on the status field should throw an exception");
+      } catch (IllegalArgumentException iae) {
+        Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths.  "
+            + "Please use the add/remove alert or update status functions instead.", iae.getMessage());
+      }
+
+      // Verify the metaAlert was not updated
+      findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+    }
   }
 
-  protected boolean findUpdatedDoc(Map<String, Object> message0, String guid, String sensorType)
-      throws InterruptedException, IOException {
-    boolean found = false;
-    for (int t = 0; t < MAX_RETRIES && !found; ++t, Thread.sleep(SLEEP_MS)) {
+  protected long getMatchingAlertCount(String fieldName, Object fieldValue) throws IOException, InterruptedException {
+    long cnt = 0;
+    for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS)) {
+      List<Map<String, Object>> docs = es.getAllIndexedDocs(INDEX, SENSOR_NAME + "_doc");
+      cnt = docs
+          .stream()
+          .filter(d -> {
+            Object newfield = d.get(fieldName);
+            return newfield != null && newfield.equals(fieldValue);
+          }).count();
+    }
+    return cnt;
+  }
+
+  protected long getMatchingMetaAlertCount(String fieldName, String fieldValue) throws IOException, InterruptedException {
+    long cnt = 0;
+    for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS)) {
+      List<Map<String, Object>> docs = es.getAllIndexedDocs(METAALERTS_INDEX, MetaAlertDao.METAALERT_DOC);
+      cnt = docs
+          .stream()
+          .filter(d -> {
+            List<Map<String, Object>> alerts = (List<Map<String, Object>>) d
+                .get(MetaAlertDao.ALERT_FIELD);
+
+            for (Map<String, Object> alert : alerts) {
+              Object newField = alert.get(fieldName);
+              if (newField != null && newField.equals(fieldValue)) {
+                return true;
+              }
+            }
+
+            return false;
+          }).count();
+    }
+    return cnt;
+  }
+
+  protected void findUpdatedDoc(Map<String, Object> message0, String guid, String sensorType)
+      throws InterruptedException, IOException, OriginalNotFoundException {
+    for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) {
       Document doc = metaDao.getLatest(guid, sensorType);
       if (doc != null && message0.equals(doc.getDocument())) {
-        found = true;
+        return;
       }
     }
-    return found;
+    throw new OriginalNotFoundException("Count not find " + guid + " after " + MAX_RETRIES + "tries");
   }
 
   protected boolean findCreatedDoc(String guid, String sensorType)
-      throws InterruptedException, IOException {
-    boolean found = false;
-    for (int t = 0; t < MAX_RETRIES && !found; ++t, Thread.sleep(SLEEP_MS)) {
+      throws InterruptedException, IOException, OriginalNotFoundException {
+    for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) {
       Document doc = metaDao.getLatest(guid, sensorType);
       if (doc != null) {
-        found = true;
+        return true;
+      }
+    }
+    throw new OriginalNotFoundException("Count not find " + guid + " after " + MAX_RETRIES + "tries");
+  }
+
+  protected boolean findCreatedDocs(List<GetRequest> getRequests)
+      throws InterruptedException, IOException, OriginalNotFoundException {
+    for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) {
+      Iterable<Document> docs = metaDao.getAllLatest(getRequests);
+      if (docs != null) {
+        int docCount = 0;
+        for (Document doc: docs) {
+          docCount++;
+        }
+        if (getRequests.size() == docCount) {
+          return true;
+        }
       }
     }
-    return found;
+    throw new OriginalNotFoundException("Count not find guids after " + MAX_RETRIES + "tries");
+  }
+
+  protected List<Map<String, Object>> buildAlerts(int count) {
+    List<Map<String, Object>> inputData = new ArrayList<>();
+    for (int i = 0; i < count; ++i) {
+      final String guid = "message_" + i;
+      Map<String, Object> alerts = new HashMap<>();
+      alerts.put(Constants.GUID, guid);
+      alerts.put("source:type", SENSOR_NAME);
+      alerts.put(MetaAlertDao.THREAT_FIELD_DEFAULT, i);
+      alerts.put("timestamp", System.currentTimeMillis());
+      inputData.add(alerts);
+    }
+    return inputData;
+  }
+
+  protected List<Map<String, Object>> buildMetaAlerts(int count, MetaAlertStatus status, Optional<List<Map<String, Object>>> alerts) {
+    List<Map<String, Object>> inputData = new ArrayList<>();
+    for (int i = 0; i < count; ++i) {
+      final String guid = "meta_" + status.getStatusString() + "_" + i;
+      inputData.add(buildMetaAlert(guid, status, alerts));
+    }
+    return inputData;
+  }
+
+  protected Map<String, Object> buildMetaAlert(String guid, MetaAlertStatus status, Optional<List<Map<String, Object>>> alerts) {
+    Map<String, Object> metaAlert = new HashMap<>();
+    metaAlert.put(Constants.GUID, guid);
+    metaAlert.put("source:type", METAALERT_TYPE);
+    metaAlert.put(MetaAlertDao.STATUS_FIELD, status.getStatusString());
+    if (alerts.isPresent()) {
+      List<Map<String, Object>> alertsList = alerts.get();
+      metaAlert.put(ALERT_FIELD, alertsList);
+    }
+    return metaAlert;
   }
 
   protected void elasticsearchAdd(List<Map<String, Object>> inputData, String index, String docType)

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
index fddf056..116ee4b 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
@@ -152,7 +152,7 @@ public class ElasticsearchUpdateIntegrationTest {
       Assert.assertEquals(message0, doc.getDocument());
       {
         //ensure hbase is up to date
-        Get g = new Get(guid.getBytes());
+        Get g = new Get(HBaseDao.Key.toBytes(new HBaseDao.Key(guid, SENSOR_NAME)));
         Result r = table.get(g);
         NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
         Assert.assertEquals(1, columns.size());
@@ -190,7 +190,7 @@ public class ElasticsearchUpdateIntegrationTest {
       Assert.assertEquals(message0, doc.getDocument());
       {
         //ensure hbase is up to date
-        Get g = new Get(guid.getBytes());
+        Get g = new Get(HBaseDao.Key.toBytes(new HBaseDao.Key(guid, SENSOR_NAME)));
         Result r = table.get(g);
         NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
         Assert.assertEquals(2, columns.size());

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentKey.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentKey.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentKey.java
index cab0dfc..a91def2 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentKey.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentKey.java
@@ -21,20 +21,12 @@ import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.utils.KeyUtil;
 import org.apache.metron.enrichment.lookup.LookupKey;
 
 import java.io.*;
 
 public class EnrichmentKey implements LookupKey {
-  private static final int SEED = 0xDEADBEEF;
-  private static final int HASH_PREFIX_SIZE=16;
-  ThreadLocal<HashFunction> hFunction= new ThreadLocal<HashFunction>() {
-    @Override
-    protected HashFunction initialValue() {
-      return Hashing.murmur3_128(SEED);
-    }
-  };
-
   public String indicator;
   public String type;
 
@@ -63,21 +55,14 @@ public class EnrichmentKey implements LookupKey {
     } catch (IOException e) {
       throw new RuntimeException("Unable to convert type and indicator to bytes", e);
     }
-    Hasher hasher = hFunction.get().newHasher();
-    hasher.putBytes(Bytes.toBytes(indicator));
-    byte[] prefix = hasher.hash().asBytes();
-    byte[] val = new byte[indicatorBytes.length + prefix.length];
-    int offset = 0;
-    System.arraycopy(prefix, 0, val, offset, prefix.length);
-    offset += prefix.length;
-    System.arraycopy(indicatorBytes, 0, val, offset, indicatorBytes.length);
-    return val;
+    byte[] prefix = KeyUtil.INSTANCE.getPrefix(Bytes.toBytes(indicator));
+    return KeyUtil.INSTANCE.merge(prefix, indicatorBytes);
   }
 
   @Override
   public void fromBytes(byte[] row) {
     ByteArrayInputStream baos = new ByteArrayInputStream(row);
-    baos.skip(HASH_PREFIX_SIZE);
+    baos.skip(KeyUtil.HASH_PREFIX_SIZE);
     DataInputStream w = new DataInputStream(baos);
     try {
       type = w.readUTF();

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md
index 86c8d37..d8c5beb 100644
--- a/metron-platform/metron-indexing/README.md
+++ b/metron-platform/metron-indexing/README.md
@@ -152,9 +152,12 @@ in parallel.  This enables a flexible strategy for specifying your backing store
 For instance, currently the REST API supports the update functionality and may be configured with a list of
 IndexDao implementations to use to support the updates.
 
+Updates with the IndexDao.update method replace the current object with the new object.  For partial updates,
+use IndexDao.patch instead.
+
 ### The `HBaseDao`
 
-Updates will be written to HBase. The key structure is the GUID and
+Updates will be written to HBase. The key structure includes the GUID and sensor type and
 for each new version, a new column is created with value as the message.
 
 The HBase table and column family are configured via fields in the global configuration.
@@ -169,17 +172,23 @@ The HBase column family to use for message updates.
 
 The goal of meta alerts is to be able to group together a set of alerts while being able to transparently perform actions
 like searches, as if meta alerts were normal alerts.  `org.apache.metron.indexing.dao.MetaAlertDao` extends `IndexDao` and
-enables a couple extra features: creation of a meta alert and the ability to get all meta alerts associated with an alert.
+enables several features: 
+* the ability to get all meta alerts associated with an alert
+* creation of a meta alert
+* adding alerts to a meta alert
+* removing alerts from a meta alert
+* changing a meta alert's status
 
 The implementation of this is to denormalize the relationship between alerts and meta alerts, and store alerts as a nested field within a meta alert.
 The use of nested fields is to avoid the limitations of parent-child relationships (one-to-many) and merely linking by IDs
-(which causes issues with pagination as a result of being unable to join indices).
+(which causes issues with pagination as a result of being unable to join indices).  A list of containing meta alerts is stored 
+on an alert for the purpose of keeping source alerts and alerts contained in meta alerts in sync.
 
 The search functionality of `IndexDao` is wrapped by the `MetaAlertDao` in order to provide both regular and meta alerts side-by-side with sorting.
 The updating capabilities are similarly wrapped, in order to ensure updates are carried through both the alerts and associated meta alerts.
 Both of these functions are handled under the hood.
 
-In addition, an API endpoint is added for the meta alert specific features of creation and going from meta alert to alert.
+In addition, API endpoints have been added to expose the features listed above.
 The denormalization handles the case of going from meta alert to alert automatically.
 
 # Notes on Performance Tuning

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/pom.xml b/metron-platform/metron-indexing/pom.xml
index 35dfb2b..5e64118 100644
--- a/metron-platform/metron-indexing/pom.xml
+++ b/metron-platform/metron-indexing/pom.xml
@@ -167,6 +167,13 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-hbase</artifactId>
+            <version>${project.parent.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>