You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Robert Zuljevic <r....@levi9.com> on 2015/07/03 11:42:29 UTC

Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/
-----------------------------------------------------------

Review request for samza.


Repository: samza


Description
-------

Moved the creation of change log stream from SamzaContainer to JobCoordinator


Diffs
-----

  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
  samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 
  samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
  samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 

Diff: https://reviews.apache.org/r/36163/diff/


Testing
-------


Thanks,

Robert Zuljevic


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by Robert Zuljevic <r....@levi9.com>.

> On July 25, 2015, 1:33 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala, line 126
> > <https://reviews.apache.org/r/36163/diff/2/?file=1003380#file1003380line126>
> >
> >     add logs for the case where the topic is already existied. Log the metadata information. (like the original createStream code does)

This is already done via KafkaSystemAdmin's createChangelogStream method. Do you want me not to call this method there, but rather call it in JobCoordinator, right after cration?


- Robert


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93019
-----------------------------------------------------------


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> -----------------------------------------------------------
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 
>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> -------
> 
> I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by Yan Fang <ya...@gmail.com>.

> On July 25, 2015, 1:33 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala, lines 121-125
> > <https://reviews.apache.org/r/36163/diff/2/?file=1003380#file1003380line121>
> >
> >     this can be simplified a little:
> >     
> >     for ((storeName, systemStream) <- changeLogSystemStreams) {
> >           val systemAdmin = config
> >             .getSystemFactory(systemStream.getName)
> >             .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName)).map(Util.getObj[SystemFactory](_)).getOrElse(systemStream.getSystem,
> >               throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
> >               
> >               
> >     Then  do not need line 104-109, line 117-119.
> 
> Robert Zuljevic wrote:
>     Did you mean something like this?
>     
>     for ((storeName, systemStream) <- changeLogSystemStreams) {
>           val systemAdmin = Util.getObj[SystemFactory](config
>             .getSystemFactory(systemStream.getSystem)
>             .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem))
>             ).getAdmin(systemStream.getSystem, config)
>         
>           systemAdmin.createChangelogStream(systemStream.getStream, changeLogPartitions)
>         }
>     
>     
>     This is the only way I could thought of for simplifing this. I don't think what you posted would work, because you're using String's map function, but it did steer me in the right direction. Do you agree?

yes, you are right. This is what I was thinking. Not tested the code though. :)


- Yan


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93019
-----------------------------------------------------------


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> -----------------------------------------------------------
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 
>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> -------
> 
> I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by Robert Zuljevic <r....@levi9.com>.

> On July 25, 2015, 1:33 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala, lines 121-125
> > <https://reviews.apache.org/r/36163/diff/2/?file=1003380#file1003380line121>
> >
> >     this can be simplified a little:
> >     
> >     for ((storeName, systemStream) <- changeLogSystemStreams) {
> >           val systemAdmin = config
> >             .getSystemFactory(systemStream.getName)
> >             .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName)).map(Util.getObj[SystemFactory](_)).getOrElse(systemStream.getSystem,
> >               throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
> >               
> >               
> >     Then  do not need line 104-109, line 117-119.

Did you mean something like this?

for ((storeName, systemStream) <- changeLogSystemStreams) {
      val systemAdmin = Util.getObj[SystemFactory](config
        .getSystemFactory(systemStream.getSystem)
        .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem))
        ).getAdmin(systemStream.getSystem, config)
    
      systemAdmin.createChangelogStream(systemStream.getStream, changeLogPartitions)
    }


This is the only way I could thought of for simplifing this. I don't think what you posted would work, because you're using String's map function, but it did steer me in the right direction. Do you agree?


- Robert


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93019
-----------------------------------------------------------


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> -----------------------------------------------------------
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 
>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> -------
> 
> I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by Yan Fang <ya...@gmail.com>.

> On July 25, 2015, 1:33 a.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala, line 126
> > <https://reviews.apache.org/r/36163/diff/2/?file=1003380#file1003380line126>
> >
> >     add logs for the case where the topic is already existied. Log the metadata information. (like the original createStream code does)
> 
> Robert Zuljevic wrote:
>     This is already done via KafkaSystemAdmin's createChangelogStream method. Do you want me not to call this method there, but rather call it in JobCoordinator, right after cration?

no, they are different. What I mean is to add "additional" log here.   Something like 
_val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet)
    info("Got change log stream metadata: %s" format changeLogMetadata)_. The goal is to make sure we create correct changelog in the AM.


- Yan


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93019
-----------------------------------------------------------


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> -----------------------------------------------------------
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 
>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> -------
> 
> I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by Yan Fang <ya...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93019
-----------------------------------------------------------



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala (line 80)
<https://reviews.apache.org/r/36163/#comment147281>

    it should be config, not coordinatorSystemConfig because we need to update the config from the stream.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala (line 101)
