You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Robert Crim <rj...@gmail.com> on 2016/06/24 19:45:04 UTC

Review Request 49212: RFC: SAMZA-855: Update kafka client to 0.10.0.0

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/49212/
-----------------------------------------------------------

Review request for samza.


Repository: samza


Description
-------

This is a WIP for updating the the kafka client libraries to 0.10+. So far, I've updated the dependency and simply worked to get all existing tests passing. The next steps are to further test/verify backwards compatiblity with older brokers and moving the current `KafkaSystemFactory`, etc, to `OldKafkaSystemFactory` and implementing the new clients.


Diffs
-----

  build.gradle ba4a9d1 
  gradle/dependency-versions.gradle 47c71bf 
  samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ea10cae 
  samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala 4e97376 
  samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 78467bf 
  samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala 5e8cc65 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ba8de5c 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b373753 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176 
  samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala a25ba62 
  samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java 6f498de 
  samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java 2fa743f 
  samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala e6815da 
  samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala 504fc89 
  samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala f00405d 
  samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala ece0359 
  samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala 8e32bba 
  samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala 8d7e3fe 

Diff: https://reviews.apache.org/r/49212/diff/


Testing
-------

Got `./gradlew clean check` passing. I've not been able to run the integration tests (on any branch) but will do that next!


Thanks,

Robert Crim


Re: flushing changelog & checkpointing

Posted by Jacob Maes <ja...@gmail.com>.
Hey Ramanan,

Confirmed. It all happens in commit (at the "checkpoint interval")

The operations are executed serially for each task. The order is exactly as
Yi described. The order was chosen with crashes in mind. That is, the
checkpoint is not written until state has been updated and output messages
have been sent. Note, this could cause state and/or output messages to be
rewritten in failure scenarios because after restart the task would resume
from the previous checkpoint, reprocessing any incoming messages that had
already been processed since that checkpoint.

If you'd like to see for yourself, take a look at the commit() method here:
https://github.com/apache/samza/blob/f02386464d31b5a496bb0578838f51a0331bfffa/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala

Jake



On Wed, Jul 6, 2016 at 10:16 AM, Ramanan, Buvana (Nokia - US) <
buvana.ramanan@nokia-bell-labs.com> wrote:

> Thank you Yi! Please also confirm that all this is done at the exact same
> interval - that is checkpoint interval.
>
> Are the 3 operations you listed done in sequence or in parallel? If
> sequential, then what is the order? Can you also comment on server failure
> scenarios while this is being carried out - that is, a failure results in a
> subset of these operations not completing, in which case the newly spawned
> Samza container may have stale state. How likely is that, have you faced it?
>
> -----Original Message-----
> From: Yi Pan [mailto:nickpan47@gmail.com]
> Sent: Wednesday, July 06, 2016 1:10 PM
> To: dev@samza.apache.org
> Subject: Re: flushing changelog & checkpointing
>
> Hi, Buvana,
>
> Please see answers below.
>
> On Tue, Jul 5, 2016 at 11:47 AM, Ramanan, Buvana (Nokia - US) <
> buvana.ramanan@nokia-bell-labs.com> wrote:
>
> >
> > Does this mean that all writes to the disk for state store purposes
> > will be done at the checkpointing time (which is also the time Samza
> > checkpoints the incoming stream offsets)? Does this also mean new data
> > to the changelog stream will be emitted at checkpointing time?
> >
> >
> Yes, the commit workflow in Samza guarantees that a) pending writes to
> on-disk KV-stores are flushed; b) all pending writes to output streams are
> flushed, including the writes to changelog streams; c) all input offsets
> are flushed to checkpoint.
>
> Hope the above answers your questions.
>
> -Yi
>

RE: flushing changelog & checkpointing

Posted by "Ramanan, Buvana (Nokia - US)" <bu...@nokia-bell-labs.com>.
Thank you Yi! Please also confirm that all this is done at the exact same interval - that is checkpoint interval.

Are the 3 operations you listed done in sequence or in parallel? If sequential, then what is the order? Can you also comment on server failure scenarios while this is being carried out - that is, a failure results in a subset of these operations not completing, in which case the newly spawned Samza container may have stale state. How likely is that, have you faced it?

-----Original Message-----
From: Yi Pan [mailto:nickpan47@gmail.com] 
Sent: Wednesday, July 06, 2016 1:10 PM
To: dev@samza.apache.org
Subject: Re: flushing changelog & checkpointing

Hi, Buvana,

Please see answers below.

On Tue, Jul 5, 2016 at 11:47 AM, Ramanan, Buvana (Nokia - US) < buvana.ramanan@nokia-bell-labs.com> wrote:

