You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by Grant Henke <gr...@gmail.com> on 2016/08/30 21:47:12 UTC

Review Request 51541: FLUME-2983: Handle offset migration in the new Kafka Source

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51541/
-----------------------------------------------------------

Review request for Flume.


Bugs: FLUME-2983
    https://issues.apache.org/jira/browse/FLUME-2983


Repository: flume-git


Description
-------

Similar to FLUME-2972, Offsets tracking the position in Kafka consumers change from using zookeeper for offset storage to Kafka when moving from 0.8.x to 0.9.x.
FLUME-2821 makes the client change in the Kafka Source but does not ensure existing offsets get migrated in order to continue consuming where it left off.
This patch adds automated logic on startup to check if Kafka offsets exist, if not and migration is enabled (by default) then copy the offsets from Zookeeper and commit them to Kafka.
This patch also fixes the backwards incompatibility caused by removing the zookeeperConnect property. The bootstrap can be looked up if zookeeperConnect is used.


Diffs
-----

  flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java e7f1f2e 
  flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java e7ae68f 
  flume-ng-doc/sphinx/FlumeUserGuide.rst 0fd1ec9 
  flume-ng-sources/flume-kafka-source/pom.xml 5f5c2a8 
  flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java 86782c3 
  flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java 1f255f9 
  flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java a3a2f92 
  flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java 1598741 

Diff: https://reviews.apache.org/r/51541/diff/


Testing
-------

Unit tests so far.


Thanks,

Grant Henke


Re: Review Request 51541: FLUME-2983: Handle offset migration in the new Kafka Source

Posted by Mike Percy <mp...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51541/#review147524
-----------------------------------------------------------


Ship it!




Thanks Grant for the patch and Denes for the review.

- Mike Percy


On Aug. 31, 2016, 6:17 p.m., Grant Henke wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51541/
> -----------------------------------------------------------
> 
> (Updated Aug. 31, 2016, 6:17 p.m.)
> 
> 
> Review request for Flume.
> 
> 
> Bugs: FLUME-2983
>     https://issues.apache.org/jira/browse/FLUME-2983
> 
> 
> Repository: flume-git
> 
> 
> Description
> -------
> 
> Similar to FLUME-2972, Offsets tracking the position in Kafka consumers change from using zookeeper for offset storage to Kafka when moving from 0.8.x to 0.9.x.
> FLUME-2821 makes the client change in the Kafka Source but does not ensure existing offsets get migrated in order to continue consuming where it left off.
> This patch adds automated logic on startup to check if Kafka offsets exist, if not and migration is enabled (by default) then copy the offsets from Zookeeper and commit them to Kafka.
> This patch also fixes the backwards incompatibility caused by removing the zookeeperConnect property. The bootstrap can be looked up if zookeeperConnect is used.
> 
> 
> Diffs
> -----
> 
>   flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java e7f1f2e 
>   flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java e7ae68f 
>   flume-ng-doc/sphinx/FlumeUserGuide.rst 0fd1ec9 
>   flume-ng-sources/flume-kafka-source/pom.xml 5f5c2a8 
>   flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java 86782c3 
>   flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java 1f255f9 
>   flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java a3a2f92 
>   flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java 1598741 
> 
> Diff: https://reviews.apache.org/r/51541/diff/
> 
> 
> Testing
> -------
> 
> Unit tests so far.
> 
> 
> Thanks,
> 
> Grant Henke
> 
>


Re: Review Request 51541: FLUME-2983: Handle offset migration in the new Kafka Source

Posted by Grant Henke <gr...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51541/
-----------------------------------------------------------

(Updated Sept. 1, 2016, 1:17 a.m.)


Review request for Flume.


Bugs: FLUME-2983
    https://issues.apache.org/jira/browse/FLUME-2983


Repository: flume-git


Description
-------

Similar to FLUME-2972, Offsets tracking the position in Kafka consumers change from using zookeeper for offset storage to Kafka when moving from 0.8.x to 0.9.x.
FLUME-2821 makes the client change in the Kafka Source but does not ensure existing offsets get migrated in order to continue consuming where it left off.
This patch adds automated logic on startup to check if Kafka offsets exist, if not and migration is enabled (by default) then copy the offsets from Zookeeper and commit them to Kafka.
This patch also fixes the backwards incompatibility caused by removing the zookeeperConnect property. The bootstrap can be looked up if zookeeperConnect is used.


