You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Navina Ramesh <nr...@linkedin.com> on 2015/08/11 21:17:53 UTC
Re: Review Request 34974: SAMZA-676: implement broadcast stream
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34974/#review93553
-----------------------------------------------------------
Lgtm, overall! I was able to test it as well.
Some feedback: Would it possible to mark the SSP of the IncomingMessageEnvelope with a boolean flag indicating whether it is from a broadcast stream or not? When I tried writing a sample code, I realized that the streamTask has to be aware of the exact topic name in order to figure out if the message was from a broadcast system stream. If we had an API such as "envelope.getSystemStreamPartition().isBroadcast()", I think it will be more convenient. What do you think?
docs/learn/documentation/versioned/jobs/configuration-table.html (line 453)
<https://reviews.apache.org/r/34974/#comment147930>
Rephrase - "This property specifies the partitions that all tasks should consume."
samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java (line 73)
<https://reviews.apache.org/r/34974/#comment147929>
malformed html error in javadocs for '<' and '>'
Also, in Line 79
I think the delay in my review caused a few more commits to go through. So, you have to rebase and update your patch again.
I found this error:
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala:27: error: class HdfsSystemAdmin needs to be abstract, since method offsetComparator in trait SystemAdmin of type (x$1: String, x$2: String)Integer is not defined
class HdfsSystemAdmin extends SystemAdmin with Logging {
^
one error found
I guess you can just throw an UnsupportedMethodException for now.
- Navina Ramesh
On July 29, 2015, 10:49 p.m., Yan Fang wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34974/
> -----------------------------------------------------------
>
> (Updated July 29, 2015, 10:49 p.m.)
>
>
> Review request for samza.
>
>
> Bugs: SAMZA-676
> https://issues.apache.org/jira/browse/SAMZA-676
>
>
> Repository: samza
>
>
> Description
> -------
>
> 1. added offsetComparator method in SystemAdmin Interface
>
> 2. added "task.global.inputs" config
>
> 3. rewrote Grouper classes using Java; allows to assign global streams during grouping
>
> 4. used LinkedHashSet instead of HashSet in CoordinatorStreamSystemConsumer to preserve messages order
>
> 5. added taskNames to the offsets in OffsetManager
>
> 6. allowed to assign one SSP to multiple taskInstances
>
> 7. skipped already-processed messages in RunLoop
>
> 8. unit tests for all changes
>
>
> Diffs
> -----
>
> checkstyle/import-control.xml 6654319
> docs/learn/documentation/versioned/container/samza-container.md 9f46414
> docs/learn/documentation/versioned/jobs/configuration-table.html ea73b40
> samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588eb
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae
> samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java PRE-CREATION
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java PRE-CREATION
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartitionFactory.java PRE-CREATION
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java PRE-CREATION
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartitionFactory.java PRE-CREATION
> samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 20e5d26
> samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala e4b14f4
> samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala c292ae4
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 27b2517
> samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala c5a5ea5
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala 9dc7051
> samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupByPartition.scala 44e95fc
> samza-core/src/main/scala/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.scala 3c0acad
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f410
> samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java PRE-CREATION
> samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java PRE-CREATION
> samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java PRE-CREATION
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c46
> samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 64a5844
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 84fdeaa
> samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 7caad28
> samza-core/src/test/scala/org/apache/samza/container/grouper/stream/GroupByTestBase.scala a14169b
> samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupByPartition.scala 74daf72
> samza-core/src/test/scala/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.scala deb3895
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 4097ac7
> samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java 1fd5dd3
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f5
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala de00320
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala 1629035
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala 2a84328
> samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b4
>
> Diff: https://reviews.apache.org/r/34974/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Yan Fang
>
>