You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Navina Ramesh <nr...@linkedin.com> on 2016/07/11 22:27:49 UTC

Re: Review Request 48356: RFC: Samza as a library

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/
-----------------------------------------------------------

(Updated July 11, 2016, 10:27 p.m.)


Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).


Changes
-------

Waiting for the container to stop before shutting down JobCoordinator and executor. I chose the timeout as an arbitary value. I think we can reuse the existing container.shutdown.ms config.


Repository: samza


Description
-------

Added ConfigBuilder and support classes

Added JobCoordinator interfaces


Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface


Added TestStreamProcessor and some unit tests for ConfigBuilders


Changing who defined processorId


Fixed checkstyle errors


Replaced SamzaException with ConfigException


Removing localityManager instantiation from Samza Container


Diffs (updated)
-----

  build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
  checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
  samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
  samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
  samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
  samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
  samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 

Diff: https://reviews.apache.org/r/48356/diff/


Testing
-------

./gradlew clean build

Local integration test:
./bin/grid start zookeeper
./bin/grid start kafka
Then, run TestStreamProcessor.java


Thanks,

Navina Ramesh


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.

> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java, line 70
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441901#file1441901line70>
> >
> >     I don't fully understand the "unfortunate" part of this comment? In a fully fledged JobCoordinator, it needs to perform:
> >     - Leader election
> >     - Partition assignment (which includes SystemStreamPartitionGrouper)
> >     - Physical resource allocation (which includes TaskNameGrouper that groups tasks into a physical process (i.e. ContainerModel), and locality manager to report the physical location of the tasks)
> >     
> >     Hence, LocalityManager and TaskNameGrouper is coupled via JobCoordinator, anyways.
> >     
> >     Also, not sure what you meant by "groupers should be a property... rather the component that handles task distribution". There are two different types of groupers that do two different type of functions (partition assignment and resource allocation). And what do you mean by "not that of the job anymore"?

"unfortunate" part was probably just my frsutration when making the changes :P totally not necessary. 

I meant pretty much what you have listed as the different functionalities of the jobcoordinator. By saying, "not that of the job anymore", I was merely referring to the hierarchichal namespaces of the configs. Currently, the grouper is specified as a job-level property (job.systemstreampartition.grouper.factory). It makes sense to change this to the "job.coordinator" namespace as it is used only the jobCoordinator. Does tht make sense?


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review142183
-----------------------------------------------------------


On July 13, 2016, 9:58 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 13, 2016, 9:58 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.

> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 107
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line107>
> >
> >     How would it work w/ RegexConfigRewriter? We have use case where user will leave this empty and configure RegexConfigRewriter to fill it up at runtime. Are we making the case that this builder has to be called after RegexConfigRewriter is invoked?
> 
> Navina Ramesh wrote:
>     Since we are operating as a library, the user should be able to invoke re-writers before passing it to the StreamProcessor. User should invoke this builder and then, invoke any re-writers.
>     
>     ConfigBuilder builder = ConfigBuilder.getGenericConfigBuilder(...). ...
>     Config initialconfig = builder.build();
>     Config finalConfig = new RegExTopicGenerator().rewrite("regex-rewriter", initialConfig)
>     
>     Do you think this is not a suitable model? I wanted to make all the config related user-actions are independent of the processor lifecycle itself. This means that config rewrite is left up-to the user. They can use the regex rewriter class providing by the samza apis. 
>     
>     This does remind me to allow the user to set properties for rewriters in the ConfigBuilder. Now that does seem confusing and also, makes the validation tricky. Let's talk about this offline.

Actually, I felt that the config rewriter is a pretty hacky way to achieve the dynamic input topic discovery at first. And the further extended usage to rewrite other configuration is even more confusing. If possible, I would rather remove it from the user's eyes. Would it be possible to roll the invocation of rewriter() calls within the build() method, instead of relying on the user to invoke the rewriter explicitly? That seems to be a better option to me.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java, line 23
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441891#file1441891line23>
> >
> >     Is it true all standalone Samza processor uses this widecard grouper? I would prefer to have a specialized standalone config builder for each different type of standalone usage. Same for the TaskNameGrouperFactory configuration variable.
> >     
> >     P.S. also saw Chris' comment on not creating a base class w/o knowing what's the common base. Totally agree. Hence, it would make sense to name this "standalone" correctly s.t. it is clear that it only implements a specific type of Samza job. The name "standalone" creates some confusion since we also called ZK-based implementation as "standalone" as well.
> 
> Navina Ramesh wrote:
>     Technically speaking, the groupers are tied to which JobCoordinatorFactory we use, right? I think the grouper should reflect what kind of grouping it does, as opposed to which environment it ties into. 
>     I agree. Standalone is over-used. But I have been unable to come up with more sensible one. English language doesn't have enough words.

Actually, my question was more on whether "StandaloneConfigBuilder" is for all possible "standalone" Samza use cases, or specifically for a standalone stateless noop-partition assignment Samza-as-a-lib use case. If it is the later, that's exactly what I want.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, line 86
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441900#file1441900line86>
> >
> >     In addition, I don't see a use case where the life-cycle of JobCoordinator is outside of the life-cycle of StreamProcessor. It seems better to instantiate the JobCoordinator object within StreamProcessor, instead of passing in an already created object in the public constructor.
> 
> Navina Ramesh wrote:
>     Yes. But we could have more than one JobCoordinator implementation. I should ideally we inspecting the config object for JobCoordinatorFactory and use that. But it wasn't clear on what kind of parameters should be passed to the method in JobCoordinatorFactory. If we pass in Config (as we do with all other factory classes), it seems too generic. Also, right now, we have only one implementation. So, I decided to not use the JobCoordinatorFactory until the interface is finalized.

Aren't we already pass in the Config object anyways? Also, if we only have one implementation of JobCoordinatorFactory, wouldn't it be easier to instantiate the JobCoordinator object now? What I more concerned about is the management of the life cylce of JobCoordinator object. If it is created outside the StreamProcessor, who is managing the shutdown/close of the JobCoordinator instance? Shouldn't it anyways be the StreamProcessor? In principle, who creates the object would be responsible to close / shutdown it. Also, JobCoordinator is an internal object in Samza and ideally, shouldn't be accessible outside the StreamProcess object.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, line 118
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441900#file1441900line118>
> >
> >     Seems like this should be a generic constructor, not just for standalone. Also, isn't it true that we should be able to figure out what JobCoordinatorFactory we should use to instantiate the JobCoordinator instance from the Config object? Something like:
> >     
> >     JobCoordinatorFactory.getFactory(config).getJobCoordinator(config) should be more generic.
> 
> Navina Ramesh wrote:
>     Same response as above. 
>     It is indeed more generic. Should it remain getJobCoordinator(config) or getJobCoordinator(grouper, jobCoordinatorProperties) ?

As long as the instantiation of the JobCoordinator object is within StreamProcessor itself, I am less concerned about the actual implementation method. :)


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review142183
-----------------------------------------------------------


On Aug. 11, 2016, 1:23 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated Aug. 11, 2016, 1:23 a.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 90c1904772f2814eaa6e36ebd35c273f2c6c9217 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8c34c351ef044d187857b688cc3c02a1db 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala f786fc08c8f7eced4f4084dc8326b288888b6422 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176107e8faf47a15fb1eb591dc79c3c9d896 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.

> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java, line 24
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441885#file1441885line24>
> >
> >     So, what's the plan to make customized CheckpointConfig for each different checkpointManagerFactory? Derive a sub-class from this one?
> >     
> >     Generally, I noticed that there are a set of configuration variables are in this category:
> >     - configure a factory
> >     - a set of configure variables are only meaningful if factory = x
> >     
> >     It would be good to write up some documentation here to show case how we are dealing w/ the type of configurations above.

Yes. That's what I had in mind. We should use derived classes with customized configuration for a specific checkpoint manager factory. 

You are right. That pattern is precisely why I chose this typed model. That way, the configurations are more clear to the user and implicitly, indicate the namespace to which a configuration belongs to. 

I will add some documentation. I think it is time to put out a proper design document as well.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 31
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line31>
> >
> >     Just wonder, can we have an interface class ConfigBuilder as a common interface for all modular config builders, s.t. JobConfig, TaskConfig, etc. 
> >     And make SamzaConfigBuilder as an implementation of ConfigBuilder to hold all sub-builders?

I think that's how I started in the first draft. The ConfigBuilder interface has a "build()" method that returns a Map<String, String>. Chris made a valid point that having an interface with this build() method wasn't really  providing any additional benefit, other than making it composable. Did you have anything else in mind for creating the common interface?


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 32
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line32>
> >
> >     Add some javadoc here.

Yep. Documentation is pending everywhere  :|


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 58
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line58>
> >
> >     Here it seems like that you are mandating any config should have jobName and taskClass. However, I believe those two are not the only mandatory configuration variables. I would suggest that we remove these from the constructor of ConfigBuilder. Instead, make it more modular s.t. we can call: ConfigBuilder.getGenericConfigBuilder().jobConfig(jobFactory, jobName, jobId, coordinatorSystem).taskConfig(taskClass, taskInputs, ...)
> >     
> >     Then, finally, we can call build() to compile the complete configuration, in which we can call validate() for each modular config class (e.g. JobConfig.validate()) and also validate the inter-dependencies between the modular config classes (e.g. systems defined in JobConfig, StoreConfig, etc. must have a corresponding SystemConfig).
> >     
> >     P.S. just saw Chris' comment earlier. I think that if we can separate the ConfigBuilder into smaller modular sections, we can make the mandatory config variable in constructors.

Ok. I see the model you are proposing. 

1. It is pretty ideal for how our configs are structured today. However, it ends up being less intuitive user API. 

It seems more natural for the user to say: ConfigBuilder.getGenericBuilder().addTaskInput(systemconfigs, serdeconfigs) , instead of addTaskConfig, as it usually ends being more than just about the input. 

2. It is hard to tell which sub-configs are required or not, unless they are mandated in the constructor. For example, in ConfigBuilder.getGenericConfigBuilder().jobConfig(jobFactory, jobName, jobId, coordinatorSystem), jobId is optional. That means, we should provide overloaded constructors. If there are too many configurations within a namespace, we may end-up creating "sub-builders". I didn't want over-do this builder pattern without solid motivations. :) 

To your point on having a validate() method, there isn't a lot of validation we can do in most of our configs (at modular level), except for type and value range. Most validations seem inter-dependent. That's how I ended up with a very monolithic validation in build() method of ConfigBuidler.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 107
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line107>
> >
> >     How would it work w/ RegexConfigRewriter? We have use case where user will leave this empty and configure RegexConfigRewriter to fill it up at runtime. Are we making the case that this builder has to be called after RegexConfigRewriter is invoked?

Since we are operating as a library, the user should be able to invoke re-writers before passing it to the StreamProcessor. User should invoke this builder and then, invoke any re-writers.

ConfigBuilder builder = ConfigBuilder.getGenericConfigBuilder(...). ...
Config initialconfig = builder.build();
Config finalConfig = new RegExTopicGenerator().rewrite("regex-rewriter", initialConfig)

Do you think this is not a suitable model? I wanted to make all the config related user-actions are independent of the processor lifecycle itself. This means that config rewrite is left up-to the user. They can use the regex rewriter class providing by the samza apis. 

This does remind me to allow the user to set properties for rewriters in the ConfigBuilder. Now that does seem confusing and also, makes the validation tricky. Let's talk about this offline.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 110
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line110>
> >
> >     We have another config pattern here:
> >     - configuration variables on a entity (i.e. SystemStream in this case) are dispersed in multiple config modules (i.e. TaskConfig has task.inputs, while SystemConfig has the serde names and the replicate factors, and SerdeConfig has the acutal serde class name, etc.).There are more like this as changelog in StoreConfig also has a SystemStreamConfig which would be generated via StoreConfig.{key,msg}.serde and the corresponding SystemConfig. It would be really nice to call out those use patterns of configuration variables and describe how the new modular design handles:
> >     - encapsulation of the variables
> >     - relationship between the config modules

I thought I removed the SystemStreamConfig from the user-facing API. Right now, it private to the config builder.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 146
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line146>
> >
> >     Actually, CheckpointConfig only has the factory name in it. I assume that only KafkaCheckpointConfig would have the task.checkpoint.system set?

Actually no. Any custom checkpoint system that is used should have a system name. This configuration is at the task level in the current config. But in reality, only matter when checkpointing is enabled


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 240
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line240>
> >
> >     Can we make this as package private? As a private class within ConfigBuilder, this method shouldn't be called outside ConfigBuilder.build().

Ok


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java, line 19
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441890#file1441890line19>
> >
> >     I think that it would be OK to keep all config builders in the same org.apache.samza.config package.

I was hoping not to pollute the existing config files. These are used internally within samza code base and already, seems overpopulated with various scala and java versions :)


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java, line 30
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441890#file1441890line30>
> >
> >     Recently, Jon added a double serde as well.

Gotcha! Will add it


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java, line 23
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441891#file1441891line23>
> >
> >     Is it true all standalone Samza processor uses this widecard grouper? I would prefer to have a specialized standalone config builder for each different type of standalone usage. Same for the TaskNameGrouperFactory configuration variable.
> >     
> >     P.S. also saw Chris' comment on not creating a base class w/o knowing what's the common base. Totally agree. Hence, it would make sense to name this "standalone" correctly s.t. it is clear that it only implements a specific type of Samza job. The name "standalone" creates some confusion since we also called ZK-based implementation as "standalone" as well.

Technically speaking, the groupers are tied to which JobCoordinatorFactory we use, right? I think the grouper should reflect what kind of grouping it does, as opposed to which environment it ties into. 
I agree. Standalone is over-used. But I have been unable to come up with more sensible one. English language doesn't have enough words.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java, line 43
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441893#file1441893line43>
> >
> >     Do we handle broadcast stream in this grouper? If not, we should make it clear in the javadoc that this grouper won't work w/ broadcast stream. Or, better, fail the config validation if we found out this is the case.

Makes sense. I will document that.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, line 86
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441900#file1441900line86>
> >
> >     In addition, I don't see a use case where the life-cycle of JobCoordinator is outside of the life-cycle of StreamProcessor. It seems better to instantiate the JobCoordinator object within StreamProcessor, instead of passing in an already created object in the public constructor.