Diffs (updated)
-----

  flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java e7f1f2e 
  flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java e7ae68f 
  flume-ng-doc/sphinx/FlumeUserGuide.rst 0fd1ec9 
  flume-ng-sources/flume-kafka-source/pom.xml 5f5c2a8 
  flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java 86782c3 
  flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java 1f255f9 
  flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java a3a2f92 
  flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java 1598741 

Diff: https://reviews.apache.org/r/51541/diff/


Testing
-------

Unit tests so far.


Thanks,

Grant Henke


Re: Review Request 51541: FLUME-2983: Handle offset migration in the new Kafka Source

Posted by Grant Henke <gr...@gmail.com>.

> On Sept. 1, 2016, 12:32 a.m., Mike Percy wrote:
> > not very happy about the copy/paste but this is an important fix. we should clean it up later though. just a few comments

Yeah, me either. There was no common module that existed to hold this code and I didn't want to add one just for this. I also didn't want to add Kafka dependencies to a module that didn't have them already in this change. There is a jira tracking future consolidation.


> On Sept. 1, 2016, 12:32 a.m., Mike Percy wrote:
> > flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java, line 39
> > <https://reviews.apache.org/r/51541/diff/2/?file=1489155#file1489155line39>
> >
> >     Why this change?

Kafka uses/advertises the hostname, not the ip address. When I test deriving the bootstrap from Zookeeper, I compare with this host to validate it is generated correctly. In order to use it for comparison it should be the hostname.


> On Sept. 1, 2016, 12:32 a.m., Mike Percy wrote:
> > flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java, line 83
> > <https://reviews.apache.org/r/51541/diff/2/?file=1489155#file1489155line83>
> >
> >     I liked the old method name better... j/k

Will change back. ;)


- Grant


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51541/#review147505
-----------------------------------------------------------


On Sept. 1, 2016, 1:17 a.m., Grant Henke wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51541/
> -----------------------------------------------------------
> 
> (Updated Sept. 1, 2016, 1:17 a.m.)
> 
> 
> Review request for Flume.
> 
> 
> Bugs: FLUME-2983
>     https://issues.apache.org/jira/browse/FLUME-2983
> 
> 
> Repository: flume-git
> 
> 
> Description
> -------
> 
> Similar to FLUME-2972, Offsets tracking the position in Kafka consumers change from using zookeeper for offset storage to Kafka when moving from 0.8.x to 0.9.x.
> FLUME-2821 makes the client change in the Kafka Source but does not ensure existing offsets get migrated in order to continue consuming where it left off.
> This patch adds automated logic on startup to check if Kafka offsets exist, if not and migration is enabled (by default) then copy the offsets from Zookeeper and commit them to Kafka.
> This patch also fixes the backwards incompatibility caused by removing the zookeeperConnect property. The bootstrap can be looked up if zookeeperConnect is used.
> 
> 
> Diffs
> -----
> 
>   flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java e7f1f2e 
>   flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java e7ae68f 
>   flume-ng-doc/sphinx/FlumeUserGuide.rst 0fd1ec9 
>   flume-ng-sources/flume-kafka-source/pom.xml 5f5c2a8 
>   flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java 86782c3 
>   flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java 1f255f9 
>   flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java a3a2f92 
>   flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java 1598741 
> 
> Diff: https://reviews.apache.org/r/51541/diff/
> 
> 
> Testing
> -------
> 
> Unit tests so far.
> 
> 
> Thanks,
> 
> Grant Henke
> 
>


Re: Review Request 51541: FLUME-2983: Handle offset migration in the new Kafka Source

Posted by Mike Percy <mp...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51541/#review147505
-----------------------------------------------------------



not very happy about the copy/paste but this is an important fix. we should clean it up later though. just a few comments


flume-ng-doc/sphinx/FlumeUserGuide.rst (line 1264)
<https://reviews.apache.org/r/51541/#comment214684>

    mind wrapping these extremely long lines more similarly to the existing text? here and on the next line



flume-ng-doc/sphinx/FlumeUserGuide.rst (line 1265)
<https://reviews.apache.org/r/51541/#comment214695>

    How about: "If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset defines how offsets are handled."



flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java (line 368)
<https://reviews.apache.org/r/51541/#comment214688>

    nit: indent by 4 spaces here



flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java (line 449)
<https://reviews.apache.org/r/51541/#comment214689>

    Could use a little doc comment or something



flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java (line 39)
<https://reviews.apache.org/r/51541/#comment214694>

    Why this change?



flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java (line 83)
<https://reviews.apache.org/r/51541/#comment214693>

    I liked the old method name better... j/k


- Mike Percy


On Aug. 31, 2016, 8:46 a.m., Grant Henke wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51541/
> -----------------------------------------------------------
> 
> (Updated Aug. 31, 2016, 8:46 a.m.)
> 
> 
> Review request for Flume.
> 
> 
> Bugs: FLUME-2983
>     https://issues.apache.org/jira/browse/FLUME-2983
> 
> 
> Repository: flume-git
> 
> 
> Description
> -------
> 
> Similar to FLUME-2972, Offsets tracking the position in Kafka consumers change from using zookeeper for offset storage to Kafka when moving from 0.8.x to 0.9.x.
> FLUME-2821 makes the client change in the Kafka Source but does not ensure existing offsets get migrated in order to continue consuming where it left off.
> This patch adds automated logic on startup to check if Kafka offsets exist, if not and migration is enabled (by default) then copy the offsets from Zookeeper and commit them to Kafka.
> This patch also fixes the backwards incompatibility caused by removing the zookeeperConnect property. The bootstrap can be looked up if zookeeperConnect is used.
> 
> 
> Diffs
> -----
> 
>   flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java e7f1f2e 
>   flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java e7ae68f 
>   flume-ng-doc/sphinx/FlumeUserGuide.rst 0fd1ec9 
>   flume-ng-sources/flume-kafka-source/pom.xml 5f5c2a8 
>   flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java 86782c3 
>   flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java 1f255f9 
>   flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java a3a2f92 
>   flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java 1598741 
> 
> Diff: https://reviews.apache.org/r/51541/diff/
> 
> 
> Testing
> -------
> 
> Unit tests so far.
> 
> 
> Thanks,
> 
> Grant Henke
> 
>


Re: Review Request 51541: FLUME-2983: Handle offset migration in the new Kafka Source

Posted by Grant Henke <gr...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51541/
-----------------------------------------------------------

(Updated Aug. 31, 2016, 3:46 p.m.)


Review request for Flume.


Bugs: FLUME-2983
    https://issues.apache.org/jira/browse/FLUME-2983


Repository: flume-git


Description
-------

Similar to FLUME-2972, Offsets tracking the position in Kafka consumers change from using zookeeper for offset storage to Kafka when moving from 0.8.x to 0.9.x.
FLUME-2821 makes the client change in the Kafka Source but does not ensure existing offsets get migrated in order to continue consuming where it left off.
This patch adds automated logic on startup to check if Kafka offsets exist, if not and migration is enabled (by default) then copy the offsets from Zookeeper and commit them to Kafka.
This patch also fixes the backwards incompatibility caused by removing the zookeeperConnect property. The bootstrap can be looked up if zookeeperConnect is used.


Diffs (updated)
-----

  flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java e7f1f2e 
  flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java e7ae68f 
  flume-ng-doc/sphinx/FlumeUserGuide.rst 0fd1ec9 
  flume-ng-sources/flume-kafka-source/pom.xml 5f5c2a8 
  flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java 86782c3 
  flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java 1f255f9 
  flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java a3a2f92 
  flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java 1598741 

Diff: https://reviews.apache.org/r/51541/diff/


Testing
-------

Unit tests so far.


Thanks,

Grant Henke


Re: Review Request 51541: FLUME-2983: Handle offset migration in the new Kafka Source

Posted by Grant Henke <gr...@gmail.com>.

> On Aug. 31, 2016, 3:27 p.m., Denes Arvay wrote:
> > flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java, lines 538-606
> > <https://reviews.apache.org/r/51541/diff/1/?file=1488879#file1488879line538>
> >
> >     I'm a bit concerned about this copy-paste from the KafkaChannel it should be extracted to a utility class.
> >     If you don't have time for this now please file a jira to improve/refactor this later. Thanks.
> 
> Grant Henke wrote:
>     I agree. I felt the same way initially. The reasons I didn't break it out into a utility class were:
>     - It's not clear the correct place it would go
>     - This code is temporary and should be removed when migration support can be dropped
>     - Though very similar there are some differences in the implimentation
>     
>     I can open a jira to track consolidation.

I created https://issues.apache.org/jira/browse/FLUME-2985 to track the consolidation.


- Grant


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51541/#review147430
-----------------------------------------------------------


