You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2017/11/16 01:35:48 UTC
[3/3] metron git commit: METRON-1289 Alert fields are lost when a
MetaAlert is created (merrimanr) closes apache/metron#824
METRON-1289 Alert fields are lost when a MetaAlert is created (merrimanr) closes apache/metron#824
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/fd896fbe
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/fd896fbe
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/fd896fbe
Branch: refs/heads/master
Commit: fd896fbebe9d5e77eb11d1ce953ab2b55cc84387
Parents: c4c930f
Author: merrimanr <me...@gmail.com>
Authored: Wed Nov 15 19:35:18 2017 -0600
Committer: merrimanr <me...@apache.org>
Committed: Wed Nov 15 19:35:18 2017 -0600
----------------------------------------------------------------------
metron-interface/metron-rest/README.md | 36 +-
.../apache/metron/rest/config/IndexConfig.java | 3 +-
.../rest/controller/MetaAlertController.java | 48 +-
.../metron/rest/service/MetaAlertService.java | 10 +
.../rest/service/impl/MetaAlertServiceImpl.java | 31 +
.../rest/service/impl/SearchServiceImpl.java | 18 +-
.../MetaAlertControllerIntegrationTest.java | 120 +-
.../UpdateControllerIntegrationTest.java | 5 +-
.../org/apache/metron/common/utils/KeyUtil.java | 50 +
.../hbase/HBaseEnrichmentConverterTest.java | 21 +
.../elasticsearch/dao/ElasticsearchDao.java | 115 +-
.../dao/ElasticsearchMetaAlertDao.java | 717 +++++-----
.../elasticsearch/dao/MetaAlertStatus.java | 34 -
.../dao/ElasticsearchMetaAlertDaoTest.java | 304 +---
.../ElasticsearchMetaAlertIntegrationTest.java | 1301 ++++++++++--------
.../ElasticsearchUpdateIntegrationTest.java | 4 +-
.../enrichment/converter/EnrichmentKey.java | 23 +-
metron-platform/metron-indexing/README.md | 17 +-
metron-platform/metron-indexing/pom.xml | 7 +
.../apache/metron/indexing/dao/HBaseDao.java | 128 +-
.../apache/metron/indexing/dao/IndexDao.java | 38 +-
.../metron/indexing/dao/MetaAlertDao.java | 91 +-
.../metron/indexing/dao/MultiIndexDao.java | 54 +
.../metaalert/MetaAlertAddRemoveRequest.java | 41 +
.../dao/metaalert/MetaAlertCreateRequest.java | 14 +-
.../indexing/dao/metaalert/MetaAlertStatus.java | 34 +
.../metron/indexing/dao/search/GetRequest.java | 35 +-
.../apache/metron/indexing/dao/InMemoryDao.java | 25 +-
.../indexing/dao/InMemoryMetaAlertDao.java | 96 +-
.../indexing/dao/SearchIntegrationTest.java | 32 +-
.../integration/HBaseDaoIntegrationTest.java | 164 +++
31 files changed, 2219 insertions(+), 1397 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/README.md
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md
index b79b44d..724239b 100644
--- a/metron-interface/metron-rest/README.md
+++ b/metron-interface/metron-rest/README.md
@@ -218,6 +218,9 @@ Request and Response objects are JSON formatted. The JSON schemas are available
| [ `GET /api/v1/kafka/topic/{name}/sample`](#get-apiv1kafkatopicnamesample)|
| [ `GET /api/v1/metaalert/searchByAlert`](#get-apiv1metaalertsearchbyalert)|
| [ `GET /api/v1/metaalert/create`](#get-apiv1metaalertcreate)|
+| [ `GET /api/v1/metaalert/add/alert`](#get-apiv1metaalertaddalert)|
+| [ `GET /api/v1/metaalert/remove/alert`](#get-apiv1metaalertremovealert)|
+| [ `GET /api/v1/metaalert/update/status/{guid}/{status}`](#get-apiv1metaalertupdatestatusguidstatus)|
| [ `GET /api/v1/search/search`](#get-apiv1searchsearch)|
| [ `POST /api/v1/search/search`](#get-apiv1searchsearch)|
| [ `POST /api/v1/search/group`](#get-apiv1searchgroup)|
@@ -415,19 +418,40 @@ Request and Response objects are JSON formatted. The JSON schemas are available
* 404 - Either Kafka topic is missing or contains no messages
### `POST /api/v1/metaalert/searchByAlert`
- * Description: Searches meta alerts to find any containing an alert for the provided GUID
+ * Description: Get all meta alerts that contain an alert.
* Input:
* guid - GUID of the alert
* Returns:
- * 200 - Returns the meta alerts associated with this alert
- * 404 - The child alert isn't found
+ * 200 - Search results
### `POST /api/v1/metaalert/create`
- * Description: Creates a meta alert containing the provide alerts
+ * Description: Creates a new meta alert from a list of existing alerts. The meta alert status will initially be set to 'ACTIVE' and summary statistics will be computed from the list of alerts. A list of groups included in the request are also added to the meta alert.
* Input:
- * request - Meta Alert Create Request
+ * request - Meta alert create request which includes a list of alert get requests and a list of custom groups used to annotate a meta alert.
* Returns:
- * 200 - The meta alert was created
+ * 200 - The GUID of the new meta alert
+
+### `POST /api/v1/metaalert/add/alert`
+ * Description: Adds an alert to an existing meta alert. An alert will not be added if it is already contained in a meta alert.
+ * Input:
+ * request - Meta alert add request which includes a meta alert GUID and list of alert get requests
+ * Returns:
+ * 200 - Returns 'true' if the alert was added and 'false' if the meta alert did not change.
+
+### `POST /api/v1/metaalert/remove/alert`
+ * Description: Removes an alert from an existing meta alert. If the alert to be removed is not in a meta alert, 'false' will be returned.
+ * Input:
+ * request - Meta alert remove request which includes a meta alert GUID and list of alert get requests
+ * Returns:
+ * 200 - Returns 'true' if the alert was removed and 'false' if the meta alert did not change.
+
+### `POST /api/v1/metaalert/update/status/{guid}/{status}`
+ * Description: Updates the status of a meta alert to either 'ACTIVE' or 'INACTIVE'.
+ * Input:
+ * guid - Meta alert GUID
+ * status - Meta alert status with a value of either 'ACTIVE' or 'INACTIVE'
+ * Returns:
+ * 200 - Returns 'true' if the status changed and 'false' if it did not.
### `POST /api/v1/search/search`
* Description: Searches the indexing store. GUIDs must be quoted to ensure correct results.
http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
index 8eabb2e..4ce9644 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
@@ -19,6 +19,7 @@ package org.apache.metron.rest.config;
import static org.apache.metron.rest.MetronRestConstants.INDEX_DAO_IMPL;
+import java.util.Optional;
import org.apache.metron.hbase.HTableProvider;
import org.apache.metron.hbase.TableProvider;
import org.apache.metron.indexing.dao.AccessConfig;
@@ -81,7 +82,7 @@ public class IndexConfig {
// Create the meta alert dao and wrap it around the index dao.
MetaAlertDao ret = (MetaAlertDao) IndexDaoFactory.create(metaDaoImpl, config).get(0);
- ret.init(indexDao, metaDaoSort);
+ ret.init(indexDao, Optional.ofNullable(metaDaoSort));
return ret;
}
catch(RuntimeException re) {
http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java
index e9cff8b..d42403a 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java
@@ -21,6 +21,8 @@ package org.apache.metron.rest.controller;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
import org.apache.metron.indexing.dao.search.SearchResponse;
@@ -29,6 +31,7 @@ import org.apache.metron.rest.service.MetaAlertService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
@@ -41,24 +44,59 @@ public class MetaAlertController {
@Autowired
private MetaAlertService metaAlertService;
- @ApiOperation(value = "Get all meta alerts for alert")
+ @ApiOperation(value = "Get all meta alerts that contain an alert.")
@ApiResponse(message = "Search results", code = 200)
@RequestMapping(value = "/searchByAlert", method = RequestMethod.POST)
ResponseEntity<SearchResponse> searchByAlert(
- @ApiParam(name = "guid", value = "GUID", required = true)
+ @ApiParam(name = "guid", value = "Alert GUID", required = true)
@RequestBody final String guid
) throws RestException {
return new ResponseEntity<>(metaAlertService.getAllMetaAlertsForAlert(guid), HttpStatus.OK);
}
- @ApiOperation(value = "Create a meta alert")
- @ApiResponse(message = "Created meta alert", code = 200)
+ @ApiOperation(value = "Creates a new meta alert from a list of existing alerts. "
+ + "The meta alert status will initially be set to 'ACTIVE' and summary statistics "
+ + "will be computed from the list of alerts. A list of groups included in the request are also added to the meta alert.")
+ @ApiResponse(message = "The GUID of the new meta alert", code = 200)
@RequestMapping(value = "/create", method = RequestMethod.POST)
ResponseEntity<MetaAlertCreateResponse> create(
- @ApiParam(name = "request", value = "Meta Alert Create Request", required = true)
+ @ApiParam(name = "createRequest", value = "Meta alert create request which includes a list of alert "
+ + "get requests and a list of custom groups used to annotate a meta alert", required = true)
@RequestBody final MetaAlertCreateRequest createRequest
) throws RestException {
return new ResponseEntity<>(metaAlertService.create(createRequest), HttpStatus.OK);
}
+
+ @ApiOperation(value = "Adds an alert to an existing meta alert. An alert will not be added if it is already contained in a meta alert.")
+ @ApiResponse(message = "Returns 'true' if the alert was added and 'false' if the meta alert did not change.", code = 200)
+ @RequestMapping(value = "/add/alert", method = RequestMethod.POST)
+ ResponseEntity<Boolean> addAlertsToMetaAlert(
+ @ApiParam(name = "metaAlertAddRemoveRequest", value = "Meta alert add request which includes a meta alert GUID and list of alert get requests", required = true)
+ @RequestBody final MetaAlertAddRemoveRequest metaAlertAddRemoveRequest
+ ) throws RestException {
+ return new ResponseEntity<>(metaAlertService.addAlertsToMetaAlert(metaAlertAddRemoveRequest), HttpStatus.OK);
+ }
+
+ @ApiOperation(value = "Removes an alert from an existing meta alert. If the alert to be removed is not in a meta alert, 'false' will be returned.")
+ @ApiResponse(message = "Returns 'true' if the alert was removed and 'false' if the meta alert did not change.", code = 200)
+ @RequestMapping(value = "/remove/alert", method = RequestMethod.POST)
+ ResponseEntity<Boolean> removeAlertsFromMetaAlert(
+ @ApiParam(name = "metaAlertAddRemoveRequest", value = "Meta alert remove request which includes a meta alert GUID and list of alert get requests", required = true)
+ @RequestBody final MetaAlertAddRemoveRequest metaAlertAddRemoveRequest
+ ) throws RestException {
+ return new ResponseEntity<>(metaAlertService.removeAlertsFromMetaAlert(metaAlertAddRemoveRequest), HttpStatus.OK);
+ }
+
+ @ApiOperation(value = "Updates the status of a meta alert to either 'ACTIVE' or 'INACTIVE'.")
+ @ApiResponse(message = "Returns 'true' if the status changed and 'false' if it did not.", code = 200)
+ @RequestMapping(value = "/update/status/{guid}/{status}", method = RequestMethod.POST)
+ ResponseEntity<Boolean> updateMetaAlertStatus(
+ final @ApiParam(name = "guid", value = "Meta alert GUID", required = true)
+ @PathVariable String guid,
+ final @ApiParam(name = "status", value = "Meta alert status with a value of either 'ACTIVE' or 'INACTIVE'", required = true)
+ @PathVariable String status) throws RestException {
+ return new ResponseEntity<>(metaAlertService.updateMetaAlertStatus(guid,
+ MetaAlertStatus.valueOf(status.toUpperCase())), HttpStatus.OK);
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java
index c339506..e8abaf3 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java
@@ -18,8 +18,12 @@
package org.apache.metron.rest.service;
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.rest.RestException;
@@ -28,4 +32,10 @@ public interface MetaAlertService {
MetaAlertCreateResponse create(MetaAlertCreateRequest createRequest) throws RestException;
SearchResponse getAllMetaAlertsForAlert(String guid) throws RestException;
+
+ boolean addAlertsToMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException;
+
+ boolean removeAlertsFromMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException;
+
+ boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status) throws RestException;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
index f120c9e..aafab24 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
@@ -19,10 +19,13 @@
package org.apache.metron.rest.service.impl;
import java.io.IOException;
+import java.util.Collection;
import org.apache.metron.indexing.dao.IndexDao;
import org.apache.metron.indexing.dao.MetaAlertDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
import org.apache.metron.indexing.dao.search.InvalidCreateException;
import org.apache.metron.indexing.dao.search.InvalidSearchException;
import org.apache.metron.indexing.dao.search.SearchRequest;
@@ -63,4 +66,32 @@ public class MetaAlertServiceImpl implements MetaAlertService {
throw new RestException(ise.getMessage(), ise);
}
}
+
+ @Override
+ public boolean addAlertsToMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException {
+ try {
+ return dao.addAlertsToMetaAlert(metaAlertAddRemoveRequest.getMetaAlertGuid(), metaAlertAddRemoveRequest.getAlerts());
+ } catch (IOException ioe) {
+ throw new RestException(ioe.getMessage(), ioe);
+ }
+ }
+
+ @Override
+ public boolean removeAlertsFromMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException {
+ try {
+ return dao.removeAlertsFromMetaAlert(metaAlertAddRemoveRequest.getMetaAlertGuid(), metaAlertAddRemoveRequest.getAlerts());
+ } catch (IOException ioe) {
+ throw new RestException(ioe.getMessage(), ioe);
+ }
+ }
+
+ @Override
+ public boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status)
+ throws RestException {
+ try {
+ return dao.updateMetaAlertStatus(metaAlertGuid, status);
+ } catch (IOException ioe) {
+ throw new RestException(ioe.getMessage(), ioe);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
index efd80a7..433eae3 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
@@ -58,13 +58,10 @@ public class SearchServiceImpl implements SearchService {
@Override
public SearchResponse search(SearchRequest searchRequest) throws RestException {
try {
- // Pull the indices from the cache by default
if (searchRequest.getIndices() == null || searchRequest.getIndices().isEmpty()) {
- List<String> indices = Lists.newArrayList((sensorIndexingConfigService.getAllIndices(environment.getProperty(INDEX_WRITER_NAME))));
- // metaalerts should be included by default
+ List<String> indices = getDefaultIndices();
+ // metaalerts should be included by default in search requests
indices.add(METAALERT_TYPE);
- // errors should not be included by default
- indices.remove(ERROR_TYPE);
searchRequest.setIndices(indices);
}
return dao.search(searchRequest);
@@ -77,6 +74,9 @@ public class SearchServiceImpl implements SearchService {
@Override
public GroupResponse group(GroupRequest groupRequest) throws RestException {
try {
+ if (groupRequest.getIndices() == null || groupRequest.getIndices().isEmpty()) {
+ groupRequest.setIndices(getDefaultIndices());
+ }
return dao.group(groupRequest);
}
catch(InvalidSearchException ise) {
@@ -112,4 +112,12 @@ public class SearchServiceImpl implements SearchService {
throw new RestException(ioe.getMessage(), ioe);
}
}
+
+ private List<String> getDefaultIndices() throws RestException {
+ // Pull the indices from the cache by default
+ List<String> indices = Lists.newArrayList((sensorIndexingConfigService.getAllIndices(environment.getProperty(INDEX_WRITER_NAME))));
+ // errors should not be included by default
+ indices.remove(ERROR_TYPE);
+ return indices;
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
index 983c207..b0dd774 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
@@ -28,11 +28,22 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.indexing.dao.InMemoryMetaAlertDao;
import org.apache.metron.indexing.dao.MetaAlertDao;
import org.apache.metron.indexing.dao.SearchIntegrationTest;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.rest.service.MetaAlertService;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -67,10 +78,18 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest {
/**
{
- "guidToIndices" : {
- "bro_1":"bro_index_2017.01.01.01",
- "snort_2":"snort_index_2017.01.01.01"
+ "alerts" : [
+ {
+ "guid": "bro_1",
+ "sensorType": "bro",
+ "index": "bro_index_2017.01.01.01"
},
+ {
+ "guid": "snort_2",
+ "sensorType": "snort",
+ "index": "snort_index_2017.01.01.01"
+ }
+ ],
"groups" : ["group_one", "group_two"]
}
*/
@@ -88,6 +107,11 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest {
loadTestData(testData);
}
+ @After
+ public void cleanup() {
+ InMemoryMetaAlertDao.clear();
+ }
+
@Test
public void test() throws Exception {
// Testing searching by alert
@@ -171,4 +195,94 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest {
.andExpect(jsonPath("$.results[0].source.guid").value("meta_3"))
.andExpect(jsonPath("$.results[0].source.count").value(2.0));
}
+
+ @Test
+ public void shouldAddRemoveAlerts() throws Exception {
+ MetaAlertCreateRequest metaAlertCreateRequest = new MetaAlertCreateRequest();
+ metaAlertCreateRequest.setGroups(Arrays.asList("group_one", "group_two"));
+ metaAlertCreateRequest.setAlerts(new ArrayList<GetRequest>() {{
+ add(new GetRequest("bro_1", "bro", "bro_index_2017.01.01.01"));
+ add(new GetRequest("snort_2", "snort", "snort_index_2017.01.01.01"));
+ }});
+ MetaAlertCreateResponse metaAlertCreateResponse = metaAlertService.create(metaAlertCreateRequest);
+
+ MetaAlertAddRemoveRequest addRequest = new MetaAlertAddRemoveRequest();
+ addRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid());
+ addRequest.setAlerts(new ArrayList<GetRequest>() {{
+ add(new GetRequest("bro_2", "bro", "bro_index_2017.01.01.01"));
+ add(new GetRequest("bro_3", "bro", "bro_index_2017.01.01.01"));
+ }});
+
+ ResultActions result = this.mockMvc.perform(
+ post(metaalertUrl + "/add/alert")
+ .with(httpBasic(user, password)).with(csrf())
+ .contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))
+ .content(JSONUtils.INSTANCE.toJSON(addRequest, false)));
+ result.andExpect(status().isOk()).andExpect(content().string("true"));
+
+ MetaAlertAddRemoveRequest addDuplicateRequest = new MetaAlertAddRemoveRequest();
+ addDuplicateRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid());
+ addDuplicateRequest.setAlerts(new ArrayList<GetRequest>() {{
+ add(new GetRequest("bro_1", "bro"));
+ }});
+
+ result = this.mockMvc.perform(
+ post(metaalertUrl + "/add/alert")
+ .with(httpBasic(user, password)).with(csrf())
+ .contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))
+ .content(JSONUtils.INSTANCE.toJSON(addDuplicateRequest, false)));
+ result.andExpect(status().isOk()).andExpect(content().string("false"));
+
+ MetaAlertAddRemoveRequest removeRequest = new MetaAlertAddRemoveRequest();
+ removeRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid());
+ removeRequest.setAlerts(new ArrayList<GetRequest>() {{
+ add(new GetRequest("bro_2", "bro"));
+ add(new GetRequest("bro_3", "bro"));
+ }});
+
+ result = this.mockMvc.perform(
+ post(metaalertUrl + "/remove/alert")
+ .with(httpBasic(user, password)).with(csrf())
+ .contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))
+ .content(JSONUtils.INSTANCE.toJSON(removeRequest, false)));
+ result.andExpect(status().isOk()).andExpect(content().string("true"));
+
+ MetaAlertAddRemoveRequest removeMissingRequest = new MetaAlertAddRemoveRequest();
+ addRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid());
+ removeMissingRequest.setAlerts(new ArrayList<GetRequest>() {{
+ add(new GetRequest("bro_1", "bro"));
+ }});
+
+ result = this.mockMvc.perform(
+ post(metaalertUrl + "/remove/alert")
+ .with(httpBasic(user, password)).with(csrf())
+ .contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))
+ .content(JSONUtils.INSTANCE.toJSON(removeMissingRequest, false)));
+ result.andExpect(status().isOk()).andExpect(content().string("false"));
+ }
+
+ @Test
+ public void shouldUpdateStatus() throws Exception {
+ MetaAlertCreateRequest metaAlertCreateRequest = new MetaAlertCreateRequest();
+ metaAlertCreateRequest.setGroups(Arrays.asList("group_one", "group_two"));
+ metaAlertCreateRequest.setAlerts(new ArrayList<GetRequest>() {{
+ add(new GetRequest("bro_1", "bro", "bro_index_2017.01.01.01"));
+ add(new GetRequest("snort_2", "snort", "snort_index_2017.01.01.01"));
+ }});
+
+ MetaAlertCreateResponse metaAlertCreateResponse = metaAlertService.create(metaAlertCreateRequest);
+
+ ResultActions result = this.mockMvc.perform(
+ post(metaalertUrl + "/update/status/" + metaAlertCreateResponse.getGuid() + "/inactive")
+ .with(httpBasic(user, password)).with(csrf())
+ .contentType(MediaType.parseMediaType("application/json;charset=UTF-8")));
+ result.andExpect(status().isOk()).andExpect(content().string("true"));
+
+ result = this.mockMvc.perform(
+ post(metaalertUrl + "/update/status/" + metaAlertCreateResponse.getGuid() + "/inactive")
+ .with(httpBasic(user, password)).with(csrf())
+ .contentType(MediaType.parseMediaType("application/json;charset=UTF-8")));
+ result.andExpect(status().isOk()).andExpect(content().string("false"));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
index 4708bc4..57a1b28 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.indexing.dao.HBaseDao;
import org.apache.metron.indexing.dao.MetaAlertDao;
import org.apache.metron.indexing.dao.SearchIntegrationTest;
import org.apache.metron.rest.service.UpdateService;
@@ -161,7 +162,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest {
Assert.assertEquals(1,table.size());
{
//ensure hbase is up to date
- Get g = new Get(guid.getBytes());
+ Get g = new Get(new HBaseDao.Key(guid,"bro").toBytes());
Result r = table.get(g);
NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
Assert.assertEquals(1, columns.size());
@@ -183,7 +184,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest {
Assert.assertEquals(1,table.size());
{
//ensure hbase is up to date
- Get g = new Get(guid.getBytes());
+ Get g = new Get(new HBaseDao.Key(guid, "bro").toBytes());
Result r = table.get(g);
NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
Assert.assertEquals(2, columns.size());
http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java
new file mode 100644
index 0000000..595a839
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+public enum KeyUtil {
+ INSTANCE;
+ private static final int SEED = 0xDEADBEEF;
+ public static final int HASH_PREFIX_SIZE=16;
+ ThreadLocal<HashFunction> hFunction= new ThreadLocal<HashFunction>() {
+ @Override
+ protected HashFunction initialValue() {
+ return Hashing.murmur3_128(SEED);
+ }
+ };
+
+ public byte[] getPrefix(byte[] key) {
+ Hasher hasher = hFunction.get().newHasher();
+ hasher.putBytes(key);
+ return hasher.hash().asBytes();
+ }
+
+ public byte[] merge(byte[] prefix, byte[] key) {
+ byte[] val = new byte[key.length + prefix.length];
+ int offset = 0;
+ System.arraycopy(prefix, 0, val, offset, prefix.length);
+ offset += prefix.length;
+ System.arraycopy(key, 0, val, offset, key.length);
+ return val;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
index a018e27..fff1d9b 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java
@@ -34,6 +34,15 @@ import java.util.HashMap;
public class HBaseEnrichmentConverterTest {
+ public static byte[] keyBytes = new byte[] {
+ 0x31,(byte)0xc2,0x49,0x05,0x6b,(byte)0xea,
+ 0x0e,0x59,(byte)0xe1,(byte)0xad,(byte)0xa0,0x24,
+ 0x55,(byte)0xa9,0x6b,0x63,0x00,0x06,
+ 0x64,0x6f,0x6d,0x61,0x69,0x6e,
+ 0x00,0x06,0x67,0x6f,0x6f,0x67,
+ 0x6c,0x65
+ };
+
EnrichmentKey key = new EnrichmentKey("domain", "google");
EnrichmentValue value = new EnrichmentValue(
new HashMap<String, Object>() {{
@@ -41,6 +50,18 @@ public class HBaseEnrichmentConverterTest {
put("grok", "baz");
}});
LookupKV<EnrichmentKey, EnrichmentValue> results = new LookupKV(key, value);
+
+ /**
+ * IF this test fails then you have broken the key serialization in that your change has
+ * caused a key to change serialization, so keys from previous releases will not be able to be found
+ * under your scheme. Please either provide a migration plan or undo this change. DO NOT CHANGE THIS
+ * TEST BLITHELY!
+ */
+ @Test
+ public void testKeySerializationRemainsConstant() {
+ byte[] raw = key.toBytes();
+ Assert.assertArrayEquals(raw, keyBytes);
+ }
@Test
public void testKeySerialization() {
byte[] serialized = key.toBytes();
http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index f114b4c..61d5472 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -25,9 +25,11 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -38,6 +40,7 @@ import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.IndexDao;
import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.Group;
import org.apache.metron.indexing.dao.search.GroupOrder;
import org.apache.metron.indexing.dao.search.GroupOrderType;
@@ -55,10 +58,9 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
@@ -256,40 +258,73 @@ public class ElasticsearchDao implements IndexDao {
return ret.orElse(null);
}
+ @Override
+ public Iterable<Document> getAllLatest(
+ final List<GetRequest> getRequests) throws IOException {
+ Collection<String> guids = new HashSet<>();
+ Collection<String> sensorTypes = new HashSet<>();
+ for (GetRequest getRequest: getRequests) {
+ guids.add(getRequest.getGuid());
+ sensorTypes.add(getRequest.getSensorType());
+ }
+ List<Document> documents = searchByGuids(
+ guids
+ , sensorTypes
+ , hit -> {
+ Long ts = 0L;
+ String doc = hit.getSourceAsString();
+ String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
+ try {
+ return Optional.of(new Document(doc, hit.getId(), sourceType, ts));
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
+ }
+ }
+
+ );
+ return documents;
+ }
+
+ <T> Optional<T> searchByGuid(String guid, String sensorType,
+ Function<SearchHit, Optional<T>> callback) {
+ Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null;
+ List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback);
+ if (results.size() > 0) {
+ return Optional.of(results.get(0));
+ } else {
+ return Optional.empty();
+ }
+ }
+
/**
* Return the search hit based on the UUID and sensor type.
* A callback can be specified to transform the hit into a type T.
* If more than one hit happens, the first one will be returned.
*/
- <T> Optional<T> searchByGuid(String guid, String sensorType,
+ <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes,
Function<SearchHit, Optional<T>> callback) {
QueryBuilder query;
- if (sensorType != null) {
- query = QueryBuilders.idsQuery(sensorType + "_doc").ids(guid);
+ if (sensorTypes != null) {
+ String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc").toArray(String[]::new);
+ query = QueryBuilders.idsQuery(types).ids(guids);
} else {
- query = QueryBuilders.idsQuery().ids(guid);
+ query = QueryBuilders.idsQuery().ids(guids);
}
SearchRequestBuilder request = client.prepareSearch()
.setQuery(query)
.setSource("message")
+ .setSize(guids.size())
;
org.elasticsearch.action.search.SearchResponse response = request.get();
SearchHits hits = response.getHits();
- long totalHits = hits.getTotalHits();
- if (totalHits > 1) {
- LOG.warn("Encountered {} results for guid {} in sensor {}. Returning first hit.",
- totalHits,
- guid,
- sensorType
- );
- }
+ List<T> results = new ArrayList<>();
for (SearchHit hit : hits) {
- Optional<T> ret = callback.apply(hit);
- if (ret.isPresent()) {
- return ret;
+ Optional<T> result = callback.apply(hit);
+ if (result.isPresent()) {
+ results.add(result.get());
}
}
- return Optional.empty();
+ return results;
}
@Override
@@ -297,18 +332,17 @@ public class ElasticsearchDao implements IndexDao {
String indexPostfix = ElasticsearchUtils
.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date());
String sensorType = update.getSensorType();
- String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, null);
- String existingIndex = calculateExistingIndex(update, index, indexPostfix);
+ String indexName = getIndexName(update, index, indexPostfix);
- UpdateRequest updateRequest = buildUpdateRequest(update, sensorType, indexName, existingIndex);
+ IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName);
try {
- UpdateResponse response = client.update(updateRequest).get();
+ IndexResponse response = client.index(indexRequest).get();
ShardInfo shardInfo = response.getShardInfo();
int failed = shardInfo.getFailed();
if (failed > 0) {
throw new IOException(
- "ElasticsearchDao upsert failed: " + Arrays.toString(shardInfo.getFailures()));
+ "ElasticsearchDao index failed: " + Arrays.toString(shardInfo.getFailures()));
}
} catch (Exception e) {
throw new IOException(e.getMessage(), e);
@@ -326,16 +360,14 @@ public class ElasticsearchDao implements IndexDao {
for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) {
Document update = updateEntry.getKey();
String sensorType = update.getSensorType();
- String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, null);
- String existingIndex = calculateExistingIndex(update, updateEntry.getValue(), indexPostfix);
- UpdateRequest updateRequest = buildUpdateRequest(
+ String indexName = getIndexName(update, updateEntry.getValue(), indexPostfix);
+ IndexRequest indexRequest = buildIndexRequest(
update,
sensorType,
- indexName,
- existingIndex
+ indexName
);
- bulkRequestBuilder.add(updateRequest);
+ bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
@@ -346,21 +378,20 @@ public class ElasticsearchDao implements IndexDao {
}
}
- protected String calculateExistingIndex(Document update, Optional<String> index,
- String indexPostFix) {
- String sensorType = update.getSensorType();
- String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostFix, null);
+ protected String getIndexName(Document update, Optional<String> index, String indexPostFix) {
+ return index.orElse(getIndexName(update.getGuid(), update.getSensorType())
+ .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null))
+ );
+ }
- return index.orElse(
- searchByGuid(update.getGuid(),
- sensorType,
- hit -> Optional.ofNullable(hit.getIndex())
- ).orElse(indexName)
+ protected Optional<String> getIndexName(String guid, String sensorType) {
+ return searchByGuid(guid,
+ sensorType,
+ hit -> Optional.ofNullable(hit.getIndex())
);
}
- protected UpdateRequest buildUpdateRequest(Document update, String sensorType, String indexName,
- String existingIndex) {
+ protected IndexRequest buildIndexRequest(Document update, String sensorType, String indexName) {
String type = sensorType + "_doc";
Object ts = update.getTimestamp();
IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid())
@@ -370,9 +401,7 @@ public class ElasticsearchDao implements IndexDao {
indexRequest = indexRequest.timestamp(ts.toString());
}
- return new UpdateRequest(existingIndex, type, update.getGuid())
- .doc(update.getDocument())
- .upsert(indexRequest);
+ return indexRequest;
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
index eef134f..c24ba0c 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
@@ -18,18 +18,21 @@
package org.apache.metron.elasticsearch.dao;
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.apache.metron.common.Constants.GUID;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery;
import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
+import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -37,7 +40,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
-import org.apache.commons.collections4.SetUtils;
import org.apache.metron.common.Constants;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.IndexDao;
@@ -45,8 +47,10 @@ import org.apache.metron.indexing.dao.MetaAlertDao;
import org.apache.metron.indexing.dao.MultiIndexDao;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
import org.apache.metron.indexing.dao.metaalert.MetaScores;
import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.GroupRequest;
import org.apache.metron.indexing.dao.search.GroupResponse;
import org.apache.metron.indexing.dao.search.InvalidCreateException;
@@ -55,31 +59,26 @@ import org.apache.metron.indexing.dao.search.SearchRequest;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.search.SearchResult;
import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.ActionWriteResponse.ShardInfo;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.get.MultiGetItemResponse;
-import org.elasticsearch.action.get.MultiGetRequest.Item;
-import org.elasticsearch.action.get.MultiGetRequestBuilder;
-import org.elasticsearch.action.get.MultiGetResponse;
-import org.elasticsearch.action.index.IndexRequest;
+import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
+import org.apache.metron.indexing.dao.update.PatchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.action.update.UpdateResponse;
-import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.index.query.support.QueryInnerHitBuilder;
-import org.elasticsearch.search.SearchHit;
public class ElasticsearchMetaAlertDao implements MetaAlertDao {
- private static final String SOURCE_TYPE = Constants.SENSOR_TYPE.replace('.', ':');
+ public static final String SOURCE_TYPE = Constants.SENSOR_TYPE.replace('.', ':');
+ private static final String STATUS_PATH = "/status";
+ private static final String ALERT_PATH = "/alert";
+
private IndexDao indexDao;
private ElasticsearchDao elasticsearchDao;
private String index = METAALERTS_INDEX;
private String threatTriageField = THREAT_FIELD_DEFAULT;
private String threatSort = THREAT_SORT_DEFAULT;
+ private int pageSize = 500;
/**
* Wraps an {@link org.apache.metron.indexing.dao.IndexDao} to handle meta alerts.
@@ -96,7 +95,7 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
*/
public ElasticsearchMetaAlertDao(IndexDao indexDao, String index, String triageLevelField,
String threatSort) {
- init(indexDao, threatSort);
+ init(indexDao, Optional.of(threatSort));
this.index = index;
this.threatTriageField = triageLevelField;
}
@@ -105,8 +104,14 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
//uninitialized.
}
+ /**
+ * Initializes this implementation by setting the supplied IndexDao and also setting a separate ElasticsearchDao.
+ * This is needed for some specific Elasticsearch functions (looking up an index from a GUID for example).
+ * @param indexDao The DAO to wrap for our queries
+ * @param threatSort The aggregation to use as the threat field. E.g. "sum", "median", etc.
+ */
@Override
- public void init(IndexDao indexDao, String threatSort) {
+ public void init(IndexDao indexDao, Optional<String> threatSort) {
if (indexDao instanceof MultiIndexDao) {
this.indexDao = indexDao;
MultiIndexDao multiIndexDao = (MultiIndexDao) indexDao;
@@ -124,8 +129,8 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
);
}
- if (threatSort != null) {
- this.threatSort = threatSort;
+ if (threatSort.isPresent()) {
+ this.threatSort = threatSort.get();
}
}
@@ -139,66 +144,63 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
if (guid == null || guid.trim().isEmpty()) {
throw new InvalidSearchException("Guid cannot be empty");
}
- org.elasticsearch.action.search.SearchResponse esResponse = getMetaAlertsForAlert(guid.trim());
- SearchResponse searchResponse = new SearchResponse();
- searchResponse.setTotal(esResponse.getHits().getTotalHits());
- searchResponse.setResults(
- Arrays.stream(esResponse.getHits().getHits()).map(searchHit -> {
- SearchResult searchResult = new SearchResult();
- searchResult.setId(searchHit.getId());
- searchResult.setSource(searchHit.getSource());
- searchResult.setScore(searchHit.getScore());
- searchResult.setIndex(searchHit.getIndex());
- return searchResult;
- }
- ).collect(Collectors.toList()));
- return searchResponse;
+ // Searches for all alerts containing the meta alert guid in it's "metalerts" array
+ QueryBuilder qb = boolQuery()
+ .must(
+ nestedQuery(
+ ALERT_FIELD,
+ boolQuery()
+ .must(termQuery(ALERT_FIELD + "." + GUID, guid))
+ ).innerHit(new QueryInnerHitBuilder())
+ )
+ .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
+ return queryAllResults(qb);
}
@Override
@SuppressWarnings("unchecked")
public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request)
throws InvalidCreateException, IOException {
- if (request.getGuidToIndices().isEmpty()) {
- throw new InvalidCreateException("MetaAlertCreateRequest must contain alert GUIDs");
+ List<GetRequest> alertRequests = request.getAlerts();
+ if (request.getAlerts().isEmpty()) {
+ throw new InvalidCreateException("MetaAlertCreateRequest must contain alerts");
}
if (request.getGroups().isEmpty()) {
throw new InvalidCreateException("MetaAlertCreateRequest must contain UI groups");
}
// Retrieve the documents going into the meta alert and build it
- MultiGetResponse multiGetResponse = getDocumentsByGuid(request);
- Document createDoc = buildCreateDocument(multiGetResponse, request.getGroups());
- MetaScores metaScores = calculateMetaScores(createDoc);
- createDoc.getDocument().putAll(metaScores.getMetaScores());
- createDoc.getDocument().put(threatTriageField, metaScores.getMetaScores().get(threatSort));
+ Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
+
+ Document metaAlert = buildCreateDocument(alerts, request.getGroups());
+ calculateMetaScores(metaAlert);
// Add source type to be consistent with other sources and allow filtering
- createDoc.getDocument().put("source:type", MetaAlertDao.METAALERT_TYPE);
+ metaAlert.getDocument().put(SOURCE_TYPE, MetaAlertDao.METAALERT_TYPE);
// Start a list of updates / inserts we need to run
Map<Document, Optional<String>> updates = new HashMap<>();
- updates.put(createDoc, Optional.of(MetaAlertDao.METAALERTS_INDEX));
+ updates.put(metaAlert, Optional.of(MetaAlertDao.METAALERTS_INDEX));
try {
// We need to update the associated alerts with the new meta alerts, making sure existing
// links are maintained.
- List<String> metaAlertField;
- for (MultiGetItemResponse itemResponse : multiGetResponse) {
- metaAlertField = new ArrayList<>();
- GetResponse response = itemResponse.getResponse();
- if (response.isExists()) {
- List<String> alertField = (List<String>) response.getSourceAsMap()
- .get(MetaAlertDao.METAALERT_FIELD);
- if (alertField != null) {
- metaAlertField.addAll(alertField);
+ Map<String, Optional<String>> guidToIndices = alertRequests.stream().collect(Collectors.toMap(
+ GetRequest::getGuid, GetRequest::getIndex));
+ Map<String, String> guidToSensorTypes = alertRequests.stream().collect(Collectors.toMap(
+ GetRequest::getGuid, GetRequest::getSensorType));
+ for (Document alert: alerts) {
+ if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
+ // Use the index in the request if it exists
+ Optional<String> index = guidToIndices.get(alert.getGuid());
+ if (!index.isPresent()) {
+ // Look up the index from Elasticsearch if one is not supplied in the request
+ index = elasticsearchDao.getIndexName(alert.getGuid(), guidToSensorTypes.get(alert.getGuid()));
+ if (!index.isPresent()) {
+ throw new IllegalArgumentException("Could not find index for " + alert.getGuid());
+ }
}
+ updates.put(alert, index);
}
- metaAlertField.add(createDoc.getGuid());
-
- Document alertUpdate = buildAlertUpdate(response.getId(),
- (String) response.getSource().get(SOURCE_TYPE), metaAlertField,
- (Long) response.getSourceAsMap().get("_timestamp"));
- updates.put(alertUpdate, Optional.of(itemResponse.getIndex()));
}
// Kick off any updates.
@@ -206,7 +208,7 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
MetaAlertCreateResponse createResponse = new MetaAlertCreateResponse();
createResponse.setCreated(true);
- createResponse.setGuid(createDoc.getGuid());
+ createResponse.setGuid(metaAlert.getGuid());
return createResponse;
} catch (IOException ioe) {
throw new InvalidCreateException("Unable to create meta alert", ioe);
@@ -214,6 +216,149 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
}
@Override
+ public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
+ throws IOException {
+ Map<Document, Optional<String>> updates = new HashMap<>();
+ Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
+ if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) {
+ Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
+ boolean metaAlertUpdated = addAlertsToMetaAlert(metaAlert, alerts);
+ if (metaAlertUpdated) {
+ calculateMetaScores(metaAlert);
+ updates.put(metaAlert, Optional.of(index));
+ for(Document alert: alerts) {
+ if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
+ updates.put(alert, Optional.empty());
+ }
+ }
+ indexDaoUpdate(updates);
+ }
+ return metaAlertUpdated;
+ } else {
+ throw new IllegalStateException("Adding alerts to an INACTIVE meta alert is not allowed");
+ }
+ }
+
+ protected boolean addAlertsToMetaAlert(Document metaAlert, Iterable<Document> alerts) {
+ boolean alertAdded = false;
+ List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD);
+ Set<String> currentAlertGuids = currentAlerts.stream().map(currentAlert ->
+ (String) currentAlert.get(GUID)).collect(Collectors.toSet());
+ for (Document alert: alerts) {
+ String alertGuid = alert.getGuid();
+ // Only add an alert if it isn't already in the meta alert
+ if (!currentAlertGuids.contains(alertGuid)) {
+ currentAlerts.add(alert.getDocument());
+ alertAdded = true;
+ }
+ }
+ return alertAdded;
+ }
+
+ protected boolean addMetaAlertToAlert(String metaAlertGuid, Document alert) {
+ List<String> metaAlertField = new ArrayList<>();
+ List<String> alertField = (List<String>) alert.getDocument()
+ .get(MetaAlertDao.METAALERT_FIELD);
+ if (alertField != null) {
+ metaAlertField.addAll(alertField);
+ }
+ boolean metaAlertAdded = !metaAlertField.contains(metaAlertGuid);
+ if (metaAlertAdded) {
+ metaAlertField.add(metaAlertGuid);
+ alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
+ }
+ return metaAlertAdded;
+ }
+
+ @Override
+ public boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
+ throws IOException {
+ Map<Document, Optional<String>> updates = new HashMap<>();
+ Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
+ if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) {
+ Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
+ Collection<String> alertGuids = alertRequests.stream().map(GetRequest::getGuid).collect(
+ Collectors.toList());
+ boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, alertGuids);
+ if (metaAlertUpdated) {
+ calculateMetaScores(metaAlert);
+ updates.put(metaAlert, Optional.of(index));
+ for(Document alert: alerts) {
+ if (removeMetaAlertFromAlert(metaAlert.getGuid(), alert)) {
+ updates.put(alert, Optional.empty());
+ }
+ }
+ indexDaoUpdate(updates);
+ }
+ return metaAlertUpdated;
+ } else {
+ throw new IllegalStateException("Removing alerts from an INACTIVE meta alert is not allowed");
+ }
+
+ }
+
+ protected boolean removeAlertsFromMetaAlert(Document metaAlert, Collection<String> alertGuids) {
+ List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD);
+ int previousSize = currentAlerts.size();
+ // Only remove an alert if it is in the meta alert
+ currentAlerts.removeIf(currentAlert -> alertGuids.contains((String) currentAlert.get(GUID)));
+ return currentAlerts.size() != previousSize;
+ }
+
+ protected boolean removeMetaAlertFromAlert(String metaAlertGuid, Document alert) {
+ List<String> metaAlertField = new ArrayList<>();
+ List<String> alertField = (List<String>) alert.getDocument()
+ .get(MetaAlertDao.METAALERT_FIELD);
+ if (alertField != null) {
+ metaAlertField.addAll(alertField);
+ }
+ boolean metaAlertRemoved = metaAlertField.remove(metaAlertGuid);
+ if (metaAlertRemoved) {
+ alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
+ }
+ return metaAlertRemoved;
+ }
+
+ @Override
+ public boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status)
+ throws IOException {
+ Map<Document, Optional<String>> updates = new HashMap<>();
+ Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
+ String currentStatus = (String) metaAlert.getDocument().get(MetaAlertDao.STATUS_FIELD);
+ boolean metaAlertUpdated = !status.getStatusString().equals(currentStatus);
+ if (metaAlertUpdated) {
+ metaAlert.getDocument().put(MetaAlertDao.STATUS_FIELD, status.getStatusString());
+ updates.put(metaAlert, Optional.of(index));
+ List<GetRequest> getRequests = new ArrayList<>();
+ List<Map<String, Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument()
+ .get(MetaAlertDao.ALERT_FIELD);
+ currentAlerts.stream().forEach(currentAlert -> {
+ getRequests.add(new GetRequest((String) currentAlert.get(GUID), (String) currentAlert.get(SOURCE_TYPE)));
+ });
+ Iterable<Document> alerts = indexDao.getAllLatest(getRequests);
+ for (Document alert : alerts) {
+ boolean metaAlertAdded = false;
+ boolean metaAlertRemoved = false;
+ // If we're making it active add add the meta alert guid for every alert.
+ if (MetaAlertStatus.ACTIVE.equals(status)) {
+ metaAlertAdded = addMetaAlertToAlert(metaAlert.getGuid(), alert);
+ }
+ // If we're making it inactive, remove the meta alert guid from every alert.
+ if (MetaAlertStatus.INACTIVE.equals(status)) {
+ metaAlertRemoved = removeMetaAlertFromAlert(metaAlert.getGuid(), alert);
+ }
+ if (metaAlertAdded || metaAlertRemoved) {
+ updates.put(alert, Optional.empty());
+ }
+ }
+ }
+ if (metaAlertUpdated) {
+ indexDaoUpdate(updates);
+ }
+ return metaAlertUpdated;
+ }
+
+ @Override
public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
// Wrap the query to also get any meta-alerts.
QueryBuilder qb = constantScoreQuery(boolQuery()
@@ -242,95 +387,171 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
}
@Override
+ public Iterable<Document> getAllLatest(
+ List<GetRequest> getRequests) throws IOException {
+ return indexDao.getAllLatest(getRequests);
+ }
+
+ @Override
public void update(Document update, Optional<String> index) throws IOException {
if (METAALERT_TYPE.equals(update.getSensorType())) {
// We've been passed an update to the meta alert.
- handleMetaUpdate(update);
+ throw new UnsupportedOperationException("Meta alerts cannot be directly updated");
} else {
+ Map<Document, Optional<String>> updates = new HashMap<>();
+ updates.put(update, index);
// We need to update an alert itself. Only that portion of the update can be delegated.
// We still need to get meta alerts potentially associated with it and update.
- org.elasticsearch.action.search.SearchResponse response = getMetaAlertsForAlert(
- update.getGuid()
- );
-
- // Each hit, if any, is a metaalert that needs to be updated
- for (SearchHit hit : response.getHits()) {
- handleAlertUpdate(update, hit);
+ Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults().stream()
+ .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(), METAALERT_TYPE, 0L))
+ .collect(Collectors.toList());
+ // Each meta alert needs to be updated with the new alert
+ for (Document metaAlert : metaAlerts) {
+ replaceAlertInMetaAlert(metaAlert, update);
+ updates.put(metaAlert, Optional.of(METAALERTS_INDEX));
}
// Run the alert's update
- indexDao.update(update, index);
+ indexDao.batchUpdate(updates);
}
}
+ protected boolean replaceAlertInMetaAlert(Document metaAlert, Document alert) {
+ boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, Collections.singleton(alert.getGuid()));
+ if (metaAlertUpdated) {
+ addAlertsToMetaAlert(metaAlert, Collections.singleton(alert));
+ }
+ return metaAlertUpdated;
+ }
+
@Override
public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException {
throw new UnsupportedOperationException("Meta alerts do not allow for bulk updates");
}
/**
+ * Does not allow patches on the "alerts" or "status" fields. These fields must be updated with their
+ * dedicated methods.
+ *
+ * @param request The patch request
+ * @param timestamp Optionally a timestamp to set. If not specified then current time is used.
+ * @throws OriginalNotFoundException
+ * @throws IOException
+ */
+ @Override
+ public void patch(PatchRequest request, Optional<Long> timestamp)
+ throws OriginalNotFoundException, IOException {
+ if (isPatchAllowed(request)) {
+ Document d = getPatchedDocument(request, timestamp);
+ indexDao.update(d, Optional.ofNullable(request.getIndex()));
+ } else {
+ throw new IllegalArgumentException("Meta alert patches are not allowed for /alert or /status paths. "
+ + "Please use the add/remove alert or update status functions instead.");
+ }
+ }
+
+ protected boolean isPatchAllowed(PatchRequest request) {
+ Iterator patchIterator = request.getPatch().iterator();
+ while(patchIterator.hasNext()) {
+ JsonNode patch = (JsonNode) patchIterator.next();
+ String path = patch.path("path").asText();
+ if (STATUS_PATH.equals(path) || ALERT_PATH.equals(path)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
* Given an alert GUID, retrieve all associated meta alerts.
- * @param guid The GUID of the child alert
+ * @param alertGuid The GUID of the child alert
* @return The Elasticsearch response containing the meta alerts
*/
- protected org.elasticsearch.action.search.SearchResponse getMetaAlertsForAlert(String guid) {
+ protected SearchResponse getMetaAlertsForAlert(String alertGuid) {
QueryBuilder qb = boolQuery()
.must(
nestedQuery(
ALERT_FIELD,
boolQuery()
- .must(termQuery(ALERT_FIELD + "." + Constants.GUID, guid))
+ .must(termQuery(ALERT_FIELD + "." + GUID, alertGuid))
).innerHit(new QueryInnerHitBuilder())
)
.must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
- SearchRequest sr = new SearchRequest();
- ArrayList<String> indices = new ArrayList<>();
- indices.add(index);
- sr.setIndices(indices);
- return elasticsearchDao
+ return queryAllResults(qb);
+ }
+
+ /**
+ * Elasticsearch queries default to 10 records returned. Some internal queries require that all
+ * results are returned. Rather than setting an arbitrarily high size, this method pages through results
+ * and returns them all in a single SearchResponse.
+ * @param qb
+ * @return
+ */
+ protected SearchResponse queryAllResults(QueryBuilder qb) {
+ SearchRequestBuilder searchRequestBuilder = elasticsearchDao
.getClient()
.prepareSearch(index)
.addFields("*")
.setFetchSource(true)
.setQuery(qb)
+ .setSize(pageSize);
+ org.elasticsearch.action.search.SearchResponse esResponse = searchRequestBuilder
.execute()
.actionGet();
+ List<SearchResult> allResults = getSearchResults(esResponse);
+ long total = esResponse.getHits().getTotalHits();
+ if (total > pageSize) {
+ int pages = (int) (total / pageSize) + 1;
+ for (int i = 1; i < pages; i++) {
+ int from = i * pageSize;
+ searchRequestBuilder.setFrom(from);
+ esResponse = searchRequestBuilder
+ .execute()
+ .actionGet();
+ allResults.addAll(getSearchResults(esResponse));
+ }
+ }
+ SearchResponse searchResponse = new SearchResponse();
+ searchResponse.setTotal(total);
+ searchResponse.setResults(allResults);
+ return searchResponse;
}
/**
- * Return child documents after retrieving them from Elasticsearch.
- * @param request The request detailing which child alerts we need
- * @return The Elasticsearch response to our request for alerts
+ * Transforms a list of Elasticsearch SearchHits to a list of SearchResults
+ * @param searchResponse
+ * @return
*/
- protected MultiGetResponse getDocumentsByGuid(MetaAlertCreateRequest request) {
- MultiGetRequestBuilder multiGet = elasticsearchDao.getClient().prepareMultiGet();
- for (Entry<String, String> entry : request.getGuidToIndices().entrySet()) {
- multiGet.add(new Item(entry.getValue(), null, entry.getKey()));
- }
- return multiGet.get();
+ protected List<SearchResult> getSearchResults(org.elasticsearch.action.search.SearchResponse searchResponse) {
+ return Arrays.stream(searchResponse.getHits().getHits()).map(searchHit -> {
+ SearchResult searchResult = new SearchResult();
+ searchResult.setId(searchHit.getId());
+ searchResult.setSource(searchHit.getSource());
+ searchResult.setScore(searchHit.getScore());
+ searchResult.setIndex(searchHit.getIndex());
+ return searchResult;
+ }
+ ).collect(Collectors.toList());
}
/**
* Build the Document representing a meta alert to be created.
- * @param multiGetResponse The Elasticsearch results for the meta alerts child documents
+ * @param alerts The Elasticsearch results for the meta alerts child documents
* @param groups The groups used to create this meta alert
* @return A Document representing the new meta alert
*/
- protected Document buildCreateDocument(MultiGetResponse multiGetResponse, List<String> groups) {
+ protected Document buildCreateDocument(Iterable<Document> alerts, List<String> groups) {
// Need to create a Document from the multiget. Scores will be calculated later
Map<String, Object> metaSource = new HashMap<>();
List<Map<String, Object>> alertList = new ArrayList<>();
- for (MultiGetItemResponse itemResponse : multiGetResponse) {
- GetResponse response = itemResponse.getResponse();
- if (response.isExists()) {
- alertList.add(response.getSource());
- }
+ for (Document alert: alerts) {
+ alertList.add(alert.getDocument());
}
metaSource.put(ALERT_FIELD, alertList);
// Add any meta fields
String guid = UUID.randomUUID().toString();
- metaSource.put(Constants.GUID, guid);
+ metaSource.put(GUID, guid);
metaSource.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis());
metaSource.put(GROUPS_FIELD, groups);
metaSource.put(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString());
@@ -339,29 +560,6 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
}
/**
- * Process an update to a meta alert itself.
- * @param update The update Document to be applied
- * @throws IOException If there's a problem running the update
- */
- protected void handleMetaUpdate(Document update) throws IOException {
- Map<Document, Optional<String>> updates = new HashMap<>();
-
- if (update.getDocument().containsKey(MetaAlertDao.STATUS_FIELD)) {
- // Update all associated alerts to maintain the meta alert link properly
- updates.putAll(buildStatusAlertUpdates(update));
- }
- if (update.getDocument().containsKey(MetaAlertDao.ALERT_FIELD)) {
- // If the alerts field changes (i.e. add/remove alert), update all affected alerts to
- // maintain the meta alert link properly.
- updates.putAll(buildAlertFieldUpdates(update));
- }
-
- // Run meta alert update.
- updates.put(update, Optional.of(index));
- indexDaoUpdate(updates);
- }
-
- /**
* Calls the single update variant if there's only one update, otherwise calls batch.
* @param updates The list of updates to run
* @throws IOException If there's an update error
@@ -375,203 +573,6 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
} // else we have no updates, so don't do anything
}
- protected Map<Document, Optional<String>> buildStatusAlertUpdates(Document update)
- throws IOException {
- Map<Document, Optional<String>> updates = new HashMap<>();
- List<Map<String, Object>> alerts = getAllAlertsForMetaAlert(update);
- for (Map<String, Object> alert : alerts) {
- // Retrieve the associated alert, so we can update the array
- List<String> metaAlertField = new ArrayList<>();
- @SuppressWarnings("unchecked")
- List<String> alertField = (List<String>) alert.get(MetaAlertDao.METAALERT_FIELD);
- if (alertField != null) {
- metaAlertField.addAll(alertField);
- }
- String status = (String) update.getDocument().get(MetaAlertDao.STATUS_FIELD);
-
- Document alertUpdate = null;
- String alertGuid = (String) alert.get(Constants.GUID);
- // If we're making it active add add the meta alert guid for every alert.
- if (MetaAlertStatus.ACTIVE.getStatusString().equals(status)
- && !metaAlertField.contains(update.getGuid())) {
- metaAlertField.add(update.getGuid());
- alertUpdate = buildAlertUpdate(
- alertGuid,
- (String) alert.get(SOURCE_TYPE),
- metaAlertField,
- (Long) alert.get("_timestamp")
- );
- }
-
- // If we're making it inactive, remove the meta alert guid from every alert.
- if (MetaAlertStatus.INACTIVE.getStatusString().equals(status)
- && metaAlertField.remove(update.getGuid())) {
- alertUpdate = buildAlertUpdate(
- alertGuid,
- (String) alert.get(SOURCE_TYPE),
- metaAlertField,
- (Long) alert.get("_timestamp")
- );
- }
-
- // Only run an alert update if we have an actual update.
- if (alertUpdate != null) {
- updates.put(alertUpdate, Optional.empty());
- }
- }
- return updates;
- }
-
- protected Map<Document, Optional<String>> buildAlertFieldUpdates(Document update)
- throws IOException {
- Map<Document, Optional<String>> updates = new HashMap<>();
- // If we've updated the alerts field (i.e add/remove), recalculate meta alert scores and
- // the metaalerts fields for updating the children alerts.
- MetaScores metaScores = calculateMetaScores(update);
- update.getDocument().putAll(metaScores.getMetaScores());
- update.getDocument().put(threatTriageField, metaScores.getMetaScores().get(threatSort));
-
- // Get the set of GUIDs that are in the new version.
- Set<String> updateGuids = new HashSet<>();
- @SuppressWarnings("unchecked")
- List<Map<String, Object>> updateAlerts = (List<Map<String, Object>>) update.getDocument()
- .get(MetaAlertDao.ALERT_FIELD);
- for (Map<String, Object> alert : updateAlerts) {
- updateGuids.add((String) alert.get(Constants.GUID));
- }
-
- // Get the set of GUIDs from the old version
- List<Map<String, Object>> alerts = getAllAlertsForMetaAlert(update);
- Set<String> currentGuids = new HashSet<>();
- for (Map<String, Object> alert : alerts) {
- currentGuids.add((String) alert.get(Constants.GUID));
- }
-
- // Get both set differences, so we know what's been added and removed.
- Set<String> removedGuids = SetUtils.difference(currentGuids, updateGuids);
- Set<String> addedGuids = SetUtils.difference(updateGuids, currentGuids);
-
- Document alertUpdate;
-
- // Handle any removed GUIDs
- for (String guid : removedGuids) {
- // Retrieve the associated alert, so we can update the array
- Document alert = elasticsearchDao.getLatest(guid, null);
- List<String> metaAlertField = new ArrayList<>();
- @SuppressWarnings("unchecked")
- List<String> alertField = (List<String>) alert.getDocument()
- .get(MetaAlertDao.METAALERT_FIELD);
- if (alertField != null) {
- metaAlertField.addAll(alertField);
- }
- if (metaAlertField.remove(update.getGuid())) {
- alertUpdate = buildAlertUpdate(guid, alert.getSensorType(), metaAlertField,
- alert.getTimestamp());
- updates.put(alertUpdate, Optional.empty());
- }
- }
-
- // Handle any added GUIDs
- for (String guid : addedGuids) {
- // Retrieve the associated alert, so we can update the array
- Document alert = elasticsearchDao.getLatest(guid, null);
- List<String> metaAlertField = new ArrayList<>();
- @SuppressWarnings("unchecked")
- List<String> alertField = (List<String>) alert.getDocument()
- .get(MetaAlertDao.METAALERT_FIELD);
- if (alertField != null) {
- metaAlertField.addAll(alertField);
- }
- metaAlertField.add(update.getGuid());
- alertUpdate = buildAlertUpdate(guid, alert.getSensorType(), metaAlertField,
- alert.getTimestamp());
- updates.put(alertUpdate, Optional.empty());
- }
-
- return updates;
- }
-
- @SuppressWarnings("unchecked")
- protected List<Map<String, Object>> getAllAlertsForMetaAlert(Document update) throws IOException {
- Document latest = indexDao.getLatest(update.getGuid(), MetaAlertDao.METAALERT_TYPE);
- if (latest == null) {
- return new ArrayList<>();
- }
- List<String> guids = new ArrayList<>();
- List<Map<String, Object>> latestAlerts = (List<Map<String, Object>>) latest.getDocument()
- .get(MetaAlertDao.ALERT_FIELD);
- for (Map<String, Object> alert : latestAlerts) {
- guids.add((String) alert.get(Constants.GUID));
- }
-
- List<Map<String, Object>> alerts = new ArrayList<>();
- QueryBuilder query = QueryBuilders.idsQuery().ids(guids);
- SearchRequestBuilder request = elasticsearchDao.getClient().prepareSearch()
- .setQuery(query);
- org.elasticsearch.action.search.SearchResponse response = request.get();
- for (SearchHit hit : response.getHits().getHits()) {
- alerts.add(hit.sourceAsMap());
- }
- return alerts;
- }
-
- /**
- * Builds an update Document for updating the meta alerts list.
- * @param alertGuid The GUID of the alert to update
- * @param sensorType The sensor type to update
- * @param metaAlertField The new metaAlertList to use
- * @return The update Document
- */
- protected Document buildAlertUpdate(String alertGuid, String sensorType,
- List<String> metaAlertField, Long timestamp) {
- Document alertUpdate;
- Map<String, Object> document = new HashMap<>();
- document.put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
- alertUpdate = new Document(
- document,
- alertGuid,
- sensorType,
- timestamp
- );
- return alertUpdate;
- }
-
- /**
- * Takes care of upserting a child alert to a meta alert.
- * @param update The update Document to be applied
- * @param hit The meta alert to be updated
- * @throws IOException If there's an issue running the upsert
- */
- protected void handleAlertUpdate(Document update, SearchHit hit) throws IOException {
- XContentBuilder builder = buildUpdatedMetaAlert(update, hit);
-
- // Run the meta alert's update
- IndexRequest indexRequest = new IndexRequest(
- METAALERTS_INDEX,
- METAALERT_DOC,
- hit.getId()
- ).source(builder);
- UpdateRequest updateRequest = new UpdateRequest(
- METAALERTS_INDEX,
- METAALERT_DOC,
- hit.getId()
- ).doc(builder).upsert(indexRequest);
- try {
- UpdateResponse updateResponse = elasticsearchDao.getClient().update(updateRequest).get();
-
- ShardInfo shardInfo = updateResponse.getShardInfo();
- int failed = shardInfo.getFailed();
- if (failed > 0) {
- throw new IOException(
- "ElasticsearchMetaAlertDao upsert failed: "
- + Arrays.toString(shardInfo.getFailures())
- );
- }
- } catch (Exception e) {
- throw new IOException(e.getMessage(), e);
- }
- }
-
@Override
public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices)
throws IOException {
@@ -595,80 +596,26 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
/**
* Calculate the meta alert scores for a Document.
- * @param document The Document containing scores
+ * @param metaAlert The Document containing scores
* @return Set of score statistics
*/
@SuppressWarnings("unchecked")
- protected MetaScores calculateMetaScores(Document document) {
- List<Object> alertsRaw = ((List<Object>) document.getDocument().get(ALERT_FIELD));
- if (alertsRaw == null || alertsRaw.isEmpty()) {
- throw new IllegalArgumentException("No alerts to use in calculation for doc GUID: "
- + document.getDocument().get(Constants.GUID));
- }
-
- ArrayList<Double> scores = new ArrayList<>();
- for (Object alertRaw : alertsRaw) {
- Map<String, Object> alert = (Map<String, Object>) alertRaw;
- Double scoreNum = parseThreatField(alert.get(threatTriageField));
- if (scoreNum != null) {
- scores.add(scoreNum);
- }
- }
-
- return new MetaScores(scores);
- }
-
- /**
- * Builds the updated meta alert based on the update.
- * @param update The update Document for the meta alert
- * @param hit The meta alert to be updated
- * @return A builder for Elasticsearch to use
- * @throws IOException If we have an issue building the result
- */
- protected XContentBuilder buildUpdatedMetaAlert(Document update, SearchHit hit)
- throws IOException {
- // Make sure to get all the threat scores while we're going through the docs
- List<Double> scores = new ArrayList<>();
- // Start building the new version of the metaalert
- XContentBuilder builder = jsonBuilder().startObject();
-
- // Run through the nested alerts of the meta alert and either use the new or old versions
- builder.startArray(ALERT_FIELD);
- Map<String, Object> hitAlerts = hit.sourceAsMap();
-
- @SuppressWarnings("unchecked")
- List<Map<String, Object>> alertHits = (List<Map<String, Object>>) hitAlerts.get(ALERT_FIELD);
- for (Map<String, Object> alertHit : alertHits) {
- Map<String, Object> docMap = alertHit;
- // If we're at the update use it instead of the original
- if (alertHit.get(Constants.GUID).equals(update.getGuid())) {
- docMap = update.getDocument();
- }
- builder.map(docMap);
-
- // Handle either String or Number values in the threatTriageField
- Object threatRaw = docMap.get(threatTriageField);
- Double threat = parseThreatField(threatRaw);
-
- if (threat != null) {
- scores.add(threat);
- }
- }
- builder.endArray();
-
- // Add all the meta alert fields, and score calculation
- Map<String, Object> updatedMeta = new HashMap<>();
- updatedMeta.putAll(hit.getSource());
- updatedMeta.putAll(new MetaScores(scores).getMetaScores());
- for (Entry<String, Object> entry : updatedMeta.entrySet()) {
- // The alerts field is being added separately, so ignore the original
- if (!(entry.getKey().equals(ALERT_FIELD))) {
- builder.field(entry.getKey(), entry.getValue());
+ protected void calculateMetaScores(Document metaAlert) {
+ MetaScores metaScores = new MetaScores(new ArrayList<>());
+ List<Object> alertsRaw = ((List<Object>) metaAlert.getDocument().get(ALERT_FIELD));
+ if (alertsRaw != null && !alertsRaw.isEmpty()) {
+ ArrayList<Double> scores = new ArrayList<>();
+ for (Object alertRaw : alertsRaw) {
+ Map<String, Object> alert = (Map<String, Object>) alertRaw;
+ Double scoreNum = parseThreatField(alert.get(threatTriageField));
+ if (scoreNum != null) {
+ scores.add(scoreNum);
+ }
}
+ metaScores = new MetaScores(scores);
}
- builder.endObject();
-
- return builder;
+ metaAlert.getDocument().putAll(metaScores.getMetaScores());
+ metaAlert.getDocument().put(threatTriageField, metaScores.getMetaScores().get(threatSort));
}
private Double parseThreatField(Object threatRaw) {
@@ -680,4 +627,12 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
}
return threat;
}
+
+ public int getPageSize() {
+ return pageSize;
+ }
+
+ public void setPageSize(int pageSize) {
+ this.pageSize = pageSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java
deleted file mode 100644
index 6c8e858..0000000
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.elasticsearch.dao;
-
-public enum MetaAlertStatus {
- ACTIVE("active"),
- INACTIVE("inactive");
-
- private String statusString;
-
- MetaAlertStatus(String statusString) {
- this.statusString = statusString;
- }
-
- public String getStatusString() {
- return statusString;
- }
-}