You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by supermonk <gi...@git.apache.org> on 2016/07/02 03:47:08 UTC

[GitHub] storm pull request #1538: Sending the Data to Kafka as a batch, instead of s...

GitHub user supermonk opened a pull request:

    https://github.com/apache/storm/pull/1538

    Sending the Data to Kafka as a batch, instead of sending one tuple ea\u2026

    Sending the Data to Kafka as a batch, instead of sending one tuple each. 0.10 storm-kafka version uses Producer.scala (kafka-client) class which has two methods, we are using now the second method which sends in batch.
    
    Ref : https://github.com/apache/kafka/blob/623ab1e7c6497c000bc9c9978637f20542a3191c/core/src/main/scala/kafka/javaapi/producer/Producer.scala

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Symantec/storm 0.10.x-branch

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1538.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1538
    
----
commit ab19a949b1f738a620f5fdc9102c147cc3d05c0f
Author: narendra_bidari <na...@symantec.com>
Date:   2016-07-02T03:44:08Z

    Sending the Data to Kafka as a batch, instead of sending one tuple each. 0.10 version uses Producer.scala class which has two methods, we are using now the second method which sends in batch.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1538: Sending the Data to Kafka as a batch, instead of sending ...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1538
  
    @supermonk 
    I am not quite sure if this patch helps a lot. We do have internal batching in kafka producer api. It can be controlled by batch.size so even if you call producer.send for every tuple it won't send that tuple immediately . It will only call send once the batch.size is reached. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1538: Sending the Data to Kafka as a batch, instead of sending ...

Posted by knusbaum <gi...@git.apache.org>.
Github user knusbaum commented on the issue:

    https://github.com/apache/storm/pull/1538
  
    I'm going to close this PR. It hasn't been touched in a few months, it is still missing a Jira, and we have decided to halt any non-bug work on 10.* branches.
    
    In addition, it looks to me like `batch.size` is available for Kafka at least as far back as 0.8.0, so utilizing that sounds like the correct way to address this issue. 
    
    However, I only spent a brief time looking at the docs, so If you think this is still a valuable change for the 1.x and master branches, please file a Jira and open PRs to those branches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1538: Sending the Data to Kafka as a batch, instead of sending ...