On Aug. 30, 2016, 9:47 p.m., Grant Henke wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51541/
> -----------------------------------------------------------
> 
> (Updated Aug. 30, 2016, 9:47 p.m.)
> 
> 
> Review request for Flume.
> 
> 
> Bugs: FLUME-2983
>     https://issues.apache.org/jira/browse/FLUME-2983
> 
> 
> Repository: flume-git
> 
> 
> Description
> -------
> 
> Similar to FLUME-2972, Offsets tracking the position in Kafka consumers change from using zookeeper for offset storage to Kafka when moving from 0.8.x to 0.9.x.
> FLUME-2821 makes the client change in the Kafka Source but does not ensure existing offsets get migrated in order to continue consuming where it left off.
> This patch adds automated logic on startup to check if Kafka offsets exist, if not and migration is enabled (by default) then copy the offsets from Zookeeper and commit them to Kafka.
> This patch also fixes the backwards incompatibility caused by removing the zookeeperConnect property. The bootstrap can be looked up if zookeeperConnect is used.
> 
> 
> Diffs
> -----
> 
>   flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java e7f1f2e 
>   flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java e7ae68f 
>   flume-ng-doc/sphinx/FlumeUserGuide.rst 0fd1ec9 
>   flume-ng-sources/flume-kafka-source/pom.xml 5f5c2a8 
>   flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java 86782c3 
>   flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java 1f255f9 
>   flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java a3a2f92 
>   flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java 1598741 
> 
> Diff: https://reviews.apache.org/r/51541/diff/
> 
> 
> Testing
> -------
> 
> Unit tests so far.
> 
> 
> Thanks,
> 
> Grant Henke
> 
>


Re: Review Request 51541: FLUME-2983: Handle offset migration in the new Kafka Source

Posted by Grant Henke <gr...@gmail.com>.

> On Aug. 31, 2016, 3:27 p.m., Denes Arvay wrote:
> > flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java, lines 538-606
> > <https://reviews.apache.org/r/51541/diff/1/?file=1488879#file1488879line538>
> >
> >     I'm a bit concerned about this copy-paste from the KafkaChannel it should be extracted to a utility class.
> >     If you don't have time for this now please file a jira to improve/refactor this later. Thanks.

I agree. I felt the same way initially. The reasons I didn't break it out into a utility class were:
- It's not clear the correct place it would go
- This code is temporary and should be removed when migration support can be dropped
- Though very similar there are some differences in the implimentation

I can open a jira to track consolidation.


> On Aug. 31, 2016, 3:27 p.m., Denes Arvay wrote:
> > flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java, line 372
> > <https://reviews.apache.org/r/51541/diff/1/?file=1488879#file1488879line372>
> >
> >     What about using google guava's `Strings.isNullOrEmpty()` or `isEmpty()` in apache commons `StringUtils`? It's more readable.
> >     There are a couple of null-isEmpty checks, it might be worth to replace all of them.
> >     
> >     Or you might want to consider using the `context.getString`'s default value parameter.

There are a ton of places this pattern is used so I purposly didn't deviate from that for consistency. I think you could change this pattern everywhere in a sepearate re-factoring/clean up jira if you like.


> On Aug. 31, 2016, 3:27 p.m., Denes Arvay wrote:
> > flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java, line 394
> > <https://reviews.apache.org/r/51541/diff/1/?file=1488879#file1488879line394>
> >
> >     If Flume reconfigure occurs and the updated config file doesn't contain the `group.id` (`GROUP_ID_CONFIG`) entry it should use the default value, shouldn't it?
> >     
> >     I'm not sure though, as per the https://issues.apache.org/jira/browse/FLUME-2857 ticket this wouldn't be the first place where we introduce this. Wdyt?

I actually didn't change this logic, I just moved it out of setConsumerProps. I think finding and work done on this can be tracked in FLUME-2857 and other jiras.


- Grant


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51541/#review147430
-----------------------------------------------------------


