You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2018/07/11 01:32:28 UTC
[12/50] [abbrv] metron git commit: METRON-1421 Create a
SolrMetaAlertDao (justinleet) closes apache/metron#970
METRON-1421 Create a SolrMetaAlertDao (justinleet) closes apache/metron#970
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/49f851e0
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/49f851e0
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/49f851e0
Branch: refs/heads/feature/METRON-1554-pcap-query-panel
Commit: 49f851e0b8c2ffa1cdd7c8f169bed3dfa07cf35c
Parents: eb33666
Author: justinleet <ju...@gmail.com>
Authored: Wed May 23 10:32:34 2018 -0400
Committer: leet <le...@apache.org>
Committed: Wed May 23 10:32:34 2018 -0400
----------------------------------------------------------------------
metron-analytics/metron-profiler/.gitignore | 1 +
.../apache/metron/rest/config/IndexConfig.java | 12 +-
.../rest/service/impl/MetaAlertServiceImpl.java | 5 +-
.../rest/service/impl/SearchServiceImpl.java | 2 +-
.../rest/service/impl/UpdateServiceImpl.java | 2 +-
.../MetaAlertControllerIntegrationTest.java | 6 +-
.../UpdateControllerIntegrationTest.java | 4 +-
.../elasticsearch/dao/ElasticsearchDao.java | 65 +-
.../dao/ElasticsearchMetaAlertDao.java | 641 ++---------
...ElasticsearchMetaAlertRetrieveLatestDao.java | 44 +
.../dao/ElasticsearchMetaAlertSearchDao.java | 110 ++
.../dao/ElasticsearchMetaAlertUpdateDao.java | 219 ++++
.../dao/ElasticsearchRetrieveLatestDao.java | 151 +++
.../dao/ElasticsearchSearchDao.java | 102 --
.../dao/ElasticsearchUpdateDao.java | 10 +-
.../elasticsearch/utils/ElasticsearchUtils.java | 64 ++
.../elasticsearch/dao/ElasticsearchDaoTest.java | 70 +-
.../dao/ElasticsearchMetaAlertDaoTest.java | 164 +--
.../ElasticsearchMetaAlertIntegrationTest.java | 986 ++---------------
.../ElasticsearchSearchIntegrationTest.java | 64 +-
.../ElasticsearchUpdateIntegrationTest.java | 84 +-
.../components/ElasticSearchComponent.java | 26 +-
metron-platform/metron-indexing/README.md | 2 +-
metron-platform/metron-indexing/pom.xml | 8 +-
.../metron/indexing/dao/AccessConfig.java | 10 +
.../apache/metron/indexing/dao/IndexDao.java | 141 +--
.../metron/indexing/dao/MetaAlertDao.java | 154 ---
.../metron/indexing/dao/RetrieveLatestDao.java | 67 ++
.../metaalert/DeferredMetaAlertIndexDao.java | 42 +
.../metaalert/MetaAlertAddRemoveRequest.java | 1 -
.../indexing/dao/metaalert/MetaAlertConfig.java | 74 ++
.../dao/metaalert/MetaAlertConstants.java | 30 +
.../indexing/dao/metaalert/MetaAlertDao.java | 77 ++
.../metaalert/MetaAlertRetrieveLatestDao.java | 25 +
.../dao/metaalert/MetaAlertSearchDao.java | 35 +
.../dao/metaalert/MetaAlertUpdateDao.java | 146 +++
.../indexing/dao/metaalert/MetaScores.java | 52 +-
.../AbstractLuceneMetaAlertUpdateDao.java | 334 ++++++
.../metron/indexing/dao/search/SearchDao.java | 22 +-
.../indexing/dao/search/SearchResponse.java | 10 +-
.../metron/indexing/dao/update/PatchUtil.java | 50 +
.../metron/indexing/dao/update/UpdateDao.java | 47 +
.../metron/indexing/util/IndexingCacheUtil.java | 35 +
.../indexing/dao/InMemoryMetaAlertDao.java | 69 +-
.../indexing/dao/SearchIntegrationTest.java | 60 +-
.../indexing/dao/UpdateIntegrationTest.java | 87 +-
.../dao/metaalert/MetaAlertIntegrationTest.java | 1012 ++++++++++++++++++
.../indexing/dao/metaalert/MetaScoresTest.java | 75 ++
.../AbstractLuceneMetaAlertUpdateDaoTest.java | 854 +++++++++++++++
.../integration/IndexingIntegrationTest.java | 4 +-
metron-platform/metron-pcap-backend/.gitignore | 1 +
metron-platform/metron-solr/pom.xml | 4 +-
.../src/main/config/schema/bro/schema.xml | 3 +
.../src/main/config/schema/metaalert/schema.xml | 39 +-
.../src/main/config/schema/snort/schema.xml | 3 +
.../src/main/config/schema/yaf/schema.xml | 3 +
.../org/apache/metron/solr/dao/SolrDao.java | 37 +-
.../metron/solr/dao/SolrMetaAlertDao.java | 285 +++--
.../dao/SolrMetaAlertRetrieveLatestDao.java | 77 ++
.../metron/solr/dao/SolrMetaAlertSearchDao.java | 211 ++++
.../metron/solr/dao/SolrMetaAlertUpdateDao.java | 216 ++++
.../metron/solr/dao/SolrRetrieveLatestDao.java | 81 ++
.../apache/metron/solr/dao/SolrSearchDao.java | 127 +--
.../apache/metron/solr/dao/SolrUpdateDao.java | 51 +-
.../apache/metron/solr/dao/SolrUtilities.java | 92 ++
.../org/apache/metron/solr/dao/SolrDaoTest.java | 61 +-
.../metron/solr/dao/SolrMetaAlertDaoTest.java | 137 +++
.../metron/solr/dao/SolrSearchDaoTest.java | 176 ++-
.../metron/solr/dao/SolrUpdateDaoTest.java | 19 +-
.../metron/solr/dao/SolrUtilitiesTest.java | 48 +
.../SolrIndexingIntegrationTest.java | 5 +-
.../SolrMetaAlertIntegrationTest.java | 397 +++++++
.../integration/SolrSearchIntegrationTest.java | 59 +-
.../integration/SolrUpdateIntegrationTest.java | 87 +-
.../integration/components/SolrComponent.java | 94 +-
.../schema/SchemaValidationIntegrationTest.java | 9 +-
.../metron/solr/writer/SolrWriterTest.java | 23 +-
.../resources/config/test/conf/managed-schema | 84 +-
78 files changed, 6051 insertions(+), 2733 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-analytics/metron-profiler/.gitignore
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/.gitignore b/metron-analytics/metron-profiler/.gitignore
new file mode 100644
index 0000000..df1a13b
--- /dev/null
+++ b/metron-analytics/metron-profiler/.gitignore
@@ -0,0 +1 @@
+/logs
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
index 635d1de..c432c6c 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
@@ -20,12 +20,14 @@ package org.apache.metron.rest.config;
import static org.apache.metron.rest.MetronRestConstants.INDEX_DAO_IMPL;
import java.util.Optional;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
import org.apache.metron.hbase.HTableProvider;
import org.apache.metron.hbase.TableProvider;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.IndexDao;
import org.apache.metron.indexing.dao.IndexDaoFactory;
-import org.apache.metron.indexing.dao.MetaAlertDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertDao;
+import org.apache.metron.indexing.util.IndexingCacheUtil;
import org.apache.metron.rest.MetronRestConstants;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.service.GlobalConfigService;
@@ -34,10 +36,6 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
@Configuration
public class IndexConfig {
@@ -45,6 +43,9 @@ public class IndexConfig {
private GlobalConfigService globalConfigService;
@Autowired
+ private ConfigurationsCache cache;
+
+ @Autowired
private Environment environment;
@Autowired
@@ -72,6 +73,7 @@ public class IndexConfig {
throw new IllegalStateException("Unable to retrieve the global config.", e);
}
});
+ config.setIndexSupplier(IndexingCacheUtil.getIndexLookupFunction(cache));
config.setTableProvider(TableProvider.create(hbaseProviderImpl, () -> new HTableProvider()));
config.setKerberosEnabled(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false));
if (indexDaoImpl == null) {
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
index aafab24..3f9b3e4 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
@@ -19,16 +19,14 @@
package org.apache.metron.rest.service.impl;
import java.io.IOException;
-import java.util.Collection;
import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.MetaAlertDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertDao;
import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
import org.apache.metron.indexing.dao.search.InvalidCreateException;
import org.apache.metron.indexing.dao.search.InvalidSearchException;
-import org.apache.metron.indexing.dao.search.SearchRequest;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.service.MetaAlertService;
@@ -48,7 +46,6 @@ public class MetaAlertServiceImpl implements MetaAlertService {
this.environment = environment;
}
-
@Override
public MetaAlertCreateResponse create(MetaAlertCreateRequest createRequest) throws RestException {
try {
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
index 21d158f..82b9c11 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java
@@ -18,7 +18,7 @@
package org.apache.metron.rest.service.impl;
import static org.apache.metron.common.Constants.ERROR_TYPE;
-import static org.apache.metron.indexing.dao.MetaAlertDao.METAALERT_TYPE;
+import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_TYPE;
import static org.apache.metron.rest.MetronRestConstants.INDEX_WRITER_NAME;
import static org.apache.metron.rest.MetronRestConstants.SEARCH_FACET_FIELDS_SPRING_PROPERTY;
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/UpdateServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/UpdateServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/UpdateServiceImpl.java
index 76ac75d..6a42248 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/UpdateServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/UpdateServiceImpl.java
@@ -44,7 +44,7 @@ public class UpdateServiceImpl implements UpdateService {
@Override
public void patch(PatchRequest request) throws RestException, OriginalNotFoundException {
try {
- dao.patch(request, Optional.of(System.currentTimeMillis()));
+ dao.patch(dao, request, Optional.of(System.currentTimeMillis()));
} catch (Exception e) {
throw new RestException(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
index 3e69e37..9200fd1 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java
@@ -30,13 +30,10 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.curator.framework.CuratorFramework;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.indexing.dao.InMemoryMetaAlertDao;
-import org.apache.metron.indexing.dao.MetaAlertDao;
import org.apache.metron.indexing.dao.SearchIntegrationTest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
@@ -75,6 +72,7 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest {
private String metaalertUrl = "/api/v1/metaalert";
private String user = "user";
private String password = "password";
+ private String metaAlertIndex = "metaalert_index";
/**
{
@@ -111,7 +109,7 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest {
ImmutableMap<String, String> testData = ImmutableMap.of(
"bro_index_2017.01.01.01", SearchIntegrationTest.broData,
"snort_index_2017.01.01.01", SearchIntegrationTest.snortData,
- MetaAlertDao.METAALERTS_INDEX, metaAlertData
+ metaAlertIndex, metaAlertData
);
loadTestData(testData);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
index e8d00d3..e437325 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.indexing.dao.HBaseDao;
-import org.apache.metron.indexing.dao.MetaAlertDao;
import org.apache.metron.indexing.dao.SearchIntegrationTest;
import org.apache.metron.rest.service.UpdateService;
import org.junit.Assert;
@@ -72,6 +71,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest {
private String searchUrl = "/api/v1/search";
private String user = "user";
private String password = "password";
+ private String metaAlertIndex = "metaalert_index";
/**
{
@@ -121,7 +121,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest {
ImmutableMap<String, String> testData = ImmutableMap.of(
"bro_index_2017.01.01.01", SearchIntegrationTest.broData,
"snort_index_2017.01.01.01", SearchIntegrationTest.snortData,
- MetaAlertDao.METAALERTS_INDEX, MetaAlertControllerIntegrationTest.metaAlertData
+ metaAlertIndex, MetaAlertControllerIntegrationTest.metaAlertData
);
loadTestData(testData);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index a09086a..246de6a 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -24,8 +24,8 @@ import java.util.Map;
import java.util.Optional;
import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
import org.apache.metron.indexing.dao.AccessConfig;
-import org.apache.metron.indexing.dao.ColumnMetadataDao;
import org.apache.metron.indexing.dao.IndexDao;
+import org.apache.metron.indexing.dao.RetrieveLatestDao;
import org.apache.metron.indexing.dao.search.FieldType;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.GroupRequest;
@@ -34,6 +34,9 @@ import org.apache.metron.indexing.dao.search.InvalidSearchException;
import org.apache.metron.indexing.dao.search.SearchRequest;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.update.Document;
+import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
+import org.apache.metron.indexing.dao.update.PatchRequest;
+import org.apache.metron.indexing.dao.update.ReplaceRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.slf4j.Logger;
@@ -46,6 +49,7 @@ public class ElasticsearchDao implements IndexDao {
private transient TransportClient client;
private ElasticsearchSearchDao searchDao;
private ElasticsearchUpdateDao updateDao;
+ private ElasticsearchRetrieveLatestDao retrieveLatestDao;
/**
* Retrieves column metadata about search indices.
@@ -63,12 +67,14 @@ public class ElasticsearchDao implements IndexDao {
AccessConfig config,
ElasticsearchSearchDao searchDao,
ElasticsearchUpdateDao updateDao,
+ ElasticsearchRetrieveLatestDao retrieveLatestDao,
ElasticsearchColumnMetadataDao columnMetadataDao,
ElasticsearchRequestSubmitter requestSubmitter
- ) {
+ ) {
this.client = client;
this.searchDao = searchDao;
this.updateDao = updateDao;
+ this.retrieveLatestDao = retrieveLatestDao;
this.columnMetadataDao = columnMetadataDao;
this.requestSubmitter = requestSubmitter;
this.accessConfig = config;
@@ -78,32 +84,25 @@ public class ElasticsearchDao implements IndexDao {
//uninitialized.
}
- public ElasticsearchDao columnMetadataDao(ElasticsearchColumnMetadataDao columnMetadataDao) {
- this.columnMetadataDao = columnMetadataDao;
- return this;
- }
-
- public ElasticsearchDao accessConfig(AccessConfig accessConfig) {
- this.accessConfig = accessConfig;
- return this;
- }
-
@Override
public synchronized void init(AccessConfig config) {
- if(this.client == null) {
- this.client = ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get());
+ if (this.client == null) {
+ this.client = ElasticsearchUtils
+ .getClient(config.getGlobalConfigSupplier().get());
this.accessConfig = config;
this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client.admin());
this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client);
- this.searchDao = new ElasticsearchSearchDao(client, accessConfig, columnMetadataDao, requestSubmitter);
- this.updateDao = new ElasticsearchUpdateDao(client, accessConfig, searchDao);
+ this.searchDao = new ElasticsearchSearchDao(client, accessConfig, columnMetadataDao,
+ requestSubmitter);
+ this.retrieveLatestDao = new ElasticsearchRetrieveLatestDao(client);
+ this.updateDao = new ElasticsearchUpdateDao(client, accessConfig, retrieveLatestDao);
}
- if(columnMetadataDao == null) {
+ if (columnMetadataDao == null) {
throw new IllegalArgumentException("No ColumnMetadataDao available");
}
- if(requestSubmitter == null) {
+ if (requestSubmitter == null) {
throw new IllegalArgumentException("No ElasticsearchRequestSubmitter available");
}
}
@@ -119,14 +118,14 @@ public class ElasticsearchDao implements IndexDao {
}
@Override
- public Document getLatest(final String guid, final String sensorType) throws IOException {
- return searchDao.getLatest(guid, sensorType);
+ public Document getLatest(final String guid, final String sensorType) {
+ return retrieveLatestDao.getLatest(guid, sensorType);
}
@Override
public Iterable<Document> getAllLatest(
- final List<GetRequest> getRequests) throws IOException {
- return searchDao.getAllLatest(getRequests);
+ final List<GetRequest> getRequests) {
+ return retrieveLatestDao.getAllLatest(getRequests);
}
@Override
@@ -140,19 +139,37 @@ public class ElasticsearchDao implements IndexDao {
}
@Override
+ public void patch(RetrieveLatestDao retrieveLatestDao, PatchRequest request, Optional<Long> timestamp)
+ throws OriginalNotFoundException, IOException {
+ updateDao.patch(retrieveLatestDao, request, timestamp);
+ }
+
+ @Override
+ public void replace(ReplaceRequest request, Optional<Long> timestamp) throws IOException {
+ updateDao.replace(request, timestamp);
+ }
+
+ @Override
public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException {
return this.columnMetadataDao.getColumnMetadata(indices);
}
+ @Override
+ public Optional<Map<String, Object>> getLatestResult(GetRequest request) throws IOException {
+ return retrieveLatestDao.getLatestResult(request);
+ }
+
protected Optional<String> getIndexName(String guid, String sensorType) {
return updateDao.getIndexName(guid, sensorType);
}
- protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder) throws InvalidSearchException {
+ protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder)
+ throws InvalidSearchException {
return searchDao.search(request, queryBuilder);
}
- protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder) throws InvalidSearchException {
+ protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder)
+ throws InvalidSearchException {
return searchDao.group(groupRequest, queryBuilder);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
index 2311a2b..faec939 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
@@ -18,29 +18,21 @@
package org.apache.metron.elasticsearch.dao;
-import static org.apache.metron.common.Constants.GUID;
-import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
-import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery;
-import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
-import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
-import static org.elasticsearch.index.query.QueryBuilders.termQuery;
-
-import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
-import org.apache.commons.collections4.SetUtils;
-import org.apache.lucene.search.join.ScoreMode;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import org.apache.metron.common.Constants;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.MetaAlertDao;
import org.apache.metron.indexing.dao.MultiIndexDao;
+import org.apache.metron.indexing.dao.RetrieveLatestDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertDao;
import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
-import org.apache.metron.indexing.dao.metaalert.MetaScores;
import org.apache.metron.indexing.dao.search.FieldType;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.GroupRequest;
@@ -49,59 +41,36 @@ import org.apache.metron.indexing.dao.search.InvalidCreateException;
import org.apache.metron.indexing.dao.search.InvalidSearchException;
import org.apache.metron.indexing.dao.search.SearchRequest;
import org.apache.metron.indexing.dao.search.SearchResponse;
-import org.apache.metron.indexing.dao.search.SearchResult;
import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.get.MultiGetItemResponse;
-import org.elasticsearch.action.get.MultiGetRequest.Item;
-import org.elasticsearch.action.get.MultiGetRequestBuilder;
-import org.elasticsearch.action.get.MultiGetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.action.update.UpdateResponse;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.index.query.InnerHitBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.QueryStringQueryBuilder;
-import org.elasticsearch.search.SearchHit;
import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
import org.apache.metron.indexing.dao.update.PatchRequest;
-import org.apache.metron.stellar.common.utils.ConversionUtils;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.QueryStringQueryBuilder;
public class ElasticsearchMetaAlertDao implements MetaAlertDao {
- public static final String SOURCE_TYPE = Constants.SENSOR_TYPE.replace('.', ':');
- private static final String STATUS_PATH = "/status";
- private static final String ALERT_PATH = "/alert";
+ public static final String THREAT_TRIAGE_FIELD = MetaAlertConstants.THREAT_FIELD_DEFAULT
+ .replace('.', ':');
+ public static final String METAALERTS_INDEX = "metaalert_index";
+
+ public static final String SOURCE_TYPE_FIELD = Constants.SENSOR_TYPE.replace('.', ':');
+ protected String metaAlertsIndex = METAALERTS_INDEX;
+ protected String threatTriageField = THREAT_TRIAGE_FIELD;
+ protected String threatSort = MetaAlertConstants.THREAT_SORT_DEFAULT;
- private IndexDao indexDao;
private ElasticsearchDao elasticsearchDao;
- private String index = METAALERTS_INDEX;
- private String threatTriageField = THREAT_FIELD_DEFAULT;
+ private IndexDao indexDao;
+ private ElasticsearchMetaAlertSearchDao metaAlertSearchDao;
+ private ElasticsearchMetaAlertUpdateDao metaAlertUpdateDao;
+ private ElasticsearchMetaAlertRetrieveLatestDao metaAlertRetrieveLatestDao;
- /**
- * Defines which summary aggregation is used to represent the overall threat triage score for
- * the metaalert. The summary aggregation is applied to the threat triage score of all child alerts.
- *
- * This overall score is primarily used for sorting; hence it is called the 'threatSort'. This
- * can be either max, min, average, count, median, or sum.
- */
- private String threatSort = THREAT_SORT_DEFAULT;
- private int pageSize = 500;
+ protected int pageSize = 500;
/**
* Wraps an {@link org.apache.metron.indexing.dao.IndexDao} to handle meta alerts.
* @param indexDao The Dao to wrap
*/
public ElasticsearchMetaAlertDao(IndexDao indexDao) {
- this(indexDao, METAALERTS_INDEX, THREAT_FIELD_DEFAULT, THREAT_SORT_DEFAULT);
+ this(indexDao, METAALERTS_INDEX, MetaAlertConstants.THREAT_FIELD_DEFAULT,
+ MetaAlertConstants.THREAT_SORT_DEFAULT);
}
/**
@@ -112,10 +81,13 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
* as the overall threat triage score for the metaalert. This
* can be either max, min, average, count, median, or sum.
*/
- public ElasticsearchMetaAlertDao(IndexDao indexDao, String index, String triageLevelField, String threatSort) {
+ public ElasticsearchMetaAlertDao(IndexDao indexDao, String metaAlertsIndex,
+ String triageLevelField,
+ String threatSort) {
init(indexDao, Optional.of(threatSort));
- this.index = index;
this.threatTriageField = triageLevelField;
+ this.threatSort = threatSort;
+ this.metaAlertsIndex = metaAlertsIndex;
}
public ElasticsearchMetaAlertDao() {
@@ -123,8 +95,10 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
}
/**
- * Initializes this implementation by setting the supplied IndexDao and also setting a separate ElasticsearchDao.
- * This is needed for some specific Elasticsearch functions (looking up an index from a GUID for example).
+ * Initializes this implementation by setting the supplied IndexDao and also setting a separate
+ * ElasticsearchDao.
+ * This is needed for some specific Elasticsearch functions (looking up an index from a GUID for
+ * example).
* @param indexDao The DAO to wrap for our queries
* @param threatSort The summary aggregation of the child threat triage scores used
* as the overall threat triage score for the metaalert. This
@@ -152,6 +126,24 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
if (threatSort.isPresent()) {
this.threatSort = threatSort.get();
}
+
+ MetaAlertConfig config = new MetaAlertConfig(
+ metaAlertsIndex,
+ threatTriageField,
+ this.threatSort,
+ ElasticsearchMetaAlertDao.SOURCE_TYPE_FIELD
+ );
+
+ this.metaAlertSearchDao = new ElasticsearchMetaAlertSearchDao(
+ elasticsearchDao,
+ config,
+ pageSize);
+ this.metaAlertRetrieveLatestDao = new ElasticsearchMetaAlertRetrieveLatestDao(indexDao);
+ this.metaAlertUpdateDao = new ElasticsearchMetaAlertUpdateDao(
+ elasticsearchDao,
+ metaAlertRetrieveLatestDao,
+ config,
+ pageSize);
}
@Override
@@ -160,551 +152,74 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao {
}
@Override
- public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException {
- if (guid == null || guid.trim().isEmpty()) {
- throw new InvalidSearchException("Guid cannot be empty");
- }
- // Searches for all alerts containing the meta alert guid in it's "metalerts" array
- QueryBuilder qb = boolQuery()
- .must(
- nestedQuery(
- ALERT_FIELD,
- boolQuery()
- .must(termQuery(ALERT_FIELD + "." + GUID, guid)),
- ScoreMode.None
- ).innerHit(new InnerHitBuilder())
- )
- .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
- return queryAllResults(qb);
+ public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException {
+ return indexDao.getColumnMetadata(indices);
}
@Override
- @SuppressWarnings("unchecked")
- public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request)
- throws InvalidCreateException, IOException {
- List<GetRequest> alertRequests = request.getAlerts();
- if (request.getAlerts().isEmpty()) {
- throw new InvalidCreateException("MetaAlertCreateRequest must contain alerts");
- }
- if (request.getGroups().isEmpty()) {
- throw new InvalidCreateException("MetaAlertCreateRequest must contain UI groups");
- }
-
- // Retrieve the documents going into the meta alert and build it
- Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
-
- Document metaAlert = buildCreateDocument(alerts, request.getGroups());
- calculateMetaScores(metaAlert);
- // Add source type to be consistent with other sources and allow filtering
- metaAlert.getDocument().put(SOURCE_TYPE, MetaAlertDao.METAALERT_TYPE);
-
- // Start a list of updates / inserts we need to run
- Map<Document, Optional<String>> updates = new HashMap<>();
- updates.put(metaAlert, Optional.of(MetaAlertDao.METAALERTS_INDEX));
-
- try {
- // We need to update the associated alerts with the new meta alerts, making sure existing
- // links are maintained.
- Map<String, Optional<String>> guidToIndices = alertRequests.stream().collect(Collectors.toMap(
- GetRequest::getGuid, GetRequest::getIndex));
- Map<String, String> guidToSensorTypes = alertRequests.stream().collect(Collectors.toMap(
- GetRequest::getGuid, GetRequest::getSensorType));
- for (Document alert: alerts) {
- if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
- // Use the index in the request if it exists
- Optional<String> index = guidToIndices.get(alert.getGuid());
- if (!index.isPresent()) {
- // Look up the index from Elasticsearch if one is not supplied in the request
- index = elasticsearchDao.getIndexName(alert.getGuid(), guidToSensorTypes.get(alert.getGuid()));
- if (!index.isPresent()) {
- throw new IllegalArgumentException("Could not find index for " + alert.getGuid());
- }
- }
- updates.put(alert, index);
- }
- }
-
- // Kick off any updates.
- indexDaoUpdate(updates);
-
- MetaAlertCreateResponse createResponse = new MetaAlertCreateResponse();
- createResponse.setCreated(true);
- createResponse.setGuid(metaAlert.getGuid());
- return createResponse;
- } catch (IOException ioe) {
- throw new InvalidCreateException("Unable to create meta alert", ioe);
- }
+ public Document getLatest(String guid, String sensorType) throws IOException {
+ return indexDao.getLatest(guid, sensorType);
}
@Override
- public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
- throws IOException {
- Map<Document, Optional<String>> updates = new HashMap<>();
- Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
- if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) {
- Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
- boolean metaAlertUpdated = addAlertsToMetaAlert(metaAlert, alerts);
- if (metaAlertUpdated) {
- calculateMetaScores(metaAlert);
- updates.put(metaAlert, Optional.of(index));
- for(Document alert: alerts) {
- if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
- updates.put(alert, Optional.empty());
- }
- }
- indexDaoUpdate(updates);
- }
- return metaAlertUpdated;
- } else {
- throw new IllegalStateException("Adding alerts to an INACTIVE meta alert is not allowed");
- }
+ public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException {
+ return indexDao.getAllLatest(getRequests);
}
- protected boolean addAlertsToMetaAlert(Document metaAlert, Iterable<Document> alerts) {
- boolean alertAdded = false;
- List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD);
- Set<String> currentAlertGuids = currentAlerts.stream().map(currentAlert ->
- (String) currentAlert.get(GUID)).collect(Collectors.toSet());
- for (Document alert: alerts) {
- String alertGuid = alert.getGuid();
- // Only add an alert if it isn't already in the meta alert
- if (!currentAlertGuids.contains(alertGuid)) {
- currentAlerts.add(alert.getDocument());
- alertAdded = true;
- }
- }
- return alertAdded;
+ @Override
+ public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException {
+ return metaAlertSearchDao.getAllMetaAlertsForAlert(guid);
}
- protected boolean addMetaAlertToAlert(String metaAlertGuid, Document alert) {
- List<String> metaAlertField = new ArrayList<>();
- List<String> alertField = (List<String>) alert.getDocument()
- .get(MetaAlertDao.METAALERT_FIELD);
- if (alertField != null) {
- metaAlertField.addAll(alertField);
- }
- boolean metaAlertAdded = !metaAlertField.contains(metaAlertGuid);
- if (metaAlertAdded) {
- metaAlertField.add(metaAlertGuid);
- alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
- }
- return metaAlertAdded;
+ @Override
+ public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request)
+ throws InvalidCreateException, IOException {
+ return metaAlertUpdateDao.createMetaAlert(request);
}
@Override
- public boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
+ public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
throws IOException {
- Map<Document, Optional<String>> updates = new HashMap<>();
- Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
- if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) {
- Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
- Collection<String> alertGuids = alertRequests.stream().map(GetRequest::getGuid).collect(
- Collectors.toList());
- boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, alertGuids);
- if (metaAlertUpdated) {
- calculateMetaScores(metaAlert);
- updates.put(metaAlert, Optional.of(index));
- for(Document alert: alerts) {
- if (removeMetaAlertFromAlert(metaAlert.getGuid(), alert)) {
- updates.put(alert, Optional.empty());
- }
- }
- indexDaoUpdate(updates);
- }
- return metaAlertUpdated;
- } else {
- throw new IllegalStateException("Removing alerts from an INACTIVE meta alert is not allowed");
- }
-
+ return metaAlertUpdateDao.addAlertsToMetaAlert(metaAlertGuid, alertRequests);
}
- protected boolean removeAlertsFromMetaAlert(Document metaAlert, Collection<String> alertGuids) {
- List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD);
- int previousSize = currentAlerts.size();
- // Only remove an alert if it is in the meta alert
- currentAlerts.removeIf(currentAlert -> alertGuids.contains((String) currentAlert.get(GUID)));
- return currentAlerts.size() != previousSize;
- }
-
- protected boolean removeMetaAlertFromAlert(String metaAlertGuid, Document alert) {
- List<String> metaAlertField = new ArrayList<>();
- List<String> alertField = (List<String>) alert.getDocument()
- .get(MetaAlertDao.METAALERT_FIELD);
- if (alertField != null) {
- metaAlertField.addAll(alertField);
- }
- boolean metaAlertRemoved = metaAlertField.remove(metaAlertGuid);
- if (metaAlertRemoved) {
- alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
- }
- return metaAlertRemoved;
+ @Override
+ public boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
+ throws IOException {
+ return metaAlertUpdateDao.removeAlertsFromMetaAlert(metaAlertGuid, alertRequests);
}
@Override
public boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status)
throws IOException {
- Map<Document, Optional<String>> updates = new HashMap<>();
- Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
- String currentStatus = (String) metaAlert.getDocument().get(MetaAlertDao.STATUS_FIELD);
- boolean metaAlertUpdated = !status.getStatusString().equals(currentStatus);
- if (metaAlertUpdated) {
- metaAlert.getDocument().put(MetaAlertDao.STATUS_FIELD, status.getStatusString());
- updates.put(metaAlert, Optional.of(index));
- List<GetRequest> getRequests = new ArrayList<>();
- List<Map<String, Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument()
- .get(MetaAlertDao.ALERT_FIELD);
- currentAlerts.stream().forEach(currentAlert -> {
- getRequests.add(new GetRequest((String) currentAlert.get(GUID), (String) currentAlert.get(SOURCE_TYPE)));
- });
- Iterable<Document> alerts = indexDao.getAllLatest(getRequests);
- for (Document alert : alerts) {
- boolean metaAlertAdded = false;
- boolean metaAlertRemoved = false;
- // If we're making it active add add the meta alert guid for every alert.
- if (MetaAlertStatus.ACTIVE.equals(status)) {
- metaAlertAdded = addMetaAlertToAlert(metaAlert.getGuid(), alert);
- }
- // If we're making it inactive, remove the meta alert guid from every alert.
- if (MetaAlertStatus.INACTIVE.equals(status)) {
- metaAlertRemoved = removeMetaAlertFromAlert(metaAlert.getGuid(), alert);
- }
- if (metaAlertAdded || metaAlertRemoved) {
- updates.put(alert, Optional.empty());
- }
- }
- }
- if (metaAlertUpdated) {
- indexDaoUpdate(updates);
- }
- return metaAlertUpdated;
+ return metaAlertUpdateDao.updateMetaAlertStatus(metaAlertGuid, status);
}
@Override
public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
- // Wrap the query to also get any meta-alerts.
- QueryBuilder qb = constantScoreQuery(boolQuery()
- .must(boolQuery()
- .should(new QueryStringQueryBuilder(searchRequest.getQuery()))
- .should(nestedQuery(
- ALERT_FIELD,
- new QueryStringQueryBuilder(searchRequest.getQuery()),
- ScoreMode.None
- )
- )
- )
- // Ensures that it's a meta alert with active status or that it's an alert (signified by
- // having no status field)
- .must(boolQuery()
- .should(termQuery(MetaAlertDao.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()))
- .should(boolQuery().mustNot(existsQuery(MetaAlertDao.STATUS_FIELD)))
- )
- .mustNot(existsQuery(MetaAlertDao.METAALERT_FIELD))
- );
- return elasticsearchDao.search(searchRequest, qb);
+ return metaAlertSearchDao.search(searchRequest);
}
@Override
- public Document getLatest(String guid, String sensorType) throws IOException {
- return indexDao.getLatest(guid, sensorType);
- }
-
- @Override
- public Iterable<Document> getAllLatest(
- List<GetRequest> getRequests) throws IOException {
- return indexDao.getAllLatest(getRequests);
+ public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException {
+ return metaAlertSearchDao.group(groupRequest);
}
@Override
public void update(Document update, Optional<String> index) throws IOException {
- if (METAALERT_TYPE.equals(update.getSensorType())) {
- // We've been passed an update to the meta alert.
- throw new UnsupportedOperationException("Meta alerts cannot be directly updated");
- } else {
- Map<Document, Optional<String>> updates = new HashMap<>();
- updates.put(update, index);
- // We need to update an alert itself. Only that portion of the update can be delegated.
- // We still need to get meta alerts potentially associated with it and update.
- Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults().stream()
- .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(), METAALERT_TYPE, 0L))
- .collect(Collectors.toList());
- // Each meta alert needs to be updated with the new alert
- for (Document metaAlert : metaAlerts) {
- replaceAlertInMetaAlert(metaAlert, update);
- updates.put(metaAlert, Optional.of(METAALERTS_INDEX));
- }
-
- // Run the alert's update
- indexDao.batchUpdate(updates);
- }
- }
-
- protected boolean replaceAlertInMetaAlert(Document metaAlert, Document alert) {
- boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, Collections.singleton(alert.getGuid()));
- if (metaAlertUpdated) {
- addAlertsToMetaAlert(metaAlert, Collections.singleton(alert));
- }
- return metaAlertUpdated;
+ metaAlertUpdateDao.update(update, index);
}
@Override
- public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException {
- throw new UnsupportedOperationException("Meta alerts do not allow for bulk updates");
+ public void batchUpdate(Map<Document, Optional<String>> updates) {
+ metaAlertUpdateDao.batchUpdate(updates);
}
- /**
- * Does not allow patches on the "alerts" or "status" fields. These fields must be updated with their
- * dedicated methods.
- *
- * @param request The patch request
- * @param timestamp Optionally a timestamp to set. If not specified then current time is used.
- * @throws OriginalNotFoundException
- * @throws IOException
- */
@Override
- public void patch(PatchRequest request, Optional<Long> timestamp)
+ public void patch(RetrieveLatestDao retrieveLatestDao, PatchRequest request,
+ Optional<Long> timestamp)
throws OriginalNotFoundException, IOException {
- if (isPatchAllowed(request)) {
- Document d = getPatchedDocument(request, timestamp);
- indexDao.update(d, Optional.ofNullable(request.getIndex()));
- } else {
- throw new IllegalArgumentException("Meta alert patches are not allowed for /alert or /status paths. "
- + "Please use the add/remove alert or update status functions instead.");
- }
- }
-
- protected boolean isPatchAllowed(PatchRequest request) {
- if(request.getPatch() != null && !request.getPatch().isEmpty()) {
- for(Map<String, Object> patch : request.getPatch()) {
- Object pathObj = patch.get("path");
- if(pathObj != null && pathObj instanceof String) {
- String path = (String)pathObj;
- if (STATUS_PATH.equals(path) || ALERT_PATH.equals(path)) {
- return false;
- }
- }
- }
- }
- return true;
- }
-
- /**
- * Given an alert GUID, retrieve all associated meta alerts.
- * @param alertGuid The GUID of the child alert
- * @return The Elasticsearch response containing the meta alerts
- */
- protected SearchResponse getMetaAlertsForAlert(String alertGuid) {
- QueryBuilder qb = boolQuery()
- .must(
- nestedQuery(
- ALERT_FIELD,
- boolQuery()
- .must(termQuery(ALERT_FIELD + "." + Constants.GUID, alertGuid)),
- ScoreMode.None
- ).innerHit(new InnerHitBuilder())
- )
- .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
- return queryAllResults(qb);
- }
-
- /**
- * Elasticsearch queries default to 10 records returned. Some internal queries require that all
- * results are returned. Rather than setting an arbitrarily high size, this method pages through results
- * and returns them all in a single SearchResponse.
- * @param qb
- * @return
- */
- protected SearchResponse queryAllResults(QueryBuilder qb) {
- SearchRequestBuilder searchRequestBuilder = elasticsearchDao
- .getClient()
- .prepareSearch(index)
- .addStoredField("*")
- .setFetchSource(true)
- .setQuery(qb)
- .setSize(pageSize);
- org.elasticsearch.action.search.SearchResponse esResponse = searchRequestBuilder
- .execute()
- .actionGet();
- List<SearchResult> allResults = getSearchResults(esResponse);
- long total = esResponse.getHits().getTotalHits();
- if (total > pageSize) {
- int pages = (int) (total / pageSize) + 1;
- for (int i = 1; i < pages; i++) {
- int from = i * pageSize;
- searchRequestBuilder.setFrom(from);
- esResponse = searchRequestBuilder
- .execute()
- .actionGet();
- allResults.addAll(getSearchResults(esResponse));
- }
- }
- SearchResponse searchResponse = new SearchResponse();
- searchResponse.setTotal(total);
- searchResponse.setResults(allResults);
- return searchResponse;
- }
-
- /**
- * Transforms a list of Elasticsearch SearchHits to a list of SearchResults
- * @param searchResponse
- * @return
- */
- protected List<SearchResult> getSearchResults(org.elasticsearch.action.search.SearchResponse searchResponse) {
- return Arrays.stream(searchResponse.getHits().getHits()).map(searchHit -> {
- SearchResult searchResult = new SearchResult();
- searchResult.setId(searchHit.getId());
- searchResult.setSource(searchHit.getSource());
- searchResult.setScore(searchHit.getScore());
- searchResult.setIndex(searchHit.getIndex());
- return searchResult;
- }
- ).collect(Collectors.toList());
- }
-
- /**
- * Build the Document representing a meta alert to be created.
- * @param alerts The Elasticsearch results for the meta alerts child documents
- * @param groups The groups used to create this meta alert
- * @return A Document representing the new meta alert
- */
- protected Document buildCreateDocument(Iterable<Document> alerts, List<String> groups) {
- // Need to create a Document from the multiget. Scores will be calculated later
- Map<String, Object> metaSource = new HashMap<>();
- List<Map<String, Object>> alertList = new ArrayList<>();
- for (Document alert: alerts) {
- alertList.add(alert.getDocument());
- }
- metaSource.put(ALERT_FIELD, alertList);
-
- // Add any meta fields
- String guid = UUID.randomUUID().toString();
- metaSource.put(GUID, guid);
- metaSource.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis());
- metaSource.put(GROUPS_FIELD, groups);
- metaSource.put(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString());
-
- return new Document(metaSource, guid, METAALERT_TYPE, System.currentTimeMillis());
- }
-
- /**
- * Calls the single update variant if there's only one update, otherwise calls batch.
- * @param updates The list of updates to run
- * @throws IOException If there's an update error
- */
- protected void indexDaoUpdate(Map<Document, Optional<String>> updates) throws IOException {
- if (updates.size() == 1) {
- Entry<Document, Optional<String>> singleUpdate = updates.entrySet().iterator().next();
- indexDao.update(singleUpdate.getKey(), singleUpdate.getValue());
- } else if (updates.size() > 1) {
- indexDao.batchUpdate(updates);
- } // else we have no updates, so don't do anything
- }
-
-
-
- @SuppressWarnings("unchecked")
- protected List<Map<String, Object>> getAllAlertsForMetaAlert(Document update) throws IOException {
- Document latest = indexDao.getLatest(update.getGuid(), MetaAlertDao.METAALERT_TYPE);
- if (latest == null) {
- return new ArrayList<>();
- }
- List<String> guids = new ArrayList<>();
- List<Map<String, Object>> latestAlerts = (List<Map<String, Object>>) latest.getDocument()
- .get(MetaAlertDao.ALERT_FIELD);
- for (Map<String, Object> alert : latestAlerts) {
- guids.add((String) alert.get(Constants.GUID));
- }
-
- List<Map<String, Object>> alerts = new ArrayList<>();
- QueryBuilder query = QueryBuilders.idsQuery().addIds(guids.toArray(new String[0]));
- SearchRequestBuilder request = elasticsearchDao.getClient().prepareSearch()
- .setQuery(query);
- org.elasticsearch.action.search.SearchResponse response = request.get();
- for (SearchHit hit : response.getHits().getHits()) {
- alerts.add(hit.sourceAsMap());
- }
- return alerts;
- }
-
- /**
- * Builds an update Document for updating the meta alerts list.
- * @param alertGuid The GUID of the alert to update
- * @param sensorType The sensor type to update
- * @param metaAlertField The new metaAlertList to use
- * @return The update Document
- */
- protected Document buildAlertUpdate(String alertGuid, String sensorType,
- List<String> metaAlertField, Long timestamp) {
- Document alertUpdate;
- Map<String, Object> document = new HashMap<>();
- document.put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
- alertUpdate = new Document(
- document,
- alertGuid,
- sensorType,
- timestamp
- );
- return alertUpdate;
- }
-
-
- @Override
- public Map<String, FieldType> getColumnMetadata(List<String> indices)
- throws IOException {
- return indexDao.getColumnMetadata(indices);
- }
-
- @Override
- public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException {
- // Wrap the query to hide any alerts already contained in meta alerts
- QueryBuilder qb = QueryBuilders.boolQuery()
- .must(new QueryStringQueryBuilder(groupRequest.getQuery()))
- .mustNot(existsQuery(MetaAlertDao.METAALERT_FIELD));
- return elasticsearchDao.group(groupRequest, qb);
- }
-
- /**
- * Calculate the meta alert scores for a Document.
- * @param metaAlert The Document containing scores
- * @return Set of score statistics
- */
- @SuppressWarnings("unchecked")
- protected void calculateMetaScores(Document metaAlert) {
- MetaScores metaScores = new MetaScores(new ArrayList<>());
- List<Object> alertsRaw = ((List<Object>) metaAlert.getDocument().get(ALERT_FIELD));
- if (alertsRaw != null && !alertsRaw.isEmpty()) {
- ArrayList<Double> scores = new ArrayList<>();
- for (Object alertRaw : alertsRaw) {
- Map<String, Object> alert = (Map<String, Object>) alertRaw;
- Double scoreNum = parseThreatField(alert.get(threatTriageField));
- if (scoreNum != null) {
- scores.add(scoreNum);
- }
- }
- metaScores = new MetaScores(scores);
- }
-
- // add a summary (max, min, avg, ...) of all the threat scores from the child alerts
- metaAlert.getDocument().putAll(metaScores.getMetaScores());
-
- // add the overall threat score for the metaalert; one of the summary aggregations as defined by `threatSort`
- Object threatScore = metaScores.getMetaScores().get(threatSort);
-
- // add the threat score as a float; type needs to match the threat score field from each of the sensor indices
- metaAlert.getDocument().put(threatTriageField, ConversionUtils.convert(threatScore, Float.class));
- }
-
- private Double parseThreatField(Object threatRaw) {
- Double threat = null;
- if (threatRaw instanceof Number) {
- threat = ((Number) threatRaw).doubleValue();
- } else if (threatRaw instanceof String) {
- threat = Double.parseDouble((String) threatRaw);
- }
- return threat;
- }
-
- public int getPageSize() {
- return pageSize;
+ metaAlertUpdateDao.patch(retrieveLatestDao, request, timestamp);
}
public void setPageSize(int pageSize) {
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertRetrieveLatestDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertRetrieveLatestDao.java
new file mode 100644
index 0000000..8aa55d6
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertRetrieveLatestDao.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.metron.indexing.dao.RetrieveLatestDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertRetrieveLatestDao;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.update.Document;
+
+public class ElasticsearchMetaAlertRetrieveLatestDao implements MetaAlertRetrieveLatestDao {
+ private RetrieveLatestDao retrieveLatestDao;
+
+ public ElasticsearchMetaAlertRetrieveLatestDao(RetrieveLatestDao retrieveLatestDao) {
+ this.retrieveLatestDao = retrieveLatestDao;
+ }
+
+ @Override
+ public Document getLatest(String guid, String sensorType) throws IOException {
+ return retrieveLatestDao.getLatest(guid, sensorType);
+ }
+
+ @Override
+ public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException {
+ return retrieveLatestDao.getAllLatest(getRequests);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
new file mode 100644
index 0000000..00fc9d0
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import static org.apache.metron.common.Constants.GUID;
+import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.queryAllResults;
+import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
+import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery;
+import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
+import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
+import static org.elasticsearch.index.query.QueryBuilders.termQuery;
+
+import org.apache.lucene.search.join.ScoreMode;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertSearchDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.elasticsearch.index.query.InnerHitBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+
+public class ElasticsearchMetaAlertSearchDao implements MetaAlertSearchDao {
+
+ protected ElasticsearchDao elasticsearchDao;
+ private MetaAlertConfig config;
+ private int pageSize;
+
+ public ElasticsearchMetaAlertSearchDao(ElasticsearchDao elasticsearchDao,
+ MetaAlertConfig config, int pageSize) {
+ this.elasticsearchDao = elasticsearchDao;
+ this.config = config;
+ this.pageSize = pageSize;
+ }
+
+ @Override
+ public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
+ // Wrap the query to also get any meta-alerts.
+ QueryBuilder qb = constantScoreQuery(boolQuery()
+ .must(boolQuery()
+ .should(new QueryStringQueryBuilder(searchRequest.getQuery()))
+ .should(nestedQuery(
+ MetaAlertConstants.ALERT_FIELD,
+ new QueryStringQueryBuilder(searchRequest.getQuery()),
+ ScoreMode.None
+ )
+ )
+ )
+ // Ensures that it's a meta alert with active status or that it's an alert (signified by
+ // having no status field)
+ .must(boolQuery()
+ .should(termQuery(MetaAlertConstants.STATUS_FIELD,
+ MetaAlertStatus.ACTIVE.getStatusString()))
+ .should(boolQuery().mustNot(existsQuery(MetaAlertConstants.STATUS_FIELD)))
+ )
+ .mustNot(existsQuery(MetaAlertConstants.METAALERT_FIELD))
+ );
+ return elasticsearchDao.search(searchRequest, qb);
+ }
+
+ @Override
+ public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException {
+ // Wrap the query to hide any alerts already contained in meta alerts
+ QueryBuilder qb = QueryBuilders.boolQuery()
+ .must(new QueryStringQueryBuilder(groupRequest.getQuery()))
+ .mustNot(existsQuery(MetaAlertConstants.METAALERT_FIELD));
+ return elasticsearchDao.group(groupRequest, qb);
+ }
+
+ @Override
+ public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException {
+ if (guid == null || guid.trim().isEmpty()) {
+ throw new InvalidSearchException("Guid cannot be empty");
+ }
+ // Searches for all alerts containing the meta alert guid in it's "metalerts" array
+ QueryBuilder qb = boolQuery()
+ .must(
+ nestedQuery(
+ MetaAlertConstants.ALERT_FIELD,
+ boolQuery()
+ .must(termQuery(MetaAlertConstants.ALERT_FIELD + "." + GUID, guid)),
+ ScoreMode.None
+ ).innerHit(new InnerHitBuilder())
+ )
+ .must(termQuery(MetaAlertConstants.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
+ return queryAllResults(elasticsearchDao.getClient(), qb, config.getMetaAlertIndex(),
+ pageSize);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
new file mode 100644
index 0000000..6c709a6
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
+import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
+import static org.elasticsearch.index.query.QueryBuilders.termQuery;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.lucene.search.join.ScoreMode;
+import org.apache.metron.common.Constants;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertRetrieveLatestDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
+import org.apache.metron.indexing.dao.metaalert.MetaScores;
+import org.apache.metron.indexing.dao.metaalert.lucene.AbstractLuceneMetaAlertUpdateDao;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.search.InvalidCreateException;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.index.query.InnerHitBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+
+public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpdateDao {
+
+ private ElasticsearchDao elasticsearchDao;
+ private MetaAlertRetrieveLatestDao retrieveLatestDao;
+ private int pageSize;
+
+ /**
+ * Constructor an ElasticsearchMetaAlertUpdateDao
+ * @param elasticsearchDao An UpdateDao to defer queries to.
+ * @param retrieveLatestDao A RetrieveLatestDao for getting the current state of items being
+ * mutated.
+ * @param config The meta alert config to use.
+ */
+ public ElasticsearchMetaAlertUpdateDao(
+ ElasticsearchDao elasticsearchDao,
+ MetaAlertRetrieveLatestDao retrieveLatestDao,
+ MetaAlertConfig config,
+ int pageSize
+ ) {
+ super(elasticsearchDao, retrieveLatestDao, config);
+ this.elasticsearchDao = elasticsearchDao;
+ this.retrieveLatestDao = retrieveLatestDao;
+ this.pageSize = pageSize;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request)
+ throws InvalidCreateException, IOException {
+ List<GetRequest> alertRequests = request.getAlerts();
+ if (request.getAlerts().isEmpty()) {
+ throw new InvalidCreateException("MetaAlertCreateRequest must contain alerts");
+ }
+ if (request.getGroups().isEmpty()) {
+ throw new InvalidCreateException("MetaAlertCreateRequest must contain UI groups");
+ }
+
+ // Retrieve the documents going into the meta alert and build it
+ Iterable<Document> alerts = retrieveLatestDao.getAllLatest(alertRequests);
+
+ Document metaAlert = buildCreateDocument(alerts, request.getGroups(),
+ MetaAlertConstants.ALERT_FIELD);
+ MetaScores
+ .calculateMetaScores(metaAlert, getConfig().getThreatTriageField(),
+ getConfig().getThreatSort());
+ // Add source type to be consistent with other sources and allow filtering
+ metaAlert.getDocument()
+ .put(ElasticsearchMetaAlertDao.SOURCE_TYPE_FIELD, MetaAlertConstants.METAALERT_TYPE);
+
+ // Start a list of updates / inserts we need to run
+ Map<Document, Optional<String>> updates = new HashMap<>();
+ updates.put(metaAlert, Optional.of(getConfig().getMetaAlertIndex()));
+
+ try {
+ // We need to update the associated alerts with the new meta alerts, making sure existing
+ // links are maintained.
+ Map<String, Optional<String>> guidToIndices = alertRequests.stream().collect(Collectors.toMap(
+ GetRequest::getGuid, GetRequest::getIndex));
+ Map<String, String> guidToSensorTypes = alertRequests.stream().collect(Collectors.toMap(
+ GetRequest::getGuid, GetRequest::getSensorType));
+ for (Document alert : alerts) {
+ if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
+ // Use the index in the request if it exists
+ Optional<String> index = guidToIndices.get(alert.getGuid());
+ if (!index.isPresent()) {
+ // Look up the index from Elasticsearch if one is not supplied in the request
+ index = elasticsearchDao
+ .getIndexName(alert.getGuid(), guidToSensorTypes.get(alert.getGuid()));
+ if (!index.isPresent()) {
+ throw new IllegalArgumentException("Could not find index for " + alert.getGuid());
+ }
+ }
+ updates.put(alert, index);
+ }
+ }
+
+ // Kick off any updates.
+ update(updates);
+
+ MetaAlertCreateResponse createResponse = new MetaAlertCreateResponse();
+ createResponse.setCreated(true);
+ createResponse.setGuid(metaAlert.getGuid());
+ return createResponse;
+ } catch (IOException ioe) {
+ throw new InvalidCreateException("Unable to create meta alert", ioe);
+ }
+ }
+
+ /**
+ * Adds alerts to a metaalert, based on a list of GetRequests provided for retrieval.
+ * @param metaAlertGuid The GUID of the metaalert to be given new children.
+ * @param alertRequests GetRequests for the appropriate alerts to add.
+ * @return True if metaalert is modified, false otherwise.
+ */
+ public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
+ throws IOException {
+
+ Document metaAlert = retrieveLatestDao
+ .getLatest(metaAlertGuid, MetaAlertConstants.METAALERT_TYPE);
+ if (MetaAlertStatus.ACTIVE.getStatusString()
+ .equals(metaAlert.getDocument().get(MetaAlertConstants.STATUS_FIELD))) {
+ Iterable<Document> alerts = retrieveLatestDao.getAllLatest(alertRequests);
+ Map<Document, Optional<String>> updates = buildAddAlertToMetaAlertUpdates(metaAlert, alerts);
+ update(updates);
+ return updates.size() != 0;
+ } else {
+ throw new IllegalStateException("Adding alerts to an INACTIVE meta alert is not allowed");
+ }
+ }
+
+ @Override
+ public void update(Document update, Optional<String> index) throws IOException {
+ if (MetaAlertConstants.METAALERT_TYPE.equals(update.getSensorType())) {
+ // We've been passed an update to the meta alert.
+ throw new UnsupportedOperationException("Meta alerts cannot be directly updated");
+ } else {
+ Map<Document, Optional<String>> updates = new HashMap<>();
+ updates.put(update, index);
+ // We need to update an alert itself. Only that portion of the update can be delegated.
+ // We still need to get meta alerts potentially associated with it and update.
+ Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults()
+ .stream()
+ .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(),
+ MetaAlertConstants.METAALERT_TYPE, 0L))
+ .collect(Collectors.toList());
+ // Each meta alert needs to be updated with the new alert
+ for (Document metaAlert : metaAlerts) {
+ if (replaceAlertInMetaAlert(metaAlert, update)) {
+ updates.put(metaAlert, Optional.of(getConfig().getMetaAlertIndex()));
+ }
+ }
+
+ // Run the alert's update
+ elasticsearchDao.batchUpdate(updates);
+ }
+ }
+
+ /**
+ * Given an alert GUID, retrieve all associated meta alerts.
+ * @param alertGuid The GUID of the child alert
+ * @return The Elasticsearch response containing the meta alerts
+ */
+ protected SearchResponse getMetaAlertsForAlert(String alertGuid) {
+ QueryBuilder qb = boolQuery()
+ .must(
+ nestedQuery(
+ MetaAlertConstants.ALERT_FIELD,
+ boolQuery()
+ .must(termQuery(MetaAlertConstants.ALERT_FIELD + "." + Constants.GUID,
+ alertGuid)),
+ ScoreMode.None
+ ).innerHit(new InnerHitBuilder())
+ )
+ .must(termQuery(MetaAlertConstants.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
+ return ElasticsearchUtils
+ .queryAllResults(elasticsearchDao.getClient(), qb, getConfig().getMetaAlertIndex(),
+ pageSize);
+ }
+
+
+ protected boolean replaceAlertInMetaAlert(Document metaAlert, Document alert) {
+ boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert,
+ Collections.singleton(alert.getGuid()));
+ if (metaAlertUpdated) {
+ addAlertsToMetaAlert(metaAlert, Collections.singleton(alert));
+ }
+ return metaAlertUpdated;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
new file mode 100644
index 0000000..f6bfeda
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.metron.indexing.dao.RetrieveLatestDao;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.index.query.IdsQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+
+public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
+
+ private TransportClient transportClient;
+
+ public ElasticsearchRetrieveLatestDao(TransportClient transportClient) {
+ this.transportClient = transportClient;
+ }
+
+ @Override
+ public Document getLatest(String guid, String sensorType) {
+ Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit));
+ return doc.orElse(null);
+ }
+
+ @Override
+ public Iterable<Document> getAllLatest(List<GetRequest> getRequests) {
+ Collection<String> guids = new HashSet<>();
+ Collection<String> sensorTypes = new HashSet<>();
+ for (GetRequest getRequest : getRequests) {
+ guids.add(getRequest.getGuid());
+ sensorTypes.add(getRequest.getSensorType());
+ }
+ List<Document> documents = searchByGuids(
+ guids,
+ sensorTypes,
+ hit -> {
+ Long ts = 0L;
+ String doc = hit.getSourceAsString();
+ String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
+ try {
+ return Optional.of(new Document(doc, hit.getId(), sourceType, ts));
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
+ }
+ }
+
+ );
+ return documents;
+ }
+
+ <T> Optional<T> searchByGuid(String guid, String sensorType,
+ Function<SearchHit, Optional<T>> callback) {
+ Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null;
+ List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback);
+ if (results.size() > 0) {
+ return Optional.of(results.get(0));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Return the search hit based on the UUID and sensor type.
+ * A callback can be specified to transform the hit into a type T.
+ * If more than one hit happens, the first one will be returned.
+ */
+ <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes,
+ Function<SearchHit, Optional<T>> callback) {
+ if (guids == null || guids.isEmpty()) {
+ return Collections.emptyList();
+ }
+ QueryBuilder query = null;
+ IdsQueryBuilder idsQuery;
+ if (sensorTypes != null) {
+ String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc")
+ .toArray(String[]::new);
+ idsQuery = QueryBuilders.idsQuery(types);
+ } else {
+ idsQuery = QueryBuilders.idsQuery();
+ }
+
+ for (String guid : guids) {
+ query = idsQuery.addIds(guid);
+ }
+
+ SearchRequestBuilder request = transportClient.prepareSearch()
+ .setQuery(query)
+ .setSize(guids.size());
+ org.elasticsearch.action.search.SearchResponse response = request.get();
+ SearchHits hits = response.getHits();
+ List<T> results = new ArrayList<>();
+ for (SearchHit hit : hits) {
+ Optional<T> result = callback.apply(hit);
+ if (result.isPresent()) {
+ results.add(result.get());
+ }
+ }
+ return results;
+ }
+
+ private Optional<Document> toDocument(final String guid, SearchHit hit) {
+ Long ts = 0L;
+ String doc = hit.getSourceAsString();
+ String sourceType = toSourceType(hit.getType());
+ try {
+ return Optional.of(new Document(doc, guid, sourceType, ts));
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Returns the source type based on a given doc type.
+ * @param docType The document type.
+ * @return The source type.
+ */
+ private String toSourceType(String docType) {
+ return Iterables.getFirst(Splitter.on("_doc").split(docType), null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
index 3971237..5725534 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
@@ -116,49 +116,6 @@ public class ElasticsearchSearchDao implements SearchDao {
return group(groupRequest, new QueryStringQueryBuilder(groupRequest.getQuery()));
}
- @Override
- public Document getLatest(String guid, String sensorType) throws IOException {
- Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit));
- return doc.orElse(null);
- }
-
- <T> Optional<T> searchByGuid(String guid, String sensorType,
- Function<SearchHit, Optional<T>> callback) {
- Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null;
- List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback);
- if (results.size() > 0) {
- return Optional.of(results.get(0));
- } else {
- return Optional.empty();
- }
- }
-
- @Override
- public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException {
- Collection<String> guids = new HashSet<>();
- Collection<String> sensorTypes = new HashSet<>();
- for (GetRequest getRequest: getRequests) {
- guids.add(getRequest.getGuid());
- sensorTypes.add(getRequest.getSensorType());
- }
- List<Document> documents = searchByGuids(
- guids
- , sensorTypes
- , hit -> {
- Long ts = 0L;
- String doc = hit.getSourceAsString();
- String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
- try {
- return Optional.of(new Document(doc, hit.getId(), sourceType, ts));
- } catch (IOException e) {
- throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
- }
- }
-
- );
- return documents;
- }
-
/**
* Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query.
* @param request The request defining the parameters of the search
@@ -505,63 +462,4 @@ public class ElasticsearchSearchDao implements SearchDao {
}
return searchResultGroups;
}
-
- /**
- * Return the search hit based on the UUID and sensor type.
- * A callback can be specified to transform the hit into a type T.
- * If more than one hit happens, the first one will be returned.
- */
- <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes,
- Function<SearchHit, Optional<T>> callback) {
- if(guids == null || guids.isEmpty()) {
- return Collections.EMPTY_LIST;
- }
- QueryBuilder query = null;
- IdsQueryBuilder idsQuery = null;
- if (sensorTypes != null) {
- String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc").toArray(String[]::new);
- idsQuery = QueryBuilders.idsQuery(types);
- } else {
- idsQuery = QueryBuilders.idsQuery();
- }
-
- for(String guid : guids) {
- query = idsQuery.addIds(guid);
- }
-
- SearchRequestBuilder request = client.prepareSearch()
- .setQuery(query)
- .setSize(guids.size())
- ;
- org.elasticsearch.action.search.SearchResponse response = request.get();
- SearchHits hits = response.getHits();
- List<T> results = new ArrayList<>();
- for (SearchHit hit : hits) {
- Optional<T> result = callback.apply(hit);
- if (result.isPresent()) {
- results.add(result.get());
- }
- }
- return results;
- }
-
- private Optional<Document> toDocument(final String guid, SearchHit hit) {
- Long ts = 0L;
- String doc = hit.getSourceAsString();
- String sourceType = toSourceType(hit.getType());
- try {
- return Optional.of(new Document(doc, guid, sourceType, ts));
- } catch (IOException e) {
- throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
- }
- }
-
- /**
- * Returns the source type based on a given doc type.
- * @param docType The document type.
- * @return The source type.
- */
- private String toSourceType(String docType) {
- return Iterables.getFirst(Splitter.on("_doc").split(docType), null);
- }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
index a7c3a71..c4d7412 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
@@ -42,14 +42,14 @@ public class ElasticsearchUpdateDao implements UpdateDao {
private transient TransportClient client;
private AccessConfig accessConfig;
- private ElasticsearchSearchDao searchDao;
+ private ElasticsearchRetrieveLatestDao retrieveLatestDao;
public ElasticsearchUpdateDao(TransportClient client,
AccessConfig accessConfig,
- ElasticsearchSearchDao searchDao) {
+ ElasticsearchRetrieveLatestDao searchDao) {
this.client = client;
this.accessConfig = accessConfig;
- this.searchDao = searchDao;
+ this.retrieveLatestDao = searchDao;
}
@Override
@@ -110,7 +110,7 @@ public class ElasticsearchUpdateDao implements UpdateDao {
}
protected Optional<String> getIndexName(String guid, String sensorType) {
- return searchDao.searchByGuid(guid,
+ return retrieveLatestDao.searchByGuid(guid,
sensorType,
hit -> Optional.ofNullable(hit.getIndex())
);
@@ -121,7 +121,7 @@ public class ElasticsearchUpdateDao implements UpdateDao {
Object ts = update.getTimestamp();
IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid())
.source(update.getDocument());
- if(ts != null) {
+ if (ts != null) {
indexRequest = indexRequest.timestamp(ts.toString());
}