You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@metron.apache.org by cestella <gi...@git.apache.org> on 2018/06/27 14:32:52 UTC

[GitHub] metron pull request #1082: METRON-1642: KafkaWriter should be able choose th...

GitHub user cestella opened a pull request:

    https://github.com/apache/metron/pull/1082

    METRON-1642: KafkaWriter should be able choose the topic from a field in addition to topology construction time

    ## Contributor Comments
    Currently, we choose the kafka topic via the kafka.topic field.  It would be useful to allow people to specify the topic via a field.  This would enable multi-stage (or chain) parsing, among other use-cases.
    
    
    ## Pull Request Checklist
    
    Thank you for submitting a contribution to Apache Metron.  
    Please refer to our [Development Guidelines](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=61332235) for the complete guide to follow for contributions.  
    Please refer also to our [Build Verification Guidelines](https://cwiki.apache.org/confluence/display/METRON/Verifying+Builds?show-miniview) for complete smoke testing guides.  
    
    
    In order to streamline the review of the contribution we ask you follow these guidelines and ask you to double check the following:
    
    ### For all changes:
    - [x] Is there a JIRA ticket associated with this PR? If not one needs to be created at [Metron Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel).
    - [x] Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    - [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    
    ### For code changes:
    - [x] Have you included steps to reproduce the behavior or problem that is being changed or addressed?
    - [x] Have you included steps or a guide to how the change may be verified and tested manually?
    - [x] Have you ensured that the full suite of tests and checks have been executed in the root metron folder via:
      ```
      mvn -q clean integration-test install && dev-utilities/build-utils/verify_licenses.sh 
      ```
    
    - [x] Have you written or updated unit tests and or integration tests to verify your changes?
    - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
    - [x] Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?
    
    ### For documentation related changes:
    - [x] Have you ensured that format looks appropriate for the output in which it is rendered by building and verifying the site-book? If not then run the following commands and the verify changes via `site-book/target/site/index.html`:
    
      ```
      cd site-book
      mvn site
      ```
    
    #### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
    It is also recommended that [travis-ci](https://travis-ci.org) is set up for your personal repository such that your branches are built there before submitting a pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cestella/incubator-metron kafkaWriterFromField2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/metron/pull/1082.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1082
    
----
commit 7c48d6c4fcdcb066ed988ffba52acc588b7c05c4
Author: cstella <ce...@...>
Date:   2018-06-05T19:34:20Z

    Updating kafka writer and test

commit 5cffccb3bd7a5894982931a0548b00c3def8a2cc
Author: cstella <ce...@...>
Date:   2018-06-08T05:33:01Z

    Typo.

commit 33ee7d77253b9a46d2854169f082dadc3bdf30e7
Author: cstella <ce...@...>
Date:   2018-06-27T14:19:23Z

    Merge branch 'master' into kafkaWriterFromField2

----


---

[GitHub] metron pull request #1082: METRON-1642: KafkaWriter should be able choose th...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1082#discussion_r199287400
  
    --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java ---
    @@ -212,10 +233,13 @@ public BulkWriterResponse write(String sensorType, WriterConfiguration configura
             writerResponse.addError(t, tuple);
             continue;
           }
    -      Future future = kafkaProducer
    -          .send(new ProducerRecord<String, String>(kafkaTopic, jsonMessage));
    -      // we want to manage the batching
    -      results.add(new AbstractMap.SimpleEntry<>(tuple, future));
    +      Optional<String> topic = getKafkaTopic(message);
    +      if(topic.isPresent()) {
    --- End diff --
    
    In what cases would a topic not be present?  If that's an unexpected condition, we should probably log something. Or can a user choose to not route a message by returning an empty topic value?


---

[GitHub] metron pull request #1082: METRON-1642: KafkaWriter should be able choose th...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1082#discussion_r199552106
  
    --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java ---
    @@ -197,6 +209,15 @@ public void init(Map stormConf, TopologyContext topologyContext, WriterConfigura
         return producerConfig;
       }
     
    +  public Optional<String> getKafkaTopic(JSONObject message) {
    --- End diff --
    
    They absolutely could create loops in kafka, which wouldn't be ideal.  I don't know how I could prevent it, sadly.  I think this falls in the "with great power comes great responsibility".  Should we call this a spiderman bug (spiderbug?).


---

[GitHub] metron pull request #1082: METRON-1642: KafkaWriter should be able choose th...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1082#discussion_r199287320
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -166,6 +166,15 @@ then it is assumed to be a regex and will match any topic matching the pattern (
       * `batchTimeout` : The timeout after which a batch will be flushed even if batchSize has not been met.  Optional.
         If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm
         parameter `topology.message.timeout.secs`.  Ignored if batchSize is `1`, since this disables batching.
    +  * The kafka writer can be configured within the parser config as well.  (This is all configured a priori, but this is convenient for overriding the settings)  :
    --- End diff --
    
    The ability to route messages based on a message field applies for all topologies that use a `KafkaWriter`, not just the Parsers, right?  That would include Enrichment and Profiler?
    
    This is documented under the Parsers.  Would it be worth mentioning that the same settings would work in any topology that uses a `KafkaWriter` although it may not be advised.
    
    It might make sense to put these docs in `metron-writer` (with the `KafkaWriter` class) and then link to those docs from the Parser docs here.  Then in `matron-writer` it would make sense to mention that this potentially could be used by any topology that uses a `KafkaWriter`.


---

[GitHub] metron pull request #1082: METRON-1642: KafkaWriter should be able choose th...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1082#discussion_r199290836
  
    --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java ---
    @@ -197,6 +209,15 @@ public void init(Map stormConf, TopologyContext topologyContext, WriterConfigura
         return producerConfig;
       }
     
    +  public Optional<String> getKafkaTopic(JSONObject message) {
    --- End diff --
    
    I am assuming this logic applies to any topology that uses a `KafkaWriter`.  Would it be easy enough for a user to run into an infinite loop scenario if they have any two sequential topologies both using a `KafkaWriter`?
    
    Parser -> Stage1 -> Stage2 -> ...
    
    * In the Parser, it ingests a message where the "outputTopic" = "stage1".  
    * This sends the message to my Stage1 processing
    * If the Stage1 logic does not change the value of that field for whatever reason, then the message will go right back to Stage1 and be reprocessed.
    * Wash, rinse, repeat and you've got a mess on your hands that is difficult to debug.
    
    Maybe I am thinking too hard about this.  There may be nothing we can really do about that.  With power comes responsibility.  If an advanced user wants to customize routing, then they need to own this risk.


---

[GitHub] metron pull request #1082: METRON-1642: KafkaWriter should be able choose th...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1082#discussion_r199613819
  
    --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java ---
    @@ -197,6 +209,15 @@ public void init(Map stormConf, TopologyContext topologyContext, WriterConfigura
         return producerConfig;
       }
     
    +  public Optional<String> getKafkaTopic(JSONObject message) {
    --- End diff --
    
    Ok, that makes sense to me.  A user would have to take additional steps to really shoot their self in the foot here.  If 'topic field' was a global setting this might happen, but its not.  
    
    I agree with you.  I don't think there is anything we need to change here.


---

[GitHub] metron pull request #1082: METRON-1642: KafkaWriter should be able choose th...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1082#discussion_r199553063
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -166,6 +166,15 @@ then it is assumed to be a regex and will match any topic matching the pattern (
       * `batchTimeout` : The timeout after which a batch will be flushed even if batchSize has not been met.  Optional.
         If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm
         parameter `topology.message.timeout.secs`.  Ignored if batchSize is `1`, since this disables batching.
    +  * The kafka writer can be configured within the parser config as well.  (This is all configured a priori, but this is convenient for overriding the settings)  :
    --- End diff --
    
    That's a very good point, I can move this documentation into metron-writer and link to it.


---

[GitHub] metron pull request #1082: METRON-1642: KafkaWriter should be able choose th...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1082#discussion_r199607708
  
    --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java ---
    @@ -197,6 +209,15 @@ public void init(Map stormConf, TopologyContext topologyContext, WriterConfigura
         return producerConfig;
       }
     
    +  public Optional<String> getKafkaTopic(JSONObject message) {
    --- End diff --
    
    What if we unset the value of the topic field after the redirect occurs?  This would force the user to set the field again, if they really want another redirect.
    
    It seems like on day 1 of a user attempting to use this functionality, they are going to fall into this trap.  To really use this functionality, you need to set the value in one step and then unset or change it in the next step.  If you don't, its going to loop.
    



---

[GitHub] metron issue #1082: METRON-1642: KafkaWriter should be able choose the topic...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/metron/pull/1082
  
    Ok, I think I addressed the issues here and I also added an integration test that will exercise this particular scenario.  Let me know what you think, @nickwallen et al


---

[GitHub] metron pull request #1082: METRON-1642: KafkaWriter should be able choose th...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1082#discussion_r199610213
  
    --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java ---
    @@ -197,6 +209,15 @@ public void init(Map stormConf, TopologyContext topologyContext, WriterConfigura
         return producerConfig;
       }
     
    +  public Optional<String> getKafkaTopic(JSONObject message) {
    --- End diff --
    
    It's unclear to me that we really want to do that.  It could be that users are going to have logic that depends on that field downstream.  It seems wrong to me to remove a field that they're adding or have in the message.  
    
    I mean, in order to create a loop, one of two things would have to happen:
    * the user specifies the input topic as the output topic (aka a simple loop), which removing the kafka topic field wouldn't help because it is likely to be computed (e.g. in the manual test script)
    * the user creates a non-simple loop where by sensor A -> B -> ... C -> A, but it's exceedingly unlikely that they're all going to be parsers of the same type, so messages from C will unlikely fail to parse in A.  In the case that they do, it's likely that the kafka topic will be computed in first parser, so it'll be recomputed and removing the field after the first parse won't have helped.
    
    TL;DR
    Ultimately, I think removing the field won't appreciably help the situation and puts us into the state of removing data, which makes me uncomfortable.


---

[GitHub] metron issue #1082: METRON-1642: KafkaWriter should be able choose the topic...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/metron/pull/1082
  
    Ok, the manual test script is ready for this and can be reviewed.


---

[GitHub] metron pull request #1082: METRON-1642: KafkaWriter should be able choose th...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1082#discussion_r199611812
  
    --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java ---
    @@ -197,6 +209,15 @@ public void init(Map stormConf, TopologyContext topologyContext, WriterConfigura
         return producerConfig;
       }
     
    +  public Optional<String> getKafkaTopic(JSONObject message) {
    --- End diff --
    
    I think I see your point.  I think we're ok, though, because we're not defaulting `kafka.topicField`.  If it's not specified AND the `kafka.topic` isn't specified, then we don't send the message anywhere.  I did this specifically so people didn't accidentally forget to unset a field and end up in a loop.  You have to go out of your way (and set the `kafka.topicField`) to make the mistake.


---

[GitHub] metron pull request #1082: METRON-1642: KafkaWriter should be able choose th...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1082#discussion_r199291506
  
    --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java ---
    @@ -197,6 +209,15 @@ public void init(Map stormConf, TopologyContext topologyContext, WriterConfigura
         return producerConfig;
       }
     
    +  public Optional<String> getKafkaTopic(JSONObject message) {
    +    if(kafkaTopicField != null) {
    +      return Optional.ofNullable((String)message.get(kafkaTopicField));
    +    }
    +    else {
    +      return Optional.ofNullable(kafkaTopic);
    +    }
    --- End diff --
    
    It would make sense to add some debug statements showing which topic was chosen and what the field value is.  This would make debugging a routing issue much simpler for the user.


---

[GitHub] metron pull request #1082: METRON-1642: KafkaWriter should be able choose th...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/metron/pull/1082


---

[GitHub] metron issue #1082: METRON-1642: KafkaWriter should be able choose the topic...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/metron/pull/1082
  
    +1 Nice bit of functionality.  Thanks!


---

[GitHub] metron pull request #1082: METRON-1642: KafkaWriter should be able choose th...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1082#discussion_r199287458
  
    --- Diff: metron-platform/metron-parsers/README.md ---
    @@ -166,6 +166,15 @@ then it is assumed to be a regex and will match any topic matching the pattern (
       * `batchTimeout` : The timeout after which a batch will be flushed even if batchSize has not been met.  Optional.
         If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm
         parameter `topology.message.timeout.secs`.  Ignored if batchSize is `1`, since this disables batching.
    +  * The kafka writer can be configured within the parser config as well.  (This is all configured a priori, but this is convenient for overriding the settings)  :
    --- End diff --
    
    The ability to route messages based on a message field applies for all topologies that use a `KafkaWriter`, not just the Parsers, right?  That would include Enrichment and Profiler?
    
    This is documented under the Parsers.  Would it be worth mentioning that the same settings would impact any topology that uses a `KafkaWriter` (although it may not be advised.)
    
    It might make sense to put these docs in `metron-writer` (with the `KafkaWriter` class) and then link to those docs from the Parser docs here.  Then in `matron-writer` it would make sense to mention that this functionality could be used by any topology that uses a `KafkaWriter`.


---

[GitHub] metron pull request #1082: METRON-1642: KafkaWriter should be able choose th...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1082#discussion_r199552556
  
    --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java ---
    @@ -212,10 +233,13 @@ public BulkWriterResponse write(String sensorType, WriterConfiguration configura
             writerResponse.addError(t, tuple);
             continue;
           }
    -      Future future = kafkaProducer
    -          .send(new ProducerRecord<String, String>(kafkaTopic, jsonMessage));
    -      // we want to manage the batching
    -      results.add(new AbstractMap.SimpleEntry<>(tuple, future));
    +      Optional<String> topic = getKafkaTopic(message);
    +      if(topic.isPresent()) {
    --- End diff --
    
    If the topic is not present, then the message is dropped because we don't know where to send it.  It's not unexpected necessarily.


---