You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by brianghig <gi...@git.apache.org> on 2015/05/14 14:59:41 UTC

[GitHub] incubator-nifi pull request: [NIFI-413] PutKafka Compression and B...

GitHub user brianghig opened a pull request:

    https://github.com/apache/incubator-nifi/pull/55

    [NIFI-413] PutKafka Compression and Batching Support

    Adding properties to PutKafka processor to support asynchronous producing with configurable batches.
    
    Also added user-defined control over compression codec and compressed topics per NIFI-413.
    
    Producer Type remains synchronous by default.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/brianghig/incubator-nifi NIFI-413-PutKafka-Compression-and-Batching

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-nifi/pull/55.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #55
    
----
commit 29b4e56a61dc7125e10d7e5d70a0123485717b4d
Author: Brian Ghigiarelli <br...@gmail.com>
Date:   2015-04-22T15:32:36Z

    Merge pull request #1 from apache/develop
    
    Merging latest NiFi to my fork

commit 421ad8fb133d3bab32bc20d98011e9dfa0caff99
Author: Brian Ghigiarelli <br...@gmail.com>
Date:   2015-05-14T00:32:23Z

    Merge pull request #2 from apache/develop
    
    Merging latest Apache NiFi to fork

commit 9653770ac4a050f4439c07f7ae6e34658f08e5a0
Author: Brian Ghigiarelli <br...@gmail.com>
Date:   2015-05-14T12:55:04Z

    [NIFI-413] Adding properties to PutKafka to support asynchronous production with configurable batching. Also added user-defined control over compression codec and compressed topics. Producer type remains synchronous by default.

