You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by an...@apache.org on 2018/10/03 13:28:33 UTC

metron git commit: METRON-1798 Add mpack support for parser aggregation (anandsubbu) closes apache/metron#1215

Repository: metron
Updated Branches:
  refs/heads/master ff1f9cf52 -> ebdaf5f90


METRON-1798 Add mpack support for parser aggregation (anandsubbu) closes apache/metron#1215


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/ebdaf5f9
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/ebdaf5f9
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/ebdaf5f9

Branch: refs/heads/master
Commit: ebdaf5f905829d8ff59814bb3555666217f85abb
Parents: ff1f9cf
Author: anandsubbu <an...@gmail.com>
Authored: Wed Oct 3 18:57:48 2018 +0530
Committer: anandsubbu <an...@apache.org>
Committed: Wed Oct 3 18:57:48 2018 +0530

----------------------------------------------------------------------
 .../configuration/metron-parsers-env.xml        |  2 +-
 .../CURRENT/package/scripts/parser_commands.py  | 49 ++++++++++++++++++--
 .../metron-parsers/ParserChaining.md            | 18 +++++++
 3 files changed, 63 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/ebdaf5f9/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml
index a9a498b..03a2594 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml
@@ -21,7 +21,7 @@
     <property>
         <name>parsers</name>
         <value>bro,snort,yaf</value>
-        <description>Metron parsers to deploy</description>
+        <description>Metron parsers to deploy. You can also specify an aggregated parser list by grouping them with double quotes. For example: "parserA,parserB",parserC,parserD</description>
         <display-name>Metron Parsers</display-name>
     </property>
     <property>

http://git-wip-us.apache.org/repos/asf/metron/blob/ebdaf5f9/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
index 274306a..18780d9 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
@@ -20,6 +20,7 @@ limitations under the License.
 
 import os
 import re
+import shlex
 import subprocess
 import time
 
@@ -49,7 +50,45 @@ class ParserCommands:
 
     # get list of parsers
     def __get_parsers(self, params):
-        return params.parsers.replace(' ', '').split(',')
+        """
+        Combines the list of parser topics and sends a unique list to be used for
+        Kafka topic creation and the like.
+        :param params:
+        :return: List containing the names of unique parsers
+        """
+        parserBatches = list(self.__get_aggr_parsers(params))
+        parsers = ','.join(s.translate(None, '"') for s in parserBatches)
+        # Get only the unique list of parser names
+        parsers = list(set(parsers.split(',')))
+        return parsers
+
+    def __get_aggr_parsers(self, params):
+        """
+        Fetches the list of aggregated (and regular) parsers and returns a list.
+        If the input list of parsers were "bro,snort,yaf", "bro,snort" and yaf, for example,
+        then this method will return ["bro,snort,yaf", "bro,snort", "yaf"]
+        :param params:
+        :return: List containing the names of parsers
+        """
+        parserList = []
+        parsers = shlex.shlex(params.parsers)
+        for name in parsers:
+            parserList.append(name.strip(','))
+        return [s.translate(None, "'[]") for s in filter(None, parserList)]
+
+    def get_parser_aggr_topology_names(self, params):
+        """
+        Returns the names of regular and aggregated topologies as they would run in storm
+        An aggregated topology has the naming convention of 'parserA__parserB'.
+        For example, a list of parsers like ["bro,snort", yaf] will be returned as ["bro__snort", "yaf"]
+        :param params:
+        :return: List containing the names of parser topologies
+        """
+        topologyName = []
+        for parser in self.__get_aggr_parsers(params):
+            parser = parser.replace(",", "__").strip('"')
+            topologyName.append(parser)
+        return topologyName
 
     def __get_topics(self):
         # All errors go to indexing topics, so create it here if it's not already
@@ -104,7 +143,7 @@ class ParserCommands:
         metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
 
     def start_parser_topologies(self, env):
-        Logger.info("Starting Metron parser topologies: {0}".format(self.get_parser_list()))
+        Logger.info("Starting Metron parser topologies: {0}".format(self.__get_aggr_parsers(self.__params)))
         start_cmd_template = """{0}/bin/start_parser_topology.sh \
                                     -k {1} \
                                     -z {2} \
@@ -118,7 +157,7 @@ class ParserCommands:
                                   self.__params.metron_principal_name,
                                   execute_user=self.__params.metron_user)
 
-        stopped_parsers = set(self.get_parser_list()) - self.get_running_topology_names(env)
+        stopped_parsers = set(self.__get_aggr_parsers(self.__params)) - self.get_running_topology_names(env)
         Logger.info('Parsers that need started: ' + str(stopped_parsers))
 
         for parser in stopped_parsers:
@@ -135,7 +174,7 @@ class ParserCommands:
     def stop_parser_topologies(self, env):
         Logger.info('Stopping parsers')
 
-        running_parsers = set(self.get_parser_list()) & self.get_running_topology_names(env)
+        running_parsers = set(self.get_parser_aggr_topology_names(self.__params)) & self.get_running_topology_names(env)
         Logger.info('Parsers that need stopped: ' + str(running_parsers))
 
         for parser in running_parsers:
@@ -192,7 +231,7 @@ class ParserCommands:
         env.set_params(self.__params)
         all_running = True
         topologies = metron_service.get_running_topologies(self.__params)
-        for parser in self.get_parser_list():
+        for parser in self.get_parser_aggr_topology_names(self.__params):
             parser_found = False
             is_running = False
             if parser in topologies:

http://git-wip-us.apache.org/repos/asf/metron/blob/ebdaf5f9/metron-platform/metron-parsers/ParserChaining.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/ParserChaining.md b/metron-platform/metron-parsers/ParserChaining.md
index a28f11d..4cab74c 100644
--- a/metron-platform/metron-parsers/ParserChaining.md
+++ b/metron-platform/metron-parsers/ParserChaining.md
@@ -49,6 +49,24 @@ data emitted from any parser, we will need to adjust the downstream
 parsers to extract the enveloped data from the JSON blob and treat it as
 the data to parse.
 
+# Aggregated Parsers with Parser Chaining
+Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor.
+
+Say, there were three sensors (`bro`, `snort` and `yaf`). Instead of creating a topology per sensor, all 3 can be run in a single aggregated parser. It is also possible to aggregate a subset of these parsers (e.g. run `bro` as it's own topology, and aggregate the other 2).
+
+The step to start an aggregated parsers then becomes
+```
+$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s bro,snort,yaf
+```
+
+which will result in a single storm topology named `bro__snort__yaf` to run.
+
+Aggregated parsers can be specified using the Ambari Metron config as well under Services -> Metron -> Configs -> 'Parsers' tab -> 'Metron Parsers' field. The grouping is configured by enclosing the desired parsers in double quotes.
+
+Some examples of specifying aggregated parsers are as follows:
+* "bro,snort,yaf" --> Will start a single topology named `bro__snort__yaf`
+* "ciscopixA,ciscopixB",yaf,"squid,ciscopixC" --> Will start three topologies viz. `ciscopixA__ciscopixB`, `yaf` and `squid__ciscopixC`
+
 # Architecting a Parser Chaining Solution in Metron
 
 Currently the approach to fulfill this requirement involves a couple