You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@metron.apache.org by GitBox <gi...@apache.org> on 2018/12/13 19:29:42 UTC

[GitHub] asfgit closed pull request #1269: METRON-1879 Allow Elasticsearch to Auto-Generate the Document ID

asfgit closed pull request #1269: METRON-1879 Allow Elasticsearch to Auto-Generate the Document ID
URL: https://github.com/apache/metron/pull/1269
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/metron-interface/metron-alerts/e2e/alerts-list/alerts-list.e2e-spec.ts b/metron-interface/metron-alerts/e2e/alerts-list/alerts-list.e2e-spec.ts
index e3709ab31f..70f52def55 100644
--- a/metron-interface/metron-alerts/e2e/alerts-list/alerts-list.e2e-spec.ts
+++ b/metron-interface/metron-alerts/e2e/alerts-list/alerts-list.e2e-spec.ts
@@ -24,9 +24,9 @@ import { loadTestData, deleteTestData } from '../utils/e2e_util';
 describe('Test spec for all ui elements & list view', function() {
   let page: MetronAlertsPage;
   let loginPage: LoginPage;
-  let columnNames = [ '', 'Score', 'id', 'timestamp', 'source:type', 'ip_src_addr', 'enrichm...:country',
+  let columnNames = [ '', 'Score', 'guid', 'timestamp', 'source:type', 'ip_src_addr', 'enrichm...:country',
                       'ip_dst_addr', 'host', 'alert_status', '', '', ''];
-  let colNamesColumnConfig = [ 'score', 'id', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
+  let colNamesColumnConfig = [ 'score', 'guid', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
                                 'ip_dst_addr', 'host', 'alert_status' ];
 
   beforeAll(async function() : Promise<any> {
@@ -136,16 +136,17 @@ describe('Test spec for all ui elements & list view', function() {
   });
 
   it('should select columns from table configuration', async function() : Promise<any> {
-    let newColNamesColumnConfig = [ 'score', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
-      'ip_dst_addr', 'host', 'alert_status', 'guid' ];
-
     await page.clickConfigureTable();
-    expect(await page.getSelectedColumnNames()).toEqual(colNamesColumnConfig, 'for default selected column names');
-    await page.toggleSelectCol('id');
+    expect(await page.getSelectedColumnNames()).toEqual(colNamesColumnConfig, 'expect default selected column names');
+
+    // remove the 'guid' column and add the 'id' column
     await page.toggleSelectCol('guid');
-    expect(await page.getSelectedColumnNames()).toEqual(newColNamesColumnConfig, 'for guid added to selected column names');
-    await page.saveConfigureColumns();
+    await page.toggleSelectCol('id');
 
+    let expectedColumns = [ 'score', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
+      'ip_dst_addr', 'host', 'alert_status', 'id' ];
+    expect(await page.getSelectedColumnNames()).toEqual(expectedColumns, 'expect "id" field added and "guid" field removed from visible columns');
+    await page.saveConfigureColumns();
   });
 
   it('should have all time-range controls', async function() : Promise<any> {
diff --git a/metron-interface/metron-alerts/e2e/alerts-list/configure-table/configure-table.e2e-spec.ts b/metron-interface/metron-alerts/e2e/alerts-list/configure-table/configure-table.e2e-spec.ts
index c3636f5cbe..39504a9aa1 100644
--- a/metron-interface/metron-alerts/e2e/alerts-list/configure-table/configure-table.e2e-spec.ts
+++ b/metron-interface/metron-alerts/e2e/alerts-list/configure-table/configure-table.e2e-spec.ts
@@ -24,7 +24,7 @@ import {loadTestData, deleteTestData} from '../../utils/e2e_util';
 describe('Test spec for table column configuration', function() {
   let page: MetronAlertsPage;
   let loginPage: LoginPage;
-  let colNamesColumnConfig = [ 'score', 'id', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
+  let colNamesColumnConfig = [ 'score', 'guid', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
     'ip_dst_addr', 'host', 'alert_status' ];
 
   beforeAll(async function() : Promise<any> {
@@ -45,17 +45,18 @@ describe('Test spec for table column configuration', function() {
   });
 
   it('should select columns from table configuration', async function() : Promise<any> {
-    let newColNamesColumnConfig = [ 'score', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
-      'ip_dst_addr', 'host', 'alert_status', 'guid' ];
-
     await page.clearLocalStorage();
     await page.navigateTo();
+    await page.clickConfigureTable();
+    expect(await page.getSelectedColumnNames()).toEqualBcoz(colNamesColumnConfig, 'for default selected column names');
 
-    await  page.clickConfigureTable();
-    expect(await  page.getSelectedColumnNames()).toEqualBcoz(colNamesColumnConfig, 'for default selected column names');
-    await page.toggleSelectCol('id');
+    // remove the 'guid' column and add the 'id' column
     await page.toggleSelectCol('guid');
-    expect(await page.getSelectedColumnNames()).toEqualBcoz(newColNamesColumnConfig, 'for guid added to selected column names');
+    await page.toggleSelectCol('id');
+
+    let expectedColumns = [ 'score', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
+      'ip_dst_addr', 'host', 'alert_status', 'id' ];
+    expect(await page.getSelectedColumnNames()).toEqualBcoz(expectedColumns, 'expect "id" field added and "guid" field removed from visible columns');
     await page.saveConfigureColumns();
   });
 
diff --git a/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.html b/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.html
index 027f57a9cf..718a41f48b 100644
--- a/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.html
+++ b/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.html
@@ -61,7 +61,7 @@
             <span appAlertSeverity [severity]="getScore(alert.source)"> <a> {{ hasScore(alert.source) ? getScore(alert.source) : '-' }} </a> </span>
           </td>
           <td [attr.colspan]="alertsColumnsToDisplay.length - 1">
-            <a (click)="addFilter('guid', alert.id)" [attr.title]="alert.id" style="color:#689AA9"> {{ alert.source['name'] ? alert.source['name'] : alert.id | centerEllipses:20:cell }}</a>
+            <a (click)="addFilter('guid', alert.source['guid'])" [attr.title]="alert.source['guid']" style="color:#689AA9"> {{ alert.source['name'] ? alert.source['name'] : alert.source['guid'] | centerEllipses:20:cell }}</a>
               <span> ({{ alert.source.metron_alert.length }})</span>
           </td>
           <td>
diff --git a/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.ts b/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.ts
index fd47b67cc2..2190bebfdd 100644
--- a/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.ts
+++ b/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.ts
@@ -141,14 +141,14 @@ export class TableViewComponent implements OnInit, OnChanges, OnDestroy {
 
   onSort(sortEvent: SortEvent) {
     let sortOrder = (sortEvent.sortOrder === Sort.ASC ? 'asc' : 'desc');
-    let sortBy = sortEvent.sortBy === 'id' ? 'guid' : sortEvent.sortBy;
+    let sortBy = sortEvent.sortBy === 'id' ? '_uid' : sortEvent.sortBy;
     this.queryBuilder.setSort(sortBy, sortOrder);
     this.onRefreshData.emit(true);
   }
 
   getValue(alert: Alert, column: ColumnMetadata, formatData: boolean) {
     if (column.name === 'id') {
-      return this.formatValue(column, alert[column.name]);
+      return this.formatValue(column, alert['id']);
     }
 
     return this.getValueFromSource(alert.source, column, formatData);
@@ -158,9 +158,6 @@ export class TableViewComponent implements OnInit, OnChanges, OnDestroy {
     let returnValue = '';
     try {
       switch (column.name) {
-        case 'id':
-          returnValue = alertSource['guid'];
-          break;
         case 'alert_status':
           returnValue = alertSource['alert_status'] ? alertSource['alert_status'] : 'NEW';
           break;
@@ -218,7 +215,7 @@ export class TableViewComponent implements OnInit, OnChanges, OnDestroy {
   }
 
   addFilter(field: string, value: string) {
-    field = (field === 'id') ? 'guid' : field;
+    field = (field === 'id') ? '_id' : field;
     this.onAddFilter.emit(new Filter(field, value));
   }
 
diff --git a/metron-interface/metron-alerts/src/app/alerts/alerts-list/tree-view/tree-view.component.ts b/metron-interface/metron-alerts/src/app/alerts/alerts-list/tree-view/tree-view.component.ts
index ab1d4eba1b..19aaa6e31d 100644
--- a/metron-interface/metron-alerts/src/app/alerts/alerts-list/tree-view/tree-view.component.ts
+++ b/metron-interface/metron-alerts/src/app/alerts/alerts-list/tree-view/tree-view.component.ts
@@ -323,7 +323,7 @@ export class TreeViewComponent extends TableViewComponent implements OnInit, OnC
   }
 
   sortTreeSubGroup($event, treeGroup: TreeGroupData) {
-    let sortBy = $event.sortBy === 'id' ? 'guid' : $event.sortBy;
+    let sortBy = $event.sortBy === 'id' ? '_uid' : $event.sortBy;
     let sortOrder = $event.sortOrder === Sort.ASC ? 'asc' : 'desc';
     let sortField = new SortField(sortBy, sortOrder);
 
diff --git a/metron-interface/metron-alerts/src/app/service/elasticsearch-localstorage-impl.ts b/metron-interface/metron-alerts/src/app/service/elasticsearch-localstorage-impl.ts
index 6fd4107856..2c91d4cfd5 100644
--- a/metron-interface/metron-alerts/src/app/service/elasticsearch-localstorage-impl.ts
+++ b/metron-interface/metron-alerts/src/app/service/elasticsearch-localstorage-impl.ts
@@ -45,7 +45,7 @@ export class ElasticSearchLocalstorageImpl extends DataSource {
   sourceType: 'source:type';
 
   private defaultColumnMetadata = [
-    new ColumnMetadata('id', 'string'),
+    new ColumnMetadata('guid', 'string'),
     new ColumnMetadata('timestamp', 'date'),
     new ColumnMetadata('source:type', 'string'),
     new ColumnMetadata('ip_src_addr', 'ip'),
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
index 9e6e568ea7..7aea2fc950 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
@@ -32,7 +32,6 @@
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 /**
@@ -120,10 +119,23 @@ private IndexRequest createRequest(D document, String index) {
         if(document.getTimestamp() == null) {
             throw new IllegalArgumentException("Document must contain the timestamp");
         }
+
+        // if updating an existing document, the doc ID should be defined.
+        // if creating a new document, set the doc ID to null to allow Elasticsearch to generate one.
+        String docId = document.getDocumentID().orElse(null);
+        if(LOG.isDebugEnabled() && document.getDocumentID().isPresent()) {
+            LOG.debug("Updating existing document with known doc ID; docID={}, guid={}, sensorType={}",
+                    docId, document.getGuid(), document.getSensorType());
+        } else if(LOG.isDebugEnabled()) {
+            LOG.debug("Creating a new document, doc ID not yet known; guid={}, sensorType={}",
+                    document.getGuid(), document.getSensorType());
+        }
+
         return new IndexRequest()
                 .source(document.getDocument())
                 .type(document.getSensorType() + "_doc")
-                .id(document.getGuid())
+                .index(index)
+                .id(docId)
                 .index(index)
                 .timestamp(document.getTimestamp().toString());
     }
@@ -149,6 +161,7 @@ private void handleBulkResponse(BulkResponse bulkResponse, List<Indexable> docum
                 } else {
                     // request succeeded
                     D success = getDocument(response.getItemId());
+                    success.setDocumentID(response.getResponse().getId());
                     results.addSuccess(success);
                 }
             }
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
index 2e9c855694..519e803140 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
@@ -44,11 +44,13 @@
 import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.InvalidCreateException;
 import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
 import org.apache.metron.indexing.dao.update.Document;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.InnerHitBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
 
 public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpdateDao {
 
@@ -148,8 +150,11 @@ public Document update(Document update, Optional<String> index) throws IOExcepti
       try {
         // We need to update an alert itself.  Only that portion of the update can be delegated.
         // We still need to get meta alerts potentially associated with it and update.
-        Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults().stream()
-                .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(), MetaAlertConstants.METAALERT_TYPE, update.getTimestamp()))
+        SearchResponse response = getMetaAlertsForAlert(update.getGuid());
+        Collection<Document> metaAlerts = response
+                .getResults()
+                .stream()
+                .map(result -> toDocument(result, update.getTimestamp()))
                 .collect(Collectors.toList());
         // Each meta alert needs to be updated with the new alert
         for (Document metaAlert : metaAlerts) {
@@ -172,6 +177,13 @@ public Document update(Document update, Optional<String> index) throws IOExcepti
     }
   }
 
+  private Document toDocument(SearchResult result, Long timestamp) {
+    Document document = Document.fromJSON(result.getSource());
+    document.setTimestamp(timestamp);
+    document.setDocumentID(result.getId());
+    return document;
+  }
+
   @Override
   public Document addCommentToAlert(CommentAddRemoveRequest request) throws IOException {
     return getUpdateDao().addCommentToAlert(request);
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
index c63532e8e5..dca74bca59 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
@@ -60,7 +60,8 @@ public SearchResponse submitSearch(SearchRequest request) throws InvalidSearchEx
     org.elasticsearch.action.search.SearchResponse esResponse;
     try {
       esResponse = client.getHighLevelClient().search(request);
-      LOG.debug("Got Elasticsearch response; response={}", esResponse.toString());
+      LOG.debug("Got Elasticsearch response with {} hit(s); response={}",
+              esResponse.getHits().getTotalHits(), esResponse.toString());
 
     } catch (Exception e) {
       String msg = String.format(
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
index 0c91007943..95d27db31c 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
@@ -18,8 +18,18 @@
 
 package org.apache.metron.elasticsearch.dao;
 
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
+import org.apache.metron.common.Constants;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.indexing.dao.RetrieveLatestDao;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -29,29 +39,23 @@
 import java.util.Optional;
 import java.util.function.Function;
 
-import org.apache.metron.elasticsearch.client.ElasticsearchClient;
-import org.apache.metron.indexing.dao.RetrieveLatestDao;
-import org.apache.metron.indexing.dao.search.GetRequest;
-import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.index.query.IdsQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
+import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
+import static org.elasticsearch.index.query.QueryBuilders.termsQuery;
+import static org.elasticsearch.index.query.QueryBuilders.typeQuery;
 
 public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
 
-  private ElasticsearchClient transportClient;
+  private ElasticsearchClient client;
+  private ElasticsearchRequestSubmitter submitter;
 
-  public ElasticsearchRetrieveLatestDao(ElasticsearchClient transportClient) {
-    this.transportClient = transportClient;
+  public ElasticsearchRetrieveLatestDao(ElasticsearchClient client) {
+    this.client = client;
+    this.submitter = new ElasticsearchRequestSubmitter(client);
   }
 
   @Override
   public Document getLatest(String guid, String sensorType) throws IOException {
-    Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit));
+    Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(hit));
     return doc.orElse(null);
   }
 
@@ -63,21 +67,7 @@ public Document getLatest(String guid, String sensorType) throws IOException {
       guids.add(getRequest.getGuid());
       sensorTypes.add(getRequest.getSensorType());
     }
-    List<Document> documents = searchByGuids(
-        guids,
-        sensorTypes,
-        hit -> {
-          Long ts = 0L;
-          String doc = hit.getSourceAsString();
-          String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
-          try {
-            return Optional.of(new Document(doc, hit.getId(), sourceType, ts));
-          } catch (IOException e) {
-            throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
-          }
-        }
-
-    );
+    List<Document> documents = searchByGuids(guids, sensorTypes, hit -> toDocument(hit));
     return documents;
   }
 
@@ -102,54 +92,47 @@ public Document getLatest(String guid, String sensorType) throws IOException {
     if (guids == null || guids.isEmpty()) {
       return Collections.emptyList();
     }
-    QueryBuilder query = null;
-    IdsQueryBuilder idsQuery;
-    if (sensorTypes != null) {
-      String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc")
-          .toArray(String[]::new);
-      idsQuery = QueryBuilders.idsQuery(types);
-    } else {
-      idsQuery = QueryBuilders.idsQuery();
-    }
 
-    for (String guid : guids) {
-      query = idsQuery.addIds(guid);
+    // should match any of the guids
+    // the 'guid' field must be of type 'keyword' or this term query will not match
+    BoolQueryBuilder guidQuery = boolQuery().must(termsQuery(Constants.GUID, guids));
+
+    // should match any of the sensor types
+    BoolQueryBuilder sensorQuery = boolQuery();
+    sensorTypes.forEach(sensorType -> sensorQuery.should(typeQuery(sensorType + "_doc")));
+
+    // must have a match for both guid and sensor
+    BoolQueryBuilder query = boolQuery()
+            .must(guidQuery)
+            .must(sensorQuery);
+
+    // submit the search
+    SearchResponse response;
+    try {
+      SearchSourceBuilder source = new SearchSourceBuilder()
+              .query(query)
+              .size(guids.size());
+      SearchRequest request = new SearchRequest().source(source);
+      response = submitter.submitSearch(request);
+
+    } catch(InvalidSearchException e) {
+      throw new IOException(e);
     }
-    SearchRequest request = new SearchRequest();
-    SearchSourceBuilder builder = new SearchSourceBuilder();
-    builder.query(query);
-    builder.size(guids.size());
-    request.source(builder);
-
-    org.elasticsearch.action.search.SearchResponse response = transportClient.getHighLevelClient().search(request);
-    SearchHits hits = response.getHits();
+
+    // transform the search hits to results using the callback
     List<T> results = new ArrayList<>();
-    for (SearchHit hit : hits) {
+    for(SearchHit hit: response.getHits()) {
       Optional<T> result = callback.apply(hit);
-      if (result.isPresent()) {
-        results.add(result.get());
-      }
+      result.ifPresent(r -> results.add(r));
     }
+
     return results;
   }
 
-  private Optional<Document> toDocument(final String guid, SearchHit hit) {
-    Long ts = 0L;
-    String doc = hit.getSourceAsString();
-    String sourceType = toSourceType(hit.getType());
-    try {
-      return Optional.of(new Document(doc, guid, sourceType, ts));
-    } catch (IOException e) {
-      throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
-    }
-  }
+  private Optional<Document> toDocument(SearchHit hit) {
+    Document document = Document.fromJSON(hit.getSource());
+    document.setDocumentID(hit.getId());
 
-  /**
-   * Returns the source type based on a given doc type.
-   * @param docType The document type.
-   * @return The source type.
-   */
-  private String toSourceType(String docType) {
-    return Iterables.getFirst(Splitter.on("_doc").split(docType), null);
+    return Optional.of(document);
   }
 }
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java
index b313811fcf..c6389d7acc 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java
@@ -20,6 +20,7 @@
 import org.apache.metron.common.Constants;
 import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
@@ -130,15 +131,21 @@ public void testSizeWhenWriteFails() throws IOException {
     }
 
     private void setupElasticsearchToFail() throws IOException {
+        final String errorMessage = "error message";
+        final Exception cause = new Exception("test exception");
+        final boolean isFailed = true;
+        final int itemID = 0;
+
         // define the item failure
         BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class);
-        when(failure.getCause()).thenReturn(new Exception("test exception"));
-        when(failure.getMessage()).thenReturn("error message");
+        when(failure.getCause()).thenReturn(cause);
+        when(failure.getMessage()).thenReturn(errorMessage);
 
         // define the item level response
         BulkItemResponse itemResponse = mock(BulkItemResponse.class);
-        when(itemResponse.isFailed()).thenReturn(true);
-        when(itemResponse.getItemId()).thenReturn(0);
+        when(itemResponse.isFailed()).thenReturn(isFailed);
+        when(itemResponse.getItemId()).thenReturn(itemID);
+
         when(itemResponse.getFailure()).thenReturn(failure);
         when(itemResponse.getFailureMessage()).thenReturn("error message");
         List<BulkItemResponse> itemsResponses = Collections.singletonList(itemResponse);
@@ -146,16 +153,32 @@ private void setupElasticsearchToFail() throws IOException {
         // define the bulk response to indicate failure
         BulkResponse response = mock(BulkResponse.class);
         when(response.iterator()).thenReturn(itemsResponses.iterator());
-        when(response.hasFailures()).thenReturn(true);
+        when(response.hasFailures()).thenReturn(isFailed);
 
         // have the client return the mock response
         when(highLevelClient.bulk(any(BulkRequest.class))).thenReturn(response);
     }
 
     private void setupElasticsearchToSucceed() throws IOException {
+        final String documentId = UUID.randomUUID().toString();
+        final boolean isFailed = false;
+        final int itemID = 0;
+
+        // the write response will contain what is used as the document ID
+        DocWriteResponse writeResponse = mock(DocWriteResponse.class);
+        when(writeResponse.getId()).thenReturn(documentId);
+
+        // define the item level response
+        BulkItemResponse itemResponse = mock(BulkItemResponse.class);
+        when(itemResponse.isFailed()).thenReturn(isFailed);
+        when(itemResponse.getItemId()).thenReturn(itemID);
+        when(itemResponse.getResponse()).thenReturn(writeResponse);
+        List<BulkItemResponse> itemsResponses = Collections.singletonList(itemResponse);
+
         // define the bulk response to indicate success
         BulkResponse response = mock(BulkResponse.class);
-        when(response.hasFailures()).thenReturn(false);
+        when(response.iterator()).thenReturn(itemsResponses.iterator());
+        when(response.hasFailures()).thenReturn(isFailed);
 
         // have the client return the mock response
         when(highLevelClient.bulk(any(BulkRequest.class))).thenReturn(response);
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
index 7a84588c0a..917df4df9a 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
@@ -27,6 +27,7 @@
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.SearchShardTarget;
 import org.junit.Test;
 
@@ -55,15 +56,19 @@ public ElasticsearchRequestSubmitter setup(SearchResponse response) throws IOExc
 
   @Test
   public void searchShouldSucceedWhenOK() throws InvalidSearchException, IOException {
-
     // mocks
     SearchResponse response = mock(SearchResponse.class);
     SearchRequest request = new SearchRequest();
 
+    // response will indicate 1 search hit
+    SearchHits hits = mock(SearchHits.class);
+    when(hits.getTotalHits()).thenReturn(1L);
+
     // response will have status of OK and no failed shards
     when(response.status()).thenReturn(RestStatus.OK);
     when(response.getFailedShards()).thenReturn(0);
     when(response.getTotalShards()).thenReturn(2);
+    when(response.getHits()).thenReturn(hits);
 
     // search should succeed
     ElasticsearchRequestSubmitter submitter = setup(response);
@@ -99,9 +104,14 @@ public void searchShouldHandleShardFailure() throws InvalidSearchException, IOEx
     // response will have status of OK
     when(response.status()).thenReturn(RestStatus.OK);
 
+    // response will indicate 1 search hit
+    SearchHits hits = mock(SearchHits.class);
+    when(hits.getTotalHits()).thenReturn(1L);
+
     // the response will report shard failures
     when(response.getFailedShards()).thenReturn(1);
     when(response.getTotalShards()).thenReturn(2);
+    when(response.getHits()).thenReturn(hits);
 
     // the response will return the failures
     ShardSearchFailure[] failures = { fail };
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchBulkDocumentWriterIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchBulkDocumentWriterIntegrationTest.java
new file mode 100644
index 0000000000..df4aeb0089
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchBulkDocumentWriterIntegrationTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.integration;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
+import org.apache.metron.elasticsearch.dao.ElasticsearchRetrieveLatestDao;
+import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.Response;
+import org.hamcrest.CoreMatchers;
+import org.json.simple.JSONObject;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class ElasticsearchBulkDocumentWriterIntegrationTest {
+
+    @ClassRule
+    public static TemporaryFolder indexDir = new TemporaryFolder();
+    private static String broTemplatePath = "../../metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template";
+    private static ElasticSearchComponent elasticsearch;
+    private ElasticsearchClient client;
+    private ElasticsearchBulkDocumentWriter<Document> writer;
+    private ElasticsearchRetrieveLatestDao retrieveDao;
+
+    @BeforeClass
+    public static void setupElasticsearch() throws Exception {
+        AccessConfig accessConfig = new AccessConfig();
+        accessConfig.setGlobalConfigSupplier(() -> globals());
+
+        elasticsearch = new ElasticSearchComponent.Builder()
+                .withHttpPort(9211)
+                .withIndexDir(indexDir.getRoot())
+                .withAccessConfig(accessConfig)
+                .build();
+        elasticsearch.start();
+    }
+
+    @AfterClass
+    public static void tearDownElasticsearch() {
+        if(elasticsearch != null) {
+            elasticsearch.stop();
+        }
+    }
+
+    @Before
+    public void setup() throws Exception {
+        client = ElasticsearchClientFactory.create(globals());
+        retrieveDao = new ElasticsearchRetrieveLatestDao(client);
+        writer = new ElasticsearchBulkDocumentWriter<>(client)
+                .withRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
+
+        // add bro template
+        JSONObject broTemplate = JSONUtils.INSTANCE.load(new File(broTemplatePath), JSONObject.class);
+        String broTemplateJson = JSONUtils.INSTANCE.toJSON(broTemplate, true);
+        HttpEntity broEntity = new NStringEntity(broTemplateJson, ContentType.APPLICATION_JSON);
+        Response response = client
+                .getLowLevelClient()
+                .performRequest("PUT", "/_template/bro_template", Collections.emptyMap(), broEntity);
+        assertThat(response.getStatusLine().getStatusCode(), CoreMatchers.equalTo(200));
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        if(client != null) {
+            client.close();
+        }
+    }
+
+    @Test
+    public void testWrite() throws Exception {
+        // create some documents to write
+        List<Document> documents = new ArrayList<>();
+        for(int i=0; i<10; i++) {
+            Document document = Document.fromJSON(createMessage());
+            documents.add(document);
+        }
+
+        // write the documents
+        for(Document doc: documents) {
+            writer.addDocument(doc, "bro_index");
+        }
+        writer.write();
+
+        // ensure the documents were written
+        for(Document expected: documents) {
+            Document actual = retrieveDao.getLatest(expected.getGuid(), expected.getSensorType());
+            assertNotNull("No document found", actual);
+            assertEquals(expected.getGuid(), actual.getGuid());
+            assertEquals(expected.getSensorType(), actual.getSensorType());
+            assertEquals(expected.getDocument(), actual.getDocument());
+            assertTrue(actual.getDocumentID().isPresent());
+
+            // the document ID and GUID should not be the same, since the document ID was auto-generated
+            assertNotEquals(actual.getDocument(), actual.getGuid());
+        }
+    }
+
+    private static Map<String, Object> globals() {
+        Map<String, Object> globals = new HashMap<>();
+        globals.put("es.clustername", "metron");
+        globals.put("es.ip", "localhost");
+        globals.put("es.port", "9200");
+        globals.put("es.date.format", "yyyy.MM.dd.HH");
+        return globals;
+    }
+
+    private JSONObject createMessage() {
+        JSONObject message = new JSONObject();
+        message.put(Constants.GUID, UUID.randomUUID().toString());
+        message.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis());
+        message.put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
+        message.put("source:type", "bro");
+        return message;
+    }
+}
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
index c3e2108e99..09f7df9ba7 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
@@ -20,14 +20,7 @@
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
+import org.apache.commons.lang.ClassUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.indexing.dao.search.GetRequest;
@@ -38,11 +31,25 @@
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
 import org.apache.metron.indexing.dao.update.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class MultiIndexDao implements IndexDao {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private List<IndexDao> indices;
 
-  public MultiIndexDao( IndexDao... composedDao) {
+  public MultiIndexDao(IndexDao... composedDao) {
     indices = new ArrayList<>();
     Collections.addAll(indices, composedDao);
   }
@@ -117,18 +124,30 @@ public Document addCommentToAlert(CommentAddRemoveRequest request) throws IOExce
    */
   @Override
   public Document addCommentToAlert(CommentAddRemoveRequest request, Document latest) throws IOException {
-    List<DocumentContainer> output =
-            indices.parallelStream().map(dao -> {
-              try {
-                return new DocumentContainer(dao.addCommentToAlert(request, latest));
-              } catch (Throwable e) {
-                return new DocumentContainer(e);
-              }
-            }).collect(Collectors.toList());
-
+    List<DocumentContainer> output = indices
+            .parallelStream()
+            .map(dao -> addCommentToAlert(dao, request, latest))
+            .collect(Collectors.toList());
     return getLatestDocument(output);
   }
 
+  private DocumentContainer addCommentToAlert(IndexDao indexDao, CommentAddRemoveRequest request, Document latest) {
+    DocumentContainer container;
+    try {
+      Document document = indexDao.addCommentToAlert(request, latest);
+      container = new DocumentContainer(document);
+      LOG.debug("Added comment to alert; indexDao={}, guid={}, sensorType={}, document={}",
+              ClassUtils.getShortClassName(indexDao.getClass()), document.getGuid(), document.getSensorType(), document);
+
+    } catch (Throwable e) {
+      container = new DocumentContainer(e);
+      LOG.error("Unable to add comment to alert; indexDao={}, error={}",
+              ClassUtils.getShortClassName(indexDao.getClass()), ExceptionUtils.getRootCauseMessage(e));
+    }
+
+    return container;
+  }
+
   @Override
   public Document removeCommentFromAlert(CommentAddRemoveRequest request) throws IOException {
     Document latest = getLatest(request.getGuid(), request.getSensorType());
@@ -145,18 +164,30 @@ public Document removeCommentFromAlert(CommentAddRemoveRequest request) throws I
    */
   @Override
   public Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest) throws IOException {
-    List<DocumentContainer> output =
-            indices.parallelStream().map(dao -> {
-              try {
-                return new DocumentContainer(dao.removeCommentFromAlert(request, latest));
-              } catch (Throwable e) {
-                return new DocumentContainer(e);
-              }
-            }).collect(Collectors.toList());
-
+    List<DocumentContainer> output = indices
+            .parallelStream()
+            .map(dao -> removeCommentFromAlert(dao, request, latest))
+            .collect(Collectors.toList());
     return getLatestDocument(output);
   }
 
+  private DocumentContainer removeCommentFromAlert(IndexDao indexDao, CommentAddRemoveRequest request, Document latest) {
+    DocumentContainer container;
+    try {
+      Document document = indexDao.removeCommentFromAlert(request, latest);
+      container = new DocumentContainer(document);
+      LOG.debug("Removed comment from alert; indexDao={}, guid={}, sensorType={}, document={}",
+              ClassUtils.getShortClassName(indexDao.getClass()), document.getGuid(), document.getSensorType(), document);
+
+    } catch (Throwable e) {
+      container = new DocumentContainer(e);
+      LOG.error("Unable to remove comment from alert; indexDao={}, error={}",
+              ClassUtils.getShortClassName(indexDao.getClass()), ExceptionUtils.getRootCauseMessage(e));
+    }
+
+    return container;
+  }
+
   protected static class DocumentContainer {
     private Optional<Document> d = Optional.empty();
     private Optional<Throwable> t = Optional.empty();
@@ -226,18 +257,30 @@ public void init(AccessConfig config) {
 
   @Override
   public Document getLatest(final String guid, String sensorType) throws IOException {
-    List<DocumentContainer> output =
-            indices.parallelStream().map(dao -> {
-      try {
-        return new DocumentContainer(dao.getLatest(guid, sensorType));
-      } catch (Throwable e) {
-        return new DocumentContainer(e);
-      }
-    }).collect(Collectors.toList());
-
+    List<DocumentContainer> output = indices
+            .parallelStream()
+            .map(dao -> getLatest(dao, guid, sensorType))
+            .collect(Collectors.toList());
     return getLatestDocument(output);
   }
 
+  private DocumentContainer getLatest(IndexDao indexDao, String guid, String sensorType) {
+    DocumentContainer container;
+    try {
+      Document document = indexDao.getLatest(guid, sensorType);
+      container = new DocumentContainer(document);
+      LOG.debug("Found latest document; indexDao={}, guid={}, sensorType={}, document={}",
+              ClassUtils.getShortClassName(indexDao.getClass()), guid, sensorType, document);
+
+    } catch (Throwable e) {
+      container = new DocumentContainer(e);
+      LOG.error("Unable to find latest document; indexDao={}, error={}",
+              ClassUtils.getShortClassName(indexDao.getClass()), ExceptionUtils.getRootCauseMessage(e));
+    }
+
+    return container;
+  }
+
   @Override
   public Iterable<Document> getAllLatest(
       List<GetRequest> getRequests) throws IOException {
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertConstants.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertConstants.java
index daa54246d7..b47320090e 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertConstants.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertConstants.java
@@ -20,10 +20,24 @@
 
 public class MetaAlertConstants {
   public static String METAALERT_TYPE = "metaalert";
+
+  /**
+   * The name of the field in an alert that contains a list
+   * of GUIDs of all meta-alerts the alert is associated with.
+   *
+   * <p>Only standard, non-metaalerts will have this field.
+   */
   public static String METAALERT_FIELD = "metaalerts";
   public static String METAALERT_DOC = METAALERT_TYPE + "_doc";
   public static String THREAT_FIELD_DEFAULT = "threat:triage:score";
   public static String THREAT_SORT_DEFAULT = "sum";
+
+  /**
+   * The name of the field in a meta-alert that contains a list of
+   * all alerts associated with the meta-alert.
+   *
+   * <p>Only meta-alerts will have this field.
+   */
   public static String ALERT_FIELD = "metron_alert";
   public static String STATUS_FIELD = "status";
   public static String GROUPS_FIELD = "groups";
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java
index 3686b19d23..0a028e54bd 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java
@@ -21,19 +21,39 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.metron.common.Constants.Fields.TIMESTAMP;
+import static org.apache.metron.common.Constants.GUID;
+import static org.apache.metron.common.Constants.SENSOR_TYPE;
 import org.apache.metron.common.utils.JSONUtils;
 
 public class Document {
+
   Long timestamp;
   Map<String, Object> document;
   String guid;
   String sensorType;
+  String documentID;
+
+  public static Document fromJSON(Map<String, Object> json) {
+    String guid = getGUID(json);
+    Long timestamp = getTimestamp(json).orElse(0L);
+    String sensorType = getSensorType(json);
+    return new Document(json, guid, sensorType, timestamp);
+  }
 
   public Document(Map<String, Object> document, String guid, String sensorType, Long timestamp) {
+    this(document, guid, sensorType, timestamp, null);
+  }
+
+  public Document(Map<String, Object> document, String guid, String sensorType, Long timestamp, String documentID) {
     setDocument(document);
     setGuid(guid);
     setTimestamp(timestamp);
     setSensorType(sensorType);
+    setDocumentID(documentID);
   }
 
   public Document(String document, String guid, String sensorType, Long timestamp) throws IOException {
@@ -41,7 +61,7 @@ public Document(String document, String guid, String sensorType, Long timestamp)
   }
 
   public Document(String document, String guid, String sensorType) throws IOException {
-    this( document, guid, sensorType, null);
+    this(document, guid, sensorType, null);
   }
 
   /**
@@ -49,8 +69,11 @@ public Document(String document, String guid, String sensorType) throws IOExcept
    * @param other The document to be copied.
    */
   public Document(Document other) {
-    this(new HashMap<>(other.getDocument()), other.getGuid(), other.getSensorType(),
-        other.getTimestamp());
+    this(new HashMap<>(other.getDocument()),
+            other.getGuid(),
+            other.getSensorType(),
+            other.getTimestamp(),
+            other.getDocumentID().orElse(null));
   }
 
   private static Map<String, Object> convertDoc(String document) throws IOException {
@@ -89,46 +112,83 @@ public void setGuid(String guid) {
     this.guid = guid;
   }
 
-  @Override
-  public String toString() {
-    return "Document{" +
-        "timestamp=" + timestamp +
-        ", document=" + document +
-        ", guid='" + guid + '\'' +
-        ", sensorType='" + sensorType + '\'' +
-        '}';
+  /**
+   * Returns the unique identifier that is used when persisting this document.
+   *
+   * <p>This value will be different than the Metron guid.
+   *
+   * <p>Only present when a document has been retrieved from a store
+   * that supports a document ID, like Elasticsearch.  This value will
+   * not be present when retrieved from HBase.
+   */
+  public Optional<String> getDocumentID() {
+    return Optional.ofNullable(documentID);
   }
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
+  public void setDocumentID(Optional<String> documentID) {
+    this.documentID = documentID.orElse(null);
+  }
+
+  public void setDocumentID(String documentID) {
+    this.documentID = documentID;
+  }
+
+  private static Optional<Long> getTimestamp(Map<String, Object> document) {
+    Object value = document.get(TIMESTAMP.getName());
+    if(value != null && value instanceof Long) {
+      return Optional.of(Long.class.cast(value));
     }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
+    return Optional.empty();
+  }
+
+  private static String getGUID(Map<String, Object> document) {
+    Object value = document.get(GUID);
+    if(value != null && value instanceof String) {
+      return String.class.cast(value);
     }
 
-    Document document1 = (Document) o;
+    throw new IllegalStateException(String.format("Missing '%s' field", GUID));
+  }
 
-    if (timestamp != null ? !timestamp.equals(document1.timestamp) : document1.timestamp != null) {
-      return false;
-    }
-    if (document != null ? !document.equals(document1.document) : document1.document != null) {
-      return false;
+  private static String getSensorType(Map<String, Object> document) {
+    Object value = document.get(SENSOR_TYPE);
+    if(value != null && value instanceof String) {
+      return String.class.cast(value);
     }
-    if (guid != null ? !guid.equals(document1.guid) : document1.guid != null) {
-      return false;
+
+    value = document.get(SENSOR_TYPE.replace(".", ":"));
+    if(value != null && value instanceof String) {
+      return String.class.cast(value);
     }
-    return sensorType != null ? sensorType.equals(document1.sensorType)
-        : document1.sensorType == null;
+
+    throw new IllegalStateException(String.format("Missing '%s' field", SENSOR_TYPE));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof Document)) return false;
+    Document document1 = (Document) o;
+    return Objects.equals(timestamp, document1.timestamp) &&
+            Objects.equals(document, document1.document) &&
+            Objects.equals(guid, document1.guid) &&
+            Objects.equals(sensorType, document1.sensorType) &&
+            Objects.equals(documentID, document1.documentID);
   }
 
   @Override
   public int hashCode() {
-    int result = timestamp != null ? timestamp.hashCode() : 0;
-    result = 31 * result + (document != null ? document.hashCode() : 0);
-    result = 31 * result + (guid != null ? guid.hashCode() : 0);
-    result = 31 * result + (sensorType != null ? sensorType.hashCode() : 0);
-    return result;
+    return Objects.hash(timestamp, document, guid, sensorType, documentID);
+  }
+
+  @Override
+  public String toString() {
+    return "Document{" +
+            "timestamp=" + timestamp +
+            ", document=" + document +
+            ", guid='" + guid + '\'' +
+            ", sensorType='" + sensorType + '\'' +
+            ", documentID=" + documentID +
+            '}';
   }
 }
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java
index 82f0a4958b..ef1d298861 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java
@@ -24,6 +24,8 @@
 import java.util.Map;
 import java.util.Optional;
 
+import static java.lang.String.format;
+
 public interface UpdateDao {
 
   /**
@@ -55,7 +57,6 @@
 
   Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest) throws IOException;
 
-
   /**
    * Update a document in an index given a JSON Patch (see RFC 6902 at
    * https://tools.ietf.org/html/rfc6902)
@@ -73,23 +74,28 @@ default Document patch(RetrieveLatestDao retrieveLatestDao, PatchRequest request
   }
 
   default Document getPatchedDocument(RetrieveLatestDao retrieveLatestDao, PatchRequest request,
-      Optional<Long> timestamp
+      Optional<Long> optionalTimestamp
   ) throws OriginalNotFoundException, IOException {
-    Map<String, Object> latest = request.getSource();
-    if (latest == null) {
-      Document latestDoc = retrieveLatestDao.getLatest(request.getGuid(), request.getSensorType());
-      if (latestDoc != null && latestDoc.getDocument() != null) {
-        latest = latestDoc.getDocument();
+    String guid = request.getGuid();
+    String sensorType = request.getSensorType();
+    String documentID = null;
+    Long timestamp = optionalTimestamp.orElse(System.currentTimeMillis());
+
+    Map<String, Object> originalSource = request.getSource();
+    if (originalSource == null) {
+      // no document source provided, lookup the latest
+      Document toPatch = retrieveLatestDao.getLatest(guid, sensorType);
+      if(toPatch != null && toPatch.getDocument() != null) {
+        originalSource = toPatch.getDocument();
+        documentID = toPatch.getDocumentID().orElse(null);
+
       } else {
-        throw new OriginalNotFoundException(
-            "Unable to patch an document that doesn't exist and isn't specified.");
+        String error = format("Document does not exist, but is required; guid=%s, sensorType=%s", guid, sensorType);
+        throw new OriginalNotFoundException(error);
       }
     }
 
-    Map<String, Object> updated = JSONUtils.INSTANCE.applyPatch(request.getPatch(), latest);
-    return new Document(updated,
-        request.getGuid(),
-        request.getSensorType(),
-        timestamp.orElse(System.currentTimeMillis()));
+    Map<String, Object> patchedSource = JSONUtils.INSTANCE.applyPatch(request.getPatch(), originalSource);
+    return new Document(patchedSource, guid, sensorType, timestamp, documentID);
   }
 }
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
index f1355a6a47..5a18fc50b5 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
@@ -55,6 +55,7 @@
 import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
 import org.apache.metron.indexing.dao.update.PatchRequest;
+import org.apache.metron.integration.utils.TestUtils;
 import org.json.simple.parser.ParseException;
 import org.junit.Assert;
 import org.junit.Test;
@@ -231,8 +232,8 @@ public void shouldSortByThreatTriageScore() throws Exception {
     SearchResponse result = metaDao.search(sr);
     List<SearchResult> results = result.getResults();
     Assert.assertEquals(2, results.size());
-    Assert.assertEquals("meta_active_0", results.get((0)).getId());
-    Assert.assertEquals("message_1", results.get((1)).getId());
+    Assert.assertEquals("meta_active_0", results.get((0)).getSource().get(Constants.GUID));
+    Assert.assertEquals("message_1", results.get((1)).getSource().get(Constants.GUID));
 
     // Test ascending
     SortField sfAsc = new SortField();
@@ -245,8 +246,8 @@ public void shouldSortByThreatTriageScore() throws Exception {
     srAsc.setSort(Collections.singletonList(sfAsc));
     result = metaDao.search(srAsc);
     results = result.getResults();
-    Assert.assertEquals("message_1", results.get((0)).getId());
-    Assert.assertEquals("meta_active_0", results.get((1)).getId());
+    Assert.assertEquals("message_1", results.get((0)).getSource().get(Constants.GUID));
+    Assert.assertEquals("meta_active_0", results.get((1)).getSource().get(Constants.GUID));
     Assert.assertEquals(2, results.size());
   }
 
@@ -856,92 +857,72 @@ public void shouldHidesAlertsOnGroup() throws Exception {
   @Test
   public abstract void shouldSearchByNestedAlert() throws Exception;
 
-  @SuppressWarnings("unchecked")
+  /**
+   * If a meta-alert is active, any updates to alerts associated with a meta-alert
+   * should be reflected in both the original alert and the copy contained within
+   * the meta-alert.
+   */
   @Test
   public void shouldUpdateMetaAlertOnAlertUpdate() throws Exception {
-    // Load alerts
-    List<Map<String, Object>> alerts = buildAlerts(2);
-    alerts.get(0).put(METAALERT_FIELD, Arrays.asList("meta_active", "meta_inactive"));
-    addRecords(alerts, getTestIndexFullName(), SENSOR_NAME);
-
-    // Load metaAlerts
-    Map<String, Object> activeMetaAlert = buildMetaAlert("meta_active", MetaAlertStatus.ACTIVE,
-        Optional.of(Collections.singletonList(alerts.get(0))));
-    Map<String, Object> inactiveMetaAlert = buildMetaAlert("meta_inactive",
-        MetaAlertStatus.INACTIVE,
-        Optional.of(Collections.singletonList(alerts.get(0))));
-    // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
-    addRecords(Arrays.asList(activeMetaAlert, inactiveMetaAlert), getMetaAlertIndex(),
-        METAALERT_TYPE);
-
-    // Verify load was successful
-    findCreatedDocs(Arrays.asList(
-        new GetRequest("message_0", SENSOR_NAME),
-        new GetRequest("message_1", SENSOR_NAME),
-        new GetRequest("meta_active", METAALERT_TYPE),
-        new GetRequest("meta_inactive", METAALERT_TYPE)));
-
+    final String expectedFieldValue = "metron";
     {
-      // Modify the first message and add a new field
-      Map<String, Object> message0 = new HashMap<String, Object>(alerts.get(0)) {
-        {
-          put(NEW_FIELD, "metron");
-          put(THREAT_FIELD_DEFAULT, 10.0d);
-        }
-      };
-      String guid = "" + message0.get(Constants.GUID);
-      metaDao.update(new Document(message0, guid, SENSOR_NAME, null),
-          Optional.of(getTestIndexFullName()));
-
-      {
-        // Verify alerts are up-to-date
-        findUpdatedDoc(message0, guid, SENSOR_NAME);
-        long cnt = getMatchingAlertCount(NEW_FIELD, message0.get(NEW_FIELD));
-        if (cnt == 0) {
-          Assert.fail("Alert not updated!");
-        }
-      }
-
-      {
-        // Verify meta alerts are up-to-date
-        long cnt = getMatchingMetaAlertCount(NEW_FIELD, "metron");
-        if (cnt == 0) {
-          Assert.fail("Active metaalert was not updated!");
-        }
-        if (cnt != 1) {
-          Assert.fail("Metaalerts not updated correctly!");
-        }
-      }
+      // create 2 'regular' alerts that will be associated with meta-alerts
+      List<Map<String, Object>> alerts = buildAlerts(2);
+      alerts.get(0).put(METAALERT_FIELD, Arrays.asList("meta_active", "meta_inactive"));
+      addRecords(alerts, getTestIndexFullName(), SENSOR_NAME);
+
+      // the active meta-alert should be updated when an associated alert is updated
+      Map<String, Object> activeMetaAlert = buildMetaAlert("meta_active", MetaAlertStatus.ACTIVE,
+              Optional.of(Collections.singletonList(alerts.get(0))));
+
+      // the inactive meta-alert should NOT be updated when an associated alert is updated
+      Map<String, Object> inactiveMetaAlert = buildMetaAlert("meta_inactive", MetaAlertStatus.INACTIVE,
+              Optional.of(Collections.singletonList(alerts.get(0))));
+
+      // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
+      addRecords(Arrays.asList(activeMetaAlert, inactiveMetaAlert), getMetaAlertIndex(), METAALERT_TYPE);
+
+      // Verify load was successful
+      findCreatedDocs(Arrays.asList(
+              new GetRequest("message_0", SENSOR_NAME),
+              new GetRequest("message_1", SENSOR_NAME),
+              new GetRequest("meta_active", METAALERT_TYPE),
+              new GetRequest("meta_inactive", METAALERT_TYPE)));
     }
-    //modify the same message and modify the new field
     {
-      Map<String, Object> message0 = new HashMap<String, Object>(alerts.get(0)) {
-        {
-          put(NEW_FIELD, "metron2");
-        }
-      };
-      String guid = "" + message0.get(Constants.GUID);
-      metaDao.update(new Document(message0, guid, SENSOR_NAME, null), Optional.empty());
-
-      {
-        // Verify index is up-to-date
-        findUpdatedDoc(message0, guid, SENSOR_NAME);
-        long cnt = getMatchingAlertCount(NEW_FIELD, message0.get(NEW_FIELD));
-        if (cnt == 0) {
-          Assert.fail("Alert not updated!");
-        }
-      }
-      {
-        // Verify meta alerts are up-to-date
-        long cnt = getMatchingMetaAlertCount(NEW_FIELD, "metron2");
-        if (cnt == 0) {
-          Assert.fail("Active metaalert was not updated!");
-        }
-        if (cnt != 1) {
-          Assert.fail("Metaalerts not updated correctly!");
-        }
-      }
+      // modify the 'normal' alert by adding a field
+      Document message0 = metaDao.getLatest("message_0", SENSOR_NAME);
+      message0.getDocument().put(NEW_FIELD, expectedFieldValue);
+      message0.getDocument().put(THREAT_FIELD_DEFAULT, 10.0d);
+      metaDao.update(message0, Optional.of(getTestIndexFullName()));
     }
+
+    // ensure the original 'normal' alert was itself updated
+    assertEventually(() -> {
+      Document message0 = metaDao.getLatest("message_0", SENSOR_NAME);
+      Assert.assertNotNull(message0);
+      Assert.assertEquals(expectedFieldValue, message0.getDocument().get(NEW_FIELD));
+    });
+
+    // the 'active' meta-alert, which contains a copy of the updated alert should also be updated
+    assertEventually(() -> {
+      Document active = metaDao.getLatest("meta_active", METAALERT_TYPE);
+      Object value = active.getDocument().get(ALERT_FIELD);
+      List<Map<String, Object>> children = List.class.cast(value);
+      Assert.assertNotNull(children);
+      Assert.assertEquals(1, children.size());
+      Assert.assertEquals(expectedFieldValue, children.get(0).get(NEW_FIELD));
+    });
+
+    // the 'inactive' meta-alert, which contains a copy of the updated alert should NOT be updated
+    assertEventually(() -> {
+      Document inactive = metaDao.getLatest("meta_inactive", METAALERT_TYPE);
+      Object value = inactive.getDocument().get(ALERT_FIELD);
+      List<Map<String, Object>> children = List.class.cast(value);
+      Assert.assertNotNull(children);
+      Assert.assertEquals(1, children.size());
+      Assert.assertFalse(children.get(0).containsKey(NEW_FIELD));
+    });
   }
 
   @Test
@@ -957,7 +938,7 @@ public void shouldThrowExceptionOnMetaAlertUpdate() throws Exception {
   }
 
   @Test
-  public void shouldPatchAllowedMetaAlerts() throws Exception {
+  public void shouldPatchMetaAlertFields() throws Exception {
     // Load alerts
     List<Map<String, Object>> alerts = buildAlerts(2);
     alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_active"));
@@ -969,63 +950,108 @@ public void shouldPatchAllowedMetaAlerts() throws Exception {
 
     // Load metaAlerts
     Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE,
-        Optional.of(Arrays.asList(alerts.get(0), alerts.get(1))));
+            Optional.of(Arrays.asList(alerts.get(0), alerts.get(1))));
     // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
     addRecords(Collections.singletonList(metaAlert), getMetaAlertIndex(), METAALERT_TYPE);
 
-    // Verify load was successful
+    // ensure the test data was loaded
     findCreatedDocs(Arrays.asList(
-        new GetRequest("message_0", SENSOR_NAME),
-        new GetRequest("message_1", SENSOR_NAME),
-        new GetRequest("meta_alert", METAALERT_TYPE)));
+            new GetRequest("message_0", SENSOR_NAME),
+            new GetRequest("message_1", SENSOR_NAME),
+            new GetRequest("meta_alert", METAALERT_TYPE)));
 
-    Map<String, Object> expectedMetaAlert = new HashMap<>(metaAlert);
-    expectedMetaAlert.put(NAME_FIELD, "New Meta Alert");
-    {
-      // Verify a patch to a field other than "status" or "alert" can be patched
-      String namePatch = namePatchRequest.replace(META_INDEX_FLAG, getMetaAlertIndex());
-      PatchRequest patchRequest = JSONUtils.INSTANCE.load(namePatch, PatchRequest.class);
+    // patch the name field
+    String namePatch = namePatchRequest.replace(META_INDEX_FLAG, getMetaAlertIndex());
+    PatchRequest patchRequest = JSONUtils.INSTANCE.load(namePatch, PatchRequest.class);
+    metaDao.patch(metaDao, patchRequest, Optional.of(System.currentTimeMillis()));
+
+    // ensure the alert was patched
+    assertEventually(() -> {
+      Document updated = metaDao.getLatest("meta_alert", METAALERT_TYPE);
+      Assert.assertEquals("New Meta Alert", updated.getDocument().get(NAME_FIELD));
+    });
+  }
+
+  @Test
+  public void shouldThrowExceptionIfPatchAlertField() throws Exception {
+    setupTypings();
+
+    // add 2 alerts to an active meta-alert
+    List<Map<String, Object>> alerts = buildAlerts(2);
+    alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_active"));
+    alerts.get(1).put(METAALERT_FIELD, Collections.singletonList("meta_active"));
+    addRecords(alerts, getTestIndexFullName(), SENSOR_NAME);
+
+    // create an active meta-alert
+    Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE,
+            Optional.of(Arrays.asList(alerts.get(0), alerts.get(1))));
+    addRecords(Collections.singletonList(metaAlert), getMetaAlertIndex(), METAALERT_TYPE);
+
+    // ensure the test data was loaded
+    findCreatedDocs(Arrays.asList(
+            new GetRequest("message_0", SENSOR_NAME),
+            new GetRequest("message_1", SENSOR_NAME),
+            new GetRequest("meta_alert", METAALERT_TYPE)));
+
+    // attempt to patch the alert field
+    try {
+      String alertPatch = alertPatchRequest.replace(META_INDEX_FLAG, getMetaAlertIndex());
+      PatchRequest patchRequest = JSONUtils.INSTANCE.load(alertPatch, PatchRequest.class);
       metaDao.patch(metaDao, patchRequest, Optional.of(System.currentTimeMillis()));
+      Assert.fail("A patch on the alert field should throw an exception");
 
-      findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+    } catch (IllegalArgumentException iae) {
+      Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths.  "
+                      + "Please use the add/remove alert or update status functions instead.",
+              iae.getMessage());
     }
 
-    {
-      // Verify a patch to an alert field should throw an exception
-      try {
-        String alertPatch = alertPatchRequest.replace(META_INDEX_FLAG, getMetaAlertIndex());
-        PatchRequest patchRequest = JSONUtils.INSTANCE.load(alertPatch, PatchRequest.class);
-        metaDao.patch(metaDao, patchRequest, Optional.of(System.currentTimeMillis()));
-
-        Assert.fail("A patch on the alert field should throw an exception");
-      } catch (IllegalArgumentException iae) {
-        Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths.  "
-                + "Please use the add/remove alert or update status functions instead.",
-            iae.getMessage());
-      }
+    // ensure the alert field was NOT changed
+    assertEventually(() -> {
+      Document updated = metaDao.getLatest("meta_alert", METAALERT_TYPE);
+      Assert.assertEquals(metaAlert.get(ALERT_FIELD), updated.getDocument().get(ALERT_FIELD));
+    });
+  }
 
-      // Verify the metaAlert was not updated
-      findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
-    }
+  @Test
+  public void shouldThrowExceptionIfPatchStatusField() throws Exception {
+    setupTypings();
 
-    {
-      // Verify a patch to a status field should throw an exception
-      try {
-        String statusPatch = statusPatchRequest
-            .replace(META_INDEX_FLAG, getMetaAlertIndex());
-        PatchRequest patchRequest = JSONUtils.INSTANCE.load(statusPatch, PatchRequest.class);
-        metaDao.patch(metaDao, patchRequest, Optional.of(System.currentTimeMillis()));
-
-        Assert.fail("A patch on the status field should throw an exception");
-      } catch (IllegalArgumentException iae) {
-        Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths.  "
-                + "Please use the add/remove alert or update status functions instead.",
-            iae.getMessage());
-      }
+    // add 2 alerts to an active meta-alert
+    List<Map<String, Object>> alerts = buildAlerts(2);
+    alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_active"));
+    alerts.get(1).put(METAALERT_FIELD, Collections.singletonList("meta_active"));
+    addRecords(alerts, getTestIndexFullName(), SENSOR_NAME);
 
-      // Verify the metaAlert was not updated
-      findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+    // create an active meta-alert
+    Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE,
+            Optional.of(Arrays.asList(alerts.get(0), alerts.get(1))));
+    addRecords(Collections.singletonList(metaAlert), getMetaAlertIndex(), METAALERT_TYPE);
+
+    // ensure the test data was loaded
+    findCreatedDocs(Arrays.asList(
+            new GetRequest("message_0", SENSOR_NAME),
+            new GetRequest("message_1", SENSOR_NAME),
+            new GetRequest("meta_alert", METAALERT_TYPE)));
+
+    // Verify a patch to a status field should throw an exception
+    try {
+      String statusPatch = statusPatchRequest.replace(META_INDEX_FLAG, getMetaAlertIndex());
+      PatchRequest patchRequest = JSONUtils.INSTANCE.load(statusPatch, PatchRequest.class);
+      metaDao.patch(metaDao, patchRequest, Optional.of(System.currentTimeMillis()));
+      Assert.fail("A patch on the status field should throw an exception");
+
+    } catch (IllegalArgumentException iae) {
+      Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths.  "
+                      + "Please use the add/remove alert or update status functions instead.",
+              iae.getMessage());
     }
+
+    // ensure the status field was NOT changed
+    assertEventually(() -> {
+      Document updated = metaDao.getLatest("meta_alert", METAALERT_TYPE);
+      Assert.assertEquals(metaAlert.get(STATUS_FIELD), updated.getDocument().get(STATUS_FIELD));
+    });
   }
 
   protected void findUpdatedDoc(Map<String, Object> message0, String guid, String sensorType)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services