Yes. But we could have more than one JobCoordinator implementation. I should ideally we inspecting the config object for JobCoordinatorFactory and use that. But it wasn't clear on what kind of parameters should be passed to the method in JobCoordinatorFactory. If we pass in Config (as we do with all other factory classes), it seems too generic. Also, right now, we have only one implementation. So, I decided to not use the JobCoordinatorFactory until the interface is finalized.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, line 118
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441900#file1441900line118>
> >
> >     Seems like this should be a generic constructor, not just for standalone. Also, isn't it true that we should be able to figure out what JobCoordinatorFactory we should use to instantiate the JobCoordinator instance from the Config object? Something like:
> >     
> >     JobCoordinatorFactory.getFactory(config).getJobCoordinator(config) should be more generic.

Same response as above. 
It is indeed more generic. Should it remain getJobCoordinator(config) or getJobCoordinator(grouper, jobCoordinatorProperties) ?


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, line 130
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441900#file1441900line130>
> >
> >     Not sure whether we want to keep JmxServer life-cycle within the StreamProcessor life-cycle. This actually could be one thing that is shared w/ the whole JVM process and it can be passed in to the StreamProcessor.

Oh I see. So, will the users create an instance of JmxServer in samza library? We could make it default, I guess and not have the user configure anything.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java, line 44
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441901#file1441901line44>
> >
> >     I think that we should make the javadoc more clear. If this StandaloneJobCoordinator is an implementation that defines:
> >     - config based JobModel generation (i.e. configure via widecard groupers)
> >     - no leader election
> >     
> >     We should put these definitions in the javadoc here as well.

Ok. Makes sense.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala, line 80
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441903#file1441903line80>
> >
> >     Can we just name it job.coordinator.host-affinity.enabled? Not particularly a fan of creating a new config that is to be deprecated. ;)
> >     For non-YARN JobCoordinator, we can just fail the config validation stating that it is not supported yet, if that is the case.

Ah.. This is not a new config. I just moved it from YarnConfig to JobConfig class. Since it is an existing config, we should deprecate before rolling out the new config name


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java, line 104
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441912#file1441912line104>
> >
> >     I think that we should deprecate this one w/ job.coordinator.host-affinity.enabled. Maybe copying over this value and print a warning for now and remove completely later.

Yes.


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review142183
-----------------------------------------------------------


On July 13, 2016, 9:58 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 13, 2016, 9:58 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.

> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 31
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line31>
> >
> >     Just wonder, can we have an interface class ConfigBuilder as a common interface for all modular config builders, s.t. JobConfig, TaskConfig, etc. 
> >     And make SamzaConfigBuilder as an implementation of ConfigBuilder to hold all sub-builders?
> 
> Navina Ramesh wrote:
>     I think that's how I started in the first draft. The ConfigBuilder interface has a "build()" method that returns a Map<String, String>. Chris made a valid point that having an interface with this build() method wasn't really  providing any additional benefit, other than making it composable. Did you have anything else in mind for creating the common interface?

Since we are planning to remove this interface altogether, I am going to drop this requirement for a common interface :) *So glad to get rid of it* !


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review142183
-----------------------------------------------------------


On Aug. 18, 2016, 5:39 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated Aug. 18, 2016, 5:39 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/EnvironmentConfigRewriter.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/RegExTopicRewriter.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/RewriterConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 90c1904772f2814eaa6e36ebd35c273f2c6c9217 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8c34c351ef044d187857b688cc3c02a1db 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala f786fc08c8f7eced4f4084dc8326b288888b6422 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 383bb13b18ace639607541c1bf6d0f42569cd4ff 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 95a5aa0a23db2a890c19166b6031b2a3b96689f2 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176107e8faf47a15fb1eb591dc79c3c9d896 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.

> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, line 130
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441900#file1441900line130>
> >
> >     Not sure whether we want to keep JmxServer life-cycle within the StreamProcessor life-cycle. This actually could be one thing that is shared w/ the whole JVM process and it can be passed in to the StreamProcessor.
> 
> Navina Ramesh wrote:
>     Oh I see. So, will the users create an instance of JmxServer in samza library? We could make it default, I guess and not have the user configure anything.

Yi..I tried making the JmxServer to be single  entity shared with the whole JVM process. Looks like the "JvmServer" class is defined in org.apache.samza.metrics . It serves as a wrapper around the regular jmx server. So, are you suggesting that the users pass their JmxServer instance or that we should create an instance of JmxServer within a StreamProcessor and share it between the container and JobCoordinator?


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review142183
-----------------------------------------------------------


On July 13, 2016, 9:58 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 13, 2016, 9:58 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.

> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, line 130
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441900#file1441900line130>
> >
> >     Not sure whether we want to keep JmxServer life-cycle within the StreamProcessor life-cycle. This actually could be one thing that is shared w/ the whole JVM process and it can be passed in to the StreamProcessor.
> 
> Navina Ramesh wrote:
>     Oh I see. So, will the users create an instance of JmxServer in samza library? We could make it default, I guess and not have the user configure anything.
> 
> Navina Ramesh wrote:
>     Yi..I tried making the JmxServer to be single  entity shared with the whole JVM process. Looks like the "JvmServer" class is defined in org.apache.samza.metrics . It serves as a wrapper around the regular jmx server. So, are you suggesting that the users pass their JmxServer instance or that we should create an instance of JmxServer within a StreamProcessor and share it between the container and JobCoordinator?

Hi, Navina, I see. If the JmxServer is a Samza specific metric server, let's keep it within StreamProcessor. My point was that I don't see a reason for Samza to open and use a different JMX connector if user process already opened one. But if the whole JmxServer is Samza specific, let's keep it within Samza.


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review142183
-----------------------------------------------------------


On Aug. 11, 2016, 1:23 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated Aug. 11, 2016, 1:23 a.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 90c1904772f2814eaa6e36ebd35c273f2c6c9217 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8c34c351ef044d187857b688cc3c02a1db 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala f786fc08c8f7eced4f4084dc8326b288888b6422 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176107e8faf47a15fb1eb591dc79c3c9d896 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.

> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 58
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line58>
> >
> >     Here it seems like that you are mandating any config should have jobName and taskClass. However, I believe those two are not the only mandatory configuration variables. I would suggest that we remove these from the constructor of ConfigBuilder. Instead, make it more modular s.t. we can call: ConfigBuilder.getGenericConfigBuilder().jobConfig(jobFactory, jobName, jobId, coordinatorSystem).taskConfig(taskClass, taskInputs, ...)
> >     
> >     Then, finally, we can call build() to compile the complete configuration, in which we can call validate() for each modular config class (e.g. JobConfig.validate()) and also validate the inter-dependencies between the modular config classes (e.g. systems defined in JobConfig, StoreConfig, etc. must have a corresponding SystemConfig).
> >     
> >     P.S. just saw Chris' comment earlier. I think that if we can separate the ConfigBuilder into smaller modular sections, we can make the mandatory config variable in constructors.
> 
> Navina Ramesh wrote:
>     Ok. I see the model you are proposing. 
>     
>     1. It is pretty ideal for how our configs are structured today. However, it ends up being less intuitive user API. 
>     
>     It seems more natural for the user to say: ConfigBuilder.getGenericBuilder().addTaskInput(systemconfigs, serdeconfigs) , instead of addTaskConfig, as it usually ends being more than just about the input. 
>     
>     2. It is hard to tell which sub-configs are required or not, unless they are mandated in the constructor. For example, in ConfigBuilder.getGenericConfigBuilder().jobConfig(jobFactory, jobName, jobId, coordinatorSystem), jobId is optional. That means, we should provide overloaded constructors. If there are too many configurations within a namespace, we may end-up creating "sub-builders". I didn't want over-do this builder pattern without solid motivations. :) 
>     
>     To your point on having a validate() method, there isn't a lot of validation we can do in most of our configs (at modular level), except for type and value range. Most validations seem inter-dependent. That's how I ended up with a very monolithic validation in build() method of ConfigBuidler.

