You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@metron.apache.org by justinleet <gi...@git.apache.org> on 2018/07/10 13:30:24 UTC

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

GitHub user justinleet opened a pull request:

    https://github.com/apache/metron/pull/1099

    METRON-1657: Parser aggregation in storm

    ## Contributor Comments
    This PR allows for users to specify multiple parsers to be run in one aggregated Storm topology.
    
    Essentially, the ParserBolt (and the supporting infrastructure) has been generalized to take multiple sensors.  This gives us a structure where there's a Storm spout per sensor.  These all lead to a parser bolt that delegates based on the topic metadata and output appropriately.
    
    ### Current assumptions/restrictions
    
    - Topic metadata must be enabled in order to be able to delegate properly in the aggregated case (still unneeded for standard case).
    - Configs that are shared across parsers generally apply in a "Last-one wins" manner.  Theres's a couple minor exceptions (e.g. if a parser says Kafka security is not PLAINTEXT, that'll win over anyone who says PLAINTEXT).
    - All error topics for the aggregated parsers are the same.  This restriction could be lifted if we generalize that infrastructure a bit, but I think it's reasonable to leave as a follow-on if there's enough demand.
    - Order matters in how the sensors are specified.  "bro,yaf" is not the same as "yaf,bro".  There are two places this matters, configs and the name of the Storm topology. This could pretty easily be lifted by sorting for the Storm topology (and I might just go ahead and do it anyway).
    
    ### Testing
    To ensure that single sensor parsers work, just spin up full dev and ensure everything is passing data as expected.
    
    On fulldev, the REST API has been altered to accept a comma separated list in the parser start and parser stop endpoints.  Just kill bro, and start up "bro,yaf".  An aggregated parser should be be launched in Storm with the names of both sensors used, and data should flow through both.   Because no UI is attached to aggregated parsers, I chose not to expand this REST API out for now until we know what's actually needed to properly manage it. The upshot of this is that you can't specify advanced configs directly here.
    
    On the CLI, the start_parser_topology.sh can be used to submit
    e.g.
    ```
    ${METRON_HOME}/bin/start_parser_topology.sh -z node1:2181 -s bro,yaf
    ```
    
    If you want to override certain parameters via command line this is still possible:
    ```
    ${METRON_HOME}/bin/start_parser_topology.sh -z node1:2181 -s bro,yaf -snt 2,3 -pp 2 -pnt 5
    ```
    This will spin up a topology with a spout number of tasks of 2 for bro, 3 for yaf, along with a parser parallelism of 2 and a parser number of tasks of 5.  This can be seen in the Storm UI by looking at the number of executors and tasks for each Storm component after launch.  As a heads up, because this is full dev and it's only running with one Storm supervisor/worker/etc, it's likely to be unhappy with the config, but it'll still submit the job.
    
    ## Pull Request Checklist
    
    Thank you for submitting a contribution to Apache Metron.  
    Please refer to our [Development Guidelines](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=61332235) for the complete guide to follow for contributions.  
    Please refer also to our [Build Verification Guidelines](https://cwiki.apache.org/confluence/display/METRON/Verifying+Builds?show-miniview) for complete smoke testing guides.  
    
    
    In order to streamline the review of the contribution we ask you follow these guidelines and ask you to double check the following:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? If not one needs to be created at [Metron Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel).
    - [ ] Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    
    ### For code changes:
    - [ ] Have you included steps to reproduce the behavior or problem that is being changed or addressed?
    - [ ] Have you included steps or a guide to how the change may be verified and tested manually?
    - [ ] Have you ensured that the full suite of tests and checks have been executed in the root metron folder via:
      ```
      mvn -q clean integration-test install && dev-utilities/build-utils/verify_licenses.sh 
      ```
    
    - [ ] Have you written or updated unit tests and or integration tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
    - [ ] Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered by building and verifying the site-book? If not then run the following commands and the verify changes via `site-book/target/site/index.html`:
    
      ```
      cd site-book
      mvn site
      ```
    
    #### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
    It is also recommended that [travis-ci](https://travis-ci.org) is set up for your personal repository such that your branches are built there before submitting a pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/justinleet/metron parserAgg

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/metron/pull/1099.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1099
    
----
commit edb1bc1fc6217230ad73442045bc5904fd852309
Author: justinjleet <ju...@...>
Date:   2018-06-28T14:28:35Z

    wip

commit a7725d16209aabd2a3c8935c694aa4373a34468b
Author: justinjleet <ju...@...>
Date:   2018-07-02T12:36:17Z

    More wip on getting everything carried through

commit d08993a74350314a2197fb86645268e4590b0edc
Author: justinjleet <ju...@...>
Date:   2018-07-03T20:09:56Z

    more complete wip, although still work to do

commit 463c38911383a19064e5251dc7bf7327391704c8
Author: justinjleet <ju...@...>
Date:   2018-07-04T21:50:09Z

    allowing for nullable output topic. should end up being enrichment by default

commit a04b40e8c72d3e437f4e8363505ff77afabb1394
Author: justinjleet <ju...@...>
Date:   2018-07-05T13:19:02Z

    Actually committing all the fix from before

commit c6949a074c61a86d57749d526418d1dcbf09a90a
Author: justinjleet <ju...@...>
Date:   2018-07-05T14:59:26Z

    Stop and Start work in REST now

commit ee1ef16ec9e69ba5859dbd9fe8a63bdf64817142
Author: justinjleet <ju...@...>
Date:   2018-07-05T19:30:26Z

    Fixing unit test

commit 61592b8c5767238a501dce355933f4df22d4a03b
Author: justinjleet <ju...@...>
Date:   2018-07-05T21:04:35Z

    Some cleanup

commit 3b7d2423aa6105ac20e943c7a49590b7723a1f74
Author: justinjleet <ju...@...>
Date:   2018-07-06T01:48:46Z

    More cleanup

commit 8e4263ac5452eaf9b8939f7b7acb8d2ef6404676
Author: justinjleet <ju...@...>
Date:   2018-07-08T19:31:48Z

    Some cleanup, test additions, couple fixes

commit b106c56c549325935813933eb7f5a0d2f1a024a6
Author: justinjleet <ju...@...>
Date:   2018-07-10T12:58:36Z

    docs and more cleanup

commit 717ff36fa6ff72a17d8891222ff27778d8405c35
Author: justinjleet <ju...@...>
Date:   2018-07-10T13:03:37Z

    removing old TODO

commit fd0957ed8269467afa7c4c538cf61f64828faaf8
Author: justinjleet <ju...@...>
Date:   2018-07-10T13:05:45Z

    Adding note to README

----


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202803106
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -82,6 +82,12 @@ topology in kafka.  Errors are collected with the context of the error
     (e.g. stacktrace) and original message causing the error and sent to an
     `error` queue.  Invalid messages as determined by global validation
     functions are also treated as errors and sent to an `error` queue. 
    +
    +Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
    +multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
    --- End diff --
    
    I think I'm misunderstanding between your diagram and this implementation.  There will be one kafka topic monitored by the bolt,  then routed to the right parser, then output to a different spout per parser?


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    @justinleet I am fine with that as a follow on, I would like the task or issue created. 


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    @ottobackwards I agree, our bolts tend to do the same magic set of incantations to set up stellar.  It'd be better to either try to infer that initialization where possible or to externalize that logic.  Agreed that it's not this PR that should do that, but I would expect that we should probably consider a separate JIRA to the effect:
    * Decouple ParserBolt the parserbolt initialization and execution logic into a separate component from the Storm Bolt infrastructure (this will aid in NiFi or spark integration).
    
    Thoughts?  If we agree, then I'll create the JIRA.


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202809295
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -82,6 +82,12 @@ topology in kafka.  Errors are collected with the context of the error
     (e.g. stacktrace) and original message causing the error and sent to an
     `error` queue.  Invalid messages as determined by global validation
     functions are also treated as errors and sent to an `error` queue. 
    +
    +Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
    +multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
    --- End diff --
    
    Yeah, in order to do that, we'd need to execute the DAG without the intermediate kafka step.  That'd be a follow-on.  This is the first step in that.  We have all the information here (the input -> output mapping of kafka topics), so it shouldn't be so hard to intervene and cut the intermediate kafka writing step out.


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r203052624
  
    --- Diff: use-cases/parser_chaining/aggregated_parser_chaining_flow.svg ---
    @@ -0,0 +1,2 @@
    +<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
    --- End diff --
    
    You need to add the license header here.


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202758396
  
    --- Diff: metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java ---
    @@ -182,40 +185,61 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
         super.prepare(stormConf, context, collector);
         messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
         this.collector = collector;
    -    if(getSensorParserConfig() != null) {
    -      cache = CachingStellarProcessor.createCache(getSensorParserConfig().getCacheConfig());
    -    }
    -    initializeStellar();
    -    if(getSensorParserConfig() != null && filter == null) {
    -      getSensorParserConfig().getParserConfig().putIfAbsent("stellarContext", stellarContext);
    -      if (!StringUtils.isEmpty(getSensorParserConfig().getFilterClassName())) {
    -        filter = Filters.get(getSensorParserConfig().getFilterClassName()
    -                , getSensorParserConfig().getParserConfig()
    -        );
    +
    +    // Build the Stellar cache
    +    Map<String, Object> cacheConfig = new HashMap<>();
    +    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
    +      String sensor = entry.getKey();
    +      SensorParserConfig config = getSensorParserConfig(sensor);
    +
    +      if (config != null) {
    +        cacheConfig.putAll(config.getCacheConfig());
           }
         }
    +    cache = CachingStellarProcessor.createCache(cacheConfig);
     
    -    parser.init();
    +    // Need to prep all sensors
    +    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
    +      String sensor = entry.getKey();
    +      MessageParser<JSONObject> parser = entry.getValue().getMessageParser();
     
    --- End diff --
    
    I don't believe this is correct.  We want to initialize stellar PER parser.  Each should have it's own stellar instance and cache.


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    All that being said I am a big +1 on this.  Great work @justinleet, thanks for taking the time to work it through my thick skull.


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r203083284
  
    --- Diff: use-cases/parser_chaining/README.md ---
    @@ -233,3 +233,10 @@ cat ~/data.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --b
     ```
     
     You should see indices created for the `cisco-5-304` and `cisco-6-302` data with appropriate fields created for each type.
    +
    +# Aggregated Parsers with Parser Chaining
    +Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor.
    +
    --- End diff --
    
    yes.  I'm not sure we need to document the UI or somewhere else?   Do we have to document removing the default processors from ambari if you are going to aggregate them etc?  Or how to do it in the management ui?
    
    We may need more practical steps.



---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r203115089
  
    --- Diff: use-cases/parser_chaining/README.md ---
    @@ -233,3 +233,10 @@ cat ~/data.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --b
     ```
     
     You should see indices created for the `cisco-5-304` and `cisco-6-302` data with appropriate fields created for each type.
    +
    +# Aggregated Parsers with Parser Chaining
    +Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor.
    +
    --- End diff --
    
    I think we should adjust, as a follow-on, ambari to accept proper CSV for parsers.  So, if you want groups you just quote the groups.  E.g. let's say you input the following: `bro,"snort,yaf"`
    That would create the following topologies:
    * `bro`
    * `snort` and `yaf` aggregated


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202814185
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -82,6 +82,12 @@ topology in kafka.  Errors are collected with the context of the error
     (e.g. stacktrace) and original message causing the error and sent to an
     `error` queue.  Invalid messages as determined by global validation
     functions are also treated as errors and sent to an `error` queue. 
    +
    +Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
    +multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
    --- End diff --
    
    You also might want 2 different parsers that source from the same kafka topic (think: 1 parser to send the data to enrichment and 1 parser to send to hbase as a streaming enrichment for authentication data)


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202805243
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -82,6 +82,12 @@ topology in kafka.  Errors are collected with the context of the error
     (e.g. stacktrace) and original message causing the error and sent to an
     `error` queue.  Invalid messages as determined by global validation
     functions are also treated as errors and sent to an `error` queue. 
    +
    +Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
    +multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
    --- End diff --
    
    Essentially the flow in the syslog case is:
    
    1. Syslog message comes in through the wrapper's topic.
    2. The spout for syslog passes it through to the aggregate bolt.
    3. The aggregated bolt does its job unwrapping and outputs the inner messages to the appropriate topics for each.
    4. The spouts for the inner messages then pick up from the now populated topics.
    5. All spouts are now passing messages as they receive them to the aggregate bolt.
    6. All messages get delegated and handled as needed.
    
    This implementation doesn't do anything clever around avoiding output topics if the aggregate parser is already contained here.


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    A mechanism for the routing process to apply a transform or some such. @cestella may have a better design idea.
    
    What I would like us to do is remove the transport from the message where there are common wrappers.
    
    Example:  All the message types delivered by syslog.   The parsers should not have to all parser syslog AND the message.  I imagine defining a transform/parser in the router that takes every message and transforms it ( parses syslog fields + structured data if 5424 into meta and MSG to the bytes ) and then passes it on to a parser that only needs to know the message.
    
    That way we can have simpler parsers, and even support the same message when transported in different wrappers.
    
    We can talk about if this is a function of parser chaining, such that each specialized parser will be second in a chain with the syslog unwrapped being first, or part of routing, but I think it is part of routing personally.



---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r203092801
  
    --- Diff: use-cases/parser_chaining/README.md ---
    @@ -233,3 +233,10 @@ cat ~/data.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --b
     ```
     
     You should see indices created for the `cisco-5-304` and `cisco-6-302` data with appropriate fields created for each type.
    +
    +# Aggregated Parsers with Parser Chaining
    +Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor.
    +
    --- End diff --
    
    Right now, as noted in the description, there's no UI attached to this.  Even the REST API's update is pretty minimal (just to take comma separated lists).  I didn't want to build that out, because the management UI requires some decent amount of thought put into it and that'll ripple through REST as needed (e.g. needing/wanting to pass spout num tasks, parallelism, etc.).
    
    Right now I look at this as providing a low level way of being able to get some of the benefits of this type of aggregation, with making it more user friendly being follow-on since it'll require nontrivial effort and design. I can go ahead and create follow-on tickets for that work, if that works for you.
    
    For the default Ambari processors, I'm not particularly inclined to worry about it, although I could be persuaded that we need to.  That feels like something that can be addressed as this is made more user friendly (i.e. I expect people familiar enough with the system to make the determination to aggregate parsers right now to also be familiar enough to stop the topologies).  I could add a warning or something like that in the docs to not run an aggregated parser with sensor X alongside a dedicated topology for sensor X, but I'm not sure that's necessary.
    
    I also went ahead and added the actual command to the chain parsers README, so the practical example is complete.


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    This looks good to me, I'm +1 by inspection, but I want to make sure enough time has passed so enough people can look at it.  I'll hold my +1 until EOD monday.


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
GitHub user justinleet reopened a pull request:

    https://github.com/apache/metron/pull/1099

    METRON-1657: Parser aggregation in storm

    ## Contributor Comments
    This PR allows for users to specify multiple parsers to be run in one aggregated Storm topology.
    
    Essentially, the ParserBolt (and the supporting infrastructure) has been generalized to take multiple sensors.  This gives us a structure where there's a Storm spout per sensor.  These all lead to a parser bolt that delegates based on the topic metadata and output appropriately.
    
    ### Current assumptions/restrictions
    
    - Topic metadata must be enabled in order to be able to delegate properly in the aggregated case (still unneeded for standard case).
    - Configs that are shared across parsers generally apply in a "Last-one wins" manner.  Theres's a couple minor exceptions (e.g. if a parser says Kafka security is not PLAINTEXT, that'll win over anyone who says PLAINTEXT).
    - All error topics for the aggregated parsers are the same.  This restriction could be lifted if we generalize that infrastructure a bit, but I think it's reasonable to leave as a follow-on if there's enough demand.
    - Order matters in how the sensors are specified.  "bro,yaf" is not the same as "yaf,bro".  There are two places this matters, configs and the name of the Storm topology. This could pretty easily be lifted by sorting for the Storm topology (and I might just go ahead and do it anyway).
    
    ### Testing
    To ensure that single sensor parsers work, just spin up full dev and ensure everything is passing data as expected.
    
    On fulldev, the REST API has been altered to accept a comma separated list in the parser start and parser stop endpoints.  Just kill bro, and start up "bro,yaf".  An aggregated parser should be be launched in Storm with the names of both sensors used, and data should flow through both.   Because no UI is attached to aggregated parsers, I chose not to expand this REST API out for now until we know what's actually needed to properly manage it. The upshot of this is that you can't specify advanced configs directly here.
    
    On the CLI, the start_parser_topology.sh can be used to submit
    e.g.
    ```
    ${METRON_HOME}/bin/start_parser_topology.sh -z node1:2181 -s bro,yaf
    ```
    
    If you want to override certain parameters via command line this is still possible:
    ```
    ${METRON_HOME}/bin/start_parser_topology.sh -z node1:2181 -s bro,yaf -snt 2,3 -pp 2 -pnt 5
    ```
    This will spin up a topology with a spout number of tasks of 2 for bro, 3 for yaf, along with a parser parallelism of 2 and a parser number of tasks of 5.  This can be seen in the Storm UI by looking at the number of executors and tasks for each Storm component after launch.  As a heads up, because this is full dev and it's only running with one Storm supervisor/worker/etc, it's likely to be unhappy with the config, but it'll still submit the job.
    
    ## Pull Request Checklist
    
    Thank you for submitting a contribution to Apache Metron.  
    Please refer to our [Development Guidelines](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=61332235) for the complete guide to follow for contributions.  
    Please refer also to our [Build Verification Guidelines](https://cwiki.apache.org/confluence/display/METRON/Verifying+Builds?show-miniview) for complete smoke testing guides.  
    
    
    In order to streamline the review of the contribution we ask you follow these guidelines and ask you to double check the following:
    
    ### For all changes:
    - [x] Is there a JIRA ticket associated with this PR? If not one needs to be created at [Metron Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel).
    - [x] Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    - [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    
    ### For code changes:
    - [x] Have you included steps to reproduce the behavior or problem that is being changed or addressed?
    - [x] Have you included steps or a guide to how the change may be verified and tested manually?
    - [ ] Have you ensured that the full suite of tests and checks have been executed in the root metron folder via:
      ```
      mvn -q clean integration-test install && dev-utilities/build-utils/verify_licenses.sh 
      ```
    
    - [x] Have you written or updated unit tests and or integration tests to verify your changes?
    - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
    - [x] Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered by building and verifying the site-book? If not then run the following commands and the verify changes via `site-book/target/site/index.html`:
    
      ```
      cd site-book
      mvn site
      ```
    
    #### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
    It is also recommended that [travis-ci](https://travis-ci.org) is set up for your personal repository such that your branches are built there before submitting a pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/justinleet/metron parserAgg

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/metron/pull/1099.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1099
    
----
commit edb1bc1fc6217230ad73442045bc5904fd852309
Author: justinjleet <ju...@...>
Date:   2018-06-28T14:28:35Z

    wip

commit a7725d16209aabd2a3c8935c694aa4373a34468b
Author: justinjleet <ju...@...>
Date:   2018-07-02T12:36:17Z

    More wip on getting everything carried through

commit d08993a74350314a2197fb86645268e4590b0edc
Author: justinjleet <ju...@...>
Date:   2018-07-03T20:09:56Z

    more complete wip, although still work to do

commit 463c38911383a19064e5251dc7bf7327391704c8
Author: justinjleet <ju...@...>
Date:   2018-07-04T21:50:09Z

    allowing for nullable output topic. should end up being enrichment by default

commit a04b40e8c72d3e437f4e8363505ff77afabb1394
Author: justinjleet <ju...@...>
Date:   2018-07-05T13:19:02Z

    Actually committing all the fix from before

commit c6949a074c61a86d57749d526418d1dcbf09a90a
Author: justinjleet <ju...@...>
Date:   2018-07-05T14:59:26Z

    Stop and Start work in REST now

commit ee1ef16ec9e69ba5859dbd9fe8a63bdf64817142
Author: justinjleet <ju...@...>
Date:   2018-07-05T19:30:26Z

    Fixing unit test

commit 61592b8c5767238a501dce355933f4df22d4a03b
Author: justinjleet <ju...@...>
Date:   2018-07-05T21:04:35Z

    Some cleanup

commit 3b7d2423aa6105ac20e943c7a49590b7723a1f74
Author: justinjleet <ju...@...>
Date:   2018-07-06T01:48:46Z

    More cleanup

commit 8e4263ac5452eaf9b8939f7b7acb8d2ef6404676
Author: justinjleet <ju...@...>
Date:   2018-07-08T19:31:48Z

    Some cleanup, test additions, couple fixes

commit b106c56c549325935813933eb7f5a0d2f1a024a6
Author: justinjleet <ju...@...>
Date:   2018-07-10T12:58:36Z

    docs and more cleanup

commit 717ff36fa6ff72a17d8891222ff27778d8405c35
Author: justinjleet <ju...@...>
Date:   2018-07-10T13:03:37Z

    removing old TODO

commit fd0957ed8269467afa7c4c538cf61f64828faaf8
Author: justinjleet <ju...@...>
Date:   2018-07-10T13:05:45Z

    Adding note to README

commit bceec862fc6275fb54348b7e18cc14d1903a6769
Author: justinjleet <ju...@...>
Date:   2018-07-10T13:44:40Z

    Removing Splitter and just using a Java stream

----


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r201361910
  
    --- Diff: metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---
    @@ -91,14 +101,14 @@ public Config getTopologyConfig() {
        */
       public static ParserTopology build(String zookeeperUrl,
                                           Optional<String> brokerUrl,
    -                                      String sensorType,
    -                                      ValueSupplier<Integer> spoutParallelismSupplier,
    -                                      ValueSupplier<Integer> spoutNumTasksSupplier,
    +                                      List<String> sensorTypes,
    +                                      ValueSupplier<List> spoutParallelismSupplier,
    --- End diff --
    
    Ah, I see.  man, I miss C++ templates sometimes.  I retract the nit; don't worry about it.


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    As a heads up, this will have conflicts with https://github.com/apache/metron/pull/1084


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet closed the pull request at:

    https://github.com/apache/metron/pull/1099


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    Ok, so I want to capture the follow-on tasks:
    * Change Ambari to accept quoted parser groups
    * Decouple the ParserBolt from the Parse execution logic
    * Allow the option for intermediate kafka topics to be removed
    
    Did I miss anything?


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202817242
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -82,6 +82,12 @@ topology in kafka.  Errors are collected with the context of the error
     (e.g. stacktrace) and original message causing the error and sent to an
     `error` queue.  Invalid messages as determined by global validation
     functions are also treated as errors and sent to an `error` queue. 
    +
    +Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
    +multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
    --- End diff --
    
    @justinleet can you maybe create a data flow diagram or sequence diagram that shows a syslog record from the use-case flowing through this topology and add it to the use-case around parser chaining?
    
    It'd be something like, given a `cisco-6-302` record, it'll go:
    * From NiFi to the `pix_syslog_router` kafka topic
    * From the `pix_syslog_router` kafka topic to the `pix_syslog_router` spout in the aggregated storm topology
    * From the `pix_syslog_router` kafka spout to the parser bolt, which will run the `pix_syslog_router` Grok parser and write out to the `cisco-6-302` kafka topic
    * From the `cisco-6-302` kafka topic to the `cisco-6-302` spout in the aggregated storm topology
    * From the `cisco-6-302` kafka spout to the `cisco-6-302` Grok parser and write out to the `enrichments` kafka topic, where it's picked up by the enrichment topology.
    
    Eventually, we should consider taking out the writing to the `cisco-6-302` topic (optionally), but even eventually there may be value in those intermediate kafka topics due to how users may want to group sensors (e.g. grouping may be done via velocity or scalability requirements, rather than logical connection).


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202805609
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -82,6 +82,12 @@ topology in kafka.  Errors are collected with the context of the error
     (e.g. stacktrace) and original message causing the error and sent to an
     `error` queue.  Invalid messages as determined by global validation
     functions are also treated as errors and sent to an `error` queue. 
    +
    +Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
    +multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
    --- End diff --
    
    This PR gives us the ability to group the parsers into a single topology if we so desire.  You would still write through to kafka.  So, the topology in the example would have 3 kafka spouts:
    * One for monitoring `pix_syslog_router` (the syslog parser aka the routing parser)
    * One for monitoring `cisco-5-304`
    * One for monitoring `cisco-6-302`
    
    There would be one parser bolt, though, which would handle parsing all 3 sensor types.  That is the contribution of this PR, the ability to determine the parser and filter and field transformations from the input kafka topic and use the appropriate one to parse the messages.  There is not, however, any code here that would bypass the intermediate kafka write (e.g. from the router topology to the individual `cisco-5-304` or `cisco-6-302` topics).  That's a current gap.


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202755740
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -82,6 +82,12 @@ topology in kafka.  Errors are collected with the context of the error
     (e.g. stacktrace) and original message causing the error and sent to an
     `error` queue.  Invalid messages as determined by global validation
     functions are also treated as errors and sent to an `error` queue. 
    +
    +Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
    +multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
    --- End diff --
    
    So, this means that there is a kafka topic/spout per parser?



---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202803869
  
    --- Diff: metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java ---
    @@ -182,40 +185,61 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
         super.prepare(stormConf, context, collector);
         messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
         this.collector = collector;
    -    if(getSensorParserConfig() != null) {
    -      cache = CachingStellarProcessor.createCache(getSensorParserConfig().getCacheConfig());
    -    }
    -    initializeStellar();
    -    if(getSensorParserConfig() != null && filter == null) {
    -      getSensorParserConfig().getParserConfig().putIfAbsent("stellarContext", stellarContext);
    -      if (!StringUtils.isEmpty(getSensorParserConfig().getFilterClassName())) {
    -        filter = Filters.get(getSensorParserConfig().getFilterClassName()
    -                , getSensorParserConfig().getParserConfig()
    -        );
    +
    +    // Build the Stellar cache
    +    Map<String, Object> cacheConfig = new HashMap<>();
    +    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
    +      String sensor = entry.getKey();
    +      SensorParserConfig config = getSensorParserConfig(sensor);
    +
    +      if (config != null) {
    +        cacheConfig.putAll(config.getCacheConfig());
           }
         }
    +    cache = CachingStellarProcessor.createCache(cacheConfig);
     
    -    parser.init();
    +    // Need to prep all sensors
    +    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
    +      String sensor = entry.getKey();
    +      MessageParser<JSONObject> parser = entry.getValue().getMessageParser();
     
    --- End diff --
    
    So, the consequences of this decision are as follows:
    * You share an expression cache (i.e. the statement -> abstract syntax tree cache; distinct from the expression -> evaluated return cache)
    * You share an stellar value cache (expression -> evaluated return)
    * You share the state in the Context (e.g. hbase connections, zookeeper connections).
    
    On the whole, anything shared in the context is intended to be shared across users and sensors by virtue of Stellar being used in the enrichment topology (where it's not sensor-by-sensor), so we shoudl be ok there.  The real question is whether users would prefer to have one knob per topology for stellar cache sizing or whether they would prefer to have one knob per sensor.  I'd say that I'm ok with how this PR is doing it, because it's easier to reason about resources, IMO, on a per-topology perspective.


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r203095632
  
    --- Diff: use-cases/parser_chaining/README.md ---
    @@ -233,3 +233,10 @@ cat ~/data.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --b
     ```
     
     You should see indices created for the `cisco-5-304` and `cisco-6-302` data with appropriate fields created for each type.
    +
    +# Aggregated Parsers with Parser Chaining
    +Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor.
    +
    --- End diff --
    
    I think adding the follow on is a great idea.  I wonder if we shouldn't change the default install to aggregate the default sensors?  I don't think that will work though, because the comma separation in ambari is the list, and you won't be able to do it.
    
    My main concern is that the reviewers and committers of this pr are going to be the only ones who can do it, and everyone in user land is going to be lost.  If this is going to be expert only ( until the UI comes ) and not recommend, or a preview thing, maybe mark it as such.



---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    I have been on vacation, but will be reviewing Monday and Tuesday.  Please do not commit


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r203068658
  
    --- Diff: use-cases/parser_chaining/README.md ---
    @@ -233,3 +233,10 @@ cat ~/data.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --b
     ```
     
     You should see indices created for the `cisco-5-304` and `cisco-6-302` data with appropriate fields created for each type.
    +
    +# Aggregated Parsers with Parser Chaining
    +Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor.
    +
    --- End diff --
    
    Updated the doc. Is something like this what you're looking for?


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    I ran through https://github.com/apache/metron/blob/master/use-cases/parser_chaining/README.md, but spinning it up as
    
    ```
    $METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s cisco-6-302,cisco-5-304,pix_syslog_router
    ```
    
    This results in indices (noting that I'd pushed the data to the topic a couple times, so the numbers won't line up directly if you run it):
    ```
    [root@node1 ~]# curl -X GET "localhost:9200/_cat/indices?v"
    health status index                           uuid                   pri rep docs.count docs.deleted store.size pri.store.size
    yellow open   cisco-5-304_index_2018.07.11.18 z-MyPPEJSN6cur7FJbFORA   5   1         45            0    340.8kb        340.8kb
    yellow open   cisco-6-302_index_2018.07.11.18 vAFrok4sRpW49_RYt9RqbQ   5   1        660            0      1.4mb          1.4mb
    ...
    ````


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r201349285
  
    --- Diff: metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---
    @@ -91,14 +101,14 @@ public Config getTopologyConfig() {
        */
       public static ParserTopology build(String zookeeperUrl,
                                           Optional<String> brokerUrl,
    -                                      String sensorType,
    -                                      ValueSupplier<Integer> spoutParallelismSupplier,
    -                                      ValueSupplier<Integer> spoutNumTasksSupplier,
    +                                      List<String> sensorTypes,
    +                                      ValueSupplier<List> spoutParallelismSupplier,
    --- End diff --
    
    This is a nit, but is there any reason why we couldn't make these `ValueSupplier<List<Integer>>` and `ValueSupplier<List<Map>>`? 


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    Sure, actually I'll do a discuss thread when this all goes through.  That way I can try again to get @cestella to comment


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    @ottobackwards I definitely think it's something we should consider adding and that's worthwhile.  I feel it's more of a follow-on task, and the follow-on is to allow for optionally direct routing. i.e. this PR generalizes what's already there in the ParserBolt and a follow-on is to provide that option to do direct routing in the aggregate case.
    
    Do you feel that we need to provide that option in this PR, or is it fine to be left as a follow-on?


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202798006
  
    --- Diff: metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java ---
    @@ -182,40 +185,61 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
         super.prepare(stormConf, context, collector);
         messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
         this.collector = collector;
    -    if(getSensorParserConfig() != null) {
    -      cache = CachingStellarProcessor.createCache(getSensorParserConfig().getCacheConfig());
    -    }
    -    initializeStellar();
    -    if(getSensorParserConfig() != null && filter == null) {
    -      getSensorParserConfig().getParserConfig().putIfAbsent("stellarContext", stellarContext);
    -      if (!StringUtils.isEmpty(getSensorParserConfig().getFilterClassName())) {
    -        filter = Filters.get(getSensorParserConfig().getFilterClassName()
    -                , getSensorParserConfig().getParserConfig()
    -        );
    +
    +    // Build the Stellar cache
    +    Map<String, Object> cacheConfig = new HashMap<>();
    +    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
    +      String sensor = entry.getKey();
    +      SensorParserConfig config = getSensorParserConfig(sensor);
    +
    +      if (config != null) {
    +        cacheConfig.putAll(config.getCacheConfig());
           }
         }
    +    cache = CachingStellarProcessor.createCache(cacheConfig);
     
    -    parser.init();
    +    // Need to prep all sensors
    +    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
    +      String sensor = entry.getKey();
    +      MessageParser<JSONObject> parser = entry.getValue().getMessageParser();
     
    --- End diff --
    
    We may need a test then.


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    @ottobackwards Is there anything we want to do in this PR about the ParserBolt? I agree that it's getting unwieldy, and if there's easy wins it's not a bad opportunity to fix it up.


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    Happy to help explain it, and it's really helpful to get feedback on it because it led to improved docs, so hopefully it makes sure things are clearer to anyone looking at this later.
    
    I created tickets for:
    
    - [Management UI changes for sensor aggregation](https://issues.apache.org/jira/browse/METRON-1678)
    - [REST changes to support parser aggregation](https://issues.apache.org/jira/browse/METRON-1679)
    - [Allow the option for intermediate kafka topics to be removed in aggregated sensors](https://issues.apache.org/jira/browse/METRON-1682)
    - [Allow Ambari to accept quoted parser groups](https://issues.apache.org/jira/browse/METRON-1680)
    - [Decouple the ParserBolt from the Parse execution logic](https://issues.apache.org/jira/browse/METRON-1681)
    
    I held off on creating a ticket for the routing process applying a transform.  @ottobackwards Would you mind creating that, to make sure the use case and examples or assumption, etc. are what you expect?


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r203064655
  
    --- Diff: use-cases/parser_chaining/README.md ---
    @@ -233,3 +233,10 @@ cat ~/data.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --b
     ```
     
     You should see indices created for the `cisco-5-304` and `cisco-6-302` data with appropriate fields created for each type.
    +
    +# Aggregated Parsers with Parser Chaining
    +Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor.
    +
    --- End diff --
    
    Should we explicitly say that when you use this type of topology so do NOT create a sensor topology for any sensor included in the aggregate?


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202813546
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -82,6 +82,12 @@ topology in kafka.  Errors are collected with the context of the error
     (e.g. stacktrace) and original message causing the error and sent to an
     `error` queue.  Invalid messages as determined by global validation
     functions are also treated as errors and sent to an `error` queue. 
    +
    +Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
    +multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
    --- End diff --
    
    Well, parser chaining allows for DAGs of parsers, not just one level.  Also, you might not want to group parsers based on chained units, but rather based on velocity or some other metric (e.g. I don't want to group a high velocity sensor with a bunch of low velcoity sensors in the syslog case).  In that case, you would need the intermediate kafka topics.


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202761519
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -82,6 +82,12 @@ topology in kafka.  Errors are collected with the context of the error
     (e.g. stacktrace) and original message causing the error and sent to an
     `error` queue.  Invalid messages as determined by global validation
     functions are also treated as errors and sent to an `error` queue. 
    +
    +Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
    +multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
    --- End diff --
    
    Correct, it's aggregating existing topics / sensors into a single topology (with multiple spouts).  It's pulling from each individual topic/sensor as a spout, and then passing to a single parser bolt which handles parsing and output as needed.


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202785248
  
    --- Diff: metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java ---
    @@ -182,40 +185,61 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
         super.prepare(stormConf, context, collector);
         messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
         this.collector = collector;
    -    if(getSensorParserConfig() != null) {
    -      cache = CachingStellarProcessor.createCache(getSensorParserConfig().getCacheConfig());
    -    }
    -    initializeStellar();
    -    if(getSensorParserConfig() != null && filter == null) {
    -      getSensorParserConfig().getParserConfig().putIfAbsent("stellarContext", stellarContext);
    -      if (!StringUtils.isEmpty(getSensorParserConfig().getFilterClassName())) {
    -        filter = Filters.get(getSensorParserConfig().getFilterClassName()
    -                , getSensorParserConfig().getParserConfig()
    -        );
    +
    +    // Build the Stellar cache
    +    Map<String, Object> cacheConfig = new HashMap<>();
    +    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
    +      String sensor = entry.getKey();
    +      SensorParserConfig config = getSensorParserConfig(sensor);
    +
    +      if (config != null) {
    +        cacheConfig.putAll(config.getCacheConfig());
           }
         }
    +    cache = CachingStellarProcessor.createCache(cacheConfig);
     
    -    parser.init();
    +    // Need to prep all sensors
    +    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
    +      String sensor = entry.getKey();
    +      MessageParser<JSONObject> parser = entry.getValue().getMessageParser();
     
    --- End diff --
    
    I left it as a single shared cache on purpose. 
    
    I don't believe that there'd be any incorrect evictions by sharing the cache, and I think evicting based on the overall usage in the aggregated parser is the appropriate place to handle it. Since the cache is (mostly) LRU, I'd prefer to drop the least recently used entry of all parsers rather than dropping for each parser.  LRU of the overall flow seems better than LRU of each of the sensors. Assuming a single cache, you'd bump up the cache configs to account for this, rather than having to optimize each config individually (and potentially as a group afterwards).
    
    Caching is also off by default for the parsers, so this is a case that's only hit if the user explicitly chooses to do so.
    
    Having said that, I do think I need to shore up the documentation around that logic, assuming we choose to go forward with it.  Let me know what you think, and I can adjust appropriately.


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r201356953
  
    --- Diff: metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---
    @@ -91,14 +101,14 @@ public Config getTopologyConfig() {
        */
       public static ParserTopology build(String zookeeperUrl,
                                           Optional<String> brokerUrl,
    -                                      String sensorType,
    -                                      ValueSupplier<Integer> spoutParallelismSupplier,
    -                                      ValueSupplier<Integer> spoutNumTasksSupplier,
    +                                      List<String> sensorTypes,
    +                                      ValueSupplier<List> spoutParallelismSupplier,
    --- End diff --
    
    I actually forgot to ask about this in the main thing.  It's because of the use of the Class argument in ValueSupplier breaks the typing involved elsewhere. It's the same reason it's ValueSupplier<Map> instead of ValueSupplier<Map<Foo, Bar>> elsewhere.
    
    Having said that, it seems like it's entirely unnecessary to have the class argument at all and we could just actually type everything everywhere and just remove the argument entirely. I tried this out briefly and it seemed fine, but I didn't spin it up through full dev since it seemed optional to do.


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r203050720
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -82,6 +82,12 @@ topology in kafka.  Errors are collected with the context of the error
     (e.g. stacktrace) and original message causing the error and sent to an
     `error` queue.  Invalid messages as determined by global validation
     functions are also treated as errors and sent to an `error` queue. 
    +
    +Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
    +multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
    --- End diff --
    
    Added a diagram [here](https://github.com/justinleet/metron/blob/995fdd7d4f8ce3b49c7e29aaa7842eb99137a813/use-cases/parser_chaining/README.md#aggregated-parsers-with-parser-chaining), and also linked to it in the parsers readme.


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202802349
  
    --- Diff: metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java ---
    @@ -182,40 +185,61 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
         super.prepare(stormConf, context, collector);
         messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
         this.collector = collector;
    -    if(getSensorParserConfig() != null) {
    -      cache = CachingStellarProcessor.createCache(getSensorParserConfig().getCacheConfig());
    -    }
    -    initializeStellar();
    -    if(getSensorParserConfig() != null && filter == null) {
    -      getSensorParserConfig().getParserConfig().putIfAbsent("stellarContext", stellarContext);
    -      if (!StringUtils.isEmpty(getSensorParserConfig().getFilterClassName())) {
    -        filter = Filters.get(getSensorParserConfig().getFilterClassName()
    -                , getSensorParserConfig().getParserConfig()
    -        );
    +
    +    // Build the Stellar cache
    +    Map<String, Object> cacheConfig = new HashMap<>();
    +    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
    +      String sensor = entry.getKey();
    +      SensorParserConfig config = getSensorParserConfig(sensor);
    +
    +      if (config != null) {
    +        cacheConfig.putAll(config.getCacheConfig());
           }
         }
    +    cache = CachingStellarProcessor.createCache(cacheConfig);
     
    -    parser.init();
    +    // Need to prep all sensors
    +    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
    +      String sensor = entry.getKey();
    +      MessageParser<JSONObject> parser = entry.getValue().getMessageParser();
     
    --- End diff --
    
    I'm still not sure sharing the stellar Context is a good thing. @cestella 



---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202801374
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -82,6 +82,12 @@ topology in kafka.  Errors are collected with the context of the error
     (e.g. stacktrace) and original message causing the error and sent to an
     `error` queue.  Invalid messages as determined by global validation
     functions are also treated as errors and sent to an `error` queue. 
    +
    +Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
    +multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
    --- End diff --
    
    @ottobackwards I think that's essentially the parser chaining stuff I added earlier, am I misunderstanding?  The [use-case](https://github.com/apache/metron/tree/master/use-cases/parser_chaining) is using the "we get a ton of types of data in syslog" example.


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    @cestella I think we want at least one more to capture making this available in the management UI (I'd argue that should inform any REST API task, so I'm not sure if we want that also created as a ticket).


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202797418
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -82,6 +82,12 @@ topology in kafka.  Errors are collected with the context of the error
     (e.g. stacktrace) and original message causing the error and sent to an
     `error` queue.  Invalid messages as determined by global validation
     functions are also treated as errors and sent to an `error` queue. 
    +
    +Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
    +multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
    --- End diff --
    
    There is another, more likely use case where we have a transport wrapper on another message, and 1 topic split into many parsers as well.  How can we handle that?
    
    Specifically -> Syslog (Many Msg types) -> kafka -> bolt -> Split per message
    
    I expect to add the ability for syslog parsing later, so set that aside.  The issue is we *will* have more than one use case wrt topics.  
    
    I am not going to say this PR needs to address it, but I would want us to understand our path forward and minimize the churn.
    
    It would be best if we did not have to redo this work when accounting for that.



---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202818595
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -82,6 +82,12 @@ topology in kafka.  Errors are collected with the context of the error
     (e.g. stacktrace) and original message causing the error and sent to an
     `error` queue.  Invalid messages as determined by global validation
     functions are also treated as errors and sent to an `error` queue. 
    +
    +Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
    +multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
    --- End diff --
    
    Yeah, I'll go ahead and add a diagram to the doc, and flesh out that explanation


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202812681
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -82,6 +82,12 @@ topology in kafka.  Errors are collected with the context of the error
     (e.g. stacktrace) and original message causing the error and sent to an
     `error` queue.  Invalid messages as determined by global validation
     functions are also treated as errors and sent to an `error` queue. 
    +
    +Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
    +multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
    --- End diff --
    
    what *is* the intermediate kafka step?  That is what is confusing me.
    My understanding is that for each sensor you reference it will build a spout for that sensor parser topic, and then pass everything from those to this bolt, which will call the right parser and output to.... I'm not sure.
    
    Why have to have a sensor specific topic at all?


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r203052754
  
    --- Diff: use-cases/parser_chaining/aggregated_parser_chaining_flow.xml ---
    @@ -0,0 +1 @@
    +<mxfile userAgent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36" version="8.9.5" editor="www.draw.io" type="device"><diagram name="Page-1" id="c7558073-3199-34d8-9f00-42111426c3f3">7Vtbb6M6EP41eewKMBDy2Nt2pXOr1COd3acVAYdYJRiB06b7648dbMAXEppCk27al+KxMXi+b8YzYzIB16vNXRHmy79wDNOJY8WbCbiZOI5tzyz6j0leKokf2JUgKVDMBzWCB/QLciG/L1mjGJbSQIJxSlAuCyOcZTAikiwsCvwsD1vgVH5qHiZQEzxEYapL/0MxWVbSwJk28m8QJUvxZMviLz4Po8ekwOuMP2/igMX2r+pehWIuPr5chjF+bonA7QRcFxiT6mq1uYYp061QW3Xf147e+r0LmJE+N4DqhqcwXfOl/42+Iv5u5EXo43mJCHzIw4i1nynmE3AVpijJaDOij4IFFTzBgiCqw0vesUJxzCa4WuCMPPDJLNouSYEfa8UCNgKl6TVOMZ3mJsMZu0lfCV8cewzctER8ZXcQryApXugQ3gu86g5OQsB1/twgKni6bIHpc1nIOZTU8zZ6pBdclR04uJpec7T5Wb6UKU5+Un4wjTk+ZYfNFPJAcLFiGs9pT0t+CjiU9GkoS6jAHggV1/2yFxgbTEdCRliqGZoSZiVm0KT0mVdzdpWwqz/CxWO49UI5ijRYqBXn7DJ6SVEWb0EwITVnfgHGf85Trs3GW/yzJvRWyOVpOIfpVd2pIGIAaUlWYs4KVNFPmWRt/4xwd0BnALgTTdv3JSxtR8fSNljZzB8CS93MNGxgTD07b3JtZfi2JXyt0uAGke/8Pnb9g1kTZTRt
 ZXQB37lxbRtNn6DIIoWbS7ZD1epnL7hb+XQ9eF1E9R7J1UnCIoFinGtGqQ2DZYChFhYwDQl6kt/EBA5/xj1G9B0boxaelNNgqphqtQB+U3s/Uudx5Xl8IM9TrVmbZ8uUetk9yeMfgTzMBRtHDkYPYCCHZU2Oyg7bkR2+6sj7sqMOm5SNYwR2CD+2ix0mH9/yxGGZV7HpAm1gvNet99qNu/gzgCevbU14cuHZWywBJpbUwjf58jqQ2KFwk5XALOaXN1EaliXdniUU2kpsovFdVtul48a9N97+h/D2fFPgXXxbsI2wHuj2XYNhVyzdadieATFvIKevWKNqjn3NOpDnqdsjmLUh4RHBHg10MxHuRaiM8IV3AViEUXXTqdsjPkPE/iGiZ0v4eo4h3h8tRnRMMWIX4j5F3PlE/K2IewFQUrx3xtzTMJct+qMm3m+zQzCTo7DgPfNuRw+3ZZs7S0y8qXVMTKa/Q8ylJ+KGpL2Vs+GCLHGCszBtZXKHRmVeYAjLnP3Z+Ihhmafk0K4an/eNy7SJxgzMgpOnogjxB6HbLmIPSkVwTCrW84gUYabUc3pn/kqOUMcTY1BxdipU7F8/Oiw3PZBoxhpTFXOdSCoa2MOkolN1mx2QZqBHhem8aWaqeFRh5NH82UwpZDpKutLbn6kTzUYkmh7lwaxA0XJF1V9+prW9g3UgdljhZUzB+mhJLdC3JUPRooHxjmqgWl7JDpstmk3l24RKGachTBVEZD0XsES/wnmdK+WMltu1eFcT74blXWuCy+ozDruVhqVwQQxJGGVWk0z9yxo3F+4OtzMAdq6SaE0DT8fOMWCnFjUP+xxAP3SWChLWCpZlmECDNZ4HPJ7yRYBnsCzXgI4aBh6Gjn70IFnW2aPjK2HRO8OjV/Muk6SASUhgTOWiYHQvPN0cp6QHMPLu0b3h9K8pmTbCotoIOfh7drvxIFT3LjvQT/eMe9cwEHYV/6Qjls69a6/V/WYnsVMZK+AGG
 laBASq1PHMYVHq0+Jow48ygcv1jQnUyNbNXZJA9vp6Sc0t7zPKtOCmSv6U6ao4J7JlyeOb7B2aZwNemspWpBswz3Y9YN+ss6h6Hjr6JjtNj0tENbJVDakjQ+zwBqFN56neCA9LR6/Fx6MnRsSGdLRfYvlh1+x4WiCqChaJ7zsFG46lr4ik4Kk+VCrBteQeyNFAnUo/PhuSontp8JI4Gr+aowc+OxtHpJ0cH4ejbcrczrzuqJ6DvW3f09AzB9Dukz5SuXSnxHQkxxzFUSixHR6wWvgIy2mx+7FdZZ/OLSnD7Pw==</diagram></mxfile>
    --- End diff --
    
    You need to add the license header here.


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/metron/pull/1099


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    Kicking Travis


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    @ottobackwards Is there anything else we want as follow-on?  Assuming we're good with this where its at (or at least close to), I'll go ahead and create those tickets.


---

[GitHub] metron pull request #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1099#discussion_r202808756
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -82,6 +82,12 @@ topology in kafka.  Errors are collected with the context of the error
     (e.g. stacktrace) and original message causing the error and sent to an
     `error` queue.  Invalid messages as determined by global validation
     functions are also treated as errors and sent to an `error` queue. 
    +
    +Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
    +multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
    --- End diff --
    
    The bolt is actually executing the parser, not sending it to kafka though.
    
    Let's say that I route all my syslog stuff ( multiple message types ) through 1 kafka topic ( not tied to _ANY_ sensor.  I would want to point this bolt at that one topic, and have it route to multiple parsers. I think then they all just go to enrichment?


---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    @justinleet the main things I saw that I would think of cutting down, or I though about looking into ( the idea may turn out to be bad ) are places where the bolt 'knows' a lot of weird or complicated initialization logic around the configurations or classes it uses, like what we do initializing Stellar, or in getComponentConfiguration.
    
    I'm ok with a follow on,  I don't think this PR is creating that issue.



---

[GitHub] metron issue #1099: METRON-1657: Parser aggregation in storm

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/metron/pull/1099
  
    Ok @justinleet thanks for the diagram.  That really helps.  I did not see in the code how we were sending out to the sensor topic and then into the sensor, I though the bolt was just calling the parser ( which makes the sensor specific topic not make sense ).  I think that this flow is a valid case, one where the data could be coming from either an aggregated flow into the topic *or* straight to the topic.  My question looking at this, is why ( esp when people are asking about reduction of kafka overhead ) can't we have the *option* to eliminate the sensor topic completely?


---