You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2019/03/05 23:20:40 UTC

[metron] branch master updated: METRON-2016 Parser aggregate groups should be persisted and available through REST (merrimanr) closes apache/metron#1346

This is an automated email from the ASF dual-hosted git repository.

rmerriman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d41526  METRON-2016 Parser aggregate groups should be persisted and available through REST (merrimanr) closes apache/metron#1346
6d41526 is described below

commit 6d41526c3eff511e5fe11368c6f3f0a837507ba1
Author: merrimanr <me...@gmail.com>
AuthorDate: Tue Mar 5 17:20:25 2019 -0600

    METRON-2016 Parser aggregate groups should be persisted and available through REST (merrimanr) closes apache/metron#1346
---
 .../CURRENT/package/scripts/parser_commands.py     |  10 +-
 metron-interface/metron-rest/README.md             |  33 +++
 metron-interface/metron-rest/pom.xml               |   6 +
 .../controller/SensorParserGroupController.java    |  93 +++++++
 .../rest/service/SensorParserGroupService.java     |  39 +++
 .../service/impl/SensorParserGroupServiceImpl.java | 136 +++++++++++
 .../rest/service/impl/StormAdminServiceImpl.java   |  50 +++-
 .../rest/service/impl/StormStatusServiceImpl.java  |  68 ++++--
 ...SensorParserGroupControllerIntegrationTest.java | 266 +++++++++++++++++++++
 .../impl/SensorParserGroupServiceImplTest.java     | 203 ++++++++++++++++
 .../service/impl/StormAdminServiceImplTest.java    |  40 +++-
 .../service/impl/StormStatusServiceImplTest.java   |  51 +++-
 .../common/configuration/ParserConfigurations.java |  17 ++
 .../common/configuration/SensorParserGroup.java    |  83 +++++++
 .../metron/parsers/topology/ParserTopologyCLI.java |  16 +-
 15 files changed, 1063 insertions(+), 48 deletions(-)

diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
index 18780d9..a687085 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
@@ -65,14 +65,20 @@ class ParserCommands:
     def __get_aggr_parsers(self, params):
         """
         Fetches the list of aggregated (and regular) parsers and returns a list.
-        If the input list of parsers were "bro,snort,yaf", "bro,snort" and yaf, for example,
-        then this method will return ["bro,snort,yaf", "bro,snort", "yaf"]
+        If the input list of parsers were "bro,yaf,snort", "bro,snort" and yaf, for example,
+        then this method will return ["bro,snort,yaf", "bro,snort", "yaf"].  Sensors within
+        a group are sorted alphabetically.
         :param params:
         :return: List containing the names of parsers
         """
         parserList = []
         parsers = shlex.shlex(params.parsers)
         for name in parsers:
+            sensors = name.strip('",').split(",")
+            # if name contains multiple sensors, sort them alphabetically
+            if len(sensors) > 1:
+                sensors.sort()
+                name = '"' + ",".join(sensors) + '"'
             parserList.append(name.strip(','))
         return [s.translate(None, "'[]") for s in filter(None, parserList)]
 
diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md
index 8aa172b..11f06bf 100644
--- a/metron-interface/metron-rest/README.md
+++ b/metron-interface/metron-rest/README.md
@@ -344,6 +344,10 @@ Request and Response objects are JSON formatted.  The JSON schemas are available
 | [ `GET /api/v1/sensor/parser/config/reload/available`](#get-apiv1sensorparserconfigreloadavailable)|
 | [ `DELETE /api/v1/sensor/parser/config/{name}`](#delete-apiv1sensorparserconfigname)|
 | [ `GET /api/v1/sensor/parser/config/{name}`](#get-apiv1sensorparserconfigname)|
+| [ `POST /api/v1/sensor/parser/group`](#post-apiv1sensorparsergroup)|
+| [ `GET /api/v1/sensor/parser/group/{name}`](#get-apiv1sensorparsergroupname)|
+| [ `GET /api/v1/sensor/parser/group`](#get-apiv1sensorparsergroup)|
+| [ `DELETE /api/v1/sensor/parser/group/{name}`](#delete-apiv1sensorparsergroupname)|
 | [ `POST /api/v1/stellar/apply/transformations`](#post-apiv1stellarapplytransformations)|
 | [ `GET /api/v1/stellar/list`](#get-apiv1stellarlist)|
 | [ `GET /api/v1/stellar/list/functions`](#get-apiv1stellarlistfunctions)|
@@ -787,6 +791,35 @@ Request and Response objects are JSON formatted.  The JSON schemas are available
   * Returns:
     * 200 - Returns SensorParserConfig
     * 404 - SensorParserConfig is missing
+    
+### `POST /api/v1/sensor/parser/group`
+  * Description: Updates or creates a SensorParserGroup in Zookeeper
+  * Input:
+    * sensorParserGroup - SensorParserGroup
+  * Returns:
+    * 200 - SensorParserGroup updated. Returns saved SensorParserGroup
+    * 201 - SensorParserGroup created. Returns saved SensorParserGroup
+    
+### `GET /api/v1/sensor/parser/group/{name}`
+  * Description: Retrieves a SensorParserGroup from Zookeeper
+  * Input:
+    * name - SensorParserGroup name
+  * Returns:
+    * 200 - Returns SensorParserGroup
+    * 404 - SensorParserGroup is missing
+    
+### `GET /api/v1/sensor/parser/group`
+  * Description: Retrieves all SensorParserGroups from Zookeeper
+  * Returns:
+    * 200 - Returns all SensorParserGroups
+    
+### `DELETE /api/v1/sensor/parser/group/{name}`
+  * Description: Deletes a SensorParserGroup from Zookeeper
+  * Input:
+    * name - SensorParserGroup name
+  * Returns:
+    * 200 - SensorParserGroup was deleted
+    * 404 - SensorParserGroup is missing
 
 ### `POST /api/v1/stellar/apply/transformations`
   * Description: Executes transformations against a sample message
diff --git a/metron-interface/metron-rest/pom.xml b/metron-interface/metron-rest/pom.xml
index 2b33767..01f1aed 100644
--- a/metron-interface/metron-rest/pom.xml
+++ b/metron-interface/metron-rest/pom.xml
@@ -300,6 +300,12 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-parsing-storm</artifactId>
+        <version>${project.parent.version}</version>
+        <scope>provided</scope>
+      </dependency>
         <dependency>
             <groupId>com.jayway.jsonpath</groupId>
             <artifactId>json-path</artifactId>
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SensorParserGroupController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SensorParserGroupController.java
new file mode 100644
index 0000000..66fcea4
--- /dev/null
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SensorParserGroupController.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.controller;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.SensorParserGroup;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.model.ParseMessageRequest;
+import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.rest.service.SensorParserGroupService;
+import org.json.simple.JSONObject;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.Collection;
+import java.util.Map;
+
+@RestController
+@RequestMapping("/api/v1/sensor/parser/group")
+public class SensorParserGroupController {
+
+  @Autowired
+  private SensorParserGroupService sensorParserGroupService;
+
+  @ApiOperation(value = "Updates or creates a SensorParserGroup in Zookeeper")
+  @ApiResponses(value = { @ApiResponse(message = "SensorParserGroup updated. Returns saved SensorParserGroup", code = 200),
+          @ApiResponse(message = "SensorParserGroup created. Returns saved SensorParserGroup", code = 201) })
+  @RequestMapping(method = RequestMethod.POST)
+  ResponseEntity<SensorParserGroup> save(@ApiParam(name="sensorParserGroup", value="SensorParserGroup", required=true)@RequestBody SensorParserGroup sensorParserGroup) throws RestException {
+    if (sensorParserGroupService.findOne(sensorParserGroup.getName()) == null) {
+      return new ResponseEntity<>(sensorParserGroupService.save(sensorParserGroup), HttpStatus.CREATED);
+    } else {
+      return new ResponseEntity<>(sensorParserGroupService.save(sensorParserGroup), HttpStatus.OK);
+    }
+  }
+
+  @ApiOperation(value = "Retrieves a SensorParserGroup from Zookeeper")
+  @ApiResponses(value = { @ApiResponse(message = "Returns SensorParserGroup", code = 200),
+          @ApiResponse(message = "SensorParserGroup is missing", code = 404) })
+  @RequestMapping(value = "/{name}", method = RequestMethod.GET)
+  ResponseEntity<SensorParserGroup> findOne(@ApiParam(name="name", value="SensorParserGroup name", required=true)@PathVariable String name) throws RestException {
+    SensorParserGroup sensorParserGroup = sensorParserGroupService.findOne(name);
+    if (sensorParserGroup != null) {
+      return new ResponseEntity<>(sensorParserGroup, HttpStatus.OK);
+    }
+
+    return new ResponseEntity<>(HttpStatus.NOT_FOUND);
+  }
+
+  @ApiOperation(value = "Retrieves all SensorParserGroups from Zookeeper")
+  @ApiResponse(message = "Returns all SensorParserGroups", code = 200)
+  @RequestMapping(method = RequestMethod.GET)
+  ResponseEntity<Map<String, SensorParserGroup>> findAll() throws RestException {
+    return new ResponseEntity<>(sensorParserGroupService.getAll(), HttpStatus.OK);
+  }
+
+  @ApiOperation(value = "Deletes a SensorParserGroup from Zookeeper")
+  @ApiResponses(value = { @ApiResponse(message = "SensorParserGroup was deleted", code = 200),
+          @ApiResponse(message = "SensorParserGroup is missing", code = 404) })
+  @RequestMapping(value = "/{name}", method = RequestMethod.DELETE)
+  ResponseEntity<Void> delete(@ApiParam(name="name", value="SensorParserGroup name", required=true)@PathVariable String name) throws RestException {
+    if (sensorParserGroupService.delete(name)) {
+      return new ResponseEntity<>(HttpStatus.OK);
+    } else {
+      return new ResponseEntity<>(HttpStatus.NOT_FOUND);
+    }
+  }
+}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserGroupService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserGroupService.java
new file mode 100644
index 0000000..cc187bc
--- /dev/null
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SensorParserGroupService.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service;
+
+import org.apache.metron.common.configuration.SensorParserGroup;
+import org.apache.metron.rest.RestException;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Exposes CRUD operations for SensorParserGroup objects stored in Zookeeper.  An array of SensorParserGroups are stored
+ * in the Global Config using the 'parser.groups' key.
+ */
+public interface SensorParserGroupService {
+
+  SensorParserGroup save(SensorParserGroup sensorParserGroup) throws RestException;
+
+  SensorParserGroup findOne(String name);
+
+  Map<String, SensorParserGroup> getAll();
+
+  boolean delete(String name) throws RestException;
+}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserGroupServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserGroupServiceImpl.java
new file mode 100644
index 0000000..8b8ee4a
--- /dev/null
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserGroupServiceImpl.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserGroup;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.service.GlobalConfigService;
+import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.rest.service.SensorParserGroupService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@Service
+public class SensorParserGroupServiceImpl implements SensorParserGroupService {
+
+  private ConfigurationsCache cache;
+  private GlobalConfigService globalConfigService;
+  private SensorParserConfigService sensorParserConfigService;
+
+  @Autowired
+  public SensorParserGroupServiceImpl(ConfigurationsCache cache, GlobalConfigService globalConfigService, SensorParserConfigService sensorParserConfigService) {
+    this.cache = cache;
+    this.globalConfigService = globalConfigService;
+    this.sensorParserConfigService = sensorParserConfigService;
+  }
+
+  /**
+   * Saves a SensorParserGroup in Zookeeper.  Checks for various error conditions including empty sensors field, missing
+   * configs for each sensor and sensors already included in another group.
+   * @param sensorParserGroup
+   * @return
+   * @throws RestException
+   */
+  @Override
+  public SensorParserGroup save(SensorParserGroup sensorParserGroup) throws RestException {
+    ParserConfigurations parserConfigurations = cache.get( ParserConfigurations.class);
+    Map<String, SensorParserGroup> groups = new HashMap<>(parserConfigurations.getSensorParserGroups());
+    groups.remove(sensorParserGroup.getName());
+
+    if (sensorParserGroup.getSensors().size() == 0) {
+      throw new RestException("A parser group must contain sensors");
+    }
+
+    for(String sensor: sensorParserGroup.getSensors()) {
+      // check if sensor config exists
+      if (sensorParserConfigService.findOne(sensor) == null) {
+        throw new RestException(String.format("Could not find config for sensor %s", sensor));
+      }
+      // check if sensor is in another group
+      for (SensorParserGroup group : groups.values()) {
+        Set<String> groupSensors = group.getSensors();
+        if (groupSensors.contains(sensor)) {
+          throw new RestException(String.format("Sensor %s is already in group %s", sensor, group.getName()));
+        }
+      }
+    }
+    groups.put(sensorParserGroup.getName(), sensorParserGroup);
+    saveGroups(parserConfigurations, new HashSet<>(groups.values()));
+    return sensorParserGroup;
+  }
+
+  /**
+   * Retrieves a single SensorParserGroup by name.
+   * @param name SensorParserGroup name
+   * @return SensorParserGroup or null if group is missing
+   */
+  @Override
+  public SensorParserGroup findOne(String name) {
+    return getAll().get(name);
+  }
+
+  /**
+   * Retrieves all SensorParserGroups as a Map with key being the SensorParserGroup name
+   * @return All SensorParserGroups
+   */
+  @Override
+  public Map<String, SensorParserGroup> getAll() {
+    ParserConfigurations configs = cache.get( ParserConfigurations.class);
+    return configs.getSensorParserGroups();
+  }
+
+  /**
+   * Deletes a SensorParserGroup from Zookeeper.
+   * @param name SensorParserGroup name
+   * @return True if a SensorParserGroup was deleted
+   * @throws RestException Writing to Zookeeper resulted in an error
+   */
+  @Override
+  public boolean delete(String name) throws RestException {
+    ParserConfigurations parserConfigurations = cache.get( ParserConfigurations.class);
+    Map<String, SensorParserGroup> groups = parserConfigurations.getSensorParserGroups();
+    boolean deleted = false;
+    if (groups.containsKey(name)) {
+      groups.remove(name);
+      saveGroups(parserConfigurations, new HashSet<>(groups.values()));
+      deleted = true;
+    }
+    return deleted;
+  }
+
+  /**
+   * Saves SensorParserGroups in Zookeeper.
+   * @param parserConfigurations ParserConfigurations
+   * @param groups SensorParserGroups
+   * @throws RestException Writing to Zookeeper resulted in an error
+   */
+  private void saveGroups(ParserConfigurations parserConfigurations, Collection<SensorParserGroup> groups) throws RestException {
+    Map<String, Object> globalConfig = parserConfigurations.getGlobalConfig(true);
+    globalConfig.put(ParserConfigurations.PARSER_GROUPS_CONF, groups);
+    globalConfigService.save(globalConfig);
+  }
+
+}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
index 4569a23..3b027c9 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
@@ -17,15 +17,23 @@
  */
 package org.apache.metron.rest.service.impl;
 
+import org.apache.metron.common.configuration.SensorParserGroup;
+import org.apache.metron.parsers.topology.ParserTopologyCLI;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.TopologyResponse;
+import org.apache.metron.rest.model.TopologyStatus;
 import org.apache.metron.rest.model.TopologyStatusCode;
 import org.apache.metron.rest.service.GlobalConfigService;
 import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.rest.service.SensorParserGroupService;
 import org.apache.metron.rest.service.StormAdminService;
+import org.apache.metron.rest.service.StormStatusService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 @Service
@@ -37,14 +45,29 @@ public class StormAdminServiceImpl implements StormAdminService {
 
     private SensorParserConfigService sensorParserConfigService;
 
+    private SensorParserGroupService sensorParserGroupService;
+
+    private StormStatusService stormStatusService;
+
     @Autowired
-    public StormAdminServiceImpl(StormCLIWrapper stormCLIClientWrapper, GlobalConfigService globalConfigService, SensorParserConfigService sensorParserConfigService) {
+    public StormAdminServiceImpl(StormCLIWrapper stormCLIClientWrapper,
+                                 GlobalConfigService globalConfigService,
+                                 SensorParserConfigService sensorParserConfigService,
+                                 SensorParserGroupService sensorParserGroupService,
+                                 StormStatusService stormStatusService) {
         this.stormCLIClientWrapper = stormCLIClientWrapper;
         this.globalConfigService = globalConfigService;
         this.sensorParserConfigService = sensorParserConfigService;
+        this.sensorParserGroupService = sensorParserGroupService;
+        this.stormStatusService = stormStatusService;
     }
 
-
+    /**
+     * Starts a parser topology.  The name should either be a sensor name or group name in the case of aggregate parser topologies.
+     * @param name SensorParserConfig or SensorParserGroup name
+     * @return ToplogyResponse
+     * @throws RestException Global Config or SensorParserConfigs not found or starting the topology resulted in an error.
+     */
     @Override
     public TopologyResponse startParserTopology(String name) throws RestException {
         TopologyResponse topologyResponse = new TopologyResponse();
@@ -53,7 +76,12 @@ public class StormAdminServiceImpl implements StormAdminService {
             return topologyResponse;
         }
 
-        String[] sensorTypes = name.split(",");
+        List<String> sensorTypes = Collections.singletonList(name);
+        // If name is a group then look up sensors to build the actual topology name
+        SensorParserGroup sensorParserGroup = sensorParserGroupService.findOne(name);
+        if (sensorParserGroup != null) {
+          sensorTypes = new ArrayList<>(sensorParserGroup.getSensors());
+        }
         for (String sensorType : sensorTypes) {
             if (sensorParserConfigService.findOne(sensorType.trim()) == null) {
                 topologyResponse
@@ -62,16 +90,28 @@ public class StormAdminServiceImpl implements StormAdminService {
             }
         }
 
+        // sort the sensor types so the topology name is consistent
+        Collections.sort(sensorTypes);
         return createResponse(
-            stormCLIClientWrapper.startParserTopology(name),
+            stormCLIClientWrapper.startParserTopology(String.join(ParserTopologyCLI.TOPOLOGY_OPTION_SEPARATOR, sensorTypes)),
                 TopologyStatusCode.STARTED,
                 TopologyStatusCode.START_ERROR
         );
     }
 
+    /**
+     * Stops a parser topology.  The name should either be a sensor name or group name in the case of aggregate parser topologies.
+     * @param name SensorParserConfig or SensorParserGroup name
+     * @param stopNow Stop the topology immediately
+     * @return ToplogyResponse
+     * @throws RestException Stopping the topology resulted in an error.
+     */
     @Override
     public TopologyResponse stopParserTopology(String name, boolean stopNow) throws RestException {
-        return createResponse(stormCLIClientWrapper.stopParserTopology(name.replaceAll(",", "__"), stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR);
+        // Supplied name could be a group so get the actual job name from Storm
+        TopologyStatus topologyStatus = stormStatusService.getTopologyStatus(name);
+        String jobName = topologyStatus != null ? topologyStatus.getName() : name;
+        return createResponse(stormCLIClientWrapper.stopParserTopology(jobName, stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR);
     }
 
     @Override
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java
index 42de078..25df549 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java
@@ -16,13 +16,21 @@
 package org.apache.metron.rest.service.impl;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
+import org.apache.metron.common.configuration.SensorParserGroup;
+import org.apache.metron.parsers.topology.ParserTopologyCLI;
+import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.SupervisorSummary;
 import org.apache.metron.rest.model.TopologyResponse;
 import org.apache.metron.rest.model.TopologyStatus;
 import org.apache.metron.rest.model.TopologyStatusCode;
 import org.apache.metron.rest.model.TopologySummary;
+import org.apache.metron.rest.service.SensorParserGroupService;
 import org.apache.metron.rest.service.StormStatusService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.env.Environment;
@@ -38,13 +46,14 @@ import static org.apache.metron.rest.MetronRestConstants.TOPOLOGY_URL;
 public class StormStatusServiceImpl implements StormStatusService {
 
   private Environment environment;
-
   private RestTemplate restTemplate;
+  private SensorParserGroupService sensorParserGroupService;
 
   @Autowired
-  public StormStatusServiceImpl(Environment environment, RestTemplate restTemplate) {
+  public StormStatusServiceImpl(Environment environment, RestTemplate restTemplate, SensorParserGroupService sensorParserGroupService) {
     this.environment = environment;
     this.restTemplate = restTemplate;
+    this.sensorParserGroupService = sensorParserGroupService;
   }
 
   @Override
@@ -62,13 +71,7 @@ public class StormStatusServiceImpl implements StormStatusService {
   @Override
   public TopologyStatus getTopologyStatus(String name) {
     TopologyStatus topologyResponse = null;
-    String id = null;
-    for (TopologyStatus topology : getTopologySummary().getTopologies()) {
-      if (name.equals(topology.getName())) {
-        id = topology.getId();
-        break;
-      }
-    }
+    String id = getTopologyId(name);
     if (id != null) {
       topologyResponse = restTemplate
           .getForObject(getStormUiProperty() + TOPOLOGY_URL + "/" + id, TopologyStatus.class);
@@ -90,13 +93,7 @@ public class StormStatusServiceImpl implements StormStatusService {
   @Override
   public TopologyResponse activateTopology(String name) {
     TopologyResponse topologyResponse = new TopologyResponse();
-    String id = null;
-    for (TopologyStatus topology : getTopologySummary().getTopologies()) {
-      if (name.equals(topology.getName())) {
-        id = topology.getId();
-        break;
-      }
-    }
+    String id = getTopologyId(name);
     if (id != null) {
       Map result = restTemplate
           .postForObject(getStormUiProperty() + TOPOLOGY_URL + "/" + id + "/activate", null,
@@ -115,13 +112,7 @@ public class StormStatusServiceImpl implements StormStatusService {
   @Override
   public TopologyResponse deactivateTopology(String name) {
     TopologyResponse topologyResponse = new TopologyResponse();
-    String id = null;
-    for (TopologyStatus topology : getTopologySummary().getTopologies()) {
-      if (name.equals(topology.getName())) {
-        id = topology.getId();
-        break;
-      }
-    }
+    String id = getTopologyId(name);
     if (id != null) {
       Map result = restTemplate
           .postForObject(getStormUiProperty() + TOPOLOGY_URL + "/" + id + "/deactivate", null,
@@ -145,4 +136,35 @@ public class StormStatusServiceImpl implements StormStatusService {
     }
     return baseValue;
   }
+
+  /**
+   * Retrieves the Storm topology id from the given topology name.  If a topology name is detected to be an aggregate
+   * parser topology, the SensorParserGroups are checked for a match.
+   * @param name Topology or SensorParserGroup name
+   * @return Topology id
+   */
+  protected String getTopologyId(String name) {
+    String id = null;
+    for (TopologyStatus topology : getTopologySummary().getTopologies()) {
+      String topologyName = topology.getName();
+
+      // check sensor group
+      if (topologyName.contains(ParserTopologyCLI.STORM_JOB_SEPARATOR)) {
+        Set<String> sensors = new HashSet<>(Arrays.asList(topologyName.split(ParserTopologyCLI.STORM_JOB_SEPARATOR)));
+        SensorParserGroup group = sensorParserGroupService.findOne(name);
+        if (group == null) {
+          break;
+        } else if (sensors.equals(group.getSensors())){
+          id = topology.getId();
+          break;
+        }
+      }
+
+      if (topologyName.equals(name)) {
+        id = topology.getId();
+        break;
+      }
+    }
+    return id;
+  }
 }
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserGroupControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserGroupControllerIntegrationTest.java
new file mode 100644
index 0000000..8106573
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserGroupControllerIntegrationTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.controller;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.SensorParserGroup;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.rest.service.GlobalConfigService;
+import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.rest.service.SensorParserGroupService;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.http.MediaType;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.setup.MockMvcBuilders;
+import org.springframework.web.context.WebApplicationContext;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+import static org.hamcrest.Matchers.hasSize;
+import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
+import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
+import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT)
+@ActiveProfiles(TEST_PROFILE)
+public class SensorParserGroupControllerIntegrationTest {
+
+  /**
+   {
+   "name":"group1",
+   "description":"group1 description",
+   "sensors":["bro","snort"]
+   }
+   */
+  @Multiline
+  public static String group1BroSnort;
+
+  /**
+   {
+   "name":"group1",
+   "description":"group1 description",
+   "sensors":["bro","squid"]
+   }
+   */
+  @Multiline
+  public static String group1BroSquid;
+
+  /**
+   {
+   "name":"group2",
+   "description":"group2 description",
+   "sensors":["yaf","jsonMap"]
+   }
+   */
+  @Multiline
+  public static String group2YafJsonMap;
+
+  /**
+   {
+   "name":"errorGroup",
+   "description":"error description",
+   "sensors":["bro"]
+   }
+   */
+  @Multiline
+  public static String errorGroup;
+
+  @Autowired
+  private GlobalConfigService globalConfigService;
+
+  @Autowired
+  private SensorParserConfigService sensorParserConfigService;
+
+  @Autowired
+  private SensorParserGroupService sensorParserGroupService;
+
+  @Autowired
+  private WebApplicationContext wac;
+
+  private MockMvc mockMvc;
+  private AtomicInteger numFields;
+
+  private String sensorParserGroupUrl = "/api/v1/sensor/parser/group";
+  private String user = "user";
+  private String password = "password";
+
+  @Before
+  public void setup() throws Exception {
+    this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build();
+    Method[] method = SensorParserGroup.class.getMethods();
+    numFields = new AtomicInteger(0);
+    for(Method m : method) {
+      if(m.getName().startsWith("set")) {
+        numFields.set(numFields.get() + 1);
+      }
+    }
+    this.globalConfigService.save(new HashMap<>());
+    this.sensorParserConfigService.save("bro", new SensorParserConfig());
+    this.sensorParserConfigService.save("snort", new SensorParserConfig());
+    this.sensorParserConfigService.save("squid", new SensorParserConfig());
+    this.sensorParserConfigService.save("yaf", new SensorParserConfig());
+    this.sensorParserConfigService.save("jsonMap", new SensorParserConfig());
+  }
+
+  @Test
+  public void testSecurity() throws Exception {
+    this.mockMvc.perform(post(sensorParserGroupUrl).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(group1BroSnort))
+            .andExpect(status().isUnauthorized());
+
+    this.mockMvc.perform(get(sensorParserGroupUrl + "/group1"))
+            .andExpect(status().isUnauthorized());
+
+    this.mockMvc.perform(get(sensorParserGroupUrl))
+            .andExpect(status().isUnauthorized());
+
+    this.mockMvc.perform(delete(sensorParserGroupUrl + "/group1").with(csrf()))
+            .andExpect(status().isUnauthorized());
+  }
+
+  @Test
+  public void testCreate() throws Exception {
+    this.mockMvc.perform(post(sensorParserGroupUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(group1BroSnort))
+            .andExpect(status().isCreated())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.*", hasSize(numFields.get())))
+            .andExpect(jsonPath("$.name").value("group1"))
+            .andExpect(jsonPath("$.description").value("group1 description"))
+            .andExpect(jsonPath("$.sensors[0]").value("bro"))
+            .andExpect(jsonPath("$.sensors[1]").value("snort"));
+
+    this.mockMvc.perform(post(sensorParserGroupUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(group2YafJsonMap))
+            .andExpect(status().isCreated())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.*", hasSize(numFields.get())))
+            .andExpect(jsonPath("$.name").value("group2"))
+            .andExpect(jsonPath("$.description").value("group2 description"))
+            .andExpect(jsonPath("$.sensors[0]").value("jsonMap"))
+            .andExpect(jsonPath("$.sensors[1]").value("yaf"));
+  }
+
+  @Test
+  public void testUpdate() throws Exception {
+    this.sensorParserGroupService.save(JSONUtils.INSTANCE.load(group1BroSquid, SensorParserGroup.class));
+
+    this.mockMvc.perform(post(sensorParserGroupUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(group1BroSquid))
+            .andExpect(status().isOk())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.*", hasSize(numFields.get())))
+            .andExpect(jsonPath("$.name").value("group1"))
+            .andExpect(jsonPath("$.description").value("group1 description"))
+            .andExpect(jsonPath("$.sensors[0]").value("squid"))
+            .andExpect(jsonPath("$.sensors[1]").value("bro"));
+  }
+
+  @Test
+  public void testFindOne() throws Exception {
+    this.sensorParserGroupService.save(JSONUtils.INSTANCE.load(group1BroSquid, SensorParserGroup.class));
+
+    this.mockMvc.perform(get(sensorParserGroupUrl + "/group1").with(httpBasic(user,password)))
+            .andExpect(status().isOk())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.*", hasSize(numFields.get())))
+            .andExpect(jsonPath("$.name").value("group1"))
+            .andExpect(jsonPath("$.description").value("group1 description"))
+            .andExpect(jsonPath("$.sensors[0]").value("squid"))
+            .andExpect(jsonPath("$.sensors[1]").value("bro"));
+
+    this.mockMvc.perform(get(sensorParserGroupUrl + "/missingGroup").with(httpBasic(user,password)))
+            .andExpect(status().isNotFound());
+  }
+
+  @Test
+  public void testGetAll() throws Exception {
+    this.sensorParserGroupService.save(JSONUtils.INSTANCE.load(group1BroSquid, SensorParserGroup.class));
+    this.sensorParserGroupService.save(JSONUtils.INSTANCE.load(group2YafJsonMap, SensorParserGroup.class));
+
+    this.mockMvc.perform(get(sensorParserGroupUrl).with(httpBasic(user,password)))
+            .andExpect(status().isOk())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.*", hasSize(2)))
+            .andExpect(jsonPath("$.group1.*", hasSize(numFields.get())))
+            .andExpect(jsonPath("$.group1.name").value("group1"))
+            .andExpect(jsonPath("$.group1.description").value("group1 description"))
+            .andExpect(jsonPath("$.group1.sensors[0]").value("squid"))
+            .andExpect(jsonPath("$.group1.sensors[1]").value("bro"))
+            .andExpect(jsonPath("$.group2.*", hasSize(numFields.get())))
+            .andExpect(jsonPath("$.group2.name").value("group2"))
+            .andExpect(jsonPath("$.group2.description").value("group2 description"))
+            .andExpect(jsonPath("$.group2.sensors[0]").value("jsonMap"))
+            .andExpect(jsonPath("$.group2.sensors[1]").value("yaf"));
+  }
+
+  @Test
+  public void testError() throws Exception {
+    this.sensorParserGroupService.save(JSONUtils.INSTANCE.load(group1BroSquid, SensorParserGroup.class));
+
+    this.mockMvc.perform(post(sensorParserGroupUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(errorGroup))
+            .andExpect(status().isInternalServerError())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.responseCode").value(500))
+            .andExpect(jsonPath("$.message").value("Sensor bro is already in group group1"))
+            .andExpect(jsonPath("$.fullMessage").value("RestException: Sensor bro is already in group group1"));
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    this.sensorParserGroupService.save(JSONUtils.INSTANCE.load(group1BroSquid, SensorParserGroup.class));
+
+    this.mockMvc.perform(delete(sensorParserGroupUrl + "/group1").with(httpBasic(user,password)).with(csrf()))
+            .andExpect(status().isOk());
+
+    this.mockMvc.perform(delete(sensorParserGroupUrl + "/missingGroup").with(httpBasic(user,password)))
+            .andExpect(status().isNotFound());
+
+    {
+      //we must wait for the config to find its way into the config.
+      TestUtils.assertEventually(() -> Assert.assertNull(sensorParserConfigService.findOne("group1")));
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    this.globalConfigService.delete();
+    this.sensorParserConfigService.delete("bro");
+    this.sensorParserConfigService.delete("snort");
+    this.sensorParserConfigService.delete("squid");
+    this.sensorParserConfigService.delete("yaf");
+    this.sensorParserConfigService.delete("jsonMap");
+  }
+}
+
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserGroupServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserGroupServiceImplTest.java
new file mode 100644
index 0000000..efe8f43
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserGroupServiceImplTest.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.SensorParserGroup;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.service.GlobalConfigService;
+import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.rest.service.SensorParserGroupService;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.metron.common.configuration.ParserConfigurations.PARSER_GROUPS_CONF;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class SensorParserGroupServiceImplTest {
+
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  private ConfigurationsCache cache;
+  private GlobalConfigService globalConfigService;
+  private SensorParserConfigService sensorParserConfigService;
+  private SensorParserGroupService sensorParserGroupService;
+
+  @Before
+  public void setUp() throws Exception {
+    cache = mock(ConfigurationsCache.class);
+    globalConfigService = mock(GlobalConfigService.class);
+    sensorParserConfigService = mock(SensorParserConfigService.class);
+    sensorParserGroupService = new SensorParserGroupServiceImpl(cache, globalConfigService, sensorParserConfigService);
+  }
+
+  @Test
+  public void shouldSaveNewGroup() throws Exception {
+    when(cache.get(ParserConfigurations.class)).thenReturn(new ParserConfigurations());
+    when(sensorParserConfigService.findOne("bro")).thenReturn(new SensorParserConfig());
+
+    SensorParserGroup sensorParserGroup = new SensorParserGroup();
+    sensorParserGroup.setName("group1");
+    sensorParserGroup.setDescription("description 1");
+    sensorParserGroup.setSensors(Collections.singleton("bro"));
+
+    Map<String, Object> expectedGlobalConfig = new HashMap<>();
+    Collection<SensorParserGroup> expectedGroup = Collections.singleton(sensorParserGroup);
+    expectedGlobalConfig.put(PARSER_GROUPS_CONF, expectedGroup);
+
+    assertEquals(sensorParserGroup, sensorParserGroupService.save(sensorParserGroup));
+    verify(globalConfigService, times(1)).save(expectedGlobalConfig);
+
+    verifyNoMoreInteractions(globalConfigService);
+  }
+
+  @Test
+  public void shouldSaveExistingGroup() throws Exception {
+    SensorParserGroup oldGroup = new SensorParserGroup();
+    oldGroup.setName("oldGroup");
+    oldGroup.setDescription("old description");
+    oldGroup.setSensors(Collections.singleton("oldSensor"));
+
+    ParserConfigurations parserConfigurations = mock(ParserConfigurations.class);
+    when(cache.get(ParserConfigurations.class)).thenReturn(new ParserConfigurations());
+    when(parserConfigurations.getSensorParserGroups()).thenReturn(new HashMap<String, SensorParserGroup>() {{
+      put("newSensor", oldGroup);
+    }});
+    when(sensorParserConfigService.findOne("newSensor")).thenReturn(new SensorParserConfig());
+
+    SensorParserGroup newGroup = new SensorParserGroup();
+    newGroup.setName("newGroup");
+    newGroup.setDescription("new description");
+    newGroup.setSensors(Collections.singleton("newSensor"));
+
+    Map<String, Object> expectedGlobalConfig = new HashMap<>();
+    Collection<SensorParserGroup> expectedGroup = Collections.singleton(newGroup);
+    expectedGlobalConfig.put(PARSER_GROUPS_CONF, expectedGroup);
+
+    assertEquals(newGroup, sensorParserGroupService.save(newGroup));
+    verify(globalConfigService, times(1)).save(expectedGlobalConfig);
+
+    verifyNoMoreInteractions(globalConfigService);
+  }
+
+  @Test
+  public void saveShouldThrowExceptionOnMissingSensor() throws Exception {
+    exception.expect(RestException.class);
+    exception.expectMessage("A parser group must contain sensors");
+
+    when(cache.get(ParserConfigurations.class)).thenReturn(new ParserConfigurations());
+
+    sensorParserGroupService.save(new SensorParserGroup());
+  }
+
+  @Test
+  public void saveShouldThrowExceptionOnMissingConfig() throws Exception {
+    exception.expect(RestException.class);
+    exception.expectMessage("Could not find config for sensor bro");
+
+    when(cache.get(ParserConfigurations.class)).thenReturn(new ParserConfigurations());
+
+    SensorParserGroup sensorParserGroup = new SensorParserGroup();
+    sensorParserGroup.setSensors(Collections.singleton("bro"));
+
+    sensorParserGroupService.save(sensorParserGroup);
+  }
+
+  @Test
+  public void saveShouldThrowExceptionOnSensorInAnotherGroup() throws Exception {
+    exception.expect(RestException.class);
+    exception.expectMessage("Sensor bro is already in group existingGroup");
+
+    SensorParserGroup existingGroup = new SensorParserGroup();
+    existingGroup.setName("existingGroup");
+    existingGroup.setSensors(Collections.singleton("bro"));
+    ParserConfigurations parserConfigurations = mock(ParserConfigurations.class);
+    when(parserConfigurations.getSensorParserGroups()).thenReturn(new HashMap<String, SensorParserGroup>() {{
+      put("existingGroup", existingGroup);
+    }});
+    when(cache.get(ParserConfigurations.class)).thenReturn(parserConfigurations);
+    when(sensorParserConfigService.findOne("bro")).thenReturn(new SensorParserConfig());
+
+    SensorParserGroup newGroup = new SensorParserGroup();
+    newGroup.setName("newGroup");
+    newGroup.setSensors(Collections.singleton("bro"));
+
+    sensorParserGroupService.save(newGroup);
+  }
+
+  @Test
+  public void shouldFindSensorParserGroup() throws Exception {
+    ParserConfigurations parserConfigurations = mock(ParserConfigurations.class);
+    SensorParserGroup group1 = new SensorParserGroup();
+    group1.setName("group1");
+    group1.setDescription("group1 description");
+    group1.setSensors(Collections.singleton("group1Sensor"));
+    SensorParserGroup group2 = new SensorParserGroup();
+    group2.setName("group2");
+    group2.setDescription("group2 description");
+    group2.setSensors(Collections.singleton("group2Sensor"));
+    when(parserConfigurations.getSensorParserGroups()).thenReturn(new HashMap<String, SensorParserGroup>() {{
+      put("group1", group1);
+      put("group2", group2);
+    }});
+    when(cache.get(ParserConfigurations.class)).thenReturn(parserConfigurations);
+
+    assertEquals(group2, sensorParserGroupService.findOne("group2"));
+  }
+
+
+  @Test
+  public void shouldDeleteSensorParserGroup() throws Exception {
+    ParserConfigurations parserConfigurations = mock(ParserConfigurations.class);
+    SensorParserGroup group1 = new SensorParserGroup();
+    group1.setName("group1");
+    group1.setDescription("group1 description");
+    group1.setSensors(Collections.singleton("group1Sensor"));
+    when(parserConfigurations.getSensorParserGroups()).thenReturn(new HashMap<String, SensorParserGroup>() {{
+      put("group1", group1);
+    }});
+    when(cache.get(ParserConfigurations.class)).thenReturn(parserConfigurations);
+
+    Map<String, Object> expectedGlobalConfig = new HashMap<>();
+    expectedGlobalConfig.put(PARSER_GROUPS_CONF, new HashSet<>());
+
+    assertEquals(true, sensorParserGroupService.delete("group1"));
+    assertEquals(false, sensorParserGroupService.delete("group2"));
+
+    verify(globalConfigService, times(1)).save(expectedGlobalConfig);
+    verifyNoMoreInteractions(globalConfigService);
+  }
+
+
+}
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java
index 65a1bda..9f6c3c5 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormAdminServiceImplTest.java
@@ -18,17 +18,24 @@
 package org.apache.metron.rest.service.impl;
 
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.SensorParserGroup;
 import org.apache.metron.rest.model.TopologyResponse;
+import org.apache.metron.rest.model.TopologyStatus;
 import org.apache.metron.rest.model.TopologyStatusCode;
 import org.apache.metron.rest.service.GlobalConfigService;
 import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.rest.service.SensorParserGroupService;
 import org.apache.metron.rest.service.StormAdminService;
+import org.apache.metron.rest.service.StormStatusService;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -44,13 +51,17 @@ public class StormAdminServiceImplTest {
   StormAdminService stormAdminService;
   GlobalConfigService globalConfigService;
   SensorParserConfigService sensorParserConfigService;
+  SensorParserGroupService sensorParserGroupService;
+  StormStatusService stormStatusService;
 
   @Before
   public void setUp() throws Exception {
     stormCLIClientWrapper = mock(StormCLIWrapper.class);
     globalConfigService = mock(GlobalConfigService.class);
     sensorParserConfigService = mock(SensorParserConfigService.class);
-    stormAdminService = new StormAdminServiceImpl(stormCLIClientWrapper, globalConfigService, sensorParserConfigService);
+    sensorParserGroupService = mock(SensorParserGroupService.class);
+    stormStatusService = mock(StormStatusService.class);
+    stormAdminService = new StormAdminServiceImpl(stormCLIClientWrapper, globalConfigService, sensorParserConfigService, sensorParserGroupService, stormStatusService);
   }
 
   @Test
@@ -68,6 +79,28 @@ public class StormAdminServiceImplTest {
   }
 
   @Test
+  public void startParserTopologyByGroupShouldProperlyReturnSuccessTopologyResponse() throws Exception {
+    SensorParserGroup group = new SensorParserGroup();
+    group.setName("group");
+    group.setSensors(new HashSet<String>() {{
+      add("bro");
+      add("snort");
+    }});
+    when(sensorParserGroupService.findOne("group")).thenReturn(group);
+    when(stormCLIClientWrapper.startParserTopology("bro,snort")).thenReturn(0);
+    when(globalConfigService.get()).thenReturn(new HashMap<String, Object>());
+    when(sensorParserConfigService.findOne("bro")).thenReturn(new SensorParserConfig());
+    when(sensorParserConfigService.findOne("snort")).thenReturn(new SensorParserConfig());
+
+    TopologyResponse expected = new TopologyResponse();
+    expected.setSuccessMessage(TopologyStatusCode.STARTED.toString());
+    TopologyResponse actual = stormAdminService.startParserTopology("group");
+
+    assertEquals(expected, actual);
+    assertEquals(expected.hashCode(), actual.hashCode());
+  }
+
+  @Test
   public void startParserTopologyShouldReturnGlobalConfigMissingError() throws Exception {
     when(globalConfigService.get()).thenReturn(null);
 
@@ -90,9 +123,10 @@ public class StormAdminServiceImplTest {
 
   @Test
   public void stopParserTopologyShouldProperlyReturnErrorTopologyResponse() throws Exception {
+    TopologyStatus topologyStatus = new TopologyStatus();
+    topologyStatus.setName("bro");
     when(stormCLIClientWrapper.stopParserTopology("bro", false)).thenReturn(1);
-    when(globalConfigService.get()).thenReturn(new HashMap<String, Object>());
-    when(sensorParserConfigService.findOne("bro")).thenReturn(new SensorParserConfig());
+    when(stormStatusService.getTopologyStatus("bro")).thenReturn(topologyStatus);
 
     TopologyResponse expected = new TopologyResponse();
     expected.setErrorMessage(TopologyStatusCode.STOP_ERROR.toString());
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormStatusServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormStatusServiceImplTest.java
index 606fed4..eac9be5 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormStatusServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/StormStatusServiceImplTest.java
@@ -17,10 +17,12 @@
  */
 package org.apache.metron.rest.service.impl;
 
+import org.apache.metron.common.configuration.SensorParserGroup;
 import org.apache.metron.rest.model.TopologyResponse;
 import org.apache.metron.rest.model.TopologyStatus;
 import org.apache.metron.rest.model.TopologyStatusCode;
 import org.apache.metron.rest.model.TopologySummary;
+import org.apache.metron.rest.service.SensorParserGroupService;
 import org.apache.metron.rest.service.StormStatusService;
 import org.junit.Before;
 import org.junit.Rule;
@@ -31,6 +33,7 @@ import org.springframework.web.client.RestTemplate;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
 import static org.apache.metron.rest.MetronRestConstants.STORM_UI_SPRING_PROPERTY;
@@ -52,12 +55,14 @@ public class StormStatusServiceImplTest {
   Environment environment;
   RestTemplate restTemplate;
   StormStatusService stormStatusService;
+  SensorParserGroupService sensorParserGroupService;
 
   @Before
   public void setUp() throws Exception {
     environment = mock(Environment.class);
     restTemplate = mock(RestTemplate.class);
-    stormStatusService = new StormStatusServiceImpl(environment, restTemplate);
+    sensorParserGroupService = mock(SensorParserGroupService.class);
+    stormStatusService = new StormStatusServiceImpl(environment, restTemplate, sensorParserGroupService);
   }
 
   @Test
@@ -129,7 +134,37 @@ public class StormStatusServiceImplTest {
   }
 
   @Test
-  public void getAllTopologyStatusShouldReturnAllTopologyStatus() {
+  public void getTopologyStatusByGroupShouldReturnTopologyStatus() throws Exception {
+    final TopologyStatus topologyStatus = new TopologyStatus();
+    topologyStatus.setStatus(TopologyStatusCode.STARTED);
+    topologyStatus.setName("bro__snort");
+    topologyStatus.setId("bro_snort_id");
+    final TopologySummary topologySummary = new TopologySummary();
+    topologySummary.setTopologies(new TopologyStatus[]{topologyStatus});
+
+    SensorParserGroup group = new SensorParserGroup();
+    group.setName("group");
+    group.setSensors(new HashSet<String>() {{
+      add("bro");
+      add("snort");
+    }});
+    when(sensorParserGroupService.findOne("group")).thenReturn(group);
+    when(environment.getProperty(STORM_UI_SPRING_PROPERTY)).thenReturn(HTTP_STORM_UI);
+    when(restTemplate.getForObject(HTTP_STORM_UI + TOPOLOGY_SUMMARY_URL, TopologySummary.class)).thenReturn(topologySummary);
+    when(restTemplate.getForObject(HTTP_STORM_UI + TOPOLOGY_URL + "/bro_snort_id", TopologyStatus.class)).thenReturn(topologyStatus);
+
+    TopologyStatus expected = new TopologyStatus();
+    expected.setStatus(TopologyStatusCode.STARTED);
+    expected.setName("bro__snort");
+    expected.setId("bro_snort_id");
+
+    TopologyStatus actual = stormStatusService.getTopologyStatus("group");
+    assertEquals(expected, actual);
+    assertEquals(expected.hashCode(), actual.hashCode());
+  }
+
+  @Test
+  public void getAllTopologyStatusShouldReturnAllTopologyStatus() throws Exception {
     final TopologyStatus topologyStatus = new TopologyStatus();
     topologyStatus.setStatus(TopologyStatusCode.STARTED);
     topologyStatus.setName("bro");
@@ -151,7 +186,7 @@ public class StormStatusServiceImplTest {
 
 
   @Test
-  public void activateTopologyShouldReturnActiveTopologyResponse() {
+  public void activateTopologyShouldReturnActiveTopologyResponse() throws Exception {
     final TopologyStatus topologyStatus = new TopologyStatus();
     topologyStatus.setName("bro");
     topologyStatus.setId("bro_id");
@@ -169,7 +204,7 @@ public class StormStatusServiceImplTest {
   }
 
   @Test
-  public void activateTopologyShouldReturnErrorTopologyResponse() {
+  public void activateTopologyShouldReturnErrorTopologyResponse() throws Exception {
     final TopologyStatus topologyStatus = new TopologyStatus();
     topologyStatus.setName("bro");
     topologyStatus.setId("bro_id");
@@ -187,7 +222,7 @@ public class StormStatusServiceImplTest {
   }
 
   @Test
-  public void activateTopologyShouldReturnTopologyNotFoundTopologyResponse() {
+  public void activateTopologyShouldReturnTopologyNotFoundTopologyResponse() throws Exception {
     when(environment.getProperty(STORM_UI_SPRING_PROPERTY)).thenReturn(HTTP_STORM_UI);
     when(restTemplate.getForObject(HTTP_STORM_UI + TOPOLOGY_SUMMARY_URL, TopologySummary.class)).thenReturn(new TopologySummary());
 
@@ -197,7 +232,7 @@ public class StormStatusServiceImplTest {
   }
 
   @Test
-  public void deactivateTopologyShouldReturnActiveTopologyResponse() {
+  public void deactivateTopologyShouldReturnActiveTopologyResponse() throws Exception {
     final TopologyStatus topologyStatus = new TopologyStatus();
     topologyStatus.setName("bro");
     topologyStatus.setId("bro_id");
@@ -215,7 +250,7 @@ public class StormStatusServiceImplTest {
   }
 
   @Test
-  public void deactivateTopologyShouldReturnErrorTopologyResponse() {
+  public void deactivateTopologyShouldReturnErrorTopologyResponse() throws Exception {
     final TopologyStatus topologyStatus = new TopologyStatus();
     topologyStatus.setName("bro");
     topologyStatus.setId("bro_id");
@@ -233,7 +268,7 @@ public class StormStatusServiceImplTest {
   }
 
   @Test
-  public void deactivateTopologyShouldReturnTopologyNotFoundTopologyResponse() {
+  public void deactivateTopologyShouldReturnTopologyNotFoundTopologyResponse() throws Exception {
     when(environment.getProperty(STORM_UI_SPRING_PROPERTY)).thenReturn(HTTP_STORM_UI);
     when(restTemplate.getForObject(HTTP_STORM_UI + TOPOLOGY_SUMMARY_URL, TopologySummary.class)).thenReturn(new TopologySummary());
 
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
index 0043d71..7a638df 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
@@ -17,19 +17,24 @@
  */
 package org.apache.metron.common.configuration;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.metron.common.utils.JSONUtils;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Allows for retrieval and update of parsing configurations.
  */
 public class ParserConfigurations extends Configurations {
   public static final Integer DEFAULT_KAFKA_BATCH_SIZE = 15;
+  public static final String PARSER_GROUPS_CONF = "parser.groups";
 
   public SensorParserConfig getSensorParserConfig(String sensorType) {
     return (SensorParserConfig) getConfigurations().get(getKey(sensorType));
@@ -50,6 +55,18 @@ public class ParserConfigurations extends Configurations {
   }
 
   /**
+   * Retrieves all the sensor groups from the global config
+   * @return Map of sensor groups with the group name as the key
+   */
+  public Map<String, SensorParserGroup> getSensorParserGroups() {
+    Object groups = getGlobalConfig(true).getOrDefault(PARSER_GROUPS_CONF, new ArrayList<>());
+    Collection<SensorParserGroup> sensorParserGroups = JSONUtils.INSTANCE.getMapper()
+            .convertValue(groups, new TypeReference<Collection<SensorParserGroup>>() {{}});
+    return sensorParserGroups.stream()
+            .collect(Collectors.toMap(SensorParserGroup::getName, sensorParserGroup -> sensorParserGroup));
+  }
+
+  /**
    * Gets the list of sensor types that parsing configurations exist for.
    *
    * @return List of sensor types
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserGroup.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserGroup.java
new file mode 100644
index 0000000..e5538c1
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserGroup.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents a group of sensors.  Sensor parser groups are used to execute parsers in a single execution context (Storm
+ * bolt for example).
+ */
+public class SensorParserGroup {
+
+  private String name;
+  private String description;
+  private Set<String> sensors = new HashSet<>();
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public String getDescription() {
+    return description;
+  }
+
+  public void setDescription(String description) {
+    this.description = description;
+  }
+
+  public Set<String> getSensors() {
+    return sensors;
+  }
+
+  public void setSensors(Set<String> sensors) {
+    this.sensors = sensors;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    SensorParserGroup that = (SensorParserGroup) o;
+    return Objects.equals(name, that.name) &&
+            Objects.equals(description, that.description) &&
+            Objects.equals(sensors, that.sensors);
+  }
+
+  @Override
+  public int hashCode() {
+
+    return Objects.hash(name, description, sensors);
+  }
+
+  @Override
+  public String toString() {
+    return "SensorParserGroup{" +
+            "name='" + name + '\'' +
+            ", description='" + description + '\'' +
+            ", sensors=" + sensors +
+            '}';
+  }
+}
diff --git a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index d142279..1382bba 100644
--- a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -53,7 +53,8 @@ import org.apache.storm.utils.Utils;
 
 public class ParserTopologyCLI {
 
-  private static final String STORM_JOB_SEPARATOR = "__";
+  public static final String STORM_JOB_SEPARATOR = "__";
+  public static final String TOPOLOGY_OPTION_SEPARATOR = ",";
 
   public enum ParserOptions {
     HELP("h", code -> {
@@ -171,7 +172,7 @@ public class ParserTopologyCLI {
       Option o = new Option(code, "extra_topology_options", true
                            , "Extra options in the form of a JSON file with a map for content." +
                              "  Available options are those in the Kafka Consumer Configs at http://kafka.apache.org/0100/documentation.html#newconsumerconfigs" +
-                             " and " + Joiner.on(",").join(SpoutConfiguration.allOptions())
+                             " and " + Joiner.on(TOPOLOGY_OPTION_SEPARATOR).join(SpoutConfiguration.allOptions())
                            );
       o.setArgName("JSON_FILE");
       o.setRequired(false);
@@ -325,7 +326,7 @@ public class ParserTopologyCLI {
     String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd);
     Optional<String> brokerUrl = ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty();
     String sensorTypeRaw= ParserOptions.SENSOR_TYPES.get(cmd);
-    List<String> sensorTypes = Arrays.stream(sensorTypeRaw.split(",")).map(String::trim).collect(
+    List<String> sensorTypes = Arrays.stream(sensorTypeRaw.split(TOPOLOGY_OPTION_SEPARATOR)).map(String::trim).collect(
         Collectors.toList());
 
     /*
@@ -357,7 +358,7 @@ public class ParserTopologyCLI {
 
         // Handle the multiple explicitly passed spout parallelism's case.
         String parallelismRaw = ParserOptions.SPOUT_PARALLELISM.get(cmd, "1");
-        List<String> parallelisms = Arrays.stream(parallelismRaw.split(",")).map(String::trim).collect(
+        List<String> parallelisms = Arrays.stream(parallelismRaw.split(TOPOLOGY_OPTION_SEPARATOR)).map(String::trim).collect(
             Collectors.toList());
         if (parallelisms.size() != parserConfigs.size()) {
           throw new IllegalArgumentException("Spout parallelism should match number of sensors 1:1");
@@ -387,7 +388,7 @@ public class ParserTopologyCLI {
 
         // Handle the multiple explicitly passed spout parallelism's case.
         String numTasksRaw = ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1");
-        List<String> numTasks = Arrays.stream(numTasksRaw.split(",")).map(String::trim).collect(
+        List<String> numTasks = Arrays.stream(numTasksRaw.split(TOPOLOGY_OPTION_SEPARATOR)).map(String::trim).collect(
             Collectors.toList());
         if (numTasks.size() != parserConfigs.size()) {
           throw new IllegalArgumentException("Spout num tasks should match number of sensors 1:1");
@@ -602,14 +603,15 @@ public class ParserTopologyCLI {
       ParserTopologyCLI cli = new ParserTopologyCLI();
       ParserTopologyBuilder.ParserTopology topology = cli.createParserTopology(cmd);
       String sensorTypes = ParserOptions.SENSOR_TYPES.get(cmd);
+      String topologyName = sensorTypes.replaceAll(TOPOLOGY_OPTION_SEPARATOR, STORM_JOB_SEPARATOR);
       if (ParserOptions.TEST.has(cmd)) {
         topology.getTopologyConfig().put(Config.TOPOLOGY_DEBUG, true);
         LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(sensorTypes.replaceAll(",", STORM_JOB_SEPARATOR), topology.getTopologyConfig(), topology.getBuilder().createTopology());
+        cluster.submitTopology(topologyName, topology.getTopologyConfig(), topology.getBuilder().createTopology());
         Utils.sleep(300000);
         cluster.shutdown();
       } else {
-        StormSubmitter.submitTopology(sensorTypes.replaceAll(",", STORM_JOB_SEPARATOR), topology.getTopologyConfig(), topology.getBuilder().createTopology());
+        StormSubmitter.submitTopology(topologyName, topology.getTopologyConfig(), topology.getBuilder().createTopology());
       }
     } catch (Exception e) {
       e.printStackTrace();