I agree w/ that the builder APIs have to be user-friendly. But it should be structed w/ sub-modules in implementation. Let's talk in person tomorrow.


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review142183
-----------------------------------------------------------


On Aug. 11, 2016, 1:23 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated Aug. 11, 2016, 1:23 a.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 90c1904772f2814eaa6e36ebd35c273f2c6c9217 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8c34c351ef044d187857b688cc3c02a1db 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala f786fc08c8f7eced4f4084dc8326b288888b6422 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176107e8faf47a15fb1eb591dc79c3c9d896 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.

> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 107
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line107>
> >
> >     How would it work w/ RegexConfigRewriter? We have use case where user will leave this empty and configure RegexConfigRewriter to fill it up at runtime. Are we making the case that this builder has to be called after RegexConfigRewriter is invoked?
> 
> Navina Ramesh wrote:
>     Since we are operating as a library, the user should be able to invoke re-writers before passing it to the StreamProcessor. User should invoke this builder and then, invoke any re-writers.
>     
>     ConfigBuilder builder = ConfigBuilder.getGenericConfigBuilder(...). ...
>     Config initialconfig = builder.build();
>     Config finalConfig = new RegExTopicGenerator().rewrite("regex-rewriter", initialConfig)
>     
>     Do you think this is not a suitable model? I wanted to make all the config related user-actions are independent of the processor lifecycle itself. This means that config rewrite is left up-to the user. They can use the regex rewriter class providing by the samza apis. 
>     
>     This does remind me to allow the user to set properties for rewriters in the ConfigBuilder. Now that does seem confusing and also, makes the validation tricky. Let's talk about this offline.
> 
> Yi Pan (Data Infrastructure) wrote:
>     Actually, I felt that the config rewriter is a pretty hacky way to achieve the dynamic input topic discovery at first. And the further extended usage to rewrite other configuration is even more confusing. If possible, I would rather remove it from the user's eyes. Would it be possible to roll the invocation of rewriter() calls within the build() method, instead of relying on the user to invoke the rewriter explicitly? That seems to be a better option to me.

Spoke with Yi offline. In order to maintain parity with the current features supported in Samza, I am going to include rewriter in the build() method. Eventually, we should remove the concept of Config Rewriters in Samza!


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review142183
-----------------------------------------------------------


On Aug. 11, 2016, 1:23 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated Aug. 11, 2016, 1:23 a.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 90c1904772f2814eaa6e36ebd35c273f2c6c9217 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8c34c351ef044d187857b688cc3c02a1db 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala f786fc08c8f7eced4f4084dc8326b288888b6422 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176107e8faf47a15fb1eb591dc79c3c9d896 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review142183
-----------------------------------------------------------



Thanks for pulling it off! I did the first round of review. If there is anything unclear, please let me know.


samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java (line 24)
<https://reviews.apache.org/r/48356/#comment207749>

    So, what's the plan to make customized CheckpointConfig for each different checkpointManagerFactory? Derive a sub-class from this one?
    
    Generally, I noticed that there are a set of configuration variables are in this category:
    - configure a factory
    - a set of configure variables are only meaningful if factory = x
    
    It would be good to write up some documentation here to show case how we are dealing w/ the type of configurations above.



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 31)
<https://reviews.apache.org/r/48356/#comment207823>

    Just wonder, can we have an interface class ConfigBuilder as a common interface for all modular config builders, s.t. JobConfig, TaskConfig, etc. 
    And make SamzaConfigBuilder as an implementation of ConfigBuilder to hold all sub-builders?



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 32)
<https://reviews.apache.org/r/48356/#comment207750>

    Add some javadoc here.



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 49)
<https://reviews.apache.org/r/48356/#comment207751>

    It is great that you are creating modular configuration objects here now! I would take the chance to also create JobConfig, TaskConfig, SerdeConfig, StoreConfig as well.



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 58)
<https://reviews.apache.org/r/48356/#comment207799>

    Here it seems like that you are mandating any config should have jobName and taskClass. However, I believe those two are not the only mandatory configuration variables. I would suggest that we remove these from the constructor of ConfigBuilder. Instead, make it more modular s.t. we can call: ConfigBuilder.getGenericConfigBuilder().jobConfig(jobFactory, jobName, jobId, coordinatorSystem).taskConfig(taskClass, taskInputs, ...)
    
    Then, finally, we can call build() to compile the complete configuration, in which we can call validate() for each modular config class (e.g. JobConfig.validate()) and also validate the inter-dependencies between the modular config classes (e.g. systems defined in JobConfig, StoreConfig, etc. must have a corresponding SystemConfig).
    
    P.S. just saw Chris' comment earlier. I think that if we can separate the ConfigBuilder into smaller modular sections, we can make the mandatory config variable in constructors.



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 71)
<https://reviews.apache.org/r/48356/#comment207800>

    



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 107)
<https://reviews.apache.org/r/48356/#comment207801>

    How would it work w/ RegexConfigRewriter? We have use case where user will leave this empty and configure RegexConfigRewriter to fill it up at runtime. Are we making the case that this builder has to be called after RegexConfigRewriter is invoked?



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 109)
<https://reviews.apache.org/r/48356/#comment207802>

    It seems that here you are trying to create a holistic configuration to describe each stream in task.inputs. I am curious what you have in mind as the organization of all the configures? Is it something like:
    TaskConfig -> task.inputs -> stream -> SystemStreamConfig? It would be good to have some documentation to show the modules and relationship between the modules in the config.



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 110)
<https://reviews.apache.org/r/48356/#comment207804>

    We have another config pattern here:
    - configuration variables on a entity (i.e. SystemStream in this case) are dispersed in multiple config modules (i.e. TaskConfig has task.inputs, while SystemConfig has the serde names and the replicate factors, and SerdeConfig has the acutal serde class name, etc.).There are more like this as changelog in StoreConfig also has a SystemStreamConfig which would be generated via StoreConfig.{key,msg}.serde and the corresponding SystemConfig. It would be really nice to call out those use patterns of configuration variables and describe how the new modular design handles:
    - encapsulation of the variables
    - relationship between the config modules



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 146)
<https://reviews.apache.org/r/48356/#comment207808>

    Actually, CheckpointConfig only has the factory name in it. I assume that only KafkaCheckpointConfig would have the task.checkpoint.system set?



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 169)
<https://reviews.apache.org/r/48356/#comment207809>

    I would prefer to encapsulate the validation in JobConfig.validate(), TaskConfig.validate(), etc.



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 240)
<https://reviews.apache.org/r/48356/#comment207813>

    Can we make this as package private? As a private class within ConfigBuilder, this method shouldn't be called outside ConfigBuilder.build().



samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java (line 55)
<https://reviews.apache.org/r/48356/#comment207816>

    



samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java (line 19)
<https://reviews.apache.org/r/48356/#comment207846>

    I think that it would be OK to keep all config builders in the same org.apache.samza.config package.



samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java (line 30)
<https://reviews.apache.org/r/48356/#comment207847>

    Recently, Jon added a double serde as well.



samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java (line 65)
<https://reviews.apache.org/r/48356/#comment207848>

    



samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java (line 23)
<https://reviews.apache.org/r/48356/#comment207849>

    Is it true all standalone Samza processor uses this widecard grouper? I would prefer to have a specialized standalone config builder for each different type of standalone usage. Same for the TaskNameGrouperFactory configuration variable.
    
    P.S. also saw Chris' comment on not creating a base class w/o knowing what's the common base. Totally agree. Hence, it would make sense to name this "standalone" correctly s.t. it is clear that it only implements a specific type of Samza job. The name "standalone" creates some confusion since we also called ZK-based implementation as "standalone" as well.



samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java (line 38)
<https://reviews.apache.org/r/48356/#comment207850>

    nit: It would be good to document here what it means by setting the key and msg serdes to null. From the code, it seems that it means pass-through (no serde).



samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java (line 43)
<https://reviews.apache.org/r/48356/#comment207851>

    Do we handle broadcast stream in this grouper? If not, we should make it clear in the javadoc that this grouper won't work w/ broadcast stream. Or, better, fail the config validation if we found out this is the case.



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 86)
<https://reviews.apache.org/r/48356/#comment207862>

    In addition, I don't see a use case where the life-cycle of JobCoordinator is outside of the life-cycle of StreamProcessor. It seems better to instantiate the JobCoordinator object within StreamProcessor, instead of passing in an already created object in the public constructor.



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 118)
<https://reviews.apache.org/r/48356/#comment207861>

    Seems like this should be a generic constructor, not just for standalone. Also, isn't it true that we should be able to figure out what JobCoordinatorFactory we should use to instantiate the JobCoordinator instance from the Config object? Something like:
    
    JobCoordinatorFactory.getFactory(config).getJobCoordinator(config) should be more generic.



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 130)
<https://reviews.apache.org/r/48356/#comment207865>

    Not sure whether we want to keep JmxServer life-cycle within the StreamProcessor life-cycle. This actually could be one thing that is shared w/ the whole JVM process and it can be passed in to the StreamProcessor.



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 138)
<https://reviews.apache.org/r/48356/#comment207866>

    nit: the order of operations in the comments is not consistent w/ the order of operations in the code.



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 176)
<https://reviews.apache.org/r/48356/#comment207868>

    



samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java (line 44)
<https://reviews.apache.org/r/48356/#comment207870>

    I think that we should make the javadoc more clear. If this StandaloneJobCoordinator is an implementation that defines:
    - config based JobModel generation (i.e. configure via widecard groupers)
    - no leader election
    
    We should put these definitions in the javadoc here as well.



samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java (line 70)
<https://reviews.apache.org/r/48356/#comment207873>

    I don't fully understand the "unfortunate" part of this comment? In a fully fledged JobCoordinator, it needs to perform:
    - Leader election
    - Partition assignment (which includes SystemStreamPartitionGrouper)
    - Physical resource allocation (which includes TaskNameGrouper that groups tasks into a physical process (i.e. ContainerModel), and locality manager to report the physical location of the tasks)
    
    Hence, LocalityManager and TaskNameGrouper is coupled via JobCoordinator, anyways.
    
    Also, not sure what you meant by "groupers should be a property... rather the component that handles task distribution". There are two different types of groupers that do two different type of functions (partition assignment and resource allocation). And what do you mean by "not that of the job anymore"?



samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java (line 28)
<https://reviews.apache.org/r/48356/#comment207875>

    nit: it would be nice to perform a validation of config to see whether it truly is a standalone config.



samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala (line 80)
<https://reviews.apache.org/r/48356/#comment207877>

    Can we just name it job.coordinator.host-affinity.enabled? Not particularly a fan of creating a new config that is to be deprecated. ;)
    For non-YARN JobCoordinator, we can just fail the config validation stating that it is not supported yet, if that is the case.



samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
<https://reviews.apache.org/r/48356/#comment207880>

    I think that we should deprecate this one w/ job.coordinator.host-affinity.enabled. Maybe copying over this value and print a warning for now and remove completely later.


- Yi Pan (Data Infrastructure)


On July 13, 2016, 9:58 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 13, 2016, 9:58 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.

> On Aug. 17, 2016, 8:36 a.m., Yi Pan (Data Infrastructure) wrote:
> > A quick question: w/ the recent discussion, are you planning to remove the ConfigBuilder interface and add a StreamProcessorFactory?

Yes Yi. As we discussed online, I think we should commit this RB (as is) in order to unblock datastream folks and they can start testing. After this I will put an updated patch without the ConfigBuilder APIs and add support for offspring.


> On Aug. 17, 2016, 8:36 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/EnvironmentConfigRewriter.java, line 24
> > <https://reviews.apache.org/r/48356/diff/13/?file=1471812#file1471812line24>
> >
> >     What's the use case of this rewriter class? Some javadoc would be nice.

Yeah. This apparantly puts all system environment variables as a config. Not sure why it was introduced. We have always had it :)


> On Aug. 17, 2016, 8:36 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/RewriterConfig.java, line 41
> > <https://reviews.apache.org/r/48356/diff/13/?file=1471817#file1471817line41>
> >
> >     Can we keep it package-private? Any external reference to this method?

Sure


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review145967
-----------------------------------------------------------


On Aug. 12, 2016, 3:29 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated Aug. 12, 2016, 3:29 a.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/EnvironmentConfigRewriter.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/RegExTopicRewriter.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/RewriterConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 90c1904772f2814eaa6e36ebd35c273f2c6c9217 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8c34c351ef044d187857b688cc3c02a1db 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala f786fc08c8f7eced4f4084dc8326b288888b6422 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 383bb13b18ace639607541c1bf6d0f42569cd4ff 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 95a5aa0a23db2a890c19166b6031b2a3b96689f2 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176107e8faf47a15fb1eb591dc79c3c9d896 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review145967
-----------------------------------------------------------



A quick question: w/ the recent discussion, are you planning to remove the ConfigBuilder interface and add a StreamProcessorFactory?


samza-core/src/main/java/org/apache/samza/configbuilder/EnvironmentConfigRewriter.java (line 24)
<https://reviews.apache.org/r/48356/#comment212324>

    What's the use case of this rewriter class? Some javadoc would be nice.



samza-core/src/main/java/org/apache/samza/configbuilder/RewriterConfig.java (line 41)
<https://reviews.apache.org/r/48356/#comment212325>

    Can we keep it package-private? Any external reference to this method?


- Yi Pan (Data Infrastructure)


On Aug. 12, 2016, 3:29 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated Aug. 12, 2016, 3:29 a.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/EnvironmentConfigRewriter.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/RegExTopicRewriter.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/RewriterConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 90c1904772f2814eaa6e36ebd35c273f2c6c9217 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8c34c351ef044d187857b688cc3c02a1db 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala f786fc08c8f7eced4f4084dc8326b288888b6422 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 383bb13b18ace639607541c1bf6d0f42569cd4ff 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 95a5aa0a23db2a890c19166b6031b2a3b96689f2 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176107e8faf47a15fb1eb591dc79c3c9d896 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by "Yi Pan (Data Infrastructure)" <yi...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review146232
-----------------------------------------------------------


Ship it!




Ship It!

- Yi Pan (Data Infrastructure)