On Aug. 30, 2016, 9:47 p.m., Grant Henke wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51541/
> -----------------------------------------------------------
> 
> (Updated Aug. 30, 2016, 9:47 p.m.)
> 
> 
> Review request for Flume.
> 
> 
> Bugs: FLUME-2983
>     https://issues.apache.org/jira/browse/FLUME-2983
> 
> 
> Repository: flume-git
> 
> 
> Description
> -------
> 
> Similar to FLUME-2972, Offsets tracking the position in Kafka consumers change from using zookeeper for offset storage to Kafka when moving from 0.8.x to 0.9.x.
> FLUME-2821 makes the client change in the Kafka Source but does not ensure existing offsets get migrated in order to continue consuming where it left off.
> This patch adds automated logic on startup to check if Kafka offsets exist, if not and migration is enabled (by default) then copy the offsets from Zookeeper and commit them to Kafka.
> This patch also fixes the backwards incompatibility caused by removing the zookeeperConnect property. The bootstrap can be looked up if zookeeperConnect is used.
> 
> 
> Diffs
> -----
> 
>   flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java e7f1f2e 
>   flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java e7ae68f 
>   flume-ng-doc/sphinx/FlumeUserGuide.rst 0fd1ec9 
>   flume-ng-sources/flume-kafka-source/pom.xml 5f5c2a8 
>   flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java 86782c3 
>   flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java 1f255f9 
>   flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java a3a2f92 
>   flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java 1598741 
> 
> Diff: https://reviews.apache.org/r/51541/diff/
> 
> 
> Testing
> -------
> 
> Unit tests so far.
> 
> 
> Thanks,
> 
> Grant Henke
> 
>


Re: Review Request 51541: FLUME-2983: Handle offset migration in the new Kafka Source

Posted by Denes Arvay <de...@cloudera.com>.

> On Aug. 31, 2016, 3:27 p.m., Denes Arvay wrote:
> > flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java, line 372
> > <https://reviews.apache.org/r/51541/diff/1/?file=1488879#file1488879line372>
> >
> >     What about using google guava's `Strings.isNullOrEmpty()` or `isEmpty()` in apache commons `StringUtils`? It's more readable.
> >     There are a couple of null-isEmpty checks, it might be worth to replace all of them.
> >     
> >     Or you might want to consider using the `context.getString`'s default value parameter.
> 
> Grant Henke wrote:
>     There are a ton of places this pattern is used so I purposly didn't deviate from that for consistency. I think you could change this pattern everywhere in a sepearate re-factoring/clean up jira if you like.

ok, agree.


> On Aug. 31, 2016, 3:27 p.m., Denes Arvay wrote:
> > flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java, line 394
> > <https://reviews.apache.org/r/51541/diff/1/?file=1488879#file1488879line394>
> >
> >     If Flume reconfigure occurs and the updated config file doesn't contain the `group.id` (`GROUP_ID_CONFIG`) entry it should use the default value, shouldn't it?
> >     
> >     I'm not sure though, as per the https://issues.apache.org/jira/browse/FLUME-2857 ticket this wouldn't be the first place where we introduce this. Wdyt?
> 
> Grant Henke wrote:
>     I actually didn't change this logic, I just moved it out of setConsumerProps. I think finding and work done on this can be tracked in FLUME-2857 and other jiras.

I see, thanks for the clarification.


> On Aug. 31, 2016, 3:27 p.m., Denes Arvay wrote:
> > flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java, lines 538-606
> > <https://reviews.apache.org/r/51541/diff/1/?file=1488879#file1488879line538>
> >
> >     I'm a bit concerned about this copy-paste from the KafkaChannel it should be extracted to a utility class.
> >     If you don't have time for this now please file a jira to improve/refactor this later. Thanks.
> 
> Grant Henke wrote:
>     I agree. I felt the same way initially. The reasons I didn't break it out into a utility class were:
>     - It's not clear the correct place it would go
>     - This code is temporary and should be removed when migration support can be dropped
>     - Though very similar there are some differences in the implimentation
>     
>     I can open a jira to track consolidation.
> 
> Grant Henke wrote:
>     I created https://issues.apache.org/jira/browse/FLUME-2985 to track the consolidation.

"- It's not clear the correct place it would go" - definitely agree, I wanted to come up with some concrete advice on this but I couldn't.
Thanks for the jira!


- Denes


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51541/#review147430
-----------------------------------------------------------


