You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/12/13 19:29:33 UTC
[metron] branch master updated: METRON-1879 Allow Elasticsearch to
Auto-Generate the Document ID (nickwallen) closes apache/metron#1269
This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new eb2aee1 METRON-1879 Allow Elasticsearch to Auto-Generate the Document ID (nickwallen) closes apache/metron#1269
eb2aee1 is described below
commit eb2aee1820cae811c6491d68aed32ccc055922e4
Author: nickwallen <ni...@nickallen.org>
AuthorDate: Thu Dec 13 14:29:16 2018 -0500
METRON-1879 Allow Elasticsearch to Auto-Generate the Document ID (nickwallen) closes apache/metron#1269
---
.../e2e/alerts-list/alerts-list.e2e-spec.ts | 19 +-
.../configure-table/configure-table.e2e-spec.ts | 17 +-
.../table-view/table-view.component.html | 2 +-
.../alerts-list/table-view/table-view.component.ts | 9 +-
.../alerts-list/tree-view/tree-view.component.ts | 2 +-
.../app/service/elasticsearch-localstorage-impl.ts | 2 +-
.../bulk/ElasticsearchBulkDocumentWriter.java | 17 +-
.../dao/ElasticsearchMetaAlertUpdateDao.java | 16 +-
.../dao/ElasticsearchRequestSubmitter.java | 3 +-
.../dao/ElasticsearchRetrieveLatestDao.java | 127 ++++-----
.../bulk/ElasticsearchBulkDocumentWriterTest.java | 35 ++-
.../dao/ElasticsearchRequestSubmitterTest.java | 12 +-
...ticsearchBulkDocumentWriterIntegrationTest.java | 159 ++++++++++++
.../apache/metron/indexing/dao/MultiIndexDao.java | 115 ++++++---
.../indexing/dao/metaalert/MetaAlertConstants.java | 14 +
.../metron/indexing/dao/update/Document.java | 124 ++++++---
.../metron/indexing/dao/update/UpdateDao.java | 34 ++-
.../dao/metaalert/MetaAlertIntegrationTest.java | 284 +++++++++++----------
18 files changed, 670 insertions(+), 321 deletions(-)
diff --git a/metron-interface/metron-alerts/e2e/alerts-list/alerts-list.e2e-spec.ts b/metron-interface/metron-alerts/e2e/alerts-list/alerts-list.e2e-spec.ts
index e3709ab..70f52de 100644
--- a/metron-interface/metron-alerts/e2e/alerts-list/alerts-list.e2e-spec.ts
+++ b/metron-interface/metron-alerts/e2e/alerts-list/alerts-list.e2e-spec.ts
@@ -24,9 +24,9 @@ import { loadTestData, deleteTestData } from '../utils/e2e_util';
describe('Test spec for all ui elements & list view', function() {
let page: MetronAlertsPage;
let loginPage: LoginPage;
- let columnNames = [ '', 'Score', 'id', 'timestamp', 'source:type', 'ip_src_addr', 'enrichm...:country',
+ let columnNames = [ '', 'Score', 'guid', 'timestamp', 'source:type', 'ip_src_addr', 'enrichm...:country',
'ip_dst_addr', 'host', 'alert_status', '', '', ''];
- let colNamesColumnConfig = [ 'score', 'id', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
+ let colNamesColumnConfig = [ 'score', 'guid', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
'ip_dst_addr', 'host', 'alert_status' ];
beforeAll(async function() : Promise<any> {
@@ -136,16 +136,17 @@ describe('Test spec for all ui elements & list view', function() {
});
it('should select columns from table configuration', async function() : Promise<any> {
- let newColNamesColumnConfig = [ 'score', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
- 'ip_dst_addr', 'host', 'alert_status', 'guid' ];
-
await page.clickConfigureTable();
- expect(await page.getSelectedColumnNames()).toEqual(colNamesColumnConfig, 'for default selected column names');
- await page.toggleSelectCol('id');
+ expect(await page.getSelectedColumnNames()).toEqual(colNamesColumnConfig, 'expect default selected column names');
+
+ // remove the 'guid' column and add the 'id' column
await page.toggleSelectCol('guid');
- expect(await page.getSelectedColumnNames()).toEqual(newColNamesColumnConfig, 'for guid added to selected column names');
- await page.saveConfigureColumns();
+ await page.toggleSelectCol('id');
+ let expectedColumns = [ 'score', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
+ 'ip_dst_addr', 'host', 'alert_status', 'id' ];
+ expect(await page.getSelectedColumnNames()).toEqual(expectedColumns, 'expect "id" field added and "guid" field removed from visible columns');
+ await page.saveConfigureColumns();
});
it('should have all time-range controls', async function() : Promise<any> {
diff --git a/metron-interface/metron-alerts/e2e/alerts-list/configure-table/configure-table.e2e-spec.ts b/metron-interface/metron-alerts/e2e/alerts-list/configure-table/configure-table.e2e-spec.ts
index c3636f5..39504a9 100644
--- a/metron-interface/metron-alerts/e2e/alerts-list/configure-table/configure-table.e2e-spec.ts
+++ b/metron-interface/metron-alerts/e2e/alerts-list/configure-table/configure-table.e2e-spec.ts
@@ -24,7 +24,7 @@ import {loadTestData, deleteTestData} from '../../utils/e2e_util';
describe('Test spec for table column configuration', function() {
let page: MetronAlertsPage;
let loginPage: LoginPage;
- let colNamesColumnConfig = [ 'score', 'id', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
+ let colNamesColumnConfig = [ 'score', 'guid', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
'ip_dst_addr', 'host', 'alert_status' ];
beforeAll(async function() : Promise<any> {
@@ -45,17 +45,18 @@ describe('Test spec for table column configuration', function() {
});
it('should select columns from table configuration', async function() : Promise<any> {
- let newColNamesColumnConfig = [ 'score', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
- 'ip_dst_addr', 'host', 'alert_status', 'guid' ];
-
await page.clearLocalStorage();
await page.navigateTo();
+ await page.clickConfigureTable();
+ expect(await page.getSelectedColumnNames()).toEqualBcoz(colNamesColumnConfig, 'for default selected column names');
- await page.clickConfigureTable();
- expect(await page.getSelectedColumnNames()).toEqualBcoz(colNamesColumnConfig, 'for default selected column names');
- await page.toggleSelectCol('id');
+ // remove the 'guid' column and add the 'id' column
await page.toggleSelectCol('guid');
- expect(await page.getSelectedColumnNames()).toEqualBcoz(newColNamesColumnConfig, 'for guid added to selected column names');
+ await page.toggleSelectCol('id');
+
+ let expectedColumns = [ 'score', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country',
+ 'ip_dst_addr', 'host', 'alert_status', 'id' ];
+ expect(await page.getSelectedColumnNames()).toEqualBcoz(expectedColumns, 'expect "id" field added and "guid" field removed from visible columns');
await page.saveConfigureColumns();
});
diff --git a/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.html b/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.html
index 027f57a..718a41f 100644
--- a/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.html
+++ b/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.html
@@ -61,7 +61,7 @@
<span appAlertSeverity [severity]="getScore(alert.source)"> <a> {{ hasScore(alert.source) ? getScore(alert.source) : '-' }} </a> </span>
</td>
<td [attr.colspan]="alertsColumnsToDisplay.length - 1">
- <a (click)="addFilter('guid', alert.id)" [attr.title]="alert.id" style="color:#689AA9"> {{ alert.source['name'] ? alert.source['name'] : alert.id | centerEllipses:20:cell }}</a>
+ <a (click)="addFilter('guid', alert.source['guid'])" [attr.title]="alert.source['guid']" style="color:#689AA9"> {{ alert.source['name'] ? alert.source['name'] : alert.source['guid'] | centerEllipses:20:cell }}</a>
<span> ({{ alert.source.metron_alert.length }})</span>
</td>
<td>
diff --git a/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.ts b/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.ts
index fd47b67..2190beb 100644
--- a/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.ts
+++ b/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.ts
@@ -141,14 +141,14 @@ export class TableViewComponent implements OnInit, OnChanges, OnDestroy {
onSort(sortEvent: SortEvent) {
let sortOrder = (sortEvent.sortOrder === Sort.ASC ? 'asc' : 'desc');
- let sortBy = sortEvent.sortBy === 'id' ? 'guid' : sortEvent.sortBy;
+ let sortBy = sortEvent.sortBy === 'id' ? '_uid' : sortEvent.sortBy;
this.queryBuilder.setSort(sortBy, sortOrder);
this.onRefreshData.emit(true);
}
getValue(alert: Alert, column: ColumnMetadata, formatData: boolean) {
if (column.name === 'id') {
- return this.formatValue(column, alert[column.name]);
+ return this.formatValue(column, alert['id']);
}
return this.getValueFromSource(alert.source, column, formatData);
@@ -158,9 +158,6 @@ export class TableViewComponent implements OnInit, OnChanges, OnDestroy {
let returnValue = '';
try {
switch (column.name) {
- case 'id':
- returnValue = alertSource['guid'];
- break;
case 'alert_status':
returnValue = alertSource['alert_status'] ? alertSource['alert_status'] : 'NEW';
break;
@@ -218,7 +215,7 @@ export class TableViewComponent implements OnInit, OnChanges, OnDestroy {
}
addFilter(field: string, value: string) {
- field = (field === 'id') ? 'guid' : field;
+ field = (field === 'id') ? '_id' : field;
this.onAddFilter.emit(new Filter(field, value));
}
diff --git a/metron-interface/metron-alerts/src/app/alerts/alerts-list/tree-view/tree-view.component.ts b/metron-interface/metron-alerts/src/app/alerts/alerts-list/tree-view/tree-view.component.ts
index ab1d4eb..19aaa6e 100644
--- a/metron-interface/metron-alerts/src/app/alerts/alerts-list/tree-view/tree-view.component.ts
+++ b/metron-interface/metron-alerts/src/app/alerts/alerts-list/tree-view/tree-view.component.ts
@@ -323,7 +323,7 @@ export class TreeViewComponent extends TableViewComponent implements OnInit, OnC
}
sortTreeSubGroup($event, treeGroup: TreeGroupData) {
- let sortBy = $event.sortBy === 'id' ? 'guid' : $event.sortBy;
+ let sortBy = $event.sortBy === 'id' ? '_uid' : $event.sortBy;
let sortOrder = $event.sortOrder === Sort.ASC ? 'asc' : 'desc';
let sortField = new SortField(sortBy, sortOrder);
diff --git a/metron-interface/metron-alerts/src/app/service/elasticsearch-localstorage-impl.ts b/metron-interface/metron-alerts/src/app/service/elasticsearch-localstorage-impl.ts
index 6fd4107..2c91d4c 100644
--- a/metron-interface/metron-alerts/src/app/service/elasticsearch-localstorage-impl.ts
+++ b/metron-interface/metron-alerts/src/app/service/elasticsearch-localstorage-impl.ts
@@ -45,7 +45,7 @@ export class ElasticSearchLocalstorageImpl extends DataSource {
sourceType: 'source:type';
private defaultColumnMetadata = [
- new ColumnMetadata('id', 'string'),
+ new ColumnMetadata('guid', 'string'),
new ColumnMetadata('timestamp', 'date'),
new ColumnMetadata('source:type', 'string'),
new ColumnMetadata('ip_src_addr', 'ip'),
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
index 9e6e568..7aea2fc 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
/**
@@ -120,10 +119,23 @@ public class ElasticsearchBulkDocumentWriter<D extends Document> implements Bulk
if(document.getTimestamp() == null) {
throw new IllegalArgumentException("Document must contain the timestamp");
}
+
+ // if updating an existing document, the doc ID should be defined.
+ // if creating a new document, set the doc ID to null to allow Elasticsearch to generate one.
+ String docId = document.getDocumentID().orElse(null);
+ if(LOG.isDebugEnabled() && document.getDocumentID().isPresent()) {
+ LOG.debug("Updating existing document with known doc ID; docID={}, guid={}, sensorType={}",
+ docId, document.getGuid(), document.getSensorType());
+ } else if(LOG.isDebugEnabled()) {
+ LOG.debug("Creating a new document, doc ID not yet known; guid={}, sensorType={}",
+ document.getGuid(), document.getSensorType());
+ }
+
return new IndexRequest()
.source(document.getDocument())
.type(document.getSensorType() + "_doc")
- .id(document.getGuid())
+ .index(index)
+ .id(docId)
.index(index)
.timestamp(document.getTimestamp().toString());
}
@@ -149,6 +161,7 @@ public class ElasticsearchBulkDocumentWriter<D extends Document> implements Bulk
} else {
// request succeeded
D success = getDocument(response.getItemId());
+ success.setDocumentID(response.getResponse().getId());
results.addSuccess(success);
}
}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
index 2e9c855..519e803 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
@@ -44,11 +44,13 @@ import org.apache.metron.indexing.dao.metaalert.lucene.AbstractLuceneMetaAlertUp
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.InvalidCreateException;
import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.search.SearchResult;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpdateDao {
@@ -148,8 +150,11 @@ public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpda
try {
// We need to update an alert itself. Only that portion of the update can be delegated.
// We still need to get meta alerts potentially associated with it and update.
- Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults().stream()
- .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(), MetaAlertConstants.METAALERT_TYPE, update.getTimestamp()))
+ SearchResponse response = getMetaAlertsForAlert(update.getGuid());
+ Collection<Document> metaAlerts = response
+ .getResults()
+ .stream()
+ .map(result -> toDocument(result, update.getTimestamp()))
.collect(Collectors.toList());
// Each meta alert needs to be updated with the new alert
for (Document metaAlert : metaAlerts) {
@@ -172,6 +177,13 @@ public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpda
}
}
+ private Document toDocument(SearchResult result, Long timestamp) {
+ Document document = Document.fromJSON(result.getSource());
+ document.setTimestamp(timestamp);
+ document.setDocumentID(result.getId());
+ return document;
+ }
+
@Override
public Document addCommentToAlert(CommentAddRemoveRequest request) throws IOException {
return getUpdateDao().addCommentToAlert(request);
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
index c63532e..dca74bc 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
@@ -60,7 +60,8 @@ public class ElasticsearchRequestSubmitter {
org.elasticsearch.action.search.SearchResponse esResponse;
try {
esResponse = client.getHighLevelClient().search(request);
- LOG.debug("Got Elasticsearch response; response={}", esResponse.toString());
+ LOG.debug("Got Elasticsearch response with {} hit(s); response={}",
+ esResponse.getHits().getTotalHits(), esResponse.toString());
} catch (Exception e) {
String msg = String.format(
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
index 0c91007..95d27db 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
@@ -18,8 +18,18 @@
package org.apache.metron.elasticsearch.dao;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
+import org.apache.metron.common.Constants;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.indexing.dao.RetrieveLatestDao;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -29,29 +39,23 @@ import java.util.List;
import java.util.Optional;
import java.util.function.Function;
-import org.apache.metron.elasticsearch.client.ElasticsearchClient;
-import org.apache.metron.indexing.dao.RetrieveLatestDao;
-import org.apache.metron.indexing.dao.search.GetRequest;
-import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.index.query.IdsQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
+import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
+import static org.elasticsearch.index.query.QueryBuilders.termsQuery;
+import static org.elasticsearch.index.query.QueryBuilders.typeQuery;
public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
- private ElasticsearchClient transportClient;
+ private ElasticsearchClient client;
+ private ElasticsearchRequestSubmitter submitter;
- public ElasticsearchRetrieveLatestDao(ElasticsearchClient transportClient) {
- this.transportClient = transportClient;
+ public ElasticsearchRetrieveLatestDao(ElasticsearchClient client) {
+ this.client = client;
+ this.submitter = new ElasticsearchRequestSubmitter(client);
}
@Override
public Document getLatest(String guid, String sensorType) throws IOException {
- Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit));
+ Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(hit));
return doc.orElse(null);
}
@@ -63,21 +67,7 @@ public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
guids.add(getRequest.getGuid());
sensorTypes.add(getRequest.getSensorType());
}
- List<Document> documents = searchByGuids(
- guids,
- sensorTypes,
- hit -> {
- Long ts = 0L;
- String doc = hit.getSourceAsString();
- String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
- try {
- return Optional.of(new Document(doc, hit.getId(), sourceType, ts));
- } catch (IOException e) {
- throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
- }
- }
-
- );
+ List<Document> documents = searchByGuids(guids, sensorTypes, hit -> toDocument(hit));
return documents;
}
@@ -102,54 +92,47 @@ public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
if (guids == null || guids.isEmpty()) {
return Collections.emptyList();
}
- QueryBuilder query = null;
- IdsQueryBuilder idsQuery;
- if (sensorTypes != null) {
- String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc")
- .toArray(String[]::new);
- idsQuery = QueryBuilders.idsQuery(types);
- } else {
- idsQuery = QueryBuilders.idsQuery();
- }
- for (String guid : guids) {
- query = idsQuery.addIds(guid);
+ // should match any of the guids
+ // the 'guid' field must be of type 'keyword' or this term query will not match
+ BoolQueryBuilder guidQuery = boolQuery().must(termsQuery(Constants.GUID, guids));
+
+ // should match any of the sensor types
+ BoolQueryBuilder sensorQuery = boolQuery();
+ sensorTypes.forEach(sensorType -> sensorQuery.should(typeQuery(sensorType + "_doc")));
+
+ // must have a match for both guid and sensor
+ BoolQueryBuilder query = boolQuery()
+ .must(guidQuery)
+ .must(sensorQuery);
+
+ // submit the search
+ SearchResponse response;
+ try {
+ SearchSourceBuilder source = new SearchSourceBuilder()
+ .query(query)
+ .size(guids.size());
+ SearchRequest request = new SearchRequest().source(source);
+ response = submitter.submitSearch(request);
+
+ } catch(InvalidSearchException e) {
+ throw new IOException(e);
}
- SearchRequest request = new SearchRequest();
- SearchSourceBuilder builder = new SearchSourceBuilder();
- builder.query(query);
- builder.size(guids.size());
- request.source(builder);
-
- org.elasticsearch.action.search.SearchResponse response = transportClient.getHighLevelClient().search(request);
- SearchHits hits = response.getHits();
+
+ // transform the search hits to results using the callback
List<T> results = new ArrayList<>();
- for (SearchHit hit : hits) {
+ for(SearchHit hit: response.getHits()) {
Optional<T> result = callback.apply(hit);
- if (result.isPresent()) {
- results.add(result.get());
- }
+ result.ifPresent(r -> results.add(r));
}
+
return results;
}
- private Optional<Document> toDocument(final String guid, SearchHit hit) {
- Long ts = 0L;
- String doc = hit.getSourceAsString();
- String sourceType = toSourceType(hit.getType());
- try {
- return Optional.of(new Document(doc, guid, sourceType, ts));
- } catch (IOException e) {
- throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
- }
- }
+ private Optional<Document> toDocument(SearchHit hit) {
+ Document document = Document.fromJSON(hit.getSource());
+ document.setDocumentID(hit.getId());
- /**
- * Returns the source type based on a given doc type.
- * @param docType The document type.
- * @return The source type.
- */
- private String toSourceType(String docType) {
- return Iterables.getFirst(Splitter.on("_doc").split(docType), null);
+ return Optional.of(document);
}
}
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java
index b313811..c6389d7 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java
@@ -20,6 +20,7 @@ package org.apache.metron.elasticsearch.bulk;
import org.apache.metron.common.Constants;
import org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
@@ -130,15 +131,21 @@ public class ElasticsearchBulkDocumentWriterTest {
}
private void setupElasticsearchToFail() throws IOException {
+ final String errorMessage = "error message";
+ final Exception cause = new Exception("test exception");
+ final boolean isFailed = true;
+ final int itemID = 0;
+
// define the item failure
BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class);
- when(failure.getCause()).thenReturn(new Exception("test exception"));
- when(failure.getMessage()).thenReturn("error message");
+ when(failure.getCause()).thenReturn(cause);
+ when(failure.getMessage()).thenReturn(errorMessage);
// define the item level response
BulkItemResponse itemResponse = mock(BulkItemResponse.class);
- when(itemResponse.isFailed()).thenReturn(true);
- when(itemResponse.getItemId()).thenReturn(0);
+ when(itemResponse.isFailed()).thenReturn(isFailed);
+ when(itemResponse.getItemId()).thenReturn(itemID);
+
when(itemResponse.getFailure()).thenReturn(failure);
when(itemResponse.getFailureMessage()).thenReturn("error message");
List<BulkItemResponse> itemsResponses = Collections.singletonList(itemResponse);
@@ -146,16 +153,32 @@ public class ElasticsearchBulkDocumentWriterTest {
// define the bulk response to indicate failure
BulkResponse response = mock(BulkResponse.class);
when(response.iterator()).thenReturn(itemsResponses.iterator());
- when(response.hasFailures()).thenReturn(true);
+ when(response.hasFailures()).thenReturn(isFailed);
// have the client return the mock response
when(highLevelClient.bulk(any(BulkRequest.class))).thenReturn(response);
}
private void setupElasticsearchToSucceed() throws IOException {
+ final String documentId = UUID.randomUUID().toString();
+ final boolean isFailed = false;
+ final int itemID = 0;
+
+ // the write response will contain what is used as the document ID
+ DocWriteResponse writeResponse = mock(DocWriteResponse.class);
+ when(writeResponse.getId()).thenReturn(documentId);
+
+ // define the item level response
+ BulkItemResponse itemResponse = mock(BulkItemResponse.class);
+ when(itemResponse.isFailed()).thenReturn(isFailed);
+ when(itemResponse.getItemId()).thenReturn(itemID);
+ when(itemResponse.getResponse()).thenReturn(writeResponse);
+ List<BulkItemResponse> itemsResponses = Collections.singletonList(itemResponse);
+
// define the bulk response to indicate success
BulkResponse response = mock(BulkResponse.class);
- when(response.hasFailures()).thenReturn(false);
+ when(response.iterator()).thenReturn(itemsResponses.iterator());
+ when(response.hasFailures()).thenReturn(isFailed);
// have the client return the mock response
when(highLevelClient.bulk(any(BulkRequest.class))).thenReturn(response);
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
index 7a84588..917df4d 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
@@ -27,6 +27,7 @@ import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.Index;
import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget;
import org.junit.Test;
@@ -55,15 +56,19 @@ public class ElasticsearchRequestSubmitterTest {
@Test
public void searchShouldSucceedWhenOK() throws InvalidSearchException, IOException {
-
// mocks
SearchResponse response = mock(SearchResponse.class);
SearchRequest request = new SearchRequest();
+ // response will indicate 1 search hit
+ SearchHits hits = mock(SearchHits.class);
+ when(hits.getTotalHits()).thenReturn(1L);
+
// response will have status of OK and no failed shards
when(response.status()).thenReturn(RestStatus.OK);
when(response.getFailedShards()).thenReturn(0);
when(response.getTotalShards()).thenReturn(2);
+ when(response.getHits()).thenReturn(hits);
// search should succeed
ElasticsearchRequestSubmitter submitter = setup(response);
@@ -99,9 +104,14 @@ public class ElasticsearchRequestSubmitterTest {
// response will have status of OK
when(response.status()).thenReturn(RestStatus.OK);
+ // response will indicate 1 search hit
+ SearchHits hits = mock(SearchHits.class);
+ when(hits.getTotalHits()).thenReturn(1L);
+
// the response will report shard failures
when(response.getFailedShards()).thenReturn(1);
when(response.getTotalShards()).thenReturn(2);
+ when(response.getHits()).thenReturn(hits);
// the response will return the failures
ShardSearchFailure[] failures = { fail };
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchBulkDocumentWriterIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchBulkDocumentWriterIntegrationTest.java
new file mode 100644
index 0000000..df4aeb0
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchBulkDocumentWriterIntegrationTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.integration;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
+import org.apache.metron.elasticsearch.dao.ElasticsearchRetrieveLatestDao;
+import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.Response;
+import org.hamcrest.CoreMatchers;
+import org.json.simple.JSONObject;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class ElasticsearchBulkDocumentWriterIntegrationTest {
+
+ @ClassRule
+ public static TemporaryFolder indexDir = new TemporaryFolder();
+ private static String broTemplatePath = "../../metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template";
+ private static ElasticSearchComponent elasticsearch;
+ private ElasticsearchClient client;
+ private ElasticsearchBulkDocumentWriter<Document> writer;
+ private ElasticsearchRetrieveLatestDao retrieveDao;
+
+ @BeforeClass
+ public static void setupElasticsearch() throws Exception {
+ AccessConfig accessConfig = new AccessConfig();
+ accessConfig.setGlobalConfigSupplier(() -> globals());
+
+ elasticsearch = new ElasticSearchComponent.Builder()
+ .withHttpPort(9211)
+ .withIndexDir(indexDir.getRoot())
+ .withAccessConfig(accessConfig)
+ .build();
+ elasticsearch.start();
+ }
+
+ @AfterClass
+ public static void tearDownElasticsearch() {
+ if(elasticsearch != null) {
+ elasticsearch.stop();
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ client = ElasticsearchClientFactory.create(globals());
+ retrieveDao = new ElasticsearchRetrieveLatestDao(client);
+ writer = new ElasticsearchBulkDocumentWriter<>(client)
+ .withRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
+
+ // add bro template
+ JSONObject broTemplate = JSONUtils.INSTANCE.load(new File(broTemplatePath), JSONObject.class);
+ String broTemplateJson = JSONUtils.INSTANCE.toJSON(broTemplate, true);
+ HttpEntity broEntity = new NStringEntity(broTemplateJson, ContentType.APPLICATION_JSON);
+ Response response = client
+ .getLowLevelClient()
+ .performRequest("PUT", "/_template/bro_template", Collections.emptyMap(), broEntity);
+ assertThat(response.getStatusLine().getStatusCode(), CoreMatchers.equalTo(200));
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if(client != null) {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ // create some documents to write
+ List<Document> documents = new ArrayList<>();
+ for(int i=0; i<10; i++) {
+ Document document = Document.fromJSON(createMessage());
+ documents.add(document);
+ }
+
+ // write the documents
+ for(Document doc: documents) {
+ writer.addDocument(doc, "bro_index");
+ }
+ writer.write();
+
+ // ensure the documents were written
+ for(Document expected: documents) {
+ Document actual = retrieveDao.getLatest(expected.getGuid(), expected.getSensorType());
+ assertNotNull("No document found", actual);
+ assertEquals(expected.getGuid(), actual.getGuid());
+ assertEquals(expected.getSensorType(), actual.getSensorType());
+ assertEquals(expected.getDocument(), actual.getDocument());
+ assertTrue(actual.getDocumentID().isPresent());
+
+ // the document ID and GUID should not be the same, since the document ID was auto-generated
+ assertNotEquals(actual.getDocument(), actual.getGuid());
+ }
+ }
+
+ private static Map<String, Object> globals() {
+ Map<String, Object> globals = new HashMap<>();
+ globals.put("es.clustername", "metron");
+ globals.put("es.ip", "localhost");
+ globals.put("es.port", "9200");
+ globals.put("es.date.format", "yyyy.MM.dd.HH");
+ return globals;
+ }
+
+ private JSONObject createMessage() {
+ JSONObject message = new JSONObject();
+ message.put(Constants.GUID, UUID.randomUUID().toString());
+ message.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis());
+ message.put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
+ message.put("source:type", "bro");
+ return message;
+ }
+}
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
index c3e2108..09f7df9 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
@@ -20,14 +20,7 @@ package org.apache.metron.indexing.dao;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
+import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.metron.indexing.dao.search.FieldType;
import org.apache.metron.indexing.dao.search.GetRequest;
@@ -38,11 +31,25 @@ import org.apache.metron.indexing.dao.search.SearchRequest;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
public class MultiIndexDao implements IndexDao {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private List<IndexDao> indices;
- public MultiIndexDao( IndexDao... composedDao) {
+ public MultiIndexDao(IndexDao... composedDao) {
indices = new ArrayList<>();
Collections.addAll(indices, composedDao);
}
@@ -117,18 +124,30 @@ public class MultiIndexDao implements IndexDao {
*/
@Override
public Document addCommentToAlert(CommentAddRemoveRequest request, Document latest) throws IOException {
- List<DocumentContainer> output =
- indices.parallelStream().map(dao -> {
- try {
- return new DocumentContainer(dao.addCommentToAlert(request, latest));
- } catch (Throwable e) {
- return new DocumentContainer(e);
- }
- }).collect(Collectors.toList());
-
+ List<DocumentContainer> output = indices
+ .parallelStream()
+ .map(dao -> addCommentToAlert(dao, request, latest))
+ .collect(Collectors.toList());
return getLatestDocument(output);
}
+ private DocumentContainer addCommentToAlert(IndexDao indexDao, CommentAddRemoveRequest request, Document latest) {
+ DocumentContainer container;
+ try {
+ Document document = indexDao.addCommentToAlert(request, latest);
+ container = new DocumentContainer(document);
+ LOG.debug("Added comment to alert; indexDao={}, guid={}, sensorType={}, document={}",
+ ClassUtils.getShortClassName(indexDao.getClass()), document.getGuid(), document.getSensorType(), document);
+
+ } catch (Throwable e) {
+ container = new DocumentContainer(e);
+ LOG.error("Unable to add comment to alert; indexDao={}, error={}",
+ ClassUtils.getShortClassName(indexDao.getClass()), ExceptionUtils.getRootCauseMessage(e));
+ }
+
+ return container;
+ }
+
@Override
public Document removeCommentFromAlert(CommentAddRemoveRequest request) throws IOException {
Document latest = getLatest(request.getGuid(), request.getSensorType());
@@ -145,18 +164,30 @@ public class MultiIndexDao implements IndexDao {
*/
@Override
public Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest) throws IOException {
- List<DocumentContainer> output =
- indices.parallelStream().map(dao -> {
- try {
- return new DocumentContainer(dao.removeCommentFromAlert(request, latest));
- } catch (Throwable e) {
- return new DocumentContainer(e);
- }
- }).collect(Collectors.toList());
-
+ List<DocumentContainer> output = indices
+ .parallelStream()
+ .map(dao -> removeCommentFromAlert(dao, request, latest))
+ .collect(Collectors.toList());
return getLatestDocument(output);
}
+ private DocumentContainer removeCommentFromAlert(IndexDao indexDao, CommentAddRemoveRequest request, Document latest) {
+ DocumentContainer container;
+ try {
+ Document document = indexDao.removeCommentFromAlert(request, latest);
+ container = new DocumentContainer(document);
+ LOG.debug("Removed comment from alert; indexDao={}, guid={}, sensorType={}, document={}",
+ ClassUtils.getShortClassName(indexDao.getClass()), document.getGuid(), document.getSensorType(), document);
+
+ } catch (Throwable e) {
+ container = new DocumentContainer(e);
+ LOG.error("Unable to remove comment from alert; indexDao={}, error={}",
+ ClassUtils.getShortClassName(indexDao.getClass()), ExceptionUtils.getRootCauseMessage(e));
+ }
+
+ return container;
+ }
+
protected static class DocumentContainer {
private Optional<Document> d = Optional.empty();
private Optional<Throwable> t = Optional.empty();
@@ -226,18 +257,30 @@ public class MultiIndexDao implements IndexDao {
@Override
public Document getLatest(final String guid, String sensorType) throws IOException {
- List<DocumentContainer> output =
- indices.parallelStream().map(dao -> {
- try {
- return new DocumentContainer(dao.getLatest(guid, sensorType));
- } catch (Throwable e) {
- return new DocumentContainer(e);
- }
- }).collect(Collectors.toList());
-
+ List<DocumentContainer> output = indices
+ .parallelStream()
+ .map(dao -> getLatest(dao, guid, sensorType))
+ .collect(Collectors.toList());
return getLatestDocument(output);
}
+ private DocumentContainer getLatest(IndexDao indexDao, String guid, String sensorType) {
+ DocumentContainer container;
+ try {
+ Document document = indexDao.getLatest(guid, sensorType);
+ container = new DocumentContainer(document);
+ LOG.debug("Found latest document; indexDao={}, guid={}, sensorType={}, document={}",
+ ClassUtils.getShortClassName(indexDao.getClass()), guid, sensorType, document);
+
+ } catch (Throwable e) {
+ container = new DocumentContainer(e);
+ LOG.error("Unable to find latest document; indexDao={}, error={}",
+ ClassUtils.getShortClassName(indexDao.getClass()), ExceptionUtils.getRootCauseMessage(e));
+ }
+
+ return container;
+ }
+
@Override
public Iterable<Document> getAllLatest(
List<GetRequest> getRequests) throws IOException {
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertConstants.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertConstants.java
index daa5424..b473200 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertConstants.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertConstants.java
@@ -20,10 +20,24 @@ package org.apache.metron.indexing.dao.metaalert;
public class MetaAlertConstants {
public static String METAALERT_TYPE = "metaalert";
+
+ /**
+ * The name of the field in an alert that contains a list
+ * of GUIDs of all meta-alerts the alert is associated with.
+ *
+ * <p>Only standard, non-metaalerts will have this field.
+ */
public static String METAALERT_FIELD = "metaalerts";
public static String METAALERT_DOC = METAALERT_TYPE + "_doc";
public static String THREAT_FIELD_DEFAULT = "threat:triage:score";
public static String THREAT_SORT_DEFAULT = "sum";
+
+ /**
+ * The name of the field in a meta-alert that contains a list of
+ * all alerts associated with the meta-alert.
+ *
+ * <p>Only meta-alerts will have this field.
+ */
public static String ALERT_FIELD = "metron_alert";
public static String STATUS_FIELD = "status";
public static String GROUPS_FIELD = "groups";
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java
index 3686b19..0a028e5 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java
@@ -21,19 +21,39 @@ package org.apache.metron.indexing.dao.update;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.metron.common.Constants.Fields.TIMESTAMP;
+import static org.apache.metron.common.Constants.GUID;
+import static org.apache.metron.common.Constants.SENSOR_TYPE;
import org.apache.metron.common.utils.JSONUtils;
public class Document {
+
Long timestamp;
Map<String, Object> document;
String guid;
String sensorType;
+ String documentID;
+
+ public static Document fromJSON(Map<String, Object> json) {
+ String guid = getGUID(json);
+ Long timestamp = getTimestamp(json).orElse(0L);
+ String sensorType = getSensorType(json);
+ return new Document(json, guid, sensorType, timestamp);
+ }
public Document(Map<String, Object> document, String guid, String sensorType, Long timestamp) {
+ this(document, guid, sensorType, timestamp, null);
+ }
+
+ public Document(Map<String, Object> document, String guid, String sensorType, Long timestamp, String documentID) {
setDocument(document);
setGuid(guid);
setTimestamp(timestamp);
setSensorType(sensorType);
+ setDocumentID(documentID);
}
public Document(String document, String guid, String sensorType, Long timestamp) throws IOException {
@@ -41,7 +61,7 @@ public class Document {
}
public Document(String document, String guid, String sensorType) throws IOException {
- this( document, guid, sensorType, null);
+ this(document, guid, sensorType, null);
}
/**
@@ -49,8 +69,11 @@ public class Document {
* @param other The document to be copied.
*/
public Document(Document other) {
- this(new HashMap<>(other.getDocument()), other.getGuid(), other.getSensorType(),
- other.getTimestamp());
+ this(new HashMap<>(other.getDocument()),
+ other.getGuid(),
+ other.getSensorType(),
+ other.getTimestamp(),
+ other.getDocumentID().orElse(null));
}
private static Map<String, Object> convertDoc(String document) throws IOException {
@@ -89,46 +112,83 @@ public class Document {
this.guid = guid;
}
- @Override
- public String toString() {
- return "Document{" +
- "timestamp=" + timestamp +
- ", document=" + document +
- ", guid='" + guid + '\'' +
- ", sensorType='" + sensorType + '\'' +
- '}';
+ /**
+ * Returns the unique identifier that is used when persisting this document.
+ *
+ * <p>This value will be different than the Metron guid.
+ *
+ * <p>Only present when a document has been retrieved from a store
+ * that supports a document ID, like Elasticsearch. This value will
+ * not be present when retrieved from HBase.
+ */
+ public Optional<String> getDocumentID() {
+ return Optional.ofNullable(documentID);
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
+ public void setDocumentID(Optional<String> documentID) {
+ this.documentID = documentID.orElse(null);
+ }
+
+ public void setDocumentID(String documentID) {
+ this.documentID = documentID;
+ }
+
+ private static Optional<Long> getTimestamp(Map<String, Object> document) {
+ Object value = document.get(TIMESTAMP.getName());
+ if(value != null && value instanceof Long) {
+ return Optional.of(Long.class.cast(value));
}
- if (o == null || getClass() != o.getClass()) {
- return false;
+ return Optional.empty();
+ }
+
+ private static String getGUID(Map<String, Object> document) {
+ Object value = document.get(GUID);
+ if(value != null && value instanceof String) {
+ return String.class.cast(value);
}
- Document document1 = (Document) o;
+ throw new IllegalStateException(String.format("Missing '%s' field", GUID));
+ }
- if (timestamp != null ? !timestamp.equals(document1.timestamp) : document1.timestamp != null) {
- return false;
- }
- if (document != null ? !document.equals(document1.document) : document1.document != null) {
- return false;
+ private static String getSensorType(Map<String, Object> document) {
+ Object value = document.get(SENSOR_TYPE);
+ if(value != null && value instanceof String) {
+ return String.class.cast(value);
}
- if (guid != null ? !guid.equals(document1.guid) : document1.guid != null) {
- return false;
+
+ value = document.get(SENSOR_TYPE.replace(".", ":"));
+ if(value != null && value instanceof String) {
+ return String.class.cast(value);
}
- return sensorType != null ? sensorType.equals(document1.sensorType)
- : document1.sensorType == null;
+
+ throw new IllegalStateException(String.format("Missing '%s' field", SENSOR_TYPE));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof Document)) return false;
+ Document document1 = (Document) o;
+ return Objects.equals(timestamp, document1.timestamp) &&
+ Objects.equals(document, document1.document) &&
+ Objects.equals(guid, document1.guid) &&
+ Objects.equals(sensorType, document1.sensorType) &&
+ Objects.equals(documentID, document1.documentID);
}
@Override
public int hashCode() {
- int result = timestamp != null ? timestamp.hashCode() : 0;
- result = 31 * result + (document != null ? document.hashCode() : 0);
- result = 31 * result + (guid != null ? guid.hashCode() : 0);
- result = 31 * result + (sensorType != null ? sensorType.hashCode() : 0);
- return result;
+ return Objects.hash(timestamp, document, guid, sensorType, documentID);
+ }
+
+ @Override
+ public String toString() {
+ return "Document{" +
+ "timestamp=" + timestamp +
+ ", document=" + document +
+ ", guid='" + guid + '\'' +
+ ", sensorType='" + sensorType + '\'' +
+ ", documentID=" + documentID +
+ '}';
}
}
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java
index 82f0a49..ef1d298 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.util.Map;
import java.util.Optional;
+import static java.lang.String.format;
+
public interface UpdateDao {
/**
@@ -55,7 +57,6 @@ public interface UpdateDao {
Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest) throws IOException;
-
/**
* Update a document in an index given a JSON Patch (see RFC 6902 at
* https://tools.ietf.org/html/rfc6902)
@@ -73,23 +74,28 @@ public interface UpdateDao {
}
default Document getPatchedDocument(RetrieveLatestDao retrieveLatestDao, PatchRequest request,
- Optional<Long> timestamp
+ Optional<Long> optionalTimestamp
) throws OriginalNotFoundException, IOException {
- Map<String, Object> latest = request.getSource();
- if (latest == null) {
- Document latestDoc = retrieveLatestDao.getLatest(request.getGuid(), request.getSensorType());
- if (latestDoc != null && latestDoc.getDocument() != null) {
- latest = latestDoc.getDocument();
+ String guid = request.getGuid();
+ String sensorType = request.getSensorType();
+ String documentID = null;
+ Long timestamp = optionalTimestamp.orElse(System.currentTimeMillis());
+
+ Map<String, Object> originalSource = request.getSource();
+ if (originalSource == null) {
+ // no document source provided, lookup the latest
+ Document toPatch = retrieveLatestDao.getLatest(guid, sensorType);
+ if(toPatch != null && toPatch.getDocument() != null) {
+ originalSource = toPatch.getDocument();
+ documentID = toPatch.getDocumentID().orElse(null);
+
} else {
- throw new OriginalNotFoundException(
- "Unable to patch an document that doesn't exist and isn't specified.");
+ String error = format("Document does not exist, but is required; guid=%s, sensorType=%s", guid, sensorType);
+ throw new OriginalNotFoundException(error);
}
}
- Map<String, Object> updated = JSONUtils.INSTANCE.applyPatch(request.getPatch(), latest);
- return new Document(updated,
- request.getGuid(),
- request.getSensorType(),
- timestamp.orElse(System.currentTimeMillis()));
+ Map<String, Object> patchedSource = JSONUtils.INSTANCE.applyPatch(request.getPatch(), originalSource);
+ return new Document(patchedSource, guid, sensorType, timestamp, documentID);
}
}
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
index f1355a6..5a18fc5 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
@@ -55,6 +55,7 @@ import org.apache.metron.indexing.dao.search.SortOrder;
import org.apache.metron.indexing.dao.update.Document;
import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
import org.apache.metron.indexing.dao.update.PatchRequest;
+import org.apache.metron.integration.utils.TestUtils;
import org.json.simple.parser.ParseException;
import org.junit.Assert;
import org.junit.Test;
@@ -231,8 +232,8 @@ public abstract class MetaAlertIntegrationTest {
SearchResponse result = metaDao.search(sr);
List<SearchResult> results = result.getResults();
Assert.assertEquals(2, results.size());
- Assert.assertEquals("meta_active_0", results.get((0)).getId());
- Assert.assertEquals("message_1", results.get((1)).getId());
+ Assert.assertEquals("meta_active_0", results.get((0)).getSource().get(Constants.GUID));
+ Assert.assertEquals("message_1", results.get((1)).getSource().get(Constants.GUID));
// Test ascending
SortField sfAsc = new SortField();
@@ -245,8 +246,8 @@ public abstract class MetaAlertIntegrationTest {
srAsc.setSort(Collections.singletonList(sfAsc));
result = metaDao.search(srAsc);
results = result.getResults();
- Assert.assertEquals("message_1", results.get((0)).getId());
- Assert.assertEquals("meta_active_0", results.get((1)).getId());
+ Assert.assertEquals("message_1", results.get((0)).getSource().get(Constants.GUID));
+ Assert.assertEquals("meta_active_0", results.get((1)).getSource().get(Constants.GUID));
Assert.assertEquals(2, results.size());
}
@@ -856,92 +857,72 @@ public abstract class MetaAlertIntegrationTest {
@Test
public abstract void shouldSearchByNestedAlert() throws Exception;
- @SuppressWarnings("unchecked")
+ /**
+ * If a meta-alert is active, any updates to alerts associated with a meta-alert
+ * should be reflected in both the original alert and the copy contained within
+ * the meta-alert.
+ */
@Test
public void shouldUpdateMetaAlertOnAlertUpdate() throws Exception {
- // Load alerts
- List<Map<String, Object>> alerts = buildAlerts(2);
- alerts.get(0).put(METAALERT_FIELD, Arrays.asList("meta_active", "meta_inactive"));
- addRecords(alerts, getTestIndexFullName(), SENSOR_NAME);
-
- // Load metaAlerts
- Map<String, Object> activeMetaAlert = buildMetaAlert("meta_active", MetaAlertStatus.ACTIVE,
- Optional.of(Collections.singletonList(alerts.get(0))));
- Map<String, Object> inactiveMetaAlert = buildMetaAlert("meta_inactive",
- MetaAlertStatus.INACTIVE,
- Optional.of(Collections.singletonList(alerts.get(0))));
- // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
- addRecords(Arrays.asList(activeMetaAlert, inactiveMetaAlert), getMetaAlertIndex(),
- METAALERT_TYPE);
-
- // Verify load was successful
- findCreatedDocs(Arrays.asList(
- new GetRequest("message_0", SENSOR_NAME),
- new GetRequest("message_1", SENSOR_NAME),
- new GetRequest("meta_active", METAALERT_TYPE),
- new GetRequest("meta_inactive", METAALERT_TYPE)));
-
+ final String expectedFieldValue = "metron";
{
- // Modify the first message and add a new field
- Map<String, Object> message0 = new HashMap<String, Object>(alerts.get(0)) {
- {
- put(NEW_FIELD, "metron");
- put(THREAT_FIELD_DEFAULT, 10.0d);
- }
- };
- String guid = "" + message0.get(Constants.GUID);
- metaDao.update(new Document(message0, guid, SENSOR_NAME, null),
- Optional.of(getTestIndexFullName()));
-
- {
- // Verify alerts are up-to-date
- findUpdatedDoc(message0, guid, SENSOR_NAME);
- long cnt = getMatchingAlertCount(NEW_FIELD, message0.get(NEW_FIELD));
- if (cnt == 0) {
- Assert.fail("Alert not updated!");
- }
- }
-
- {
- // Verify meta alerts are up-to-date
- long cnt = getMatchingMetaAlertCount(NEW_FIELD, "metron");
- if (cnt == 0) {
- Assert.fail("Active metaalert was not updated!");
- }
- if (cnt != 1) {
- Assert.fail("Metaalerts not updated correctly!");
- }
- }
+ // create 2 'regular' alerts that will be associated with meta-alerts
+ List<Map<String, Object>> alerts = buildAlerts(2);
+ alerts.get(0).put(METAALERT_FIELD, Arrays.asList("meta_active", "meta_inactive"));
+ addRecords(alerts, getTestIndexFullName(), SENSOR_NAME);
+
+ // the active meta-alert should be updated when an associated alert is updated
+ Map<String, Object> activeMetaAlert = buildMetaAlert("meta_active", MetaAlertStatus.ACTIVE,
+ Optional.of(Collections.singletonList(alerts.get(0))));
+
+ // the inactive meta-alert should NOT be updated when an associated alert is updated
+ Map<String, Object> inactiveMetaAlert = buildMetaAlert("meta_inactive", MetaAlertStatus.INACTIVE,
+ Optional.of(Collections.singletonList(alerts.get(0))));
+
+ // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
+ addRecords(Arrays.asList(activeMetaAlert, inactiveMetaAlert), getMetaAlertIndex(), METAALERT_TYPE);
+
+ // Verify load was successful
+ findCreatedDocs(Arrays.asList(
+ new GetRequest("message_0", SENSOR_NAME),
+ new GetRequest("message_1", SENSOR_NAME),
+ new GetRequest("meta_active", METAALERT_TYPE),
+ new GetRequest("meta_inactive", METAALERT_TYPE)));
}
- //modify the same message and modify the new field
{
- Map<String, Object> message0 = new HashMap<String, Object>(alerts.get(0)) {
- {
- put(NEW_FIELD, "metron2");
- }
- };
- String guid = "" + message0.get(Constants.GUID);
- metaDao.update(new Document(message0, guid, SENSOR_NAME, null), Optional.empty());
-
- {
- // Verify index is up-to-date
- findUpdatedDoc(message0, guid, SENSOR_NAME);
- long cnt = getMatchingAlertCount(NEW_FIELD, message0.get(NEW_FIELD));
- if (cnt == 0) {
- Assert.fail("Alert not updated!");
- }
- }
- {
- // Verify meta alerts are up-to-date
- long cnt = getMatchingMetaAlertCount(NEW_FIELD, "metron2");
- if (cnt == 0) {
- Assert.fail("Active metaalert was not updated!");
- }
- if (cnt != 1) {
- Assert.fail("Metaalerts not updated correctly!");
- }
- }
+ // modify the 'normal' alert by adding a field
+ Document message0 = metaDao.getLatest("message_0", SENSOR_NAME);
+ message0.getDocument().put(NEW_FIELD, expectedFieldValue);
+ message0.getDocument().put(THREAT_FIELD_DEFAULT, 10.0d);
+ metaDao.update(message0, Optional.of(getTestIndexFullName()));
}
+
+ // ensure the original 'normal' alert was itself updated
+ assertEventually(() -> {
+ Document message0 = metaDao.getLatest("message_0", SENSOR_NAME);
+ Assert.assertNotNull(message0);
+ Assert.assertEquals(expectedFieldValue, message0.getDocument().get(NEW_FIELD));
+ });
+
+ // the 'active' meta-alert, which contains a copy of the updated alert should also be updated
+ assertEventually(() -> {
+ Document active = metaDao.getLatest("meta_active", METAALERT_TYPE);
+ Object value = active.getDocument().get(ALERT_FIELD);
+ List<Map<String, Object>> children = List.class.cast(value);
+ Assert.assertNotNull(children);
+ Assert.assertEquals(1, children.size());
+ Assert.assertEquals(expectedFieldValue, children.get(0).get(NEW_FIELD));
+ });
+
+ // the 'inactive' meta-alert, which contains a copy of the updated alert should NOT be updated
+ assertEventually(() -> {
+ Document inactive = metaDao.getLatest("meta_inactive", METAALERT_TYPE);
+ Object value = inactive.getDocument().get(ALERT_FIELD);
+ List<Map<String, Object>> children = List.class.cast(value);
+ Assert.assertNotNull(children);
+ Assert.assertEquals(1, children.size());
+ Assert.assertFalse(children.get(0).containsKey(NEW_FIELD));
+ });
}
@Test
@@ -957,7 +938,7 @@ public abstract class MetaAlertIntegrationTest {
}
@Test
- public void shouldPatchAllowedMetaAlerts() throws Exception {
+ public void shouldPatchMetaAlertFields() throws Exception {
// Load alerts
List<Map<String, Object>> alerts = buildAlerts(2);
alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_active"));
@@ -969,63 +950,108 @@ public abstract class MetaAlertIntegrationTest {
// Load metaAlerts
Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE,
- Optional.of(Arrays.asList(alerts.get(0), alerts.get(1))));
+ Optional.of(Arrays.asList(alerts.get(0), alerts.get(1))));
// We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
addRecords(Collections.singletonList(metaAlert), getMetaAlertIndex(), METAALERT_TYPE);
- // Verify load was successful
+ // ensure the test data was loaded
findCreatedDocs(Arrays.asList(
- new GetRequest("message_0", SENSOR_NAME),
- new GetRequest("message_1", SENSOR_NAME),
- new GetRequest("meta_alert", METAALERT_TYPE)));
+ new GetRequest("message_0", SENSOR_NAME),
+ new GetRequest("message_1", SENSOR_NAME),
+ new GetRequest("meta_alert", METAALERT_TYPE)));
- Map<String, Object> expectedMetaAlert = new HashMap<>(metaAlert);
- expectedMetaAlert.put(NAME_FIELD, "New Meta Alert");
- {
- // Verify a patch to a field other than "status" or "alert" can be patched
- String namePatch = namePatchRequest.replace(META_INDEX_FLAG, getMetaAlertIndex());
- PatchRequest patchRequest = JSONUtils.INSTANCE.load(namePatch, PatchRequest.class);
+ // patch the name field
+ String namePatch = namePatchRequest.replace(META_INDEX_FLAG, getMetaAlertIndex());
+ PatchRequest patchRequest = JSONUtils.INSTANCE.load(namePatch, PatchRequest.class);
+ metaDao.patch(metaDao, patchRequest, Optional.of(System.currentTimeMillis()));
+
+ // ensure the alert was patched
+ assertEventually(() -> {
+ Document updated = metaDao.getLatest("meta_alert", METAALERT_TYPE);
+ Assert.assertEquals("New Meta Alert", updated.getDocument().get(NAME_FIELD));
+ });
+ }
+
+ @Test
+ public void shouldThrowExceptionIfPatchAlertField() throws Exception {
+ setupTypings();
+
+ // add 2 alerts to an active meta-alert
+ List<Map<String, Object>> alerts = buildAlerts(2);
+ alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_active"));
+ alerts.get(1).put(METAALERT_FIELD, Collections.singletonList("meta_active"));
+ addRecords(alerts, getTestIndexFullName(), SENSOR_NAME);
+
+ // create an active meta-alert
+ Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE,
+ Optional.of(Arrays.asList(alerts.get(0), alerts.get(1))));
+ addRecords(Collections.singletonList(metaAlert), getMetaAlertIndex(), METAALERT_TYPE);
+
+ // ensure the test data was loaded
+ findCreatedDocs(Arrays.asList(
+ new GetRequest("message_0", SENSOR_NAME),
+ new GetRequest("message_1", SENSOR_NAME),
+ new GetRequest("meta_alert", METAALERT_TYPE)));
+
+ // attempt to patch the alert field
+ try {
+ String alertPatch = alertPatchRequest.replace(META_INDEX_FLAG, getMetaAlertIndex());
+ PatchRequest patchRequest = JSONUtils.INSTANCE.load(alertPatch, PatchRequest.class);
metaDao.patch(metaDao, patchRequest, Optional.of(System.currentTimeMillis()));
+ Assert.fail("A patch on the alert field should throw an exception");
- findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+ } catch (IllegalArgumentException iae) {
+ Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths. "
+ + "Please use the add/remove alert or update status functions instead.",
+ iae.getMessage());
}
- {
- // Verify a patch to an alert field should throw an exception
- try {
- String alertPatch = alertPatchRequest.replace(META_INDEX_FLAG, getMetaAlertIndex());
- PatchRequest patchRequest = JSONUtils.INSTANCE.load(alertPatch, PatchRequest.class);
- metaDao.patch(metaDao, patchRequest, Optional.of(System.currentTimeMillis()));
-
- Assert.fail("A patch on the alert field should throw an exception");
- } catch (IllegalArgumentException iae) {
- Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths. "
- + "Please use the add/remove alert or update status functions instead.",
- iae.getMessage());
- }
+ // ensure the alert field was NOT changed
+ assertEventually(() -> {
+ Document updated = metaDao.getLatest("meta_alert", METAALERT_TYPE);
+ Assert.assertEquals(metaAlert.get(ALERT_FIELD), updated.getDocument().get(ALERT_FIELD));
+ });
+ }
- // Verify the metaAlert was not updated
- findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
- }
+ @Test
+ public void shouldThrowExceptionIfPatchStatusField() throws Exception {
+ setupTypings();
- {
- // Verify a patch to a status field should throw an exception
- try {
- String statusPatch = statusPatchRequest
- .replace(META_INDEX_FLAG, getMetaAlertIndex());
- PatchRequest patchRequest = JSONUtils.INSTANCE.load(statusPatch, PatchRequest.class);
- metaDao.patch(metaDao, patchRequest, Optional.of(System.currentTimeMillis()));
-
- Assert.fail("A patch on the status field should throw an exception");
- } catch (IllegalArgumentException iae) {
- Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths. "
- + "Please use the add/remove alert or update status functions instead.",
- iae.getMessage());
- }
+ // add 2 alerts to an active meta-alert
+ List<Map<String, Object>> alerts = buildAlerts(2);
+ alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_active"));
+ alerts.get(1).put(METAALERT_FIELD, Collections.singletonList("meta_active"));
+ addRecords(alerts, getTestIndexFullName(), SENSOR_NAME);
- // Verify the metaAlert was not updated
- findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE);
+ // create an active meta-alert
+ Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE,
+ Optional.of(Arrays.asList(alerts.get(0), alerts.get(1))));
+ addRecords(Collections.singletonList(metaAlert), getMetaAlertIndex(), METAALERT_TYPE);
+
+ // ensure the test data was loaded
+ findCreatedDocs(Arrays.asList(
+ new GetRequest("message_0", SENSOR_NAME),
+ new GetRequest("message_1", SENSOR_NAME),
+ new GetRequest("meta_alert", METAALERT_TYPE)));
+
+ // Verify a patch to a status field should throw an exception
+ try {
+ String statusPatch = statusPatchRequest.replace(META_INDEX_FLAG, getMetaAlertIndex());
+ PatchRequest patchRequest = JSONUtils.INSTANCE.load(statusPatch, PatchRequest.class);
+ metaDao.patch(metaDao, patchRequest, Optional.of(System.currentTimeMillis()));
+ Assert.fail("A patch on the status field should throw an exception");
+
+ } catch (IllegalArgumentException iae) {
+ Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths. "
+ + "Please use the add/remove alert or update status functions instead.",
+ iae.getMessage());
}
+
+ // ensure the status field was NOT changed
+ assertEventually(() -> {
+ Document updated = metaDao.getLatest("meta_alert", METAALERT_TYPE);
+ Assert.assertEquals(metaAlert.get(STATUS_FIELD), updated.getDocument().get(STATUS_FIELD));
+ });
}
protected void findUpdatedDoc(Map<String, Object> message0, String guid, String sensorType)