On Aug. 18, 2016, 5:39 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated Aug. 18, 2016, 5:39 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/EnvironmentConfigRewriter.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/RegExTopicRewriter.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/RewriterConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 90c1904772f2814eaa6e36ebd35c273f2c6c9217 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8c34c351ef044d187857b688cc3c02a1db 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala f786fc08c8f7eced4f4084dc8326b288888b6422 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce 
>   samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 383bb13b18ace639607541c1bf6d0f42569cd4ff 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 95a5aa0a23db2a890c19166b6031b2a3b96689f2 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176107e8faf47a15fb1eb591dc79c3c9d896 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/
-----------------------------------------------------------

(Updated Aug. 18, 2016, 5:39 p.m.)


Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).


Repository: samza


Description
-------

Added ConfigBuilder and support classes

Added JobCoordinator interfaces


Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface


Added TestStreamProcessor and some unit tests for ConfigBuilders


Changing who defined processorId


Fixed checkstyle errors


Replaced SamzaException with ConfigException


Removing localityManager instantiation from Samza Container


Diffs (updated)
-----

  build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
  checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
  samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/EnvironmentConfigRewriter.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/RegExTopicRewriter.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/RewriterConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 90c1904772f2814eaa6e36ebd35c273f2c6c9217 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8c34c351ef044d187857b688cc3c02a1db 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala f786fc08c8f7eced4f4084dc8326b288888b6422 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce 
  samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 383bb13b18ace639607541c1bf6d0f42569cd4ff 
  samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala 95a5aa0a23db2a890c19166b6031b2a3b96689f2 
  samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176107e8faf47a15fb1eb591dc79c3c9d896 
  samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
  samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 

Diff: https://reviews.apache.org/r/48356/diff/


Testing
-------

./gradlew clean build

Local integration test:
./bin/grid start zookeeper
./bin/grid start kafka
Then, run TestStreamProcessor.java


Thanks,

Navina Ramesh


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/
-----------------------------------------------------------

(Updated Aug. 12, 2016, 3:29 a.m.)


Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).


Changes
-------

Removing JobCoordinator from the user-facing APISs


Repository: samza


Description
-------

Added ConfigBuilder and support classes

Added JobCoordinator interfaces


Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface


Added TestStreamProcessor and some unit tests for ConfigBuilders


Changing who defined processorId


Fixed checkstyle errors


Replaced SamzaException with ConfigException


Removing localityManager instantiation from Samza Container


Diffs (updated)
-----

  build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
  checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
  samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/EnvironmentConfigRewriter.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/RegExTopicRewriter.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/RewriterConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 90c1904772f2814eaa6e36ebd35c273f2c6c9217 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8c34c351ef044d187857b688cc3c02a1db 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala f786fc08c8f7eced4f4084dc8326b288888b6422 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce 
  samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 383bb13b18ace639607541c1bf6d0f42569cd4ff 
  samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
  samza-core/src/main/scala/org/apache/samza/util/Util.scala 95a5aa0a23db2a890c19166b6031b2a3b96689f2 
  samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176107e8faf47a15fb1eb591dc79c3c9d896 
  samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
  samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 

Diff: https://reviews.apache.org/r/48356/diff/


Testing
-------

./gradlew clean build

Local integration test:
./bin/grid start zookeeper
./bin/grid start kafka
Then, run TestStreamProcessor.java


Thanks,

Navina Ramesh


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/
-----------------------------------------------------------

(Updated Aug. 11, 2016, 1:23 a.m.)


Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).


Repository: samza


Description
-------

Added ConfigBuilder and support classes

Added JobCoordinator interfaces


Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface


Added TestStreamProcessor and some unit tests for ConfigBuilders


Changing who defined processorId


Fixed checkstyle errors


Replaced SamzaException with ConfigException


Removing localityManager instantiation from Samza Container


Diffs (updated)
-----

  build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
  checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
  samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 90c1904772f2814eaa6e36ebd35c273f2c6c9217 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 538ebb8c34c351ef044d187857b688cc3c02a1db 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala f786fc08c8f7eced4f4084dc8326b288888b6422 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce 
  samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
  samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
  samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b574176107e8faf47a15fb1eb591dc79c3c9d896 
  samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
  samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 

Diff: https://reviews.apache.org/r/48356/diff/


Testing
-------

./gradlew clean build

Local integration test:
./bin/grid start zookeeper
./bin/grid start kafka
Then, run TestStreamProcessor.java


Thanks,

Navina Ramesh


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/
-----------------------------------------------------------

(Updated July 13, 2016, 9:58 p.m.)


Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).


Repository: samza


Description
-------

Added ConfigBuilder and support classes

Added JobCoordinator interfaces


Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface


Added TestStreamProcessor and some unit tests for ConfigBuilders


Changing who defined processorId


Fixed checkstyle errors


Replaced SamzaException with ConfigException


Removing localityManager instantiation from Samza Container


Diffs (updated)
-----

  build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
  checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
  samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
  samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
  samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
  samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
  samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 

Diff: https://reviews.apache.org/r/48356/diff/


Testing
-------

./gradlew clean build

Local integration test:
./bin/grid start zookeeper
./bin/grid start kafka
Then, run TestStreamProcessor.java


Thanks,

Navina Ramesh


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/
-----------------------------------------------------------

(Updated July 12, 2016, 10:02 p.m.)


Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).


Changes
-------

Removing JobModelUpdateHandler for now and not exposing ExecutorService to the user in StreamProcessor


Repository: samza


Description
-------

Added ConfigBuilder and support classes

Added JobCoordinator interfaces


Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface


Added TestStreamProcessor and some unit tests for ConfigBuilders


Changing who defined processorId


Fixed checkstyle errors


Replaced SamzaException with ConfigException


Removing localityManager instantiation from Samza Container


Diffs (updated)
-----

  build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
  checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
  samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
  samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
  samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
  samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
  samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 

Diff: https://reviews.apache.org/r/48356/diff/


Testing
-------

./gradlew clean build

Local integration test:
./bin/grid start zookeeper
./bin/grid start kafka
Then, run TestStreamProcessor.java


Thanks,

Navina Ramesh


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/
-----------------------------------------------------------

(Updated July 12, 2016, 6:45 p.m.)


Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).


Changes
-------

Addressed most of Fred's feedback/comments


Repository: samza


Description
-------

Added ConfigBuilder and support classes

Added JobCoordinator interfaces


Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface


Added TestStreamProcessor and some unit tests for ConfigBuilders


Changing who defined processorId


Fixed checkstyle errors


Replaced SamzaException with ConfigException


Removing localityManager instantiation from Samza Container


Diffs (updated)
-----

  build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
  checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
  samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
  samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
  samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
  samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
  samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 

Diff: https://reviews.apache.org/r/48356/diff/


Testing
-------

./gradlew clean build

Local integration test:
./bin/grid start zookeeper
./bin/grid start kafka
Then, run TestStreamProcessor.java


Thanks,

Navina Ramesh


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.

> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java, line 131
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441777#file1441777line131>
> >
> >     [INFO Question] Just curious, Is there a required code convention for samza code base here? Do we do the following pattern?
> >     
> >     if (....) {
> >       ....;
> >     }

I don't believe we have been strict on coding convention. The convention you suggested is what, I believe, is used at LinkedIn. We are not very stric on open-source. As long as the code is readable, it is ok.


> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 92
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441779#file1441779line92>
> >
> >     Maybe if(!StringUtils.isBlank())? Since we would like to have a valid name.

Is StringUtils part of java.lang? Or are you referring to some other library?


> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 144
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441779#file1441779line144>
> >
> >     Would it be more accurate to call it STREAM_KEY_SERDE_FORMATTING_STRING instead of STREAM_KEY_SERDE_REGEX? regex has a different rule.

Yeah makes sense!


> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 154
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441779#file1441779line154>
> >
> >     Or just use StringUtils.isEmpty(streamName) or even StringUtils.isBlank(streamName)?

Is StringUtils part of java.lang? Or are you referring to some other library?


> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java, line 28
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441781#file1441781line28>
> >
> >     [Info Question] what is the reason to have 3 and 26214400 as default values here?

Replication factor indicates the number of replicas for the data in checkpoint topic. 3 is usually the recommended replication factor. 
Not sure about the segment bytes - 26214400.  This will be a good question for the Kafka team


> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java, line 59
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441781#file1441781line59>
> >
> >     Why do we have systemName here since we have systemConfig in the result map which has systemName already?

systemname serves as an indirection for the set of properties associated with a systemName. Systemname is essentially an identifier for the set of properties. Think of task.checkpoint.system as a "pointer". 
Samza configurations have some standard patterns. A complete list is available here -  http://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html


> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java, line 26
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441782#file1441782line26>
> >
> >     Similar to a previous comment. Maybe more proper to call it FORMATTING_STRING instead of REGEX?

Makes sense!


> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java, line 43
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441783#file1441783line43>
> >
> >     Could serdeAlias be null accidentally? If so, it would be better to have a null check for serdeAlias.

SerdeAlias is an enum. I don't think you can specify a "null" value for enum.


> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java, line 34
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441786#file1441786line34>
> >
> >     Could ssps be null?

ssps cannot be null. But doesn't hurt to check for it and avoid some random NPE. I will fix it. Thanks for noticing!


> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, line 166
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441793#file1441793line166>
> >
> >     Would it be better to log separately for these two exceptions for trouble shooting purpose if issues happen?

Not much can be inferrred from InterruptedException and ExecutionException. So, the log message is going to be the same. Didn't think it was necessary to duplicate log line in another code block as I felt it didn't add much value.


> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala, line 29
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441797#file1441797line29>
> >
> >     I know it is from previous code, but do you mind explaining or putting a comment regarding what -1L means here?

Argh. It is kind of obvious to me. I can add some documentation here! :)
negative value basically means window() method is never called. This configuration serves dual purpose - flag that indicates if window() is enabled or not AND the window time interval in ms.


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review141799
-----------------------------------------------------------


On July 12, 2016, midnight, Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 12, 2016, midnight)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.

> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 92
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441779#file1441779line92>
> >
> >     Maybe if(!StringUtils.isBlank())? Since we would like to have a valid name.
> 
> Navina Ramesh wrote:
>     Is StringUtils part of java.lang? Or are you referring to some other library?
> 
> Fred Ji wrote:
>     It is org.apache.commons.lang.StringUtils
> 
> Navina Ramesh wrote:
>     Fred, We already have guava commons added. I don't want to overload the core package. For now, it is understood that a config value cannot be empty or whitespace. Let's leave the checks as it is for now.
> 
> Fred Ji wrote:
>     I see. If we use guava commons lib, there is a better similar static function:
>     Strings.isNullOrEmpty(@Nullable String string)

Awesome. I looked for a while and didn't find this. I will use the guava version of the function. Thanks for the tip! :)


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review141799
-----------------------------------------------------------


On July 12, 2016, 10:02 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 12, 2016, 10:02 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.

> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java, line 43
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441783#file1441783line43>
> >
> >     Could serdeAlias be null accidentally? If so, it would be better to have a null check for serdeAlias.
> 
> Navina Ramesh wrote:
>     SerdeAlias is an enum. I don't think you can specify a "null" value for enum.
> 
> Fred Ji wrote:
>     null can be assigned to an enum in Java.

Ah.. it doesn't recognize constructor of type (String, Object) for obvious reasons :P  So, new SerdeConfig("name", null) is not allowed. 
However, it doesn't stop someone from specifying:  SerdeAlias x = null; new SerdeConfig("name", x); I will add the check. Thanks!


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review141799
-----------------------------------------------------------


On July 12, 2016, 6:45 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 12, 2016, 6:45 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Fred Ji <fj...@linkedin.com>.

> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 92
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441779#file1441779line92>
> >
> >     Maybe if(!StringUtils.isBlank())? Since we would like to have a valid name.
> 
> Navina Ramesh wrote:
>     Is StringUtils part of java.lang? Or are you referring to some other library?
> 
> Fred Ji wrote:
>     It is org.apache.commons.lang.StringUtils
> 
> Navina Ramesh wrote:
>     Fred, We already have guava commons added. I don't want to overload the core package. For now, it is understood that a config value cannot be empty or whitespace. Let's leave the checks as it is for now.

I see. If we use guava commons lib, there is a better similar static function:
Strings.isNullOrEmpty(@Nullable String string)


- Fred


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review141799
-----------------------------------------------------------


On July 12, 2016, 10:02 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 12, 2016, 10:02 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Fred Ji <fj...@linkedin.com>.

> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 92
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441779#file1441779line92>
> >
> >     Maybe if(!StringUtils.isBlank())? Since we would like to have a valid name.
> 
> Navina Ramesh wrote:
>     Is StringUtils part of java.lang? Or are you referring to some other library?
> 
> Fred Ji wrote:
>     It is org.apache.commons.lang.StringUtils
> 
> Navina Ramesh wrote:
>     Fred, We already have guava commons added. I don't want to overload the core package. For now, it is understood that a config value cannot be empty or whitespace. Let's leave the checks as it is for now.
> 
> Fred Ji wrote:
>     I see. If we use guava commons lib, there is a better similar static function:
>     Strings.isNullOrEmpty(@Nullable String string)
> 
> Navina Ramesh wrote:
>     Awesome. I looked for a while and didn't find this. I will use the guava version of the function. Thanks for the tip! :)

The package is com.google.common.base.Strings which is core for guava.


- Fred


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review141799
-----------------------------------------------------------


On July 12, 2016, 10:02 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 12, 2016, 10:02 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.

> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 92
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441779#file1441779line92>
> >
> >     Maybe if(!StringUtils.isBlank())? Since we would like to have a valid name.
> 
> Navina Ramesh wrote:
>     Is StringUtils part of java.lang? Or are you referring to some other library?
> 
> Fred Ji wrote:
>     It is org.apache.commons.lang.StringUtils

Fred, We already have guava commons added. I don't want to overload the core package. For now, it is understood that a config value cannot be empty or whitespace. Let's leave the checks as it is for now.


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review141799
-----------------------------------------------------------


On July 12, 2016, 10:02 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 12, 2016, 10:02 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Fred Ji <fj...@linkedin.com>.

> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 92
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441779#file1441779line92>
> >
> >     Maybe if(!StringUtils.isBlank())? Since we would like to have a valid name.
> 
> Navina Ramesh wrote:
>     Is StringUtils part of java.lang? Or are you referring to some other library?

It is org.apache.commons.lang.StringUtils


> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, line 154
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441779#file1441779line154>
> >
> >     Or just use StringUtils.isEmpty(streamName) or even StringUtils.isBlank(streamName)?
> 
> Navina Ramesh wrote:
>     Is StringUtils part of java.lang? Or are you referring to some other library?

