You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Chris Riccomini <cr...@apache.org> on 2015/02/27 02:51:10 UTC
Review Request 31520: SAMZA-579
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31520/
-----------------------------------------------------------
Review request for samza.
Bugs: SAMZA-579
https://issues.apache.org/jira/browse/SAMZA-579
Repository: samza
Description
-------
working patch
Diffs
-----
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala 4918e3e88c20f3db11f5ae651ae104cf63b3d592
Diff: https://reviews.apache.org/r/31520/diff/
Testing
-------
Thanks,
Chris Riccomini
Re: Review Request 31520: SAMZA-579
Posted by Chris Riccomini <cr...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31520/
-----------------------------------------------------------
(Updated Feb. 27, 2015, 8:59 p.m.)
Review request for samza.
Bugs: SAMZA-579
https://issues.apache.org/jira/browse/SAMZA-579
Repository: samza
Description (updated)
-------
make refresh brokers thread safe
fix test
clean up droppedTopicAndPartitions slightly
misc formatting cleanup from SAMZA-394. fix bug in KafkaSystemConsumer where we weren't actually adding to droppedTopicAndPartitions
make refreshBrokers thread safe
temporarily drop ssps without metadata, and retry later.
Diffs (updated)
-----
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala e3b9d304353981abc4d9760ccb078ff6dddcbb19
samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala cc0a4c6913ac7a08d520bb485146b1488f43ef98
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala 4918e3e88c20f3db11f5ae651ae104cf63b3d592
samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala 48ad66eb07e2b5743e4f0cf648c67b48dc05e067
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala 6f05f3c028cf77dbee9186d3463b1999c70dac43
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala 2c0f80380eeca00c556662122856823c297ac40d
samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java 1fef1eaa6266f638cf9f70c982f7d5f99f5c535c
samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java e958b51610d6927121bacffdf6d30efc13b5f444
samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java cb3083832986d5028fdfb804bc4e8c8e61b6c60b
Diff: https://reviews.apache.org/r/31520/diff/
Testing
-------
Thanks,
Chris Riccomini
Re: Review Request 31520: SAMZA-579
Posted by Yan Fang <ya...@gmail.com>.
> On Feb. 27, 2015, 7:40 p.m., Yan Fang wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala, line 151
> > <https://reviews.apache.org/r/31520/diff/3/?file=880457#file880457line151>
> >
> > 1. besides "abdicate", refreshBrokers is also called by "messageSink.refreshDropped" in the BrokerProxy thread.
> >
> > 2. in terms of *"creating multiple objects for the same broker"*, even though we "synchronized" this part, it seems still possible to have multiple objects for the same broker:
> > * thread 1 runs until "val nextoffset = topicPartitionsAndOffsets.get(head).get", so the "head" already has topic "t1". Then it is blocked by thread 2.
> > * thread 2 runs, synchronized, creates the broker for topic "t1", and remove it from "droopedTopicAndPartitions". release the lock.
> > * thread 1 goes into the "synchronized" part, it will still create broker for topic "t1" because "val head" already has the value.
Sorry, the format is a little confusing... "3,4,5" is bullet list for "2". :)
- Yan
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31520/#review74558
-----------------------------------------------------------
On Feb. 27, 2015, 5:59 p.m., Chris Riccomini wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31520/
> -----------------------------------------------------------
>
> (Updated Feb. 27, 2015, 5:59 p.m.)
>
>
> Review request for samza.
>
>
> Bugs: SAMZA-579
> https://issues.apache.org/jira/browse/SAMZA-579
>
>
> Repository: samza
>
>
> Description
> -------
>
> fix test
>
>
> clean up droppedTopicAndPartitions slightly
>
>
> misc formatting cleanup from SAMZA-394. fix bug in KafkaSystemConsumer where we weren't actually adding to droppedTopicAndPartitions
>
>
> make refreshBrokers thread safe
>
>
> temporarily drop ssps without metadata, and retry later.
>
>
> Diffs
> -----
>
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala e3b9d304353981abc4d9760ccb078ff6dddcbb19
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala cc0a4c6913ac7a08d520bb485146b1488f43ef98
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala 4918e3e88c20f3db11f5ae651ae104cf63b3d592
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala 48ad66eb07e2b5743e4f0cf648c67b48dc05e067
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala 6f05f3c028cf77dbee9186d3463b1999c70dac43
> samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java 1fef1eaa6266f638cf9f70c982f7d5f99f5c535c
> samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java e958b51610d6927121bacffdf6d30efc13b5f444
> samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java cb3083832986d5028fdfb804bc4e8c8e61b6c60b
>
> Diff: https://reviews.apache.org/r/31520/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Chris Riccomini
>
>
Re: Review Request 31520: SAMZA-579
Posted by Chris Riccomini <cr...@apache.org>.
> On Feb. 27, 2015, 7:40 p.m., Yan Fang wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala, line 151
> > <https://reviews.apache.org/r/31520/diff/3/?file=880457#file880457line151>
> >
> > 1. besides "abdicate", refreshBrokers is also called by "messageSink.refreshDropped" in the BrokerProxy thread.
> >
> > 2. in terms of *"creating multiple objects for the same broker"*, even though we "synchronized" this part, it seems still possible to have multiple objects for the same broker:
> > * thread 1 runs until "val nextoffset = topicPartitionsAndOffsets.get(head).get", so the "head" already has topic "t1". Then it is blocked by thread 2.
> > * thread 2 runs, synchronized, creates the broker for topic "t1", and remove it from "droopedTopicAndPartitions". release the lock.
> > * thread 1 goes into the "synchronized" part, it will still create broker for topic "t1" because "val head" already has the value.
>
> Yan Fang wrote:
> Sorry, the format is a little confusing... "3,4,5" is bullet list for "2". :)
Good catch. Let me think about this a bit.
- Chris
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31520/#review74558
-----------------------------------------------------------
On Feb. 27, 2015, 5:59 p.m., Chris Riccomini wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31520/
> -----------------------------------------------------------
>
> (Updated Feb. 27, 2015, 5:59 p.m.)
>
>
> Review request for samza.
>
>
> Bugs: SAMZA-579
> https://issues.apache.org/jira/browse/SAMZA-579
>
>
> Repository: samza
>
>
> Description
> -------
>
> fix test
>
>
> clean up droppedTopicAndPartitions slightly
>
>
> misc formatting cleanup from SAMZA-394. fix bug in KafkaSystemConsumer where we weren't actually adding to droppedTopicAndPartitions
>
>
> make refreshBrokers thread safe
>
>
> temporarily drop ssps without metadata, and retry later.
>
>
> Diffs
> -----
>
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala e3b9d304353981abc4d9760ccb078ff6dddcbb19
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala cc0a4c6913ac7a08d520bb485146b1488f43ef98
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala 4918e3e88c20f3db11f5ae651ae104cf63b3d592
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala 48ad66eb07e2b5743e4f0cf648c67b48dc05e067
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala 6f05f3c028cf77dbee9186d3463b1999c70dac43
> samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java 1fef1eaa6266f638cf9f70c982f7d5f99f5c535c
> samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java e958b51610d6927121bacffdf6d30efc13b5f444
> samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java cb3083832986d5028fdfb804bc4e8c8e61b6c60b
>
> Diff: https://reviews.apache.org/r/31520/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Chris Riccomini
>
>
Re: Review Request 31520: SAMZA-579
Posted by Yan Fang <ya...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31520/#review74558
-----------------------------------------------------------
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
<https://reviews.apache.org/r/31520/#comment121160>
1. besides "abdicate", refreshBrokers is also called by "messageSink.refreshDropped" in the BrokerProxy thread.
2. in terms of *"creating multiple objects for the same broker"*, even though we "synchronized" this part, it seems still possible to have multiple objects for the same broker:
* thread 1 runs until "val nextoffset = topicPartitionsAndOffsets.get(head).get", so the "head" already has topic "t1". Then it is blocked by thread 2.
* thread 2 runs, synchronized, creates the broker for topic "t1", and remove it from "droopedTopicAndPartitions". release the lock.
* thread 1 goes into the "synchronized" part, it will still create broker for topic "t1" because "val head" already has the value.
- Yan Fang
On Feb. 27, 2015, 5:59 p.m., Chris Riccomini wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31520/
> -----------------------------------------------------------
>
> (Updated Feb. 27, 2015, 5:59 p.m.)
>
>
> Review request for samza.
>
>
> Bugs: SAMZA-579
> https://issues.apache.org/jira/browse/SAMZA-579
>
>
> Repository: samza
>
>
> Description
> -------
>
> fix test
>
>
> clean up droppedTopicAndPartitions slightly
>
>
> misc formatting cleanup from SAMZA-394. fix bug in KafkaSystemConsumer where we weren't actually adding to droppedTopicAndPartitions
>
>
> make refreshBrokers thread safe
>
>
> temporarily drop ssps without metadata, and retry later.
>
>
> Diffs
> -----
>
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala e3b9d304353981abc4d9760ccb078ff6dddcbb19
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala cc0a4c6913ac7a08d520bb485146b1488f43ef98
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala 4918e3e88c20f3db11f5ae651ae104cf63b3d592
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala 48ad66eb07e2b5743e4f0cf648c67b48dc05e067
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala 6f05f3c028cf77dbee9186d3463b1999c70dac43
> samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java 1fef1eaa6266f638cf9f70c982f7d5f99f5c535c
> samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java e958b51610d6927121bacffdf6d30efc13b5f444
> samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java cb3083832986d5028fdfb804bc4e8c8e61b6c60b
>
> Diff: https://reviews.apache.org/r/31520/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Chris Riccomini
>
>
Re: Review Request 31520: SAMZA-579
Posted by Chris Riccomini <cr...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31520/
-----------------------------------------------------------
(Updated Feb. 27, 2015, 5:59 p.m.)
Review request for samza.
Bugs: SAMZA-579
https://issues.apache.org/jira/browse/SAMZA-579
Repository: samza
Description (updated)
-------
fix test
clean up droppedTopicAndPartitions slightly
misc formatting cleanup from SAMZA-394. fix bug in KafkaSystemConsumer where we weren't actually adding to droppedTopicAndPartitions
make refreshBrokers thread safe
temporarily drop ssps without metadata, and retry later.
Diffs (updated)
-----
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala e3b9d304353981abc4d9760ccb078ff6dddcbb19
samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala cc0a4c6913ac7a08d520bb485146b1488f43ef98
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala 4918e3e88c20f3db11f5ae651ae104cf63b3d592
samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala 48ad66eb07e2b5743e4f0cf648c67b48dc05e067
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala 6f05f3c028cf77dbee9186d3463b1999c70dac43
samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java 1fef1eaa6266f638cf9f70c982f7d5f99f5c535c
samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java e958b51610d6927121bacffdf6d30efc13b5f444
samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java cb3083832986d5028fdfb804bc4e8c8e61b6c60b
Diff: https://reviews.apache.org/r/31520/diff/
Testing
-------
Thanks,
Chris Riccomini
Re: Review Request 31520: SAMZA-579
Posted by Chris Riccomini <cr...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31520/
-----------------------------------------------------------
(Updated Feb. 27, 2015, 5:34 p.m.)
Review request for samza.
Bugs: SAMZA-579
https://issues.apache.org/jira/browse/SAMZA-579
Repository: samza
Description (updated)
-------
misc formatting cleanup from SAMZA-394. fix bug in KafkaSystemConsumer where we weren't actually adding to droppedTopicAndPartitions
make refreshBrokers thread safe
temporarily drop ssps without metadata, and retry later.
Diffs (updated)
-----
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala e3b9d304353981abc4d9760ccb078ff6dddcbb19
samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala cc0a4c6913ac7a08d520bb485146b1488f43ef98
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala 4918e3e88c20f3db11f5ae651ae104cf63b3d592
samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala 48ad66eb07e2b5743e4f0cf648c67b48dc05e067
samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java 1fef1eaa6266f638cf9f70c982f7d5f99f5c535c
samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java e958b51610d6927121bacffdf6d30efc13b5f444
samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java cb3083832986d5028fdfb804bc4e8c8e61b6c60b
Diff: https://reviews.apache.org/r/31520/diff/
Testing
-------
Thanks,
Chris Riccomini