On Aug. 31, 2016, 3:46 p.m., Grant Henke wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51541/
> -----------------------------------------------------------
> 
> (Updated Aug. 31, 2016, 3:46 p.m.)
> 
> 
> Review request for Flume.
> 
> 
> Bugs: FLUME-2983
>     https://issues.apache.org/jira/browse/FLUME-2983
> 
> 
> Repository: flume-git
> 
> 
> Description
> -------
> 
> Similar to FLUME-2972, Offsets tracking the position in Kafka consumers change from using zookeeper for offset storage to Kafka when moving from 0.8.x to 0.9.x.
> FLUME-2821 makes the client change in the Kafka Source but does not ensure existing offsets get migrated in order to continue consuming where it left off.
> This patch adds automated logic on startup to check if Kafka offsets exist, if not and migration is enabled (by default) then copy the offsets from Zookeeper and commit them to Kafka.
> This patch also fixes the backwards incompatibility caused by removing the zookeeperConnect property. The bootstrap can be looked up if zookeeperConnect is used.
> 
> 
> Diffs
> -----
> 
>   flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java e7f1f2e 
>   flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java e7ae68f 
>   flume-ng-doc/sphinx/FlumeUserGuide.rst 0fd1ec9 
>   flume-ng-sources/flume-kafka-source/pom.xml 5f5c2a8 
>   flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java 86782c3 
>   flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java 1f255f9 
>   flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java a3a2f92 
>   flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java 1598741 
> 
> Diff: https://reviews.apache.org/r/51541/diff/
> 
> 
> Testing
> -------
> 
> Unit tests so far.
> 
> 
> Thanks,
> 
> Grant Henke
> 
>


Re: Review Request 51541: FLUME-2983: Handle offset migration in the new Kafka Source

Posted by Denes Arvay <de...@cloudera.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51541/#review147430
-----------------------------------------------------------




flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java (lines 126 - 127)
<https://reviews.apache.org/r/51541/#comment214596>

    nit: no need to initialize with `null`



flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java (line 372)
<https://reviews.apache.org/r/51541/#comment214597>

    What about using google guava's `Strings.isNullOrEmpty()` or `isEmpty()` in apache commons `StringUtils`? It's more readable.
    There are a couple of null-isEmpty checks, it might be worth to replace all of them.
    
    Or you might want to consider using the `context.getString`'s default value parameter.



flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java (line 394)
<https://reviews.apache.org/r/51541/#comment214598>

    If Flume reconfigure occurs and the updated config file doesn't contain the `group.id` (`GROUP_ID_CONFIG`) entry it should use the default value, shouldn't it?
    
    I'm not sure though, as per the https://issues.apache.org/jira/browse/FLUME-2857 ticket this wouldn't be the first place where we introduce this. Wdyt?



flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java (line 433)
<https://reviews.apache.org/r/51541/#comment214602>

    Could you please remove the `bootStrapServers` parameter? It's not needed as it was moved to instance level.



flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java (lines 530 - 598)
<https://reviews.apache.org/r/51541/#comment214603>

    I'm a bit concerned about this copy-paste from the KafkaChannel it should be extracted to a utility class.
    If you don't have time for this now please file a jira to improve/refactor this later. Thanks.


checkstyle & tests pass

- Denes Arvay


On Aug. 30, 2016, 9:47 p.m., Grant Henke wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51541/
> -----------------------------------------------------------
> 
> (Updated Aug. 30, 2016, 9:47 p.m.)
> 
> 
> Review request for Flume.
> 
> 
> Bugs: FLUME-2983
>     https://issues.apache.org/jira/browse/FLUME-2983
> 
> 
> Repository: flume-git
> 
> 
> Description
> -------
> 
> Similar to FLUME-2972, Offsets tracking the position in Kafka consumers change from using zookeeper for offset storage to Kafka when moving from 0.8.x to 0.9.x.
> FLUME-2821 makes the client change in the Kafka Source but does not ensure existing offsets get migrated in order to continue consuming where it left off.
> This patch adds automated logic on startup to check if Kafka offsets exist, if not and migration is enabled (by default) then copy the offsets from Zookeeper and commit them to Kafka.
> This patch also fixes the backwards incompatibility caused by removing the zookeeperConnect property. The bootstrap can be looked up if zookeeperConnect is used.
> 
> 
> Diffs
> -----
> 
>   flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java e7f1f2e 
>   flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java e7ae68f 
>   flume-ng-doc/sphinx/FlumeUserGuide.rst 0fd1ec9 
>   flume-ng-sources/flume-kafka-source/pom.xml 5f5c2a8 
>   flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java 86782c3 
>   flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java 1f255f9 
>   flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java a3a2f92 
>   flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java 1598741 
> 
> Diff: https://reviews.apache.org/r/51541/diff/
> 
> 
> Testing
> -------
> 
> Unit tests so far.
> 
> 
> Thanks,
> 
> Grant Henke
> 
>