You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by an...@apache.org on 2018/10/03 13:28:33 UTC
metron git commit: METRON-1798 Add mpack support for parser
aggregation (anandsubbu) closes apache/metron#1215
Repository: metron
Updated Branches:
refs/heads/master ff1f9cf52 -> ebdaf5f90
METRON-1798 Add mpack support for parser aggregation (anandsubbu) closes apache/metron#1215
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/ebdaf5f9
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/ebdaf5f9
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/ebdaf5f9
Branch: refs/heads/master
Commit: ebdaf5f905829d8ff59814bb3555666217f85abb
Parents: ff1f9cf
Author: anandsubbu <an...@gmail.com>
Authored: Wed Oct 3 18:57:48 2018 +0530
Committer: anandsubbu <an...@apache.org>
Committed: Wed Oct 3 18:57:48 2018 +0530
----------------------------------------------------------------------
.../configuration/metron-parsers-env.xml | 2 +-
.../CURRENT/package/scripts/parser_commands.py | 49 ++++++++++++++++++--
.../metron-parsers/ParserChaining.md | 18 +++++++
3 files changed, 63 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/ebdaf5f9/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml
index a9a498b..03a2594 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml
@@ -21,7 +21,7 @@
<property>
<name>parsers</name>
<value>bro,snort,yaf</value>
- <description>Metron parsers to deploy</description>
+ <description>Metron parsers to deploy. You can also specify an aggregated parser list by grouping them with double quotes. For example: "parserA,parserB",parserC,parserD</description>
<display-name>Metron Parsers</display-name>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/metron/blob/ebdaf5f9/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
index 274306a..18780d9 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
@@ -20,6 +20,7 @@ limitations under the License.
import os
import re
+import shlex
import subprocess
import time
@@ -49,7 +50,45 @@ class ParserCommands:
# get list of parsers
def __get_parsers(self, params):
- return params.parsers.replace(' ', '').split(',')
+ """
+ Combines the list of parser topics and sends a unique list to be used for
+ Kafka topic creation and the like.
+ :param params:
+ :return: List containing the names of unique parsers
+ """
+ parserBatches = list(self.__get_aggr_parsers(params))
+ parsers = ','.join(s.translate(None, '"') for s in parserBatches)
+ # Get only the unique list of parser names
+ parsers = list(set(parsers.split(',')))
+ return parsers
+
+ def __get_aggr_parsers(self, params):
+ """
+ Fetches the list of aggregated (and regular) parsers and returns a list.
+ If the input list of parsers were "bro,snort,yaf", "bro,snort" and yaf, for example,
+ then this method will return ["bro,snort,yaf", "bro,snort", "yaf"]
+ :param params:
+ :return: List containing the names of parsers
+ """
+ parserList = []
+ parsers = shlex.shlex(params.parsers)
+ for name in parsers:
+ parserList.append(name.strip(','))
+ return [s.translate(None, "'[]") for s in filter(None, parserList)]
+
+ def get_parser_aggr_topology_names(self, params):
+ """
+ Returns the names of regular and aggregated topologies as they would run in storm
+ An aggregated topology has the naming convention of 'parserA__parserB'.
+ For example, a list of parsers like ["bro,snort", yaf] will be returned as ["bro__snort", "yaf"]
+ :param params:
+ :return: List containing the names of parser topologies
+ """
+ topologyName = []
+ for parser in self.__get_aggr_parsers(params):
+ parser = parser.replace(",", "__").strip('"')
+ topologyName.append(parser)
+ return topologyName
def __get_topics(self):
# All errors go to indexing topics, so create it here if it's not already
@@ -104,7 +143,7 @@ class ParserCommands:
metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups())
def start_parser_topologies(self, env):
- Logger.info("Starting Metron parser topologies: {0}".format(self.get_parser_list()))
+ Logger.info("Starting Metron parser topologies: {0}".format(self.__get_aggr_parsers(self.__params)))
start_cmd_template = """{0}/bin/start_parser_topology.sh \
-k {1} \
-z {2} \
@@ -118,7 +157,7 @@ class ParserCommands:
self.__params.metron_principal_name,
execute_user=self.__params.metron_user)
- stopped_parsers = set(self.get_parser_list()) - self.get_running_topology_names(env)
+ stopped_parsers = set(self.__get_aggr_parsers(self.__params)) - self.get_running_topology_names(env)
Logger.info('Parsers that need started: ' + str(stopped_parsers))
for parser in stopped_parsers:
@@ -135,7 +174,7 @@ class ParserCommands:
def stop_parser_topologies(self, env):
Logger.info('Stopping parsers')
- running_parsers = set(self.get_parser_list()) & self.get_running_topology_names(env)
+ running_parsers = set(self.get_parser_aggr_topology_names(self.__params)) & self.get_running_topology_names(env)
Logger.info('Parsers that need stopped: ' + str(running_parsers))
for parser in running_parsers:
@@ -192,7 +231,7 @@ class ParserCommands:
env.set_params(self.__params)
all_running = True
topologies = metron_service.get_running_topologies(self.__params)
- for parser in self.get_parser_list():
+ for parser in self.get_parser_aggr_topology_names(self.__params):
parser_found = False
is_running = False
if parser in topologies:
http://git-wip-us.apache.org/repos/asf/metron/blob/ebdaf5f9/metron-platform/metron-parsers/ParserChaining.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/ParserChaining.md b/metron-platform/metron-parsers/ParserChaining.md
index a28f11d..4cab74c 100644
--- a/metron-platform/metron-parsers/ParserChaining.md
+++ b/metron-platform/metron-parsers/ParserChaining.md
@@ -49,6 +49,24 @@ data emitted from any parser, we will need to adjust the downstream
parsers to extract the enveloped data from the JSON blob and treat it as
the data to parse.
+# Aggregated Parsers with Parser Chaining
+Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor.
+
+Say, there were three sensors (`bro`, `snort` and `yaf`). Instead of creating a topology per sensor, all 3 can be run in a single aggregated parser. It is also possible to aggregate a subset of these parsers (e.g. run `bro` as it's own topology, and aggregate the other 2).
+
+The step to start an aggregated parsers then becomes
+```
+$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s bro,snort,yaf
+```
+
+which will result in a single storm topology named `bro__snort__yaf` to run.
+
+Aggregated parsers can be specified using the Ambari Metron config as well under Services -> Metron -> Configs -> 'Parsers' tab -> 'Metron Parsers' field. The grouping is configured by enclosing the desired parsers in double quotes.
+
+Some examples of specifying aggregated parsers are as follows:
+* "bro,snort,yaf" --> Will start a single topology named `bro__snort__yaf`
+* "ciscopixA,ciscopixB",yaf,"squid,ciscopixC" --> Will start three topologies viz. `ciscopixA__ciscopixB`, `yaf` and `squid__ciscopixC`
+
# Architecting a Parser Chaining Solution in Metron
Currently the approach to fulfill this requirement involves a couple