Posted by supermonk <gi...@git.apache.org>.
Github user supermonk commented on the issue:

    https://github.com/apache/storm/pull/1538
  
    @harshach : I have removed the extra logs. Thanks for the comment!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1538: Sending the Data to Kafka as a batch, instead of s...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1538#discussion_r69647671
  
    --- Diff: external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java ---
    @@ -76,24 +77,36 @@ public void prepare(Map stormConf) {
             producer = new Producer(config);
         }
     
    +    @SuppressWarnings({"rawtypes", "unchecked"})
         public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
             String topic = null;
    +        List<KeyedMessage> batchList = new ArrayList<KeyedMessage>(tuples.size());
    +        // Creating Batch
             for (TridentTuple tuple : tuples) {
                 try {
                     topic = topicSelector.getTopic(tuple);
    -
    -                if(topic != null) {
    -                    producer.send(new KeyedMessage(topic, mapper.getKeyFromTuple(tuple),
    +                if (topic != null) {
    +                    batchList.add(new KeyedMessage(topic, mapper.getKeyFromTuple(tuple),
                                 mapper.getMessageFromTuple(tuple)));
    +                    LOG.debug("Updated Batch");
                     } else {
    -                    LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
    +                    LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple)
    +                            + ", topic selector returned null.");
                     }
                 } catch (Exception ex) {
    -                String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
    -                        + " to topic = " + topic;
    +                String errorMsg = "Error while filling up List for Batching";
                     LOG.warn(errorMsg, ex);
                     throw new FailedException(errorMsg, ex);
                 }
             }
    +        // Sending Batch
    +        try {
    +            producer.send(batchList);
    +            LOG.debug("Sending the Batch " + batchList);
    --- End diff --
    
    lets not log the entire batchList . Probably useful to add the batchList.size


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1538: Sending the Data to Kafka as a batch, instead of sending ...

Posted by supermonk <gi...@git.apache.org>.
Github user supermonk commented on the issue:

    https://github.com/apache/storm/pull/1538
  
    @abellina : Thanks for the comment. I have now fixed it. Let me know if any queries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1538: Sending the Data to Kafka as a batch, instead of s...

Posted by abellina <gi...@git.apache.org>.
Github user abellina commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1538#discussion_r69395057
  
    --- Diff: external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java ---
    @@ -76,24 +77,41 @@ public void prepare(Map stormConf) {
             producer = new Producer(config);
         }
     
    -    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
    -        String topic = null;
    -        for (TridentTuple tuple : tuples) {
    -            try {
    -                topic = topicSelector.getTopic(tuple);
    +    @SuppressWarnings("rawtypes")
    --- End diff --
    
    @supermonk please use spaces, not tabs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1538: Sending the Data to Kafka as a batch, instead of sending ...

Posted by supermonk <gi...@git.apache.org>.
Github user supermonk commented on the issue:

    https://github.com/apache/storm/pull/1538
  
    @harshach : Thanks for the update.   I know that batch.size is useful in Java Client for Latest Kafka Build, or do you think in the older version it should hold good.
    
    The storm 0.10 uses scala version and in trident (storm-kafka) I have not seen anywhere the batch.size begin used. As per the below blog, there is no specification done in the config.
    
    **So I believe neither we are passing in the config, as per readme of Storm-kafka nor we are doing the below calculation and pushing the config internally.**
    
    Kindly correct me if I am wrong!
    
    
    https://gist.github.com/mrflip/5958028 
    spout_batch_kb ~= max_fetch_size * kafka_machines * kpartitions_per_broker / 1024



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1538: Sending the Data to Kafka as a batch, instead of s...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1538#discussion_r69647624
  
    --- Diff: external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java ---
    @@ -76,24 +77,36 @@ public void prepare(Map stormConf) {
             producer = new Producer(config);
         }
     
    +    @SuppressWarnings({"rawtypes", "unchecked"})
         public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
             String topic = null;
    +        List<KeyedMessage> batchList = new ArrayList<KeyedMessage>(tuples.size());
    +        // Creating Batch
             for (TridentTuple tuple : tuples) {
                 try {
                     topic = topicSelector.getTopic(tuple);
    -
    -                if(topic != null) {
    -                    producer.send(new KeyedMessage(topic, mapper.getKeyFromTuple(tuple),
    +                if (topic != null) {
    +                    batchList.add(new KeyedMessage(topic, mapper.getKeyFromTuple(tuple),
                                 mapper.getMessageFromTuple(tuple)));
    +                    LOG.debug("Updated Batch");
    --- End diff --
    
    lets remove this log dont' see much use for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1538: Sending the Data to Kafka as a batch, instead of s...

Posted by supermonk <gi...@git.apache.org>.
Github user supermonk commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1538#discussion_r69431654
  
    --- Diff: external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java ---
    @@ -76,24 +77,41 @@ public void prepare(Map stormConf) {
             producer = new Producer(config);
         }
     
    -    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
    -        String topic = null;
    -        for (TridentTuple tuple : tuples) {
    -            try {
    -                topic = topicSelector.getTopic(tuple);
    +    @SuppressWarnings("rawtypes")
    --- End diff --
    
    @abellina, Thanks for the comment. I have fixed it now. let me know if any queries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1538: Sending the Data to Kafka as a batch, instead of s...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1538


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1538: Sending the Data to Kafka as a batch, instead of s...

Posted by abellina <gi...@git.apache.org>.
Github user abellina commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1538#discussion_r69455288
  
    --- Diff: external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java ---
    @@ -76,24 +77,40 @@ public void prepare(Map stormConf) {
             producer = new Producer(config);
         }
     
    +    @SuppressWarnings({"rawtypes", "unchecked", "unused"})
         public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
             String topic = null;
    +        List<KeyedMessage> batchList = new ArrayList<KeyedMessage>(tuples.size());
    +        // Creating Batch
             for (TridentTuple tuple : tuples) {
                 try {
                     topic = topicSelector.getTopic(tuple);
    -
    -                if(topic != null) {
    -                    producer.send(new KeyedMessage(topic, mapper.getKeyFromTuple(tuple),
    +                if (topic != null) {
    +                    batchList.add(new KeyedMessage(topic, mapper.getKeyFromTuple(tuple),
                                 mapper.getMessageFromTuple(tuple)));
    +                    LOG.debug("Updated Batch");
                     } else {
    -                    LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
    +                    LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple)
    +                            + ", topic selector returned null.");
                     }
                 } catch (Exception ex) {
    -                String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
    -                        + " to topic = " + topic;
    +                String errorMsg = "Error while filling up List for Batching";
                     LOG.warn(errorMsg, ex);
                     throw new FailedException(errorMsg, ex);
                 }
             }
    +        // Sending Batch
    +        try {
    +            if (batchList != null) {
    --- End diff --
    
    will we ever hit this if? Seems like batchList will always be not null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1538: Sending the Data to Kafka as a batch, instead of s...

Posted by abellina <gi...@git.apache.org>.
Github user abellina commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1538#discussion_r69455371
  
    --- Diff: external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java ---
    @@ -76,24 +77,41 @@ public void prepare(Map stormConf) {
             producer = new Producer(config);
         }
     
    -    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
    -        String topic = null;
    -        for (TridentTuple tuple : tuples) {
    -            try {
    -                topic = topicSelector.getTopic(tuple);
    +    @SuppressWarnings("rawtypes")
    --- End diff --
    
    Thanks @supermonk 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1538: Sending the Data to Kafka as a batch, instead of sending ...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1538
  
    @supermonk 
    Thanks for the work. Since this is not trivial patch (though diff is small) could you file an issue for Apache JIRA and set title to have JIRA issue ID as prefix?
    I'd also recommend you to craft pull request against master, and leave a message that you would love to have this to 0.10.x.
    Thanks in advance!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1538: Sending the Data to Kafka as a batch, instead of sending ...

Posted by supermonk <gi...@git.apache.org>.
Github user supermonk commented on the issue:

    https://github.com/apache/storm/pull/1538
  
    @Parth-Brahmbhatt , @harshach  Could you review my change and let me know if it is fine to push to Kafka as batch rather than tuple by tuple


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---