It is org.apache.commons.lang.StringUtils


> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java, line 43
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441783#file1441783line43>
> >
> >     Could serdeAlias be null accidentally? If so, it would be better to have a null check for serdeAlias.
> 
> Navina Ramesh wrote:
>     SerdeAlias is an enum. I don't think you can specify a "null" value for enum.

null can be assigned to an enum in Java.


> On July 12, 2016, 12:15 a.m., Fred Ji wrote:
> > samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java, line 131
> > <https://reviews.apache.org/r/48356/diff/7/?file=1441777#file1441777line131>
> >
> >     [INFO Question] Just curious, Is there a required code convention for samza code base here? Do we do the following pattern?
> >     
> >     if (....) {
> >       ....;
> >     }
> 
> Navina Ramesh wrote:
>     I don't believe we have been strict on coding convention. The convention you suggested is what, I believe, is used at LinkedIn. We are not very stric on open-source. As long as the code is readable, it is ok.

Sounds good to me. Thanks!


- Fred


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review141799
-----------------------------------------------------------


On July 12, 2016, 6:45 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 12, 2016, 6:45 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Fred Ji <fj...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review141799
-----------------------------------------------------------



Thanks a lot for the RB! I only have a few INFO questions to learn more details and MINOR comments as below. Thanks!


samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java (line 131)
<https://reviews.apache.org/r/48356/#comment207185>

    [INFO Question] Just curious, Is there a required code convention for samza code base here? Do we do the following pattern?
    
    if (....) {
      ....;
    }



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 92)
<https://reviews.apache.org/r/48356/#comment207186>

    Maybe if(!StringUtils.isBlank())? Since we would like to have a valid name.



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 144)
<https://reviews.apache.org/r/48356/#comment207188>

    Would it be more accurate to call it STREAM_KEY_SERDE_FORMATTING_STRING instead of STREAM_KEY_SERDE_REGEX? regex has a different rule.



samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 154)
<https://reviews.apache.org/r/48356/#comment207187>

    Or just use StringUtils.isEmpty(streamName) or even StringUtils.isBlank(streamName)?



samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java (line 28)
<https://reviews.apache.org/r/48356/#comment207190>

    [Info Question] what is the reason to have 3 and 26214400 as default values here?



samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java (line 59)
<https://reviews.apache.org/r/48356/#comment207191>

    Why do we have systemName here since we have systemConfig in the result map which has systemName already?



samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java (line 26)
<https://reviews.apache.org/r/48356/#comment207192>

    Similar to a previous comment. Maybe more proper to call it FORMATTING_STRING instead of REGEX?



samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java (line 43)
<https://reviews.apache.org/r/48356/#comment207193>

    Could serdeAlias be null accidentally? If so, it would be better to have a null check for serdeAlias.



samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java (line 34)
<https://reviews.apache.org/r/48356/#comment207194>

    Could ssps be null?



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 166)
<https://reviews.apache.org/r/48356/#comment207195>

    Would it be better to log separately for these two exceptions for trouble shooting purpose if issues happen?



samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala (line 29)
<https://reviews.apache.org/r/48356/#comment207196>

    I know it is from previous code, but do you mind explaining or putting a comment regarding what -1L means here?


- Fred Ji


On July 12, 2016, midnight, Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 12, 2016, midnight)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.

> On July 12, 2016, 6:43 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, lines 174-176
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441900#file1441900line174>
> >
> >     Don't we need to synchronize these? Are we  guaranteed that JobModelUpdateHandler cannot be invoked concurrently? If so, let's document this.

Yeah. I thought about this. I know that JobModelUpdateHandler is not going to be invoked concurrently. However, right now, it is not even called :) I am open to fully removing this interface until the ZK based design is more concrete.


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review141942
-----------------------------------------------------------


On July 12, 2016, 6:45 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 12, 2016, 6:45 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Chris Pettitt <cp...@linkedin.com>.

> On July 12, 2016, 6:43 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, lines 174-176
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441900#file1441900line174>
> >
> >     Don't we need to synchronize these? Are we  guaranteed that JobModelUpdateHandler cannot be invoked concurrently? If so, let's document this.
> 
> Navina Ramesh wrote:
>     Yeah. I thought about this. I know that JobModelUpdateHandler is not going to be invoked concurrently. However, right now, it is not even called :) I am open to fully removing this interface until the ZK based design is more concrete.

Either approach - documenting or removing - works.


- Chris


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review141942
-----------------------------------------------------------


On July 12, 2016, 6:45 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 12, 2016, 6:45 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Chris Pettitt <cp...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review141942
-----------------------------------------------------------


Fix it, then Ship it!




Noticed one issue while responding to your previous round of comments.


samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (lines 174 - 176)
<https://reviews.apache.org/r/48356/#comment207386>

    Don't we need to synchronize these? Are we  guaranteed that JobModelUpdateHandler cannot be invoked concurrently? If so, let's document this.


- Chris Pettitt


On July 12, 2016, midnight, Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 12, 2016, midnight)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
>   samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/
-----------------------------------------------------------

(Updated July 12, 2016, midnight)


Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).


Repository: samza


Description
-------

Added ConfigBuilder and support classes

Added JobCoordinator interfaces


Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface


Added TestStreamProcessor and some unit tests for ConfigBuilders


Changing who defined processorId


Fixed checkstyle errors


Replaced SamzaException with ConfigException


Removing localityManager instantiation from Samza Container


Diffs (updated)
-----

  build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
  checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
  samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
  samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
  samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
  samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
  samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 

Diff: https://reviews.apache.org/r/48356/diff/


Testing
-------

./gradlew clean build

Local integration test:
./bin/grid start zookeeper
./bin/grid start kafka
Then, run TestStreamProcessor.java


Thanks,

Navina Ramesh


Re: Review Request 48356: RFC: Samza as a library

Posted by Navina Ramesh <nr...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/
-----------------------------------------------------------

(Updated July 11, 2016, 10:47 p.m.)


Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).


Repository: samza


Description
-------

Added ConfigBuilder and support classes

Added JobCoordinator interfaces


Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer interface


Added TestStreamProcessor and some unit tests for ConfigBuilders


Changing who defined processorId


Fixed checkstyle errors


Replaced SamzaException with ConfigException


Removing localityManager instantiation from Samza Container


Diffs (updated)
-----

  build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
  checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
  samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 021d42a70179f5d14f51ac87cb09dcc97218095e 
  samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java PRE-CREATION 
  samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java PRE-CREATION 
  samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
  samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 08a4debb06f9925ae741049abb2ee0df97b2243b 
  samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
  samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 18c09224bbae959342daf9b2b7a7d971cc224f48 
  samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala d3bd9b7c11afd44ccfb681b660fefffafd216c29 
  samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 56881d46be9f859999adabbbda20433b208e012e 
  samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java PRE-CREATION 
  samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java PRE-CREATION 
  samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java PRE-CREATION 
  samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
  samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java bc95f31c0dcaaa68d483a6f152b61aba6c543fff 

Diff: https://reviews.apache.org/r/48356/diff/


Testing
-------

./gradlew clean build

Local integration test:
./bin/grid start zookeeper
./bin/grid start kafka
Then, run TestStreamProcessor.java


Thanks,

Navina Ramesh