<https://reviews.apache.org/r/36163/#comment147282>

    its private because its only used by this class.
    
    Also move this to the end of the class because it is good to put all the private methods together.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala (lines 121 - 125)
<https://reviews.apache.org/r/36163/#comment147283>

    this can be simplified a little:
    
    for ((storeName, systemStream) <- changeLogSystemStreams) {
          val systemAdmin = config
            .getSystemFactory(systemStream.getName)
            .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName)).map(Util.getObj[SystemFactory](_)).getOrElse(systemStream.getSystem,
              throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
              
              
    Then  do not need line 104-109, line 117-119.



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala (line 126)
<https://reviews.apache.org/r/36163/#comment147284>

    add logs for the case where the topic is already existied. Log the metadata information. (like the original createStream code does)


- Yan Fang


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> -----------------------------------------------------------
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 
>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> -------
> 
> I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review94687
-----------------------------------------------------------


@Robert, thanks! Code LGTM but need rebase. I will run the test after you rebase the change. Thanks!


samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala (line 77)
<https://reviews.apache.org/r/36163/#comment149295>

    nit: it would look a bit cleaner if this is wrapped in a function like getStreamMedadataCache()


- Yi Pan (Data Infrastructure)


On Aug. 6, 2015, 6:55 a.m., Robert Zuljevic wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> -----------------------------------------------------------
> 
> (Updated Aug. 6, 2015, 6:55 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Implemented suggestions received in the review. Most changes are for sake of simplicity and clarity.
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala f62161108f19ad29f40318331419438be5fb97b7 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c39cdc7ed86b591f379eda24005df25928bd8f16 
>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java 1fd5dd34f70cb701b4c134b27464863d445b35f3 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> -------
> 
> I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by Robert Zuljevic <r....@levi9.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/
-----------------------------------------------------------

(Updated Aug. 10, 2015, 7:50 a.m.)


Review request for samza.


Repository: samza


Description (updated)
-------

Removed unnecessary whitespaces.


Diffs (updated)
-----

  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
  samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ea2eaa5f663e717414b8a4b42123842105df150d 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c39cdc7ed86b591f379eda24005df25928bd8f16 
  samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
  samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 4097ac7fa1f2b70c13bf8b618808db1f688a99e6 
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java 1fd5dd34f70cb701b4c134b27464863d445b35f3 
  samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala f6d53e084771cfbfd1d1ca68f1ac54db0bd91067 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 

Diff: https://reviews.apache.org/r/36163/diff/


Testing
-------

I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.


Thanks,

Robert Zuljevic


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by Robert Zuljevic <r....@levi9.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/
-----------------------------------------------------------

(Updated Aug. 10, 2015, 7:47 a.m.)


Review request for samza.


Repository: samza


Description (updated)
-------

Rebased branch on latest master branch.


Diffs (updated)
-----

  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
  samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ea2eaa5f663e717414b8a4b42123842105df150d 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c39cdc7ed86b591f379eda24005df25928bd8f16 
  samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
  samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala 4097ac7fa1f2b70c13bf8b618808db1f688a99e6 
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java 1fd5dd34f70cb701b4c134b27464863d445b35f3 
  samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala f6d53e084771cfbfd1d1ca68f1ac54db0bd91067 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 

Diff: https://reviews.apache.org/r/36163/diff/


Testing
-------

I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.


Thanks,

Robert Zuljevic


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by Robert Zuljevic <r....@levi9.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/
-----------------------------------------------------------

(Updated Aug. 6, 2015, 6:55 a.m.)


Review request for samza.


Repository: samza


Description (updated)
-------

Implemented suggestions received in the review. Most changes are for sake of simplicity and clarity.


Diffs (updated)
-----

  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
  samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala f62161108f19ad29f40318331419438be5fb97b7 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala c39cdc7ed86b591f379eda24005df25928bd8f16 
  samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
  samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
  samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemAdmin.java 1fd5dd34f70cb701b4c134b27464863d445b35f3 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 

Diff: https://reviews.apache.org/r/36163/diff/


Testing
-------

I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.


Thanks,

Robert Zuljevic


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by Yan Fang <ya...@gmail.com>.

> On July 29, 2015, 2:45 p.m., Robert Zuljevic wrote:
> > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala, lines 121-125
> > <https://reviews.apache.org/r/36163/diff/2/?file=1003380#file1003380line121>
> >
> >     Did you mean something like this?
> >     
> >     for ((storeName, systemStream) <- changeLogSystemStreams) {
> >           val systemAdmin = Util.getObj[SystemFactory](config
> >             .getSystemFactory(systemStream.getSystem)
> >             .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem))
> >             ).getAdmin(systemStream.getSystem, config)
> >         
> >           systemAdmin.createChangelogStream(systemStream.getStream, changeLogPartitions)
> >         }
> >         
> >     This is the only way I could thought of for simplifing this. I don't think what you posted would work, because you're using String's map function, but it did steer me in the right direction. Do you agree?