commit b71b51976eb6e5afba636c8dd16e33a7436e3999
Author: Brian Ghigiarelli <br...@gmail.com>
Date:   2015-05-14T12:57:25Z

    Merge pull request #3 from apache/develop
    
    Merging in latest develop branch

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: [NIFI-413] PutKafka Compression and B...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/55#discussion_r32749706
  
    --- Diff: nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---
    @@ -136,6 +136,68 @@
                 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
                 .expressionLanguageSupported(false)
                 .build();
    +    public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder()
    +            .name("Producer Type")
    +            .description("This parameter specifies whether the messages are sent asynchronously in a background thread."
    +                    + " Valid values are (1) async for asynchronous send and (2) sync for synchronous send."
    +                    + " By setting the producer to async we allow batching together of requests (which is great for throughput)"
    +                    + " but open the possibility of a failure of the client machine dropping unsent data.")
    +            .required(true)
    +            .allowableValues("sync", "async")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(false)
    +            .defaultValue("sync")
    +            .build();
    +        public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Async Message Batch Size (batch.num.messages)")
    +            .description("Used only if Producer Type is set to \"async\". The number of messages to send in one batch when using async mode."
    +                    + " The producer will wait until either this number of messages are ready"
    +                    + " to send or queue.buffer.max.ms is reached.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("200").build();
    +        public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MS = new PropertyDescriptor.Builder()
    +            .name("Queue Buffering Max Time (queue.buffering.max.ms)")
    +            .description("Used only if Producer Type is set to \"async\". Maximum time to buffer data when using async mode. For example a setting of 100"
    +                    + " will try to batch together 100ms of messages to send at once. This will improve"
    +                    + " throughput but adds message delivery latency due to the buffering.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5000").build();
    +        public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Queue Buffer Max Count (queue.buffering.max.messages)")
    +            .description("Used only if Producer Type is set to \"async\". The maximum number of unsent messages that can be queued up the producer when"
    +                    + " using async mode before either the producer must be blocked or data must be dropped.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("10000").build();
    +        public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT_MS = new PropertyDescriptor.Builder()
    +            .name("Queue Enqueue Timeout (queue.enqueue.timeout.ms)")
    +            .description("Used only if Producer Type is set to \"async\". The amount of time to block before dropping messages when running in async mode"
    +                    + " and the buffer has reached queue.buffering.max.messages. If set to 0 events will"
    +                    + " be enqueued immediately or dropped if the queue is full (the producer send call will"
    +                    + " never block). If set to -1 the producer will block indefinitely and never willingly"
    +                    + " drop a send.")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .defaultValue("-1").build();
    +        public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
    +            .name("Compression Codec (compression.codec)")
    +            .description("This parameter allows you to specify the compression codec for all"
    +                    + " data generated by this producer. Valid values are \"none\", \"gzip\" and \"snappy\".")
    --- End diff --
    
    Do not recommend calling out valid values in the description, since they are provided already via .allowableValues(). Standard convention is also to capitalize the first letters (this is not always adhered to, but we should clean that up)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: [NIFI-413] PutKafka Compression and B...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/55#issuecomment-114165694
  
    Happy to provide feedback! Tested it out and all looks good. There were some checkstyle failures, so I formatted the code to fix those. Otherwise, all is working well. Pushed to develop.
    
    Thanks for catching and correcting this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: [NIFI-413] PutKafka Compression and B...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/55#discussion_r32748767
  
    --- Diff: nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---
    @@ -136,6 +136,68 @@
                 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
                 .expressionLanguageSupported(false)
                 .build();
    +    public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder()
    +            .name("Producer Type")
    +            .description("This parameter specifies whether the messages are sent asynchronously in a background thread."
    +                    + " Valid values are (1) async for asynchronous send and (2) sync for synchronous send."
    +                    + " By setting the producer to async we allow batching together of requests (which is great for throughput)"
    +                    + " but open the possibility of a failure of the client machine dropping unsent data.")
    +            .required(true)
    +            .allowableValues("sync", "async")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(false)
    +            .defaultValue("sync")
    +            .build();
    +        public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Async Message Batch Size (batch.num.messages)")
    +            .description("Used only if Producer Type is set to \"async\". The number of messages to send in one batch when using async mode."
    +                    + " The producer will wait until either this number of messages are ready"
    +                    + " to send or queue.buffer.max.ms is reached.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("200").build();
    +        public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MS = new PropertyDescriptor.Builder()
    --- End diff --
    
    I would recommend we change this property to be a Time Period (StandardValidators.TIME_PERIOD_VALIDATOR) with a default value of "5 secs"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: [NIFI-413] PutKafka Compression and B...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/55#issuecomment-113211256
  
    Brian,
    
    Thanks for the update! I left several comments inline - generally just small tweaks in order to keep the property naming scheme consistent with the typical conventions that we use in NiFi.
    
    Thanks!
    -Mark


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: [NIFI-413] PutKafka Compression and B...

Posted by brianghig <gi...@git.apache.org>.
Github user brianghig commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/55#issuecomment-113664426
  
    @markap14 PR is now updated based on your feedback. Thanks for guiding this processor toward a more user-friendly experience.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: [NIFI-413] PutKafka Compression and B...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/55#discussion_r32749088
  
    --- Diff: nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---
    @@ -136,6 +136,68 @@
                 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
                 .expressionLanguageSupported(false)
                 .build();
    +    public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder()
    +            .name("Producer Type")
    +            .description("This parameter specifies whether the messages are sent asynchronously in a background thread."
    --- End diff --
    
    Would put the descriptions of these two values in the Allowable Values themselves, rather than describing the possible options in the Description of the property


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: [NIFI-413] PutKafka Compression and B...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/55#discussion_r32749373
  
    --- Diff: nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---
    @@ -136,6 +136,68 @@
                 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
                 .expressionLanguageSupported(false)
                 .build();
    +    public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder()
    +            .name("Producer Type")
    +            .description("This parameter specifies whether the messages are sent asynchronously in a background thread."
    +                    + " Valid values are (1) async for asynchronous send and (2) sync for synchronous send."
    +                    + " By setting the producer to async we allow batching together of requests (which is great for throughput)"
    +                    + " but open the possibility of a failure of the client machine dropping unsent data.")
    +            .required(true)
    +            .allowableValues("sync", "async")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(false)
    +            .defaultValue("sync")
    +            .build();
    +        public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Async Message Batch Size (batch.num.messages)")
    --- End diff --
    
    I would avoid adding the Kafka property names in parens. This results in very long property names that are likely to be cutoff in the UI unless the user expands the column. They are also relevant only for those who are used to writing Kafka clients, so that the understand the meaning of those property names. For those who don't write clients (especially for non-developers) I think it makes the property names confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: [NIFI-413] PutKafka Compression and B...

Posted by brianghig <gi...@git.apache.org>.
Github user brianghig commented on the pull request:

    https://github.com/apache/incubator-nifi/pull/55#issuecomment-113654321
  
    Thanks for the feedback, @markap14 !  Updating now...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: [NIFI-413] PutKafka Compression and B...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/55#discussion_r32748886
  
    --- Diff: nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---
    @@ -136,6 +136,68 @@
                 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
                 .expressionLanguageSupported(false)
                 .build();
    +    public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder()
    +            .name("Producer Type")
    +            .description("This parameter specifies whether the messages are sent asynchronously in a background thread."
    +                    + " Valid values are (1) async for asynchronous send and (2) sync for synchronous send."
    +                    + " By setting the producer to async we allow batching together of requests (which is great for throughput)"
    +                    + " but open the possibility of a failure of the client machine dropping unsent data.")
    +            .required(true)
    +            .allowableValues("sync", "async")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(false)
    +            .defaultValue("sync")
    +            .build();
    +        public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Async Message Batch Size (batch.num.messages)")
    +            .description("Used only if Producer Type is set to \"async\". The number of messages to send in one batch when using async mode."
    +                    + " The producer will wait until either this number of messages are ready"
    +                    + " to send or queue.buffer.max.ms is reached.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("200").build();
    +        public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MS = new PropertyDescriptor.Builder()
    +            .name("Queue Buffering Max Time (queue.buffering.max.ms)")
    +            .description("Used only if Producer Type is set to \"async\". Maximum time to buffer data when using async mode. For example a setting of 100"
    +                    + " will try to batch together 100ms of messages to send at once. This will improve"
    +                    + " throughput but adds message delivery latency due to the buffering.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("5000").build();
    +        public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder()
    +            .name("Queue Buffer Max Count (queue.buffering.max.messages)")
    +            .description("Used only if Producer Type is set to \"async\". The maximum number of unsent messages that can be queued up the producer when"
    +                    + " using async mode before either the producer must be blocked or data must be dropped.")
    +            .required(true)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("10000").build();
    +        public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT_MS = new PropertyDescriptor.Builder()
    --- End diff --
    
    Rather than making this property required with a default of -1, suggest we make the property optional


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: [NIFI-413] PutKafka Compression and B...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-nifi/pull/55


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-nifi pull request: [NIFI-413] PutKafka Compression and B...

Posted by markap14 <gi...@git.apache.org>.
Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/55#discussion_r32749012
  
    --- Diff: nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---
    @@ -136,6 +136,68 @@
                 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
                 .expressionLanguageSupported(false)
                 .build();
    +    public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder()
    +            .name("Producer Type")
    +            .description("This parameter specifies whether the messages are sent asynchronously in a background thread."
    +                    + " Valid values are (1) async for asynchronous send and (2) sync for synchronous send."
    +                    + " By setting the producer to async we allow batching together of requests (which is great for throughput)"
    +                    + " but open the possibility of a failure of the client machine dropping unsent data.")
    +            .required(true)
    +            .allowableValues("sync", "async")
    --- End diff --
    
    We should probably change these to Synchronous and Asynchronous


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---