You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by siyuanh <gi...@git.apache.org> on 2016/02/29 08:46:30 UTC

[GitHub] incubator-apex-malhar pull request: Add Idempotent support the new...

GitHub user siyuanh opened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/205

    Add Idempotent support the new kafka input operator

    @chaithu14  @tweise  Please review

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/siyuanh/incubator-apex-malhar APEXMALHAR-1969

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/205.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #205
    
----
commit 2e22a1094b6ed1767597662795f5db5e51ddfa10
Author: Siyuan Hua <hs...@apache.org>
Date:   2016-02-29T07:45:33Z

    Add Idempotent support the new kafka input operator

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Add Idempotent support the new...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/205#discussion_r54615024
  
    --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java ---
    @@ -137,6 +189,9 @@ public void run()
     
     
             while (wrapper.isAlive) {
    +          if (wrapper.waitForReplay) {
    +            Thread.sleep(100);
    +          }
    --- End diff --
    
    And also this will be an issue in old kafka operator that the thread could receive duplicated messages because it start consuming data immediately when the operator is activated. The offset it starts from will have overlap with offsets in the recovery window, correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Add Idempotent support the new...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/205#discussion_r54710624
  
    --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---
    @@ -339,6 +394,11 @@ public void assign(Set<AbstractKafkaPartitioner.PartitionMeta> assignment)
         return assignment;
       }
     
    +  private boolean isIdempotent()
    +  {
    +    return windowDataManager != null && !(windowDataManager instanceof WindowDataManager.NoopWindowDataManager);
    +  }
    --- End diff --
    
    I think HighLevelKafkaConsumer doesn't support emitImmediately. So, this check needs  to be added in isIdempotent() API. Please correct it, if I am wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Add Idempotent support the new...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/205#discussion_r54615107
  
    --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---
    @@ -339,6 +394,11 @@ public void assign(Set<AbstractKafkaPartitioner.PartitionMeta> assignment)
         return assignment;
       }
     
    +  private boolean isIdempotent()
    +  {
    +    return windowDataManager != null && !(windowDataManager instanceof WindowDataManager.NoopWindowDataManager);
    +  }
    --- End diff --
    
    It's good to have a check so it is not set to null by accident


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Add Idempotent support the new...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/205#discussion_r54614634
  
    --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java ---
    @@ -137,6 +189,9 @@ public void run()
     
     
             while (wrapper.isAlive) {
    +          if (wrapper.waitForReplay) {
    +            Thread.sleep(100);
    +          }
    --- End diff --
    
    Oh then the test code might be wrong, I'll check to find out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Add Idempotent support the new...

Posted by siyuanh <gi...@git.apache.org>.
GitHub user siyuanh reopened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/205

    Add Idempotent support the new kafka input operator

    @chaithu14  @tweise  Please review

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/siyuanh/incubator-apex-malhar APEXMALHAR-1969

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/205.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #205
    
----
commit ebcd7293b63ec404279f635a825442f6fa72475c
Author: Siyuan Hua <hs...@apache.org>
Date:   2016-03-17T21:16:56Z

    Add idempotent support for new kafka input operator

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Add Idempotent support the new...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/205#discussion_r54403243
  
    --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---
    @@ -339,6 +394,11 @@ public void assign(Set<AbstractKafkaPartitioner.PartitionMeta> assignment)
         return assignment;
       }
     
    +  private boolean isIdempotent()
    +  {
    +    return windowDataManager != null && !(windowDataManager instanceof WindowDataManager.NoopWindowDataManager);
    +  }
    --- End diff --
    
    Here, windowDataManager can't be null. By Default, the value of windowDataManager is  WindowDataManager.NoopWindowDataManager. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Add Idempotent support the new...

Posted by siyuanh <gi...@git.apache.org>.
GitHub user siyuanh reopened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/205

    Add Idempotent support the new kafka input operator

    @chaithu14  @tweise  Please review

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/siyuanh/incubator-apex-malhar APEXMALHAR-1969

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/205.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #205
    
----
commit ebcd7293b63ec404279f635a825442f6fa72475c
Author: Siyuan Hua <hs...@apache.org>
Date:   2016-03-17T21:16:56Z

    Add idempotent support for new kafka input operator

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Add Idempotent support the new...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/205#discussion_r54710284
  
    --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---
    @@ -339,6 +394,11 @@ public void assign(Set<AbstractKafkaPartitioner.PartitionMeta> assignment)
         return assignment;
       }
     
    +  private boolean isIdempotent()
    +  {
    +    return windowDataManager != null && !(windowDataManager instanceof WindowDataManager.NoopWindowDataManager);
    +  }
    --- End diff --
    
    Why can't we add null check to setter method?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Add Idempotent support the new...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/205


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Add Idempotent support the new...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/205


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Add Idempotent support the new...

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/205#discussion_r54749020
  
    --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---
    @@ -339,6 +394,11 @@ public void assign(Set<AbstractKafkaPartitioner.PartitionMeta> assignment)
         return assignment;
       }
     
    +  private boolean isIdempotent()
    +  {
    +    return windowDataManager != null && !(windowDataManager instanceof WindowDataManager.NoopWindowDataManager);
    +  }
    --- End diff --
    
    Null check: that's what validation constraints are there for.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Add Idempotent support the new...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/205


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Add Idempotent support the new...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/205#discussion_r54403245
  
    --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java ---
    @@ -137,6 +189,9 @@ public void run()
     
     
             while (wrapper.isAlive) {
    +          if (wrapper.waitForReplay) {
    +            Thread.sleep(100);
    +          }
    --- End diff --
    
    After thread sleep, I think need to continue the loop. Otherwise, consume messages from the consumer. Code snippet for if condition be as follows:
    if (wrapper.waitForReplay) {
      Thread.sleep(100);
      continue;
    }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: Add Idempotent support the new...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/205#discussion_r54709695
  
    --- Diff: kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java ---
    @@ -137,6 +189,9 @@ public void run()
     
     
             while (wrapper.isAlive) {
    +          if (wrapper.waitForReplay) {
    +            Thread.sleep(100);
    +          }
    --- End diff --
    
    No, After replay all the recovery windows completed, then consumer will start.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---