yes, this is correct. :) Thanks.


- Yan


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93454
-----------------------------------------------------------


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> -----------------------------------------------------------
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 
>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> -------
> 
> I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by Robert Zuljevic <r....@levi9.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93454
-----------------------------------------------------------



samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala (lines 121 - 125)
<https://reviews.apache.org/r/36163/#comment147833>

    Did you mean something like this?
    
    for ((storeName, systemStream) <- changeLogSystemStreams) {
          val systemAdmin = Util.getObj[SystemFactory](config
            .getSystemFactory(systemStream.getSystem)
            .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem))
            ).getAdmin(systemStream.getSystem, config)
        
          systemAdmin.createChangelogStream(systemStream.getStream, changeLogPartitions)
        }
        
    This is the only way I could thought of for simplifing this. I don't think what you posted would work, because you're using String's map function, but it did steer me in the right direction. Do you agree?


- Robert Zuljevic


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> -----------------------------------------------------------
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 
>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> -------
> 
> I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by Robert Zuljevic <r....@levi9.com>.

> On July 27, 2015, 7:36 a.m., Yi Pan (Data Infrastructure) wrote:
> > The code LGTM. For testing, if we can verify this fix w/ a stateful StreamTask w/ changelog enabled with some partition numbers that are different from the default auto-creation partition number (i.e. 8) in Kafka, it would be good. The integration test suite in samza-test should be a good place to add the test there. Try following the steps in samza-test/src/main/config/join/README and run the integration test. The joiner task has a changelog configured with partition number of 2. You can verify the test passed w/ your fix.

Hi Yi, sorry for bothering you so much with this task : ) I'll just write down what I managed to do regarding integration tests:

1. I ran integration tests via Zopkio and they all finished successfully.
2. I ran the integration per guide in samza-test/src/main/config/join/README and I suspect they ran successfully, since none of them had an abnormal final status. I also ran the failure tests (albeit after some limited fidling with the python scripts involved).
3. I ran "./gradlew clean build" (which runs TestStatefulTask). It finished with a STANDARD_ERROR, which I assume is a good thing, but here is the output, just in case: http://pastebin.com/aLT5jRdd

What I suspect are the next (possible) steps:

1. Create integration tests to be used with Zopkio. Here I am uncertain how I would kill/stop Samza task to verify that changelog stream is being consumed properly.
2. Create another set of tasks similar to Checker/Emitter/Joiner/Watcher. I believe this is unnecessary since they have their changelogs and their restartability is being tested. Of course, I might be wrong.
3. Add another test similar to TestStatefulTask.
   a. Or add num.partitions param to TestStatefulTask.
4. None of the above : )
   
Again, I am very sorry for relying on you this much, but I'm really unclear on how to proceed regarding this.


- Robert


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93093
-----------------------------------------------------------


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> -----------------------------------------------------------
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 
>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> -------
> 
> I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.

> On July 27, 2015, 7:36 a.m., Yi Pan (Data Infrastructure) wrote:
> > The code LGTM. For testing, if we can verify this fix w/ a stateful StreamTask w/ changelog enabled with some partition numbers that are different from the default auto-creation partition number (i.e. 8) in Kafka, it would be good. The integration test suite in samza-test should be a good place to add the test there. Try following the steps in samza-test/src/main/config/join/README and run the integration test. The joiner task has a changelog configured with partition number of 2. You can verify the test passed w/ your fix.
> 
> Robert Zuljevic wrote:
>     Hi Yi, sorry for bothering you so much with this task : ) I'll just write down what I managed to do regarding integration tests:
>     
>     1. I ran integration tests via Zopkio and they all finished successfully.
>     2. I ran the integration per guide in samza-test/src/main/config/join/README and I suspect they ran successfully, since none of them had an abnormal final status. I also ran the failure tests (albeit after some limited fidling with the python scripts involved).
>     3. I ran "./gradlew clean build" (which runs TestStatefulTask). It finished with a STANDARD_ERROR, which I assume is a good thing, but here is the output, just in case: http://pastebin.com/aLT5jRdd
>     
>     What I suspect are the next (possible) steps:
>     
>     1. Create integration tests to be used with Zopkio. Here I am uncertain how I would kill/stop Samza task to verify that changelog stream is being consumed properly.
>     2. Create another set of tasks similar to Checker/Emitter/Joiner/Watcher. I believe this is unnecessary since they have their changelogs and their restartability is being tested. Of course, I might be wrong.
>     3. Add another test similar to TestStatefulTask.
>        a. Or add num.partitions param to TestStatefulTask.
>     4. None of the above : )
>        
>     Again, I am very sorry for relying on you this much, but I'm really unclear on how to proceed regarding this.

