You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2019/03/05 23:20:40 UTC
[metron] branch master updated: METRON-2016 Parser aggregate groups
should be persisted and available through REST (merrimanr) closes
apache/metron#1346
This is an automated email from the ASF dual-hosted git repository.
rmerriman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 6d41526 METRON-2016 Parser aggregate groups should be persisted and available through REST (merrimanr) closes apache/metron#1346
6d41526 is described below
commit 6d41526c3eff511e5fe11368c6f3f0a837507ba1
Author: merrimanr <me...@gmail.com>
AuthorDate: Tue Mar 5 17:20:25 2019 -0600
METRON-2016 Parser aggregate groups should be persisted and available through REST (merrimanr) closes apache/metron#1346
---
.../CURRENT/package/scripts/parser_commands.py | 10 +-
metron-interface/metron-rest/README.md | 33 +++
metron-interface/metron-rest/pom.xml | 6 +
.../controller/SensorParserGroupController.java | 93 +++++++
.../rest/service/SensorParserGroupService.java | 39 +++
.../service/impl/SensorParserGroupServiceImpl.java | 136 +++++++++++
.../rest/service/impl/StormAdminServiceImpl.java | 50 +++-
.../rest/service/impl/StormStatusServiceImpl.java | 68 ++++--
...SensorParserGroupControllerIntegrationTest.java | 266 +++++++++++++++++++++
.../impl/SensorParserGroupServiceImplTest.java | 203 ++++++++++++++++
.../service/impl/StormAdminServiceImplTest.java | 40 +++-
.../service/impl/StormStatusServiceImplTest.java | 51 +++-
.../common/configuration/ParserConfigurations.java | 17 ++
.../common/configuration/SensorParserGroup.java | 83 +++++++
.../metron/parsers/topology/ParserTopologyCLI.java | 16 +-
15 files changed, 1063 insertions(+), 48 deletions(-)
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
index 18780d9..a687085 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
@@ -65,14 +65,20 @@ class ParserCommands:
def __get_aggr_parsers(self, params):
"""
Fetches the list of aggregated (and regular) parsers and returns a list.
- If the input list of parsers were "bro,snort,yaf", "bro,snort" and yaf, for example,
- then this method will return ["bro,snort,yaf", "bro,snort", "yaf"]
+ If the input list of parsers were "bro,yaf,snort", "bro,snort" and yaf, for example,
+ then this method will return ["bro,snort,yaf", "bro,snort", "yaf"]. Sensors within
+ a group are sorted alphabetically.
:param params:
:return: List containing the names of parsers
"""
parserList = []
parsers = shlex.shlex(params.parsers)
for name in parsers:
+ sensors = name.strip('",').split(",")
+ # if name contains multiple sensors, sort them alphabetically
+ if len(sensors) > 1:
+ sensors.sort()
+ name = '"' + ",".join(sensors) + '"'
parserList.append(name.strip(','))
return [s.translate(None, "'[]") for s in filter(None, parserList)]
diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md
index 8aa172b..11f06bf 100644
--- a/metron-interface/metron-rest/README.md
+++ b/metron-interface/metron-rest/README.md
@@ -344,6 +344,10 @@ Request and Response objects are JSON formatted. The JSON schemas are available
| [ `GET /api/v1/sensor/parser/config/reload/available`](#get-apiv1sensorparserconfigreloadavailable)|
| [ `DELETE /api/v1/sensor/parser/config/{name}`](#delete-apiv1sensorparserconfigname)|
| [ `GET /api/v1/sensor/parser/config/{name}`](#get-apiv1sensorparserconfigname)|
+| [ `POST /api/v1/sensor/parser/group`](#post-apiv1sensorparsergroup)|
+| [ `GET /api/v1/sensor/parser/group/{name}`](#get-apiv1sensorparsergroupname)|
+| [ `GET /api/v1/sensor/parser/group`](#get-apiv1sensorparsergroup)|
+| [ `DELETE /api/v1/sensor/parser/group/{name}`](#delete-apiv1sensorparsergroupname)|
| [ `POST /api/v1/stellar/apply/transformations`](#post-apiv1stellarapplytransformations)|
| [ `GET /api/v1/stellar/list`](#get-apiv1stellarlist)|
| [ `GET /api/v1/stellar/list/functions`](#get-apiv1stellarlistfunctions)|
@@ -787,6 +791,35 @@ Request and Response objects are JSON formatted. The JSON schemas are available
* Returns:
* 200 - Returns SensorParserConfig
* 404 - SensorParserConfig is missing
+
+### `POST /api/v1/sensor/parser/group`
+ * Description: Updates or creates a SensorParserGroup in Zookeeper
+ * Input:
+ * sensorParserGroup - SensorParserGroup
+ * Returns:
+ * 200 - SensorParserGroup updated. Returns saved SensorParserGroup
+ * 201 - SensorParserGroup created. Returns saved SensorParserGroup
+
+### `GET /api/v1/sensor/parser/group/{name}`
+ * Description: Retrieves a SensorParserGroup from Zookeeper
+ * Input:
+ * name - SensorParserGroup name
+ * Returns:
+ * 200 - Returns SensorParserGroup
+ * 404 - SensorParserGroup is missing
+
+### `GET /api/v1/sensor/parser/group`
+ * Description: Retrieves all SensorParserGroups from Zookeeper
+ * Returns:
+ * 200 - Returns all SensorParserGroups
+
+### `DELETE /api/v1/sensor/parser/group/{name}`
+ * Description: Deletes a SensorParserGroup from Zookeeper
+ * Input:
+ * name - SensorParserGroup name
+ * Returns:
+ * 200 - SensorParserGroup was deleted
+ * 404 - SensorParserGroup is missing
### `POST /api/v1/stellar/apply/transformations`
* Description: Executes transformations against a sample message
diff --git a/metron-interface/metron-rest/pom.xml b/metron-interface/metron-rest/pom.xml
index 2b33767..01f1aed 100644
--- a/metron-interface/metron-rest/pom.xml
+++ b/metron-interface/metron-rest/pom.xml
@@ -300,6 +300,12 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-parsing-storm</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SensorParserGroupController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SensorParserGroupController.java
new file mode 100644
index 0000000..66fcea4
--- /dev/null
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SensorParserGroupController.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.controller;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.SensorParserGroup;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.model.ParseMessageRequest;
+import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.rest.service.SensorParserGroupService;
+import org.json.simple.JSONObject;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.Collection;
+import java.util.Map;
+
+@RestController
+@RequestMapping("/api/v1/sensor/parser/group")
+public class SensorParserGroupController {
+
+ @Autowired
+ private SensorParserGroupService sensorParserGroupService;
+
+ @ApiOperation(value = "Updates or creates a SensorParserGroup in Zookeeper")
+ @ApiResponses(value = { @ApiResponse(message = "SensorParserGroup updated. Returns saved SensorParserGroup", code = 200),
+ @ApiResponse(message = "SensorParserGroup created. Returns saved SensorParserGroup", code = 201) })
+ @RequestMapping(method = RequestMethod.POST)
+ ResponseEntity<SensorParserGroup> save(@ApiParam(name="sensorParserGroup", value="SensorParserGroup", required=true)@RequestBody SensorParserGroup sensorParserGroup) throws RestException {
+ if (sensorParserGroupService.findOne(sensorParserGroup.getName()) == null) {
+ return new ResponseEntity<>(sensorParserGroupService.save(sensorParserGroup), HttpStatus.CREATED);
+ } else {
+ return new ResponseEntity<>(sensorParserGroupService.save(sensorParserGroup), HttpStatus.OK);
+ }
+ }
+
+ @ApiOperation(value = "Retrieves a SensorParserGroup from Zookeeper")
+ @ApiResponses(value = { @ApiResponse(message = "Returns SensorParserGroup", code = 200),
+ @ApiResponse(message = "SensorParserGroup is missing", code = 404) })
+ @RequestMapping(value = "/{name}", method = RequestMethod.GET)
+ ResponseEntity<SensorParserGroup> findOne(@ApiParam(name="name", value="SensorParserGroup name", required=true)@PathVariable String name) throws RestException {
+ SensorParserGroup sensorParserGroup = sensorParserGroupService.findOne(name);
+ if (sensorParserGroup != null) {
+ return new ResponseEntity<>(sensorParserGroup, HttpStatus.OK);
+ }
+
+ return new ResponseEntity<>(HttpStatus.NOT_FOUND);
+ }
+
+ @ApiOperation(value = "Retrieves all SensorParserGroups from Zookeeper")
+ @ApiResponse(message = "Returns all SensorParserGroups", code = 200)
+ @RequestMapping(method = RequestMethod.GET)
+ ResponseEntity<Map<String, SensorParserGroup>> findAll() throws RestException {
+ return new ResponseEntity<>(sensorParserGroupService.getAll(), HttpStatus.OK);
+ }
+
+ @ApiOperation(value = "Deletes a SensorParserGroup from Zookeeper")
+ @ApiResponses(value = { @ApiResponse(message = "SensorParserGroup was deleted", code = 200),
+ @ApiResponse(message = "SensorParserGroup is missing", code = 404) })
+ @RequestMapping(value = "/{name}", method = RequestMethod.DELETE)
+ ResponseEntity<Void> delete(@ApiParam(name="name", value="SensorParserGroup name", required=true)@PathVariable String name) throws RestException {
+ if (sensorParserGroupService.delete(name)) {
+ return new ResponseEntity<>(HttpStatus.OK);
+ } else {
+ return new ResponseEntity<>(HttpStatus.NOT_FOUND);
+ }
+ }
+}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserGroupService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserGroupService.java
new file mode 100644
index 0000000..cc187bc
--- /dev/null
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserGroupService.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service;
+
+import org.apache.metron.common.configuration.SensorParserGroup;
+import org.apache.metron.rest.RestException;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Exposes CRUD operations for SensorParserGroup objects stored in Zookeeper. An array of SensorParserGroups are stored
+ * in the Global Config using the 'parser.groups' key.
+ */
+public interface SensorParserGroupService {
+
+ SensorParserGroup save(SensorParserGroup sensorParserGroup) throws RestException;
+
+ SensorParserGroup findOne(String name);
+
+ Map<String, SensorParserGroup> getAll();
+
+ boolean delete(String name) throws RestException;
+}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserGroupServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserGroupServiceImpl.java
new file mode 100644
index 0000000..8b8ee4a
--- /dev/null
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserGroupServiceImpl.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserGroup;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.service.GlobalConfigService;
+import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.rest.service.SensorParserGroupService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@Service
+public class SensorParserGroupServiceImpl implements SensorParserGroupService {
+
+ private ConfigurationsCache cache;
+ private GlobalConfigService globalConfigService;
+ private SensorParserConfigService sensorParserConfigService;
+
+ @Autowired
+ public SensorParserGroupServiceImpl(ConfigurationsCache cache, GlobalConfigService globalConfigService, SensorParserConfigService sensorParserConfigService) {
+ this.cache = cache;
+ this.globalConfigService = globalConfigService;
+ this.sensorParserConfigService = sensorParserConfigService;
+ }
+
+ /**
+ * Saves a SensorParserGroup in Zookeeper. Checks for various error conditions including empty sensors field, missing
+ * configs for each sensor and sensors already included in another group.
+ * @param sensorParserGroup
+ * @return
+ * @throws RestException
+ */
+ @Override
+ public SensorParserGroup save(SensorParserGroup sensorParserGroup) throws RestException {
+ ParserConfigurations parserConfigurations = cache.get( ParserConfigurations.class);
+ Map<String, SensorParserGroup> groups = new HashMap<>(parserConfigurations.getSensorParserGroups());
+ groups.remove(sensorParserGroup.getName());
+
+ if (sensorParserGroup.getSensors().size() == 0) {
+ throw new RestException("A parser group must contain sensors");
+ }
+
+ for(String sensor: sensorParserGroup.getSensors()) {
+ // check if sensor config exists
+ if (sensorParserConfigService.findOne(sensor) == null) {
+ throw new RestException(String.format("Could not find config for sensor %s", sensor));
+ }
+ // check if sensor is in another group
+ for (SensorParserGroup group : groups.values()) {
+ Set<String> groupSensors = group.getSensors();
+ if (groupSensors.contains(sensor)) {
+ throw new RestException(String.format("Sensor %s is already in group %s", sensor, group.getName()));
+ }
+ }
+ }
+ groups.put(sensorParserGroup.getName(), sensorParserGroup);
+ saveGroups(parserConfigurations, new HashSet<>(groups.values()));
+ return sensorParserGroup;
+ }
+
+ /**
+ * Retrieves a single SensorParserGroup by name.
+ * @param name SensorParserGroup name
+ * @return SensorParserGroup or null if group is missing
+ */
+ @Override
+ public SensorParserGroup findOne(String name) {
+ return getAll().get(name);
+ }
+
+ /**
+ * Retrieves all SensorParserGroups as a Map with key being the SensorParserGroup name
+ * @return All SensorParserGroups
+ */
+ @Override
+ public Map<String, SensorParserGroup> getAll() {
+ ParserConfigurations configs = cache.get( ParserConfigurations.class);
+ return configs.getSensorParserGroups();
+ }
+
+ /**
+ * Deletes a SensorParserGroup from Zookeeper.
+ * @param name SensorParserGroup name
+ * @return True if a SensorParserGroup was deleted
+ * @throws RestException Writing to Zookeeper resulted in an error
+ */
+ @Override
+ public boolean delete(String name) throws RestException {
+ ParserConfigurations parserConfigurations = cache.get( ParserConfigurations.class);
+ Map<String, SensorParserGroup> groups = parserConfigurations.getSensorParserGroups();
+ boolean deleted = false;
+ if (groups.containsKey(name)) {
+ groups.remove(name);
+ saveGroups(parserConfigurations, new HashSet<>(groups.values()));
+ deleted = true;
+ }
+ return deleted;
+ }
+
+ /**
+ * Saves SensorParserGroups in Zookeeper.
+ * @param parserConfigurations ParserConfigurations
+ * @param groups SensorParserGroups
+ * @throws RestException Writing to Zookeeper resulted in an error
+ */
+ private void saveGroups(ParserConfigurations parserConfigurations, Collection<SensorParserGroup> groups) throws RestException {
+ Map<String, Object> globalConfig = parserConfigurations.getGlobalConfig(true);
+ globalConfig.put(ParserConfigurations.PARSER_GROUPS_CONF, groups);
+ globalConfigService.save(globalConfig);
+ }
+
+}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
index 4569a23..3b027c9 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
@@ -17,15 +17,23 @@
*/
package org.apache.metron.rest.service.impl;
+import org.apache.metron.common.configuration.SensorParserGroup;
+import org.apache.metron.parsers.topology.ParserTopologyCLI;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.TopologyResponse;
+import org.apache.metron.rest.model.TopologyStatus;
import org.apache.metron.rest.model.TopologyStatusCode;
import org.apache.metron.rest.service.GlobalConfigService;
import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.rest.service.SensorParserGroupService;
import org.apache.metron.rest.service.StormAdminService;
+import org.apache.metron.rest.service.StormStatusService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
@Service
@@ -37,14 +45,29 @@ public class StormAdminServiceImpl implements StormAdminService {
private SensorParserConfigService sensorParserConfigService;
+ private SensorParserGroupService sensorParserGroupService;
+
+ private StormStatusService stormStatusService;
+
@Autowired
- public StormAdminServiceImpl(StormCLIWrapper stormCLIClientWrapper, GlobalConfigService globalConfigService, SensorParserConfigService sensorParserConfigService) {
+ public StormAdminServiceImpl(StormCLIWrapper stormCLIClientWrapper,
+ GlobalConfigService globalConfigService,
+ SensorParserConfigService sensorParserConfigService,
+ SensorParserGroupService sensorParserGroupService,
+ StormStatusService stormStatusService) {
this.stormCLIClientWrapper = stormCLIClientWrapper;
this.globalConfigService = globalConfigService;
this.sensorParserConfigService = sensorParserConfigService;
+ this.sensorParserGroupService = sensorParserGroupService;
+ this.stormStatusService = stormStatusService;
}
-
+ /**
+ * Starts a parser topology. The name should either be a sensor name or group name in the case of aggregate parser topologies.
+ * @param name SensorParserConfig or SensorParserGroup name
+ * @return ToplogyResponse
+ * @throws RestException Global Config or SensorParserConfigs not found or starting the topology resulted in an error.
+ */
@Override
public TopologyResponse startParserTopology(String name) throws RestException {
TopologyResponse topologyResponse = new TopologyResponse();
@@ -53,7 +76,12 @@ public class StormAdminServiceImpl implements StormAdminService {
return topologyResponse;
}
- String[] sensorTypes = name.split(",");
+ List<String> sensorTypes = Collections.singletonList(name);
+ // If name is a group then look up sensors to build the actual topology name
+ SensorParserGroup sensorParserGroup = sensorParserGroupService.findOne(name);
+ if (sensorParserGroup != null) {
+ sensorTypes = new ArrayList<>(sensorParserGroup.getSensors());
+ }
for (String sensorType : sensorTypes) {
if (sensorParserConfigService.findOne(sensorType.trim()) == null) {
topologyResponse
@@ -62,16 +90,28 @@ public class StormAdminServiceImpl implements StormAdminService {
}
}
+ // sort the sensor types so the topology name is consistent
+ Collections.sort(sensorTypes);
return createResponse(
- stormCLIClientWrapper.startParserTopology(name),
+ stormCLIClientWrapper.startParserTopology(String.join(ParserTopologyCLI.TOPOLOGY_OPTION_SEPARATOR, sensorTypes)),
TopologyStatusCode.STARTED,
TopologyStatusCode.START_ERROR
);
}
+ /**
+ * Stops a parser topology. The name should either be a sensor name or group name in the case of aggregate parser topologies.
+ * @param name SensorParserConfig or SensorParserGroup name
+ * @param stopNow Stop the topology immediately
+ * @return ToplogyResponse
+ * @throws RestException Stopping the topology resulted in an error.
+ */
@Override
public TopologyResponse stopParserTopology(String name, boolean stopNow) throws RestException {
- return createResponse(stormCLIClientWrapper.stopParserTopology(name.replaceAll(",", "__"), stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR);
+ // Supplied name could be a group so get the actual job name from Storm
+ TopologyStatus topologyStatus = stormStatusService.getTopologyStatus(name);
+ String jobName = topologyStatus != null ? topologyStatus.getName() : name;
+ return createResponse(stormCLIClientWrapper.stopParserTopology(jobName, stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR);
}
@Override
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java
index 42de078..25df549 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java
@@ -16,13 +16,21 @@
package org.apache.metron.rest.service.impl;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+
+import org.apache.metron.common.configuration.SensorParserGroup;
+import org.apache.metron.parsers.topology.ParserTopologyCLI;
+import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.SupervisorSummary;
import org.apache.metron.rest.model.TopologyResponse;
import org.apache.metron.rest.model.TopologyStatus;
import org.apache.metron.rest.model.TopologyStatusCode;
import org.apache.metron.rest.model.TopologySummary;
+import org.apache.metron.rest.service.SensorParserGroupService;
import org.apache.metron.rest.service.StormStatusService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
@@ -38,13 +46,14 @@ import static org.apache.metron.rest.MetronRestConstants.TOPOLOGY_URL;
public class StormStatusServiceImpl implements StormStatusService {
private Environment environment;
-
private RestTemplate restTemplate;
+ private SensorParserGroupService sensorParserGroupService;
@Autowired
- public StormStatusServiceImpl(Environment environment, RestTemplate restTemplate) {
+ public StormStatusServiceImpl(Environment environment, RestTemplate restTemplate, SensorParserGroupService sensorParserGroupService) {
this.environment = environment;
this.restTemplate = restTemplate;
+ this.sensorParserGroupService = sensorParserGroupService;
}
@Override
@@ -62,13 +71,7 @@ public class StormStatusServiceImpl implements StormStatusService {
@Override
public TopologyStatus getTopologyStatus(String name) {
TopologyStatus topologyResponse = null;
- String id = null;
- for (TopologyStatus topology : getTopologySummary().getTopologies()) {
- if (name.equals(topology.getName())) {
- id = topology.getId();
- break;
- }
- }
+ String id = getTopologyId(name);
if (id != null) {
topologyResponse = restTemplate
.getForObject(getStormUiProperty() + TOPOLOGY_URL + "/" + id, TopologyStatus.class);
@@ -90,13 +93,7 @@ public class StormStatusServiceImpl implements StormStatusService {
@Override
public TopologyResponse activateTopology(String name) {
TopologyResponse topologyResponse = new TopologyResponse();
- String id = null;
- for (TopologyStatus topology : getTopologySummary().getTopologies()) {
- if (name.equals(topology.getName())) {
- id = topology.getId();
- break;
- }
- }
+ String id = getTopologyId(name);
if (id != null) {
Map result = restTemplate
.postForObject(getStormUiProperty() + TOPOLOGY_URL + "/" + id + "/activate", null,
@@ -115,13 +112,7 @@ public class StormStatusServiceImpl implements StormStatusService {
@Override
public TopologyResponse deactivateTopology(String name) {
TopologyResponse topologyResponse = new TopologyResponse();
- String id = null;
- for (TopologyStatus topology : getTopologySummary().getTopologies()) {
- if (name.equals(topology.getName())) {
- id = topology.getId();
- break;
- }
- }
+ String id = getTopologyId(name);
if (id != null) {
Map result = restTemplate
.postForObject(getStormUiProperty() + TOPOLOGY_URL + "/" + id + "/deactivate", null,
@@ -145,4 +136,35 @@ public class StormStatusServiceImpl implements StormStatusService {
}
return baseValue;
}
+
+ /**
+ * Retrieves the Storm topology id from the given topology name. If a topology name is detected to be an aggregate
+ * parser topology, the SensorParserGroups are checked for a match.
+ * @param name Topology or SensorParserGroup name
+ * @return Topology id
+ */
+ protected String getTopologyId(String name) {
+ String id = null;
+ for (TopologyStatus topology : getTopologySummary().getTopologies()) {
+ String topologyName = topology.getName();
+
+ // check sensor group
+ if (topologyName.contains(ParserTopologyCLI.STORM_JOB_SEPARATOR)) {
+ Set<String> sensors = new HashSet<>(Arrays.asList(topologyName.split(ParserTopologyCLI.STORM_JOB_SEPARATOR)));
+ SensorParserGroup group = sensorParserGroupService.findOne(name);
+ if (group == null) {
+ break;
+ } else if (sensors.equals(group.getSensors())){
+ id = topology.getId();
+ break;
+ }
+ }
+
+ if (topologyName.equals(name)) {
+ id = topology.getId();
+ break;
+ }
+ }
+ return id;
+ }
}
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserGroupControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserGroupControllerIntegrationTest.java
new file mode 100644
index 0000000..8106573
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserGroupControllerIntegrationTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.controller;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.SensorParserGroup;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.rest.service.GlobalConfigService;
+import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.rest.service.SensorParserGroupService;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.http.MediaType;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.setup.MockMvcBuilders;
+import org.springframework.web.context.WebApplicationContext;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+import static org.hamcrest.Matchers.hasSize;
+import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
+import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
+import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT)
+@ActiveProfiles(TEST_PROFILE)
+public class SensorParserGroupControllerIntegrationTest {
+
+ /**
+ {
+ "name":"group1",
+ "description":"group1 description",
+ "sensors":["bro","snort"]
+ }
+ */
+ @Multiline
+ public static String group1BroSnort;
+
+ /**
+ {
+ "name":"group1",
+ "description":"group1 description",
+ "sensors":["bro","squid"]
+ }
+ */
+ @Multiline
+ public static String group1BroSquid;
+
+ /**
+ {
+ "name":"group2",
+ "description":"group2 description",
+ "sensors":["yaf","jsonMap"]
+ }
+ */
+ @Multiline
+ public static String group2YafJsonMap;
+
+ /**
+ {
+ "name":"errorGroup",
+ "description":"error description",
+ "sensors":["bro"]
+ }
+ */
+ @Multiline
+ public static String errorGroup;
+
+ @Autowired
+ private GlobalConfigService globalConfigService;
+
+ @Autowired
+ private SensorParserConfigService sensorParserConfigService;
+
+ @Autowired
+ private SensorParserGroupService sensorParserGroupService;
+
+ @Autowired
+ private WebApplicationContext wac;
+
+ private MockMvc mockMvc;
+ private AtomicInteger numFields;
+
+ private String sensorParserGroupUrl = "/api/v1/sensor/parser/group";
+ private String user = "user";
+ private String password = "password";
+
+ @Before
+ public void setup() throws Exception {
+ this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build();
+ Method[] method = SensorParserGroup.class.getMethods();
+ numFields = new AtomicInteger(0);
+ for(Method m : method) {
+ if(m.getName().startsWith("set")) {
+ numFields.set(numFields.get() + 1);
+ }
+ }
+ this.globalConfigService.save(new HashMap<>());
+ this.sensorParserConfigService.save("bro", new SensorParserConfig());
+ this.sensorParserConfigService.save("snort", new SensorParserConfig());
+ this.sensorParserConfigService.save("squid", new SensorParserConfig());
+ this.sensorParserConfigService.save("yaf", new SensorParserConfig());
+ this.sensorParserConfigService.save("jsonMap", new SensorParserConfig());
+ }
+
+ @Test
+ public void testSecurity() throws Exception {
+ this.mockMvc.perform(post(sensorParserGroupUrl).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(group1BroSnort))
+ .andExpect(status().isUnauthorized());
+
+ this.mockMvc.perform(get(sensorParserGroupUrl + "/group1"))
+ .andExpect(status().isUnauthorized());
+
+ this.mockMvc.perform(get(sensorParserGroupUrl))
+ .andExpect(status().isUnauthorized());
+
+ this.mockMvc.perform(delete(sensorParserGroupUrl + "/group1").with(csrf()))
+ .andExpect(status().isUnauthorized());
+ }
+
+ @Test
+ public void testCreate() throws Exception {
+ this.mockMvc.perform(post(sensorParserGroupUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(group1BroSnort))
+ .andExpect(status().isCreated())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.*", hasSize(numFields.get())))
+ .andExpect(jsonPath("$.name").value("group1"))
+ .andExpect(jsonPath("$.description").value("group1 description"))
+ .andExpect(jsonPath("$.sensors[0]").value("bro"))
+ .andExpect(jsonPath("$.sensors[1]").value("snort"));
+
+ this.mockMvc.perform(post(sensorParserGroupUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(group2YafJsonMap))
+ .andExpect(status().isCreated())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.*", hasSize(numFields.get())))
+ .andExpect(jsonPath("$.name").value("group2"))
+ .andExpect(jsonPath("$.description").value("group2 description"))
+ .andExpect(jsonPath("$.sensors[0]").value("jsonMap"))
+ .andExpect(jsonPath("$.sensors[1]").value("yaf"));
+ }
+
+ @Test
+ public void testUpdate() throws Exception {
+ this.sensorParserGroupService.save(JSONUtils.INSTANCE.load(group1BroSquid, SensorParserGroup.class));
+
+ this.mockMvc.perform(post(sensorParserGroupUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(group1BroSquid))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.*", hasSize(numFields.get())))
+ .andExpect(jsonPath("$.name").value("group1"))
+ .andExpect(jsonPath("$.description").value("group1 description"))
+ .andExpect(jsonPath("$.sensors[0]").value("squid"))
+ .andExpect(jsonPath("$.sensors[1]").value("bro"));
+ }
+
+ @Test
+ public void testFindOne() throws Exception {
+ this.sensorParserGroupService.save(JSONUtils.INSTANCE.load(group1BroSquid, SensorParserGroup.class));
+
+ this.mockMvc.perform(get(sensorParserGroupUrl + "/group1").with(httpBasic(user,password)))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.*", hasSize(numFields.get())))
+ .andExpect(jsonPath("$.name").value("group1"))
+ .andExpect(jsonPath("$.description").value("group1 description"))
+ .andExpect(jsonPath("$.sensors[0]").value("squid"))
+ .andExpect(jsonPath("$.sensors[1]").value("bro"));
+
+ this.mockMvc.perform(get(sensorParserGroupUrl + "/missingGroup").with(httpBasic(user,password)))
+ .andExpect(status().isNotFound());
+ }
+
+ @Test
+ public void testGetAll() throws Exception {
+ this.sensorParserGroupService.save(JSONUtils.INSTANCE.load(group1BroSquid, SensorParserGroup.class));
+ this.sensorParserGroupService.save(JSONUtils.INSTANCE.load(group2YafJsonMap, SensorParserGroup.class));
+
+ this.mockMvc.perform(get(sensorParserGroupUrl).with(httpBasic(user,password)))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.*", hasSize(2)))
+ .andExpect(jsonPath("$.group1.*", hasSize(numFields.get())))
+ .andExpect(jsonPath("$.group1.name").value("group1"))
+ .andExpect(jsonPath("$.group1.description").value("group1 description"))
+ .andExpect(jsonPath("$.group1.sensors[0]").value("squid"))
+ .andExpect(jsonPath("$.group1.sensors[1]").value("bro"))
+ .andExpect(jsonPath("$.group2.*", hasSize(numFields.get())))
+ .andExpect(jsonPath("$.group2.name").value("group2"))
+ .andExpect(jsonPath("$.group2.description").value("group2 description"))
+ .andExpect(jsonPath("$.group2.sensors[0]").value("jsonMap"))
+ .andExpect(jsonPath("$.group2.sensors[1]").value("yaf"));
+ }
+
+ @Test
+ public void testError() throws Exception {
+ this.sensorParserGroupService.save(JSONUtils.INSTANCE.load(group1BroSquid, SensorParserGroup.class));
+
+ this.mockMvc.perform(post(sensorParserGroupUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(errorGroup))
+ .andExpect(status().isInternalServerError())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.responseCode").value(500))
+ .andExpect(jsonPath("$.message").value("Sensor bro is already in group group1"))
+ .andExpect(jsonPath("$.fullMessage").value("RestException: Sensor bro is already in group group1"));
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ this.sensorParserGroupService.save(JSONUtils.INSTANCE.load(group1BroSquid, SensorParserGroup.class));
+
+ this.mockMvc.perform(delete(sensorParserGroupUrl + "/group1").with(httpBasic(user,password)).with(csrf()))
+ .andExpect(status().isOk());
+
+ this.mockMvc.perform(delete(sensorParserGroupUrl + "/missingGroup").with(httpBasic(user,password)))
+ .andExpect(status().isNotFound());
+
+ {
+ //we must wait for the config to find its way into the config.
+ TestUtils.assertEventually(() -> Assert.assertNull(sensorParserConfigService.findOne("group1")));
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ this.globalConfigService.delete();
+ this.sensorParserConfigService.delete("bro");
+ this.sensorParserConfigService.delete("snort");
+ this.sensorParserConfigService.delete("squid");
+ this.sensorParserConfigService.delete("yaf");
+ this.sensorParserConfigService.delete("jsonMap");
+ }
+}
+
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserGroupServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserGroupServiceImplTest.java
new file mode 100644
index 0000000..efe8f43
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserGroupServiceImplTest.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.SensorParserGroup;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.service.GlobalConfigService;
+import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.rest.service.SensorParserGroupService;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.metron.common.configuration.ParserConfigurations.PARSER_GROUPS_CONF;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class SensorParserGroupServiceImplTest {
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ private ConfigurationsCache cache;
+ private GlobalConfigService globalConfigService;
+ private SensorParserConfigService sensorParserConfigService;
+ private SensorParserGroupService sensorParserGroupService;
+
+ @Before
+ public void setUp() throws Exception {
+ cache = mock(ConfigurationsCache.class);
+ globalConfigService = mock(GlobalConfigService.class);
+ sensorParserConfigService = mock(SensorParserConfigService.class);
+ sensorParserGroupService = new SensorParserGroupServiceImpl(cache, globalConfigService, sensorParserConfigService);
+ }
+
+ @Test
+ public void shouldSaveNewGroup() throws Exception {
+ when(cache.get(ParserConfigurations.class)).thenReturn(new ParserConfigurations());
+ when(sensorParserConfigService.findOne("bro")).thenReturn(new SensorParserConfig());
+
+ SensorParserGroup sensorParserGroup = new SensorParserGroup();
+ sensorParserGroup.setName("group1");
+ sensorParserGroup.setDescription("description 1");
+ sensorParserGroup.setSensors(Collections.singleton("bro"));
+
+ Map<String, Object> expectedGlobalConfig = new HashMap<>();
+ Collection<SensorParserGroup> expectedGroup = Collections.singleton(sensorParserGroup);
+ expectedGlobalConfig.put(PARSER_GROUPS_CONF, expectedGroup);
+
+ assertEquals(sensorParserGroup, sensorParserGroupService.save(sensorParserGroup));
+ verify(globalConfigService, times(1)).save(expectedGlobalConfig);
+
+ verifyNoMoreInteractions(globalConfigService);
+ }
+
+ @Test
+ public void shouldSaveExistingGroup() throws Exception {
+ SensorParserGroup oldGroup = new SensorParserGroup();
+ oldGroup.setName("oldGroup");
+ oldGroup.setDescription("old description");
+ oldGroup.setSensors(Collections.singleton("oldSensor"));
+
+ ParserConfigurations parserConfigurations = mock(ParserConfigurations.class);
+ when(cache.get(ParserConfigurations.class)).thenReturn(new ParserConfigurations());
+ when(parserConfigurations.getSensorParserGroups()).thenReturn(new HashMap<String, SensorParserGroup>() {{
+ put("newSensor", oldGroup);
+ }});
+ when(sensorParserConfigService.findOne("newSensor")).thenReturn(new SensorParserConfig());
+
+ SensorParserGroup newGroup = new SensorParserGroup();
+ newGroup.setName("newGroup");
+ newGroup.setDescription("new description");
+ newGroup.setSensors(Collections.singleton("newSensor"));
+
+ Map<String, Object> expectedGlobalConfig = new HashMap<>();
+ Collection<SensorParserGroup> expectedGroup = Collections.singleton(newGroup);
+ expectedGlobalConfig.put(PARSER_GROUPS_CONF, expectedGroup);
+
+ assertEquals(newGroup, sensorParserGroupService.save(newGroup));
+ verify(globalConfigService, times(1)).save(expectedGlobalConfig);
+
+ verifyNoMoreInteractions(globalConfigService);
+ }
+
+ @Test
+ public void saveShouldThrowExceptionOnMissingSensor() throws Exception {
+ exception.expect(RestException.class);
+ exception.expectMessage("A parser group must contain sensors");
+
+ when(cache.get(ParserConfigurations.class)).thenReturn(new ParserConfigurations());
+
+ sensorParserGroupService.save(new SensorParserGroup());
+ }
+
+ @Test
+ public void saveShouldThrowExceptionOnMissingConfig() throws Exception {
+ exception.expect(RestException.class);
+ exception.expectMessage("Could not find config for sensor bro");
+
+ when(cache.get(ParserConfigurations.class)).thenReturn(new ParserConfigurations());
+
+ SensorParserGroup sensorParserGroup = new SensorParserGroup();
+ sensorParserGroup.setSensors(Collections.singleton("bro"));
+
+ sensorParserGroupService.save(sensorParserGroup);
+ }
+
+ @Test
+ public void saveShouldThrowExceptionOnSensorInAnotherGroup() throws Exception {
+ exception.expect(RestException.class);
+ exception.expectMessage("Sensor bro is already in group existingGroup");
+
+ SensorParserGroup existingGroup = new SensorParserGroup();
+ existingGroup.setName("existingGroup");
+ existingGroup.setSensors(Collections.singleton("bro"));
+ ParserConfigurations parserConfigurations = mock(ParserConfigurations.class);
+ when(parserConfigurations.getSensorParserGroups()).thenReturn(new HashMap<String, SensorParserGroup>() {{
+ put("existingGroup", existingGroup);
+ }});
+ when(cache.get(ParserConfigurations.class)).thenReturn(parserConfigurations);
+ when(sensorParserConfigService.findOne("bro")).thenReturn(new SensorParserConfig());
+
+ SensorParserGroup newGroup = new SensorParserGroup();
+ newGroup.setName("newGroup");
+ newGroup.setSensors(Collections.singleton("bro"));
+
+ sensorParserGroupService.save(newGroup);
+ }
+
+ @Test
+ public void shouldFindSensorParserGroup() throws Exception {
+ ParserConfigurations parserConfigurations = mock(ParserConfigurations.class);
+ SensorParserGroup group1 = new SensorParserGroup();
+ group1.setName("group1");
+ group1.setDescription("group1 description");
+ group1.setSensors(Collections.singleton("group1Sensor"));
+ SensorParserGroup group2 = new SensorParserGroup();
+ group2.setName("group2");
+ group2.setDescription("group2 description");
+ group2.setSensors(Collections.singleton("group2Sensor"));
+ when(parserConfigurations.getSensorParserGroups()).thenReturn(new HashMap<String, SensorParserGroup>() {{
+ put("group1", group1);
+ put("group2", group2);
+ }});
+ when(cache.get(ParserConfigurations.class)).thenReturn(parserConfigurations);
+
+ assertEquals(group2, sensorParserGroupService.findOne("group2"));
+ }
+
+
+ @Test
+ public void shouldDeleteSensorParserGroup() throws Exception {
+ ParserConfigurations parserConfigurations = mock(ParserConfigurations.class);
+ SensorParserGroup group1 = new SensorParserGroup();
+ group1.setName("group1");
+ group1.setDescription("group1 description");
+ group1.setSensors(Collections.singleton("group1Sensor"));
+ when(parserConfigurations.getSensorParserGroups()).thenReturn(new HashMap<String, SensorParserGroup>() {{
+ put("group1", group1);
+ }});
+ when(cache.get(ParserConfigurations.class)).thenReturn(parserConfigurations);
+
+ Map<String, Object> expectedGlobalConfig = new HashMap<>();
+ expectedGlobalConfig.put(PARSER_GROUPS_CONF, new HashSet<>());
+
+ assertEquals(true, sensorParserGroupService.delete("group1"));
+ assertEquals(false, sensorParserGroupService.delete("group2"));
+
+ verify(globalConfigService, times(1)).save(expectedGlobalConfig);
+ verifyNoMoreInteractions(globalConfigService);
+ }
+
+
+}
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java
index 65a1bda..9f6c3c5 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java
@@ -18,17 +18,24 @@
package org.apache.metron.rest.service.impl;
import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.SensorParserGroup;
import org.apache.metron.rest.model.TopologyResponse;
+import org.apache.metron.rest.model.TopologyStatus;
import org.apache.metron.rest.model.TopologyStatusCode;
import org.apache.metron.rest.service.GlobalConfigService;
import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.rest.service.SensorParserGroupService;
import org.apache.metron.rest.service.StormAdminService;
+import org.apache.metron.rest.service.StormStatusService;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -44,13 +51,17 @@ public class StormAdminServiceImplTest {
StormAdminService stormAdminService;
GlobalConfigService globalConfigService;
SensorParserConfigService sensorParserConfigService;
+ SensorParserGroupService sensorParserGroupService;
+ StormStatusService stormStatusService;
@Before
public void setUp() throws Exception {
stormCLIClientWrapper = mock(StormCLIWrapper.class);
globalConfigService = mock(GlobalConfigService.class);
sensorParserConfigService = mock(SensorParserConfigService.class);
- stormAdminService = new StormAdminServiceImpl(stormCLIClientWrapper, globalConfigService, sensorParserConfigService);
+ sensorParserGroupService = mock(SensorParserGroupService.class);
+ stormStatusService = mock(StormStatusService.class);
+ stormAdminService = new StormAdminServiceImpl(stormCLIClientWrapper, globalConfigService, sensorParserConfigService, sensorParserGroupService, stormStatusService);
}
@Test
@@ -68,6 +79,28 @@ public class StormAdminServiceImplTest {
}
@Test
+ public void startParserTopologyByGroupShouldProperlyReturnSuccessTopologyResponse() throws Exception {
+ SensorParserGroup group = new SensorParserGroup();
+ group.setName("group");
+ group.setSensors(new HashSet<String>() {{
+ add("bro");
+ add("snort");
+ }});
+ when(sensorParserGroupService.findOne("group")).thenReturn(group);
+ when(stormCLIClientWrapper.startParserTopology("bro,snort")).thenReturn(0);
+ when(globalConfigService.get()).thenReturn(new HashMap<String, Object>());
+ when(sensorParserConfigService.findOne("bro")).thenReturn(new SensorParserConfig());
+ when(sensorParserConfigService.findOne("snort")).thenReturn(new SensorParserConfig());
+
+ TopologyResponse expected = new TopologyResponse();
+ expected.setSuccessMessage(TopologyStatusCode.STARTED.toString());
+ TopologyResponse actual = stormAdminService.startParserTopology("group");
+
+ assertEquals(expected, actual);
+ assertEquals(expected.hashCode(), actual.hashCode());
+ }
+
+ @Test
public void startParserTopologyShouldReturnGlobalConfigMissingError() throws Exception {
when(globalConfigService.get()).thenReturn(null);
@@ -90,9 +123,10 @@ public class StormAdminServiceImplTest {
@Test
public void stopParserTopologyShouldProperlyReturnErrorTopologyResponse() throws Exception {
+ TopologyStatus topologyStatus = new TopologyStatus();
+ topologyStatus.setName("bro");
when(stormCLIClientWrapper.stopParserTopology("bro", false)).thenReturn(1);
- when(globalConfigService.get()).thenReturn(new HashMap<String, Object>());
- when(sensorParserConfigService.findOne("bro")).thenReturn(new SensorParserConfig());
+ when(stormStatusService.getTopologyStatus("bro")).thenReturn(topologyStatus);
TopologyResponse expected = new TopologyResponse();
expected.setErrorMessage(TopologyStatusCode.STOP_ERROR.toString());
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormStatusServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormStatusServiceImplTest.java
index 606fed4..eac9be5 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormStatusServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormStatusServiceImplTest.java
@@ -17,10 +17,12 @@
*/
package org.apache.metron.rest.service.impl;
+import org.apache.metron.common.configuration.SensorParserGroup;
import org.apache.metron.rest.model.TopologyResponse;
import org.apache.metron.rest.model.TopologyStatus;
import org.apache.metron.rest.model.TopologyStatusCode;
import org.apache.metron.rest.model.TopologySummary;
+import org.apache.metron.rest.service.SensorParserGroupService;
import org.apache.metron.rest.service.StormStatusService;
import org.junit.Before;
import org.junit.Rule;
@@ -31,6 +33,7 @@ import org.springframework.web.client.RestTemplate;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import static org.apache.metron.rest.MetronRestConstants.STORM_UI_SPRING_PROPERTY;
@@ -52,12 +55,14 @@ public class StormStatusServiceImplTest {
Environment environment;
RestTemplate restTemplate;
StormStatusService stormStatusService;
+ SensorParserGroupService sensorParserGroupService;
@Before
public void setUp() throws Exception {
environment = mock(Environment.class);
restTemplate = mock(RestTemplate.class);
- stormStatusService = new StormStatusServiceImpl(environment, restTemplate);
+ sensorParserGroupService = mock(SensorParserGroupService.class);
+ stormStatusService = new StormStatusServiceImpl(environment, restTemplate, sensorParserGroupService);
}
@Test
@@ -129,7 +134,37 @@ public class StormStatusServiceImplTest {
}
@Test
- public void getAllTopologyStatusShouldReturnAllTopologyStatus() {
+ public void getTopologyStatusByGroupShouldReturnTopologyStatus() throws Exception {
+ final TopologyStatus topologyStatus = new TopologyStatus();
+ topologyStatus.setStatus(TopologyStatusCode.STARTED);
+ topologyStatus.setName("bro__snort");
+ topologyStatus.setId("bro_snort_id");
+ final TopologySummary topologySummary = new TopologySummary();
+ topologySummary.setTopologies(new TopologyStatus[]{topologyStatus});
+
+ SensorParserGroup group = new SensorParserGroup();
+ group.setName("group");
+ group.setSensors(new HashSet<String>() {{
+ add("bro");
+ add("snort");
+ }});
+ when(sensorParserGroupService.findOne("group")).thenReturn(group);
+ when(environment.getProperty(STORM_UI_SPRING_PROPERTY)).thenReturn(HTTP_STORM_UI);
+ when(restTemplate.getForObject(HTTP_STORM_UI + TOPOLOGY_SUMMARY_URL, TopologySummary.class)).thenReturn(topologySummary);
+ when(restTemplate.getForObject(HTTP_STORM_UI + TOPOLOGY_URL + "/bro_snort_id", TopologyStatus.class)).thenReturn(topologyStatus);
+
+ TopologyStatus expected = new TopologyStatus();
+ expected.setStatus(TopologyStatusCode.STARTED);
+ expected.setName("bro__snort");
+ expected.setId("bro_snort_id");
+
+ TopologyStatus actual = stormStatusService.getTopologyStatus("group");
+ assertEquals(expected, actual);
+ assertEquals(expected.hashCode(), actual.hashCode());
+ }
+
+ @Test
+ public void getAllTopologyStatusShouldReturnAllTopologyStatus() throws Exception {
final TopologyStatus topologyStatus = new TopologyStatus();
topologyStatus.setStatus(TopologyStatusCode.STARTED);
topologyStatus.setName("bro");
@@ -151,7 +186,7 @@ public class StormStatusServiceImplTest {
@Test
- public void activateTopologyShouldReturnActiveTopologyResponse() {
+ public void activateTopologyShouldReturnActiveTopologyResponse() throws Exception {
final TopologyStatus topologyStatus = new TopologyStatus();
topologyStatus.setName("bro");
topologyStatus.setId("bro_id");
@@ -169,7 +204,7 @@ public class StormStatusServiceImplTest {
}
@Test
- public void activateTopologyShouldReturnErrorTopologyResponse() {
+ public void activateTopologyShouldReturnErrorTopologyResponse() throws Exception {
final TopologyStatus topologyStatus = new TopologyStatus();
topologyStatus.setName("bro");
topologyStatus.setId("bro_id");
@@ -187,7 +222,7 @@ public class StormStatusServiceImplTest {
}
@Test
- public void activateTopologyShouldReturnTopologyNotFoundTopologyResponse() {
+ public void activateTopologyShouldReturnTopologyNotFoundTopologyResponse() throws Exception {
when(environment.getProperty(STORM_UI_SPRING_PROPERTY)).thenReturn(HTTP_STORM_UI);
when(restTemplate.getForObject(HTTP_STORM_UI + TOPOLOGY_SUMMARY_URL, TopologySummary.class)).thenReturn(new TopologySummary());
@@ -197,7 +232,7 @@ public class StormStatusServiceImplTest {
}
@Test
- public void deactivateTopologyShouldReturnActiveTopologyResponse() {
+ public void deactivateTopologyShouldReturnActiveTopologyResponse() throws Exception {
final TopologyStatus topologyStatus = new TopologyStatus();
topologyStatus.setName("bro");
topologyStatus.setId("bro_id");
@@ -215,7 +250,7 @@ public class StormStatusServiceImplTest {
}
@Test
- public void deactivateTopologyShouldReturnErrorTopologyResponse() {
+ public void deactivateTopologyShouldReturnErrorTopologyResponse() throws Exception {
final TopologyStatus topologyStatus = new TopologyStatus();
topologyStatus.setName("bro");
topologyStatus.setId("bro_id");
@@ -233,7 +268,7 @@ public class StormStatusServiceImplTest {
}
@Test
- public void deactivateTopologyShouldReturnTopologyNotFoundTopologyResponse() {
+ public void deactivateTopologyShouldReturnTopologyNotFoundTopologyResponse() throws Exception {
when(environment.getProperty(STORM_UI_SPRING_PROPERTY)).thenReturn(HTTP_STORM_UI);
when(restTemplate.getForObject(HTTP_STORM_UI + TOPOLOGY_SUMMARY_URL, TopologySummary.class)).thenReturn(new TopologySummary());
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
index 0043d71..7a638df 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
@@ -17,19 +17,24 @@
*/
package org.apache.metron.common.configuration;
+import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.metron.common.utils.JSONUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
/**
* Allows for retrieval and update of parsing configurations.
*/
public class ParserConfigurations extends Configurations {
public static final Integer DEFAULT_KAFKA_BATCH_SIZE = 15;
+ public static final String PARSER_GROUPS_CONF = "parser.groups";
public SensorParserConfig getSensorParserConfig(String sensorType) {
return (SensorParserConfig) getConfigurations().get(getKey(sensorType));
@@ -50,6 +55,18 @@ public class ParserConfigurations extends Configurations {
}
/**
+ * Retrieves all the sensor groups from the global config
+ * @return Map of sensor groups with the group name as the key
+ */
+ public Map<String, SensorParserGroup> getSensorParserGroups() {
+ Object groups = getGlobalConfig(true).getOrDefault(PARSER_GROUPS_CONF, new ArrayList<>());
+ Collection<SensorParserGroup> sensorParserGroups = JSONUtils.INSTANCE.getMapper()
+ .convertValue(groups, new TypeReference<Collection<SensorParserGroup>>() {{}});
+ return sensorParserGroups.stream()
+ .collect(Collectors.toMap(SensorParserGroup::getName, sensorParserGroup -> sensorParserGroup));
+ }
+
+ /**
* Gets the list of sensor types that parsing configurations exist for.
*
* @return List of sensor types
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserGroup.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserGroup.java
new file mode 100644
index 0000000..e5538c1
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserGroup.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents a group of sensors. Sensor parser groups are used to execute parsers in a single execution context (Storm
+ * bolt for example).
+ */
+public class SensorParserGroup {
+
+ private String name;
+ private String description;
+ private Set<String> sensors = new HashSet<>();
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public Set<String> getSensors() {
+ return sensors;
+ }
+
+ public void setSensors(Set<String> sensors) {
+ this.sensors = sensors;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ SensorParserGroup that = (SensorParserGroup) o;
+ return Objects.equals(name, that.name) &&
+ Objects.equals(description, that.description) &&
+ Objects.equals(sensors, that.sensors);
+ }
+
+ @Override
+ public int hashCode() {
+
+ return Objects.hash(name, description, sensors);
+ }
+
+ @Override
+ public String toString() {
+ return "SensorParserGroup{" +
+ "name='" + name + '\'' +
+ ", description='" + description + '\'' +
+ ", sensors=" + sensors +
+ '}';
+ }
+}
diff --git a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index d142279..1382bba 100644
--- a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -53,7 +53,8 @@ import org.apache.storm.utils.Utils;
public class ParserTopologyCLI {
- private static final String STORM_JOB_SEPARATOR = "__";
+ public static final String STORM_JOB_SEPARATOR = "__";
+ public static final String TOPOLOGY_OPTION_SEPARATOR = ",";
public enum ParserOptions {
HELP("h", code -> {
@@ -171,7 +172,7 @@ public class ParserTopologyCLI {
Option o = new Option(code, "extra_topology_options", true
, "Extra options in the form of a JSON file with a map for content." +
" Available options are those in the Kafka Consumer Configs at http://kafka.apache.org/0100/documentation.html#newconsumerconfigs" +
- " and " + Joiner.on(",").join(SpoutConfiguration.allOptions())
+ " and " + Joiner.on(TOPOLOGY_OPTION_SEPARATOR).join(SpoutConfiguration.allOptions())
);
o.setArgName("JSON_FILE");
o.setRequired(false);
@@ -325,7 +326,7 @@ public class ParserTopologyCLI {
String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd);
Optional<String> brokerUrl = ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty();
String sensorTypeRaw= ParserOptions.SENSOR_TYPES.get(cmd);
- List<String> sensorTypes = Arrays.stream(sensorTypeRaw.split(",")).map(String::trim).collect(
+ List<String> sensorTypes = Arrays.stream(sensorTypeRaw.split(TOPOLOGY_OPTION_SEPARATOR)).map(String::trim).collect(
Collectors.toList());
/*
@@ -357,7 +358,7 @@ public class ParserTopologyCLI {
// Handle the multiple explicitly passed spout parallelism's case.
String parallelismRaw = ParserOptions.SPOUT_PARALLELISM.get(cmd, "1");
- List<String> parallelisms = Arrays.stream(parallelismRaw.split(",")).map(String::trim).collect(
+ List<String> parallelisms = Arrays.stream(parallelismRaw.split(TOPOLOGY_OPTION_SEPARATOR)).map(String::trim).collect(
Collectors.toList());
if (parallelisms.size() != parserConfigs.size()) {
throw new IllegalArgumentException("Spout parallelism should match number of sensors 1:1");
@@ -387,7 +388,7 @@ public class ParserTopologyCLI {
// Handle the multiple explicitly passed spout parallelism's case.
String numTasksRaw = ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1");
- List<String> numTasks = Arrays.stream(numTasksRaw.split(",")).map(String::trim).collect(
+ List<String> numTasks = Arrays.stream(numTasksRaw.split(TOPOLOGY_OPTION_SEPARATOR)).map(String::trim).collect(
Collectors.toList());
if (numTasks.size() != parserConfigs.size()) {
throw new IllegalArgumentException("Spout num tasks should match number of sensors 1:1");
@@ -602,14 +603,15 @@ public class ParserTopologyCLI {
ParserTopologyCLI cli = new ParserTopologyCLI();
ParserTopologyBuilder.ParserTopology topology = cli.createParserTopology(cmd);
String sensorTypes = ParserOptions.SENSOR_TYPES.get(cmd);
+ String topologyName = sensorTypes.replaceAll(TOPOLOGY_OPTION_SEPARATOR, STORM_JOB_SEPARATOR);
if (ParserOptions.TEST.has(cmd)) {
topology.getTopologyConfig().put(Config.TOPOLOGY_DEBUG, true);
LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(sensorTypes.replaceAll(",", STORM_JOB_SEPARATOR), topology.getTopologyConfig(), topology.getBuilder().createTopology());
+ cluster.submitTopology(topologyName, topology.getTopologyConfig(), topology.getBuilder().createTopology());
Utils.sleep(300000);
cluster.shutdown();
} else {
- StormSubmitter.submitTopology(sensorTypes.replaceAll(",", STORM_JOB_SEPARATOR), topology.getTopologyConfig(), topology.getBuilder().createTopology());
+ StormSubmitter.submitTopology(topologyName, topology.getTopologyConfig(), topology.getBuilder().createTopology());
}
} catch (Exception e) {
e.printStackTrace();