You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by PramodSSImmaneni <gi...@git.apache.org> on 2015/12/15 13:47:26 UTC

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

GitHub user PramodSSImmaneni opened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/132

    MLHR-1947 #comment Ability to restrict number of messages per window by size (Review only)

    @siyuanh please review, don't merge yet need to still provide tests. Just wanted you to check the approach.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/PramodSSImmaneni/incubator-apex-malhar kafka-maxbytes

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/132.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #132
    
----
commit 7a29ea56cc937f3ade4f0d9dda86832bd9141e49
Author: Pramod Immaneni <pr...@datatorrent.com>
Date:   2015-12-15T12:36:25Z

    MLHR-1947 #comment Ability to restrict number of messages per window by size

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/132#discussion_r48548257
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---
    @@ -392,9 +412,20 @@ public void emitTuples()
         if (maxTuplesPerWindow > 0) {
           count = Math.min(count, maxTuplesPerWindow - emitCount);
         }
    -    for (int i = 0; i < count; i++) {
    +    if (count > 0) {
    +      // If the total size transmitted in the window will be exceeded don't transmit anymore messages in this window
    +      // Make an exception for the case when no message has been transmitted in the window and transmit at least one
    +      // message even if the message size is greater than max message size so that the processing doesn't get stuck
    +      if ((emitCount > 0) && ((maxTotalMsgSizePerWindow - emitTotalMsgSize) < consumer.peekMessage().msg.size())) {
    +        return;
    +      }
    +    }
    +    int numEmitted = 0;
    +   for (int i = 0; i < count; i++) {
           KafkaConsumer.KafkaMessage message = consumer.pollMessage();
           emitTuple(message.msg);
    +      ++numEmitted;
    --- End diff --
    
    emitCount += numEmitted; a few lines later should work isn't it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/132#discussion_r48456719
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---
    @@ -140,7 +134,10 @@
     
       @Min(1)
       private int maxTuplesPerWindow = Integer.MAX_VALUE;
    +  @Min(1)
    --- End diff --
    
    Oversight, the running counter was long, corrected it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/132#discussion_r48548217
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---
    @@ -392,9 +412,20 @@ public void emitTuples()
         if (maxTuplesPerWindow > 0) {
           count = Math.min(count, maxTuplesPerWindow - emitCount);
         }
    -    for (int i = 0; i < count; i++) {
    +    if (count > 0) {
    +      // If the total size transmitted in the window will be exceeded don't transmit anymore messages in this window
    +      // Make an exception for the case when no message has been transmitted in the window and transmit at least one
    +      // message even if the message size is greater than max message size so that the processing doesn't get stuck
    +      if ((emitCount > 0) && ((maxTotalMsgSizePerWindow - emitTotalMsgSize) < consumer.peekMessage().msg.size())) {
    --- End diff --
    
    I had thought about that but then you can have more than queue size number of outstanding messages, you would have queue size + 1 which would violate the limit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/132#discussion_r48223182
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---
    @@ -207,6 +204,14 @@ public void setMaxTuplesPerWindow(int maxTuplesPerWindow)
         this.maxTuplesPerWindow = maxTuplesPerWindow;
       }
     
    +  public int getMaxMsgSizePerWindow() {
    +    return maxMsgSizePerWindow;
    +  }
    +
    --- End diff --
    
    Please add description for setter and getter method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/132#discussion_r48222945
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---
    @@ -363,12 +369,21 @@ public void emitTuples()
         if (maxTuplesPerWindow > 0) {
           count = Math.min(count, maxTuplesPerWindow - emitCount);
         }
    +    if (count > 0) {
    +      // Transmit at least one message in the window even if the message size is greater than max message size
    +      if ((emitCount > 0) && ((maxMsgSizePerWindow - emitMsgSize) < consumer.peekMessage().msg.size())) {
    +        return;
    +      }
    +    }
    --- End diff --
    
    You can replace variable "i" with numEmitted in for loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/132#discussion_r48581225
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---
    @@ -392,9 +412,20 @@ public void emitTuples()
         if (maxTuplesPerWindow > 0) {
           count = Math.min(count, maxTuplesPerWindow - emitCount);
         }
    -    for (int i = 0; i < count; i++) {
    +    if (count > 0) {
    +      // If the total size transmitted in the window will be exceeded don't transmit anymore messages in this window
    +      // Make an exception for the case when no message has been transmitted in the window and transmit at least one
    +      // message even if the message size is greater than max message size so that the processing doesn't get stuck
    +      if ((emitCount > 0) && ((maxTotalMsgSizePerWindow - emitTotalMsgSize) < consumer.peekMessage().msg.size())) {
    +        return;
    +      }
    +    }
    +    int numEmitted = 0;
    +   for (int i = 0; i < count; i++) {
    --- End diff --
    
    Fixed it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/132#discussion_r48523654
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---
    @@ -392,9 +412,20 @@ public void emitTuples()
         if (maxTuplesPerWindow > 0) {
           count = Math.min(count, maxTuplesPerWindow - emitCount);
         }
    -    for (int i = 0; i < count; i++) {
    +    if (count > 0) {
    +      // If the total size transmitted in the window will be exceeded don't transmit anymore messages in this window
    +      // Make an exception for the case when no message has been transmitted in the window and transmit at least one
    +      // message even if the message size is greater than max message size so that the processing doesn't get stuck
    +      if ((emitCount > 0) && ((maxTotalMsgSizePerWindow - emitTotalMsgSize) < consumer.peekMessage().msg.size())) {
    --- End diff --
    
    You can use pollMessage only to keep the message as a field. And if it exceed the limit wait until the first emitTuples get called in the next window. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/132#discussion_r48456725
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---
    @@ -207,6 +204,14 @@ public void setMaxTuplesPerWindow(int maxTuplesPerWindow)
         this.maxTuplesPerWindow = maxTuplesPerWindow;
       }
     
    +  public int getMaxMsgSizePerWindow() {
    +    return maxMsgSizePerWindow;
    +  }
    +
    --- End diff --
    
    Originally this was just for initial review. Added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/132#discussion_r48456731
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---
    @@ -363,12 +369,21 @@ public void emitTuples()
         if (maxTuplesPerWindow > 0) {
           count = Math.min(count, maxTuplesPerWindow - emitCount);
         }
    +    if (count > 0) {
    +      // Transmit at least one message in the window even if the message size is greater than max message size
    +      if ((emitCount > 0) && ((maxMsgSizePerWindow - emitMsgSize) < consumer.peekMessage().msg.size())) {
    +        return;
    +      }
    +    }
    --- End diff --
    
    There are two exit conditions from the loop, one in which i is incremented and one not, more readable to just go with a separate variable instead of uniformizing the value of i on exit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/132#discussion_r48223198
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---
    @@ -140,7 +134,10 @@
     
       @Min(1)
       private int maxTuplesPerWindow = Integer.MAX_VALUE;
    +  @Min(1)
    --- End diff --
    
    Why maxMsgSizePerWindow variable is int?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/132#discussion_r48555633
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---
    @@ -392,9 +412,20 @@ public void emitTuples()
         if (maxTuplesPerWindow > 0) {
           count = Math.min(count, maxTuplesPerWindow - emitCount);
         }
    -    for (int i = 0; i < count; i++) {
    +    if (count > 0) {
    +      // If the total size transmitted in the window will be exceeded don't transmit anymore messages in this window
    +      // Make an exception for the case when no message has been transmitted in the window and transmit at least one
    +      // message even if the message size is greater than max message size so that the processing doesn't get stuck
    +      if ((emitCount > 0) && ((maxTotalMsgSizePerWindow - emitTotalMsgSize) < consumer.peekMessage().msg.size())) {
    +        return;
    +      }
    +    }
    +    int numEmitted = 0;
    +   for (int i = 0; i < count; i++) {
           KafkaConsumer.KafkaMessage message = consumer.pollMessage();
           emitTuple(message.msg);
    +      ++numEmitted;
    --- End diff --
    
    It's ok, just thought you don't need this counter


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/132#discussion_r48523662
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---
    @@ -392,9 +412,20 @@ public void emitTuples()
         if (maxTuplesPerWindow > 0) {
           count = Math.min(count, maxTuplesPerWindow - emitCount);
         }
    -    for (int i = 0; i < count; i++) {
    +    if (count > 0) {
    +      // If the total size transmitted in the window will be exceeded don't transmit anymore messages in this window
    +      // Make an exception for the case when no message has been transmitted in the window and transmit at least one
    +      // message even if the message size is greater than max message size so that the processing doesn't get stuck
    +      if ((emitCount > 0) && ((maxTotalMsgSizePerWindow - emitTotalMsgSize) < consumer.peekMessage().msg.size())) {
    +        return;
    +      }
    +    }
    +    int numEmitted = 0;
    +   for (int i = 0; i < count; i++) {
           KafkaConsumer.KafkaMessage message = consumer.pollMessage();
           emitTuple(message.msg);
    +      ++numEmitted;
    --- End diff --
    
    emitCount++ ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/132#discussion_r48581219
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---
    @@ -392,9 +412,20 @@ public void emitTuples()
         if (maxTuplesPerWindow > 0) {
           count = Math.min(count, maxTuplesPerWindow - emitCount);
         }
    -    for (int i = 0; i < count; i++) {
    +    if (count > 0) {
    +      // If the total size transmitted in the window will be exceeded don't transmit anymore messages in this window
    +      // Make an exception for the case when no message has been transmitted in the window and transmit at least one
    +      // message even if the message size is greater than max message size so that the processing doesn't get stuck
    +      if ((emitCount > 0) && ((maxTotalMsgSizePerWindow - emitTotalMsgSize) < consumer.peekMessage().msg.size())) {
    +        return;
    +      }
    +    }
    +    int numEmitted = 0;
    +   for (int i = 0; i < count; i++) {
           KafkaConsumer.KafkaMessage message = consumer.pollMessage();
           emitTuple(message.msg);
    +      ++numEmitted;
    --- End diff --
    
    Oh ok. I actually gave it a second thought and think it is better to poll the message and keep it locally to avoid two locked reads from the queue and associated performance impact. Anyway in the other case there will exist a scenario where the message is out of the queue in emitTuples and new messages get added to the holdingBuffer and it is full resulting in one extra message than limit being outstanding. Made those changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/132#discussion_r48555599
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---
    @@ -392,9 +412,20 @@ public void emitTuples()
         if (maxTuplesPerWindow > 0) {
           count = Math.min(count, maxTuplesPerWindow - emitCount);
         }
    -    for (int i = 0; i < count; i++) {
    +    if (count > 0) {
    +      // If the total size transmitted in the window will be exceeded don't transmit anymore messages in this window
    +      // Make an exception for the case when no message has been transmitted in the window and transmit at least one
    +      // message even if the message size is greater than max message size so that the processing doesn't get stuck
    +      if ((emitCount > 0) && ((maxTotalMsgSizePerWindow - emitTotalMsgSize) < consumer.peekMessage().msg.size())) {
    +        return;
    +      }
    +    }
    +    int numEmitted = 0;
    +   for (int i = 0; i < count; i++) {
    --- End diff --
    
    wrong indentation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1947 #comment Ability to ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/132


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---