You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@eagle.apache.org by garrettlish <gi...@git.apache.org> on 2016/10/24 06:12:19 UTC

[GitHub] incubator-eagle pull request #556: EAGLE-670: make kafka publisher configura...

GitHub user garrettlish opened a pull request:

    https://github.com/apache/incubator-eagle/pull/556

    EAGLE-670: make kafka publisher configurable and default is async

    Kafka send alert in sync would limit the throughput. Make this configurable, and use async by default

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/garrettlish/incubator-eagle eagle670

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-eagle/pull/556.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #556
    
----
commit 78d0d81ea8726471b6f1ed71ad5ad00d872bc875
Author: Xiancheng Li <xi...@ebay.com>
Date:   2016-10-24T05:26:51Z

    EAGLE-670: make kafka publisher configurable and default is async

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-eagle issue #556: EAGLE-670: make kafka publisher configurable and...

Posted by RalphSu <gi...@git.apache.org>.
Github user RalphSu commented on the issue:

    https://github.com/apache/incubator-eagle/pull/556
  
    The convention here:
    
    we will have these properties as part of publishment and need to avoid "." in key, since mongo store doesn't support "." in key, use "_" instead.
    
    There would be many configuration for kafka, let us focus on above properties, and request.required.acks one in this PR.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-eagle issue #556: EAGLE-670: make kafka publisher configurable and...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/incubator-eagle/pull/556
  
    The default value is just for example :-).
    
    And when using kafka producer `async` mode and throughput becomes extremely larger, batch size is one of the most important configurations for tuning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-eagle issue #556: EAGLE-670: make kafka publisher configurable and...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/incubator-eagle/pull/556
  
    @garrettlish thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-eagle pull request #556: EAGLE-670: make kafka publisher configura...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on a diff in the pull request:

    https://github.com/apache/incubator-eagle/pull/556#discussion_r84639463
  
    --- Diff: eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java ---
    @@ -96,44 +103,63 @@ public void close() {
         }
     
         @SuppressWarnings( {"rawtypes", "unchecked"})
    -    protected PublishStatus emit(String topic, List<AlertStreamEvent> outputEvents) {
    +    protected void emit(String topic, List<AlertStreamEvent> outputEvents) {
             // we need to check producer here since the producer is invisable to extended kafka publisher
             if (producer == null) {
                 LOG.warn("KafkaProducer is null due to the incorrect configurations");
    -            return null;
    +            return;
             }
             if (outputEvents == null) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Alert stream events list in publishment is empty");
                 }
    -            return null;
    +            return;
             }
             this.status = new PublishStatus();
             try {
                 for (AlertStreamEvent outputEvent : outputEvents) {
                     ProducerRecord record = createRecord(outputEvent, topic);
                     if (record == null) {
                         LOG.error("Alert serialize return null, ignored message! ");
    -                    return null;
    +                    return;
    +                }
    +                if (mode == KafkaWriteMode.sync) {
    +                    Future<?> future = producer.send(record);
    +                    future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    +                    succeed(mode, "");
    +                } else {
    +                    producer.send(record, new Callback() {
    --- End diff --
    
    Oh ... I got what you did now, but what we talk about `async` means "kafka send message with batch in async thread"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-eagle issue #556: EAGLE-670: make kafka publisher configurable and...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/incubator-eagle/pull/556
  
    BTW: these selected config keys are what we have evaluated in real-case and impact kafka throughput mostly, instead of not requiring you guys to include additional many kafka configurations
    
    ~~~
    producer.type = async
    batch.num.messages = 3000
    queue.buffering.max.ms  = 5000
    queue.buffering.max.messages = 10000
    ~~~



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-eagle issue #556: EAGLE-670: make kafka publisher configurable and...

Posted by garrettlish <gi...@git.apache.org>.
Github user garrettlish commented on the issue:

    https://github.com/apache/incubator-eagle/pull/556
  
    Thx @haoch @RalphSu. I thought we set async for kafka producer and what we need to change is using callback rather than future wait, it is wrong, thanks for pointing out.
    
    I have updated the code to add kafka_client_config (list of name/value map) to specify kafka producer configurations.
    
    By default, I only set producer.type=async. 
    For batch.num.messages, queue.buffering.max.ms and queue.buffering.max.messages, I think we can use kafka producer default values. 
    The only difference for default value is batch.num.messages, it is 200 if not specified. Could u please share with us what is your reason to set it to 3000? 
    
    the kafka producer properties could be defined in publish properties as follows:
        {
                "name": "***",
                "properties" : {
                        "kafka_broker": "***",
                        "topics": "***",
                        "kafka_client_config" : [
                                {
                                        "name" : "request.requrie.acks",
                                        "value": 1
                                },
                                {
                                        "name" : "producer.type",
                                        "value": "async"
                                },
                                ...
                        ]
                }
        }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-eagle pull request #556: EAGLE-670: make kafka publisher configura...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-eagle/pull/556


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-eagle issue #556: EAGLE-670: make kafka publisher configurable and...

Posted by garrettlish <gi...@git.apache.org>.
Github user garrettlish commented on the issue:

    https://github.com/apache/incubator-eagle/pull/556
  
    Yes, cool, then we can keep the current implementation and override the batch size configuration in publish properties kafka_client_config :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-eagle issue #556: EAGLE-670: make kafka publisher configurable and...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/incubator-eagle/pull/556
  
    @garrettlish the implementation looks a little confusing.
    
    Please just simply use kafka configs: https://kafka.apache.org/08/documentation.html#clientconfig and another part using async kafka producer: https://github.com/apache/incubator-eagle/blob/master/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java#L53-L57
    
    # kafka properties
    
        producer.type = async
        batch.num.messages = 3000
        queue.buffering.max.ms	= 5000
        queue.buffering.max.messages = 10000


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-eagle pull request #556: EAGLE-670: make kafka publisher configura...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on a diff in the pull request:

    https://github.com/apache/incubator-eagle/pull/556#discussion_r84639212
  
    --- Diff: eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java ---
    @@ -96,44 +103,63 @@ public void close() {
         }
     
         @SuppressWarnings( {"rawtypes", "unchecked"})
    -    protected PublishStatus emit(String topic, List<AlertStreamEvent> outputEvents) {
    +    protected void emit(String topic, List<AlertStreamEvent> outputEvents) {
             // we need to check producer here since the producer is invisable to extended kafka publisher
             if (producer == null) {
                 LOG.warn("KafkaProducer is null due to the incorrect configurations");
    -            return null;
    +            return;
             }
             if (outputEvents == null) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Alert stream events list in publishment is empty");
                 }
    -            return null;
    +            return;
             }
             this.status = new PublishStatus();
             try {
                 for (AlertStreamEvent outputEvent : outputEvents) {
                     ProducerRecord record = createRecord(outputEvent, topic);
                     if (record == null) {
                         LOG.error("Alert serialize return null, ignored message! ");
    -                    return null;
    +                    return;
    +                }
    +                if (mode == KafkaWriteMode.sync) {
    +                    Future<?> future = producer.send(record);
    +                    future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    +                    succeed(mode, "");
    +                } else {
    +                    producer.send(record, new Callback() {
    --- End diff --
    
    This is not `async` mode in kafka language


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-eagle issue #556: EAGLE-670: make kafka publisher configurable and...

Posted by haoch <gi...@git.apache.org>.
Github user haoch commented on the issue:

    https://github.com/apache/incubator-eagle/pull/556
  
    The configuration is ok. The primary concern here is the `async` implementation, kafka producer natively support `async` mode, so that you just need pass through it instead of handle `async` thread manually which will look not very clean.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---