Hi, @Robert, sorry that I was not too specific in the comment before. If you have successfully ran the integration tests via the steps in samza-test/src/main/config/join/README. It should be good to go. Thanks!


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93093
-----------------------------------------------------------


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> -----------------------------------------------------------
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 
>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> -------
> 
> I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review93093
-----------------------------------------------------------


The code LGTM. For testing, if we can verify this fix w/ a stateful StreamTask w/ changelog enabled with some partition numbers that are different from the default auto-creation partition number (i.e. 8) in Kafka, it would be good. The integration test suite in samza-test should be a good place to add the test there. Try following the steps in samza-test/src/main/config/join/README and run the integration test. The joiner task has a changelog configured with partition number of 2. You can verify the test passed w/ your fix.


samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (line 132)
<https://reviews.apache.org/r/36163/#comment147349>

    Just want to make a note here. When Kafka admin API allows a read-only flag in fetchMetadata(), we should use it in validateChangelogStream() to avoid auto-creation of topic w/ un-wanted partition numbers.



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala (line 400)
<https://reviews.apache.org/r/36163/#comment147350>

    Nit: It would be good to put a note here: validateChangelogStream() should not be called before createChangelogStream(), before Kafka fixes the admin API to add a read-only flag in fetchMetadata().


- Yi Pan (Data Infrastructure)


On July 9, 2015, 2:39 p.m., Robert Zuljevic wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> -----------------------------------------------------------
> 
> (Updated July 9, 2015, 2:39 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Removed trailing whitespaces
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 
>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> -------
> 
> I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by Robert Zuljevic <r....@levi9.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/
-----------------------------------------------------------

(Updated July 9, 2015, 2:39 p.m.)


Review request for samza.


Repository: samza


Description (updated)
-------

Removed trailing whitespaces


Diffs (updated)
-----

  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
  samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 
  samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
  samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 

Diff: https://reviews.apache.org/r/36163/diff/


Testing
-------

I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.


Thanks,

Robert Zuljevic


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by "Gustavo Anatoly F. V. Solís" <gu...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/#review90906
-----------------------------------------------------------



samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java (line 62)
<https://reviews.apache.org/r/36163/#comment144063>

    Hi, Robert.
    
    There are trailing whitespaces, that should be removed. You can use git diff before to submit the patch and check if there are some spaces.
    
    Cheers.


- Gustavo Anatoly F. V. Solís


On Julho 3, 2015, 9:58 a.m., Robert Zuljevic wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36163/
> -----------------------------------------------------------
> 
> (Updated Julho 3, 2015, 9:58 a.m.)
> 
> 
> Review request for samza.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Moved the creation of change log stream from SamzaContainer to JobCoordinator
> 
> 
> Diffs
> -----
> 
>   samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
>   samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
>   samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 
>   samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
>   samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
>   samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
>   samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 
> 
> Diff: https://reviews.apache.org/r/36163/diff/
> 
> 
> Testing
> -------
> 
> I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.
> 
> 
> Thanks,
> 
> Robert Zuljevic
> 
>


Re: Review Request 36163: SAMZA-690: changelog topic creation should not be in the container code

Posted by Robert Zuljevic <r....@levi9.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36163/
-----------------------------------------------------------

(Updated July 3, 2015, 9:58 a.m.)


Review request for samza.


Repository: samza


Description
-------

Moved the creation of change log stream from SamzaContainer to JobCoordinator


Diffs
-----

  samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java 7a588ebc99b5f07d533e48e10061a3075a63665a 
  samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java 249b8ae3a904716ea51a2b27c7701ac30d13b854 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 8ee034a642a13af1b0fdb4ebbb3b2592bb8e2be1 
  samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala aeba61a95371faaba23c97d896321b8d95467f87 
  samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala 097f41062f3432ae9dc9a9737b48ed7b2f709f20 
  samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala 8d54c4639fc226b34e64915935c1d90e5917af2e 
  samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala d9ae187c7707673fe15c8cb7ea854e02c4a89a54 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala 35086f54f526d5d88ad3bc312b71fce40260e7c6 
  samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b063366f0f60e401765a000fa265c59dee4a461e 
  samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala 1e936b42a5b9a4bfb43766c17847b2947ebdb21d 

Diff: https://reviews.apache.org/r/36163/diff/


Testing (updated)
-------

I wasn't really sure what kind of test (unit test / integration test) I should make here, so any pointers would be greatly appreaciated! I tested the change with the unit/integration tests already available.


Thanks,

Robert Zuljevic