>
> Does this mean that all writes to the disk for state store purposes 
> will be done at the checkpointing time (which is also the time Samza 
> checkpoints the incoming stream offsets)? Does this also mean new data 
> to the changelog stream will be emitted at checkpointing time?
>
>
Yes, the commit workflow in Samza guarantees that a) pending writes to on-disk KV-stores are flushed; b) all pending writes to output streams are flushed, including the writes to changelog streams; c) all input offsets are flushed to checkpoint.

Hope the above answers your questions.

-Yi

Re: flushing changelog & checkpointing

Posted by Yi Pan <ni...@gmail.com>.
Hi, Buvana,

Please see answers below.

On Tue, Jul 5, 2016 at 11:47 AM, Ramanan, Buvana (Nokia - US) <
buvana.ramanan@nokia-bell-labs.com> wrote:

>
> Does this mean that all writes to the disk for state store purposes will
> be done at the checkpointing time (which is also the time Samza checkpoints
> the incoming stream offsets)? Does this also mean new data to the changelog
> stream will be emitted at checkpointing time?
>
>
Yes, the commit workflow in Samza guarantees that a) pending writes to
on-disk KV-stores are flushed; b) all pending writes to output streams are
flushed, including the writes to changelog streams; c) all input offsets
are flushed to checkpoint.

Hope the above answers your questions.

-Yi

flushing changelog & checkpointing

Posted by "Ramanan, Buvana (Nokia - US)" <bu...@nokia-bell-labs.com>.
Hello,

I am looking for more details on the point in processing cycle at which the state store in Samza is written to disk. I noted the following statement in Samza Stateful Processing section:

"Samza includes an additional in-memory caching layer in front of RocksDB, which avoids the cost of deserialization for frequently-accessed objects and batches writes. If the same key is updated multiple times in quick succession, the batching coalesces those updates into a single write. The writes are flushed to the changelog when a task commits."

Here, "commits" is actually a hyperlink and clicking on it takes me to checkpointing section, which I have read a lot of times from the pt of view of input stream offset commits.

Does this mean that all writes to the disk for state store purposes will be done at the checkpointing time (which is also the time Samza checkpoints the incoming stream offsets)? Does this also mean new data to the changelog stream will be emitted at checkpointing time?

Thanks,
Buvana

Re: Review Request 49212: RFC: SAMZA-855: Update kafka client to 0.10.0.0

Posted by Robert Crim <rj...@gmail.com>.

> On June 29, 2016, 2:16 a.m., Navina Ramesh wrote:
> > Robert,
> > Are you expecting more changes for 0.10.0 upgrade? Just checking on your progress. Thanks!

Yes, this just updates the client version but does not use the 0.9+ producer and consumer.


- Robert


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/49212/#review139920
-----------------------------------------------------------


On June 24, 2016, 7:45 p.m., Robert Crim wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/49212/
> -----------------------------------------------------------
> 
> (Updated June 24, 2016, 7:45 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This is a WIP for updating the the kafka client libraries to 0.10+. So far, I've updated the dependency and simply worked to get all existing tests passing. The next steps are to further test/verify backwards compatiblity with older brokers and moving the current `KafkaSystemFactory`, etc, to `OldKafkaSystemFactory` and implementing the new clients.
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d1 
>   gradle/dependency-versions.gradle 47c71bf 
>   samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ea10cae 
>   samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala 4e97376 
>   samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 78467bf 
>   samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala 5e8cc65 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ba8de5c 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b373753 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176 
>   samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala a25ba62 
>   samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java 6f498de 
>   samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java 2fa743f 
>   samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala e6815da 
>   samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala 504fc89 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala f00405d 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala ece0359 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala 8e32bba 
>   samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala 8d7e3fe 
> 
> Diff: https://reviews.apache.org/r/49212/diff/
> 
> 
> Testing
> -------
> 
> Got `./gradlew clean check` passing. I've not been able to run the integration tests (on any branch) but will do that next!
> 
> 
> Thanks,
> 
> Robert Crim
> 
>


Re: Review Request 49212: RFC: SAMZA-855: Update kafka client to 0.10.0.0

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/49212/#review139920
-----------------------------------------------------------



Robert,
Are you expecting more changes for 0.10.0 upgrade? Just checking on your progress. Thanks!

- Navina Ramesh


On June 24, 2016, 7:45 p.m., Robert Crim wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/49212/
> -----------------------------------------------------------
> 
> (Updated June 24, 2016, 7:45 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This is a WIP for updating the the kafka client libraries to 0.10+. So far, I've updated the dependency and simply worked to get all existing tests passing. The next steps are to further test/verify backwards compatiblity with older brokers and moving the current `KafkaSystemFactory`, etc, to `OldKafkaSystemFactory` and implementing the new clients.
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d1 
>   gradle/dependency-versions.gradle 47c71bf 
>   samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ea10cae 
>   samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala 4e97376 
>   samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 78467bf 
>   samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala 5e8cc65 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ba8de5c 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b373753 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176 
>   samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala a25ba62 
>   samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java 6f498de 
>   samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java 2fa743f 
>   samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala e6815da 
>   samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala 504fc89 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala f00405d 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala ece0359 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala 8e32bba 
>   samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala 8d7e3fe 
> 
> Diff: https://reviews.apache.org/r/49212/diff/
> 
> 
> Testing
> -------
> 
> Got `./gradlew clean check` passing. I've not been able to run the integration tests (on any branch) but will do that next!
> 
> 
> Thanks,
> 
> Robert Crim
> 
>


Re: Review Request 49212: RFC: SAMZA-855: Update kafka client to 0.10.0.0

Posted by Nicolas Colomer <co...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/49212/#review150178
-----------------------------------------------------------



I reworked Robert Crim's patch and rebased it on current Samza master (commit [49dac97](https://github.com/apache/samza/commit/49dac97) @ 2016-09-21T18:52:54Z)
See [SAMZA-855](https://issues.apache.org/jira/browse/SAMZA-855) issue and [apache/samza#15](https://github.com/apache/samza/pull/15) PR for the details.

- Nicolas Colomer


On juin 24, 2016, 7:45 apr�s-midi, Robert Crim wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/49212/
> -----------------------------------------------------------
> 
> (Updated juin 24, 2016, 7:45 apr�s-midi)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This is a WIP for updating the the kafka client libraries to 0.10+. So far, I've updated the dependency and simply worked to get all existing tests passing. The next steps are to further test/verify backwards compatiblity with older brokers and moving the current `KafkaSystemFactory`, etc, to `OldKafkaSystemFactory` and implementing the new clients.
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d1 
>   gradle/dependency-versions.gradle 47c71bf 
>   samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ea10cae 
>   samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala 4e97376 
>   samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 78467bf 
>   samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala 5e8cc65 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ba8de5c 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b373753 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176 
>   samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala a25ba62 
>   samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java 6f498de 
>   samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java 2fa743f 
>   samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala e6815da 
>   samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala 504fc89 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala f00405d 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala ece0359 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala 8e32bba 
>   samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala 8d7e3fe 
> 
> Diff: https://reviews.apache.org/r/49212/diff/
> 
> 
> Testing
> -------
> 
> Got `./gradlew clean check` passing. I've not been able to run the integration tests (on any branch) but will do that next!
> 
> 
> Thanks,
> 
> Robert Crim
> 
>


Re: Review Request 49212: RFC: SAMZA-855: Update kafka client to 0.10.0.0

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/49212/#review146701
-----------------------------------------------------------




samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala (line 37)
<https://reviews.apache.org/r/49212/#comment213264>

    This whole class is deleted in another RB. Could you rebase w/ latest master?


- Yi Pan (Data Infrastructure)


On June 24, 2016, 7:45 p.m., Robert Crim wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/49212/
> -----------------------------------------------------------
> 
> (Updated June 24, 2016, 7:45 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This is a WIP for updating the the kafka client libraries to 0.10+. So far, I've updated the dependency and simply worked to get all existing tests passing. The next steps are to further test/verify backwards compatiblity with older brokers and moving the current `KafkaSystemFactory`, etc, to `OldKafkaSystemFactory` and implementing the new clients.
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d1 
>   gradle/dependency-versions.gradle 47c71bf 
>   samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ea10cae 
>   samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala 4e97376 
>   samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 78467bf 
>   samza-kafka/src/main/scala/org/apache/samza/migration/KafkaCheckpointMigration.scala 5e8cc65 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ba8de5c 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b373753 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176 
>   samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala a25ba62 
>   samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java 6f498de 
>   samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java 2fa743f 
>   samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala e6815da 
>   samza-kafka/src/test/scala/org/apache/samza/migration/TestKafkaCheckpointMigration.scala 504fc89 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala f00405d 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala ece0359 
>   samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala 8e32bba 
>   samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala 8d7e3fe 
> 
> Diff: https://reviews.apache.org/r/49212/diff/
> 
> 
> Testing
> -------
> 
> Got `./gradlew clean check` passing. I've not been able to run the integration tests (on any branch) but will do that next!
> 
> 
> Thanks,
> 
> Robert Crim
> 
>