You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2016/12/09 04:05:43 UTC

Running cluster of stream processing application

Hi All,
We were able to run a stream processing application against a fairly decent
load of messages in production environment.

To make the system robust say the stream processing application crashes, is
there a way to make it auto start from the point when it crashed?

Also is there any concept like running the same application in a cluster,
where one fails, other takes over, until we bring back up the failed node
of streams application.

If yes, is there any guidelines or some knowledge base we can look at to
understand how this would work.

Is there way like in spark, where the driver program distributes the tasks
across various nodes in a cluster, is there something similar in kafka
streaming too.

Thanks
Sachin

Re: Running cluster of stream processing application

Posted by "Matthias J. Sax" <ma...@confluent.io>.
> 1. Do we need to call system exit in UncaughtExceptionHandler.

There is no *need* for it. It really depends what you *want* to do.
System.exit is quite a hard termination of your application. Usually, if
your Streams part of you application dies, you still want to clean up
the other parts of you application before you shut down the whole JVM.

> 2. Is there anything more we can do in UncaughtExceptionHandler

You cannot recover the failed thread -- but if you (for example) did
configure 8 thread, and one fails, you might just want to ignored it and
keep you app running with 7 thread. Again, it depend on what you *want*
to do. If you want to shut down, you might want to do some non-Streams
cleanup of you app.

> 3. What would be right place to do streams cleanup, if state has become
> dirty due to task failure and needed to be recreated using changelog
topic.

Streams takes care of this automatically -- there is no need to do this
manually.

> 4. Is it at all necessary to do the cleanup for dirty state (as when
> restarting streams application will take care of it)
> I faced some lock issue on local state store when restarting in case I had
> terminated the streams application directly, and then had to delete
the dir
> manually.

We fixed a couple of bug with regard to lock issues. Wiping out the
local state manually would be a workaround to fix a lock issues
temporarily though.


-Matthias


On 12/17/16 11:07 PM, Sachin Mittal wrote:
> Hi,
> Thanks for the suggestions. Before running the streams application in a
> standby cluster I was trying to get the graceful shutdown right.
> I have code something like this
>         streams.setUncaughtExceptionHandler(new
> Thread.UncaughtExceptionHandler() {
>             public void uncaughtException(Thread t, Throwable e) {
>                 //log the error
>                 //System.exit(-1); call system exit (I saw this in one of
> the test cases)
>                 //Do we need to call system exit in order for shutdown hook
> to be executed
>                 //or after handling uncaught exception it will always call
> the shutdown hook
>             }
>         });
> 
>         // add shutdown hook
>         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
>             public void run() {
>                 //log the info the streams app is shutting down
>                 streams.close();//close the stream by calling
>                 //in case of some exception we would also want to do
> streams.cleanUp()
>                 //would like to know where can we do the cleanup is it
> before close or after close
>                 //if I call before close, then I get some
> IllagalStateException
>             }
>         }));
> 
> Please let me know if I am on right track implementing graceful shutdown.
> Main question are
> 1. Do we need to call system exit in UncaughtExceptionHandler.
> 2. Is there anything more we can do in UncaughtExceptionHandler
> 3. What would be right place to do streams cleanup, if state has become
> dirty due to task failure and needed to be recreated using changelog topic.
> 4. Is it at all necessary to do the cleanup for dirty state (as when
> restarting streams application will take care of it)
> I faced some lock issue on local state store when restarting in case I had
> terminated the streams application directly, and then had to delete the dir
> manually.
> 
> Thanks
> Sachin
> 
> 
> 
> On Mon, Dec 12, 2016 at 4:24 PM, Damian Guy <da...@gmail.com> wrote:
> 
>> In the scenario you mention above about max.poll.interval.ms, yes if the
>> timeout was reached then there would be a rebalance and one of the standby
>> tasks would take over. However the original task may still be processing
>> the data when the rebalance occurs and would throw an exception when it
>> tries to commit the offsets (as it would no longer be the owner of the
>> partitions). In this case the StreamThread would terminate, so you would
>> want to have set up an UncaughtExceptionHandler such that you can be
>> alerted and take any necessary actions, i.e., shutdown the app so it can be
>> auto-restarted.
>>
>> On Mon, 12 Dec 2016 at 10:27 Sachin Mittal <sj...@gmail.com> wrote:
>>
>>> Understood.
>>>
>>> Also the line
>>> Thanks for the application. It is not clear that clustering depends on
>> how
>>> source topics are partitioned.
>>>
>>> Should be read as
>>> Thanks for the explanation. It is now clear that clustering depends on
>> how
>>> source topics are partitioned.
>>>
>>> Apologies for auto-correct.
>>>
>>> One think I want to know is say streams applications consumer has
>> following
>>> defaults set
>>> max.poll.interval.ms 300000
>>> max.poll.records 500
>>>
>>> So it is picking 500 records for one batch and pushing it downstream and
>>> waiting for 300 sec before fetching next set of records.
>>> It may be possible that for that batch of 500 it may take more than 300
>>> sec.
>>>
>>> So broker will consider that consumer failed and will re-balance for that
>>> partition and move on to other consumer in that application.
>>>
>>> Would this mean that my current streams application would come into
>> standby
>>> mode and one of the standby would be woken up and become the main
>>> application?
>>>
>>> Also I suppose if that is true then it would still process the complete
>>> downstream before becoming standby.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>
>>> On Mon, Dec 12, 2016 at 3:19 PM, Damian Guy <da...@gmail.com>
>> wrote:
>>>
>>>> Hi Sachin,
>>>>
>>>> The KafkaStreams StreamsPartitionAssignor will take care of assigning
>> the
>>>> Standby Tasks to the other instances of your Kafka Streams application.
>>> The
>>>> state store updates are all handled by reading from the change-logs and
>>>> updating local copies, there is no communication required between the
>>>> individual application instances to do this as all data is flowing
>>> through
>>>> Kafka, i.e., they are just reading from the topic that backs the state
>>>> store.
>>>>
>>>> The State Dir is independent on each machine, but that path must exist.
>>>> Kafka Streams doesn't try and keep the directories themselves in sync,
>>>> rather it will update local copies of the State Stores that happen to
>> be
>>>> under that directory path. The idea being that if the active task fails
>>> it
>>>> can quickly fail over to one of the standby tasks and not have to spend
>>> too
>>>> much time catching up to the head of the change-log.
>>>>
>>>> Yes, you should keep the num.standby.replicas value the same on all
>>>> instances.
>>>>
>>>> Yes, if you deploy three instances and one crashes, then one of the
>>> others
>>>> will take over the tasks.
>>>>
>>>> Thanks,
>>>> Damian
>>>>
>>>> On Mon, 12 Dec 2016 at 09:15 Sachin Mittal <sj...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> Thanks for the application. It is not clear that clustering depends
>> on
>>>> how
>>>>> source topics are partitioned.
>>>>> In our case I guess num.standby.replicas settings is best suited.
>>>>>
>>>>> If say I set this to 2 and run two more same application in two
>>> different
>>>>> machines, how would my original instance know in which two machines
>> it
>>>>> needs to be keep the state store up to dated.
>>>>>
>>>>> My application has a setting
>>>>> props.put(StreamsConfig.STATE_DIR_CONFIG, "/data01/kafka-streams");
>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test";
>>>>>
>>>>> So when you say it keeps state store up to date, means it will keep
>> the
>>>>> dirs /data01/kafka-streams/test in sync in all the three machines?
>>>>>
>>>>> Also in the other two standby application do I need to keep the same
>>>> value
>>>>> (2) for num.standby.replicas.
>>>>>
>>>>> So basically need to deploy three instances on same application and
>> if
>>>> one
>>>>> fails and crashes one of the standby takes over.
>>>>>
>>>>> Please let me know if my understanding is correct so far.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>> On Fri, Dec 9, 2016 at 1:34 PM, Damian Guy <da...@gmail.com>
>>> wrote:
>>>>>
>>>>>> Hi Sachin,
>>>>>>
>>>>>> What you have suggested will never happen. If there is only 1
>>> partition
>>>>>> there will only ever be one consumer of that partition. So if you
>>> had 2
>>>>>> instances of your streams application, and only a single input
>>>> partition,
>>>>>> only 1 instance would be processing the data.
>>>>>> If you are running like this, then you might want to set
>>>>>> StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG to 1 - this will mean
>> that
>>>> the
>>>>>> State Store that is generated by the aggregation is kept up to date
>>> on
>>>>> the
>>>>>> instance that is not processing the data. So in the event that the
>>>> active
>>>>>> instance fails, the standby instance should be able to continue
>>> without
>>>>> too
>>>>>> much of a gap in processing time.
>>>>>>
>>>>>> Thanks,
>>>>>> Damian
>>>>>>
>>>>>> On Fri, 9 Dec 2016 at 04:55 Sachin Mittal <sj...@gmail.com>
>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I followed the document and I have few questions.
>>>>>>> Say I have a single partition input key topic and say I run 2
>>> streams
>>>>>>> application from machine1 and machine2.
>>>>>>> Both the application have same application id are have identical
>>>> code.
>>>>>>> Say topic1 has messages like
>>>>>>> (k1, v11)
>>>>>>> (k1, v12)
>>>>>>> (k1, v13)
>>>>>>> (k2, v21)
>>>>>>> (k2, v22)
>>>>>>> (k2, v23)
>>>>>>> When I was running single application I was getting results like
>>>>>>> (k1, agg(v11, v12, v13))
>>>>>>> (k2, agg(v21, v22, v23))
>>>>>>>
>>>>>>> Now when 2 applications are run and say messages are read in
>> round
>>>>> robin
>>>>>>> fashion.
>>>>>>> v11 v13 v22 - machine 1
>>>>>>> v12 v21 v23 - machine 2
>>>>>>>
>>>>>>> The aggregation at machine 1 would be
>>>>>>> (k1, agg(v11, v13))
>>>>>>> (k2, agg(v22))
>>>>>>>
>>>>>>> The aggregation at machine 2 would be
>>>>>>> (k1, agg(v12))
>>>>>>> (k2, agg(v21, v23))
>>>>>>>
>>>>>>> So now where do I join the independent results of these 2
>>> aggregation
>>>>> to
>>>>>>> get the final result as expected when single instance was
>> running.
>>>>>>>
>>>>>>> Note my high level dsl is sometime like
>>>>>>> srcSTopic.aggragate(...).foreach(key, aggregation) {
>>>>>>>     //process aggragated value and push it to some external
>> storage
>>>>>>> }
>>>>>>>
>>>>>>> So I want this each to be running against the final set of
>>> aggregated
>>>>>>> value. Do I need to add another step before foreach to make sure
>>> the
>>>>>>> different results from 2 machines are joined to get the final one
>>> as
>>>>>>> expected. If yes what does that step 2.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Sachin
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Dec 9, 2016 at 9:42 AM, Mathieu Fenniak <
>>>>>>> mathieu.fenniak@replicon.com> wrote:
>>>>>>>
>>>>>>>> Hi Sachin,
>>>>>>>>
>>>>>>>> Some quick answers, and a link to some documentation to read
>>> more:
>>>>>>>>
>>>>>>>> - If you restart the application, it will start from the point
>> it
>>>>>> crashed
>>>>>>>> (possibly reprocessing a small window of records).
>>>>>>>>
>>>>>>>> - You can run more than one instance of the application.
>> They'll
>>>>>>>> coordinate by virtue of being part of a Kafka consumer group;
>> if
>>>> one
>>>>>>>> crashes, the partitions that it was reading from will be picked
>>> up
>>>> by
>>>>>>> other
>>>>>>>> instances.
>>>>>>>>
>>>>>>>> - When running more than one instance, the tasks will be
>>>> distributed
>>>>>>>> between the instances.
>>>>>>>>
>>>>>>>> Confluent's docs on the Kafka Streams architecture goes into a
>>> lot
>>>>> more
>>>>>>>> detail: http://docs.confluent.io/3.0.
>> 0/streams/architecture.html
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal <
>>> sjmittal@gmail.com>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>> We were able to run a stream processing application against a
>>>>> fairly
>>>>>>>> decent
>>>>>>>>> load of messages in production environment.
>>>>>>>>>
>>>>>>>>> To make the system robust say the stream processing
>> application
>>>>>>> crashes,
>>>>>>>> is
>>>>>>>>> there a way to make it auto start from the point when it
>>> crashed?
>>>>>>>>>
>>>>>>>>> Also is there any concept like running the same application
>> in
>>> a
>>>>>>> cluster,
>>>>>>>>> where one fails, other takes over, until we bring back up the
>>>>> failed
>>>>>>> node
>>>>>>>>> of streams application.
>>>>>>>>>
>>>>>>>>> If yes, is there any guidelines or some knowledge base we can
>>>> look
>>>>> at
>>>>>>> to
>>>>>>>>> understand how this would work.
>>>>>>>>>
>>>>>>>>> Is there way like in spark, where the driver program
>>> distributes
>>>>> the
>>>>>>>> tasks
>>>>>>>>> across various nodes in a cluster, is there something similar
>>> in
>>>>>> kafka
>>>>>>>>> streaming too.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Sachin
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 


Re: Running cluster of stream processing application

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
Thanks for the suggestions. Before running the streams application in a
standby cluster I was trying to get the graceful shutdown right.
I have code something like this
        streams.setUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
            public void uncaughtException(Thread t, Throwable e) {
                //log the error
                //System.exit(-1); call system exit (I saw this in one of
the test cases)
                //Do we need to call system exit in order for shutdown hook
to be executed
                //or after handling uncaught exception it will always call
the shutdown hook
            }
        });

        // add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                //log the info the streams app is shutting down
                streams.close();//close the stream by calling
                //in case of some exception we would also want to do
streams.cleanUp()
                //would like to know where can we do the cleanup is it
before close or after close
                //if I call before close, then I get some
IllagalStateException
            }
        }));

Please let me know if I am on right track implementing graceful shutdown.
Main question are
1. Do we need to call system exit in UncaughtExceptionHandler.
2. Is there anything more we can do in UncaughtExceptionHandler
3. What would be right place to do streams cleanup, if state has become
dirty due to task failure and needed to be recreated using changelog topic.
4. Is it at all necessary to do the cleanup for dirty state (as when
restarting streams application will take care of it)
I faced some lock issue on local state store when restarting in case I had
terminated the streams application directly, and then had to delete the dir
manually.

Thanks
Sachin



On Mon, Dec 12, 2016 at 4:24 PM, Damian Guy <da...@gmail.com> wrote:

> In the scenario you mention above about max.poll.interval.ms, yes if the
> timeout was reached then there would be a rebalance and one of the standby
> tasks would take over. However the original task may still be processing
> the data when the rebalance occurs and would throw an exception when it
> tries to commit the offsets (as it would no longer be the owner of the
> partitions). In this case the StreamThread would terminate, so you would
> want to have set up an UncaughtExceptionHandler such that you can be
> alerted and take any necessary actions, i.e., shutdown the app so it can be
> auto-restarted.
>
> On Mon, 12 Dec 2016 at 10:27 Sachin Mittal <sj...@gmail.com> wrote:
>
> > Understood.
> >
> > Also the line
> > Thanks for the application. It is not clear that clustering depends on
> how
> > source topics are partitioned.
> >
> > Should be read as
> > Thanks for the explanation. It is now clear that clustering depends on
> how
> > source topics are partitioned.
> >
> > Apologies for auto-correct.
> >
> > One think I want to know is say streams applications consumer has
> following
> > defaults set
> > max.poll.interval.ms 300000
> > max.poll.records 500
> >
> > So it is picking 500 records for one batch and pushing it downstream and
> > waiting for 300 sec before fetching next set of records.
> > It may be possible that for that batch of 500 it may take more than 300
> > sec.
> >
> > So broker will consider that consumer failed and will re-balance for that
> > partition and move on to other consumer in that application.
> >
> > Would this mean that my current streams application would come into
> standby
> > mode and one of the standby would be woken up and become the main
> > application?
> >
> > Also I suppose if that is true then it would still process the complete
> > downstream before becoming standby.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Mon, Dec 12, 2016 at 3:19 PM, Damian Guy <da...@gmail.com>
> wrote:
> >
> > > Hi Sachin,
> > >
> > > The KafkaStreams StreamsPartitionAssignor will take care of assigning
> the
> > > Standby Tasks to the other instances of your Kafka Streams application.
> > The
> > > state store updates are all handled by reading from the change-logs and
> > > updating local copies, there is no communication required between the
> > > individual application instances to do this as all data is flowing
> > through
> > > Kafka, i.e., they are just reading from the topic that backs the state
> > > store.
> > >
> > > The State Dir is independent on each machine, but that path must exist.
> > > Kafka Streams doesn't try and keep the directories themselves in sync,
> > > rather it will update local copies of the State Stores that happen to
> be
> > > under that directory path. The idea being that if the active task fails
> > it
> > > can quickly fail over to one of the standby tasks and not have to spend
> > too
> > > much time catching up to the head of the change-log.
> > >
> > > Yes, you should keep the num.standby.replicas value the same on all
> > > instances.
> > >
> > > Yes, if you deploy three instances and one crashes, then one of the
> > others
> > > will take over the tasks.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Mon, 12 Dec 2016 at 09:15 Sachin Mittal <sj...@gmail.com> wrote:
> > >
> > > > Hi,
> > > > Thanks for the application. It is not clear that clustering depends
> on
> > > how
> > > > source topics are partitioned.
> > > > In our case I guess num.standby.replicas settings is best suited.
> > > >
> > > > If say I set this to 2 and run two more same application in two
> > different
> > > > machines, how would my original instance know in which two machines
> it
> > > > needs to be keep the state store up to dated.
> > > >
> > > > My application has a setting
> > > > props.put(StreamsConfig.STATE_DIR_CONFIG, "/data01/kafka-streams");
> > > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test";
> > > >
> > > > So when you say it keeps state store up to date, means it will keep
> the
> > > > dirs /data01/kafka-streams/test in sync in all the three machines?
> > > >
> > > > Also in the other two standby application do I need to keep the same
> > > value
> > > > (2) for num.standby.replicas.
> > > >
> > > > So basically need to deploy three instances on same application and
> if
> > > one
> > > > fails and crashes one of the standby takes over.
> > > >
> > > > Please let me know if my understanding is correct so far.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > > On Fri, Dec 9, 2016 at 1:34 PM, Damian Guy <da...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Sachin,
> > > > >
> > > > > What you have suggested will never happen. If there is only 1
> > partition
> > > > > there will only ever be one consumer of that partition. So if you
> > had 2
> > > > > instances of your streams application, and only a single input
> > > partition,
> > > > > only 1 instance would be processing the data.
> > > > > If you are running like this, then you might want to set
> > > > > StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG to 1 - this will mean
> that
> > > the
> > > > > State Store that is generated by the aggregation is kept up to date
> > on
> > > > the
> > > > > instance that is not processing the data. So in the event that the
> > > active
> > > > > instance fails, the standby instance should be able to continue
> > without
> > > > too
> > > > > much of a gap in processing time.
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Fri, 9 Dec 2016 at 04:55 Sachin Mittal <sj...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > > I followed the document and I have few questions.
> > > > > > Say I have a single partition input key topic and say I run 2
> > streams
> > > > > > application from machine1 and machine2.
> > > > > > Both the application have same application id are have identical
> > > code.
> > > > > > Say topic1 has messages like
> > > > > > (k1, v11)
> > > > > > (k1, v12)
> > > > > > (k1, v13)
> > > > > > (k2, v21)
> > > > > > (k2, v22)
> > > > > > (k2, v23)
> > > > > > When I was running single application I was getting results like
> > > > > > (k1, agg(v11, v12, v13))
> > > > > > (k2, agg(v21, v22, v23))
> > > > > >
> > > > > > Now when 2 applications are run and say messages are read in
> round
> > > > robin
> > > > > > fashion.
> > > > > > v11 v13 v22 - machine 1
> > > > > > v12 v21 v23 - machine 2
> > > > > >
> > > > > > The aggregation at machine 1 would be
> > > > > > (k1, agg(v11, v13))
> > > > > > (k2, agg(v22))
> > > > > >
> > > > > > The aggregation at machine 2 would be
> > > > > > (k1, agg(v12))
> > > > > > (k2, agg(v21, v23))
> > > > > >
> > > > > > So now where do I join the independent results of these 2
> > aggregation
> > > > to
> > > > > > get the final result as expected when single instance was
> running.
> > > > > >
> > > > > > Note my high level dsl is sometime like
> > > > > > srcSTopic.aggragate(...).foreach(key, aggregation) {
> > > > > >     //process aggragated value and push it to some external
> storage
> > > > > > }
> > > > > >
> > > > > > So I want this each to be running against the final set of
> > aggregated
> > > > > > value. Do I need to add another step before foreach to make sure
> > the
> > > > > > different results from 2 machines are joined to get the final one
> > as
> > > > > > expected. If yes what does that step 2.
> > > > > >
> > > > > > Thanks
> > > > > > Sachin
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Dec 9, 2016 at 9:42 AM, Mathieu Fenniak <
> > > > > > mathieu.fenniak@replicon.com> wrote:
> > > > > >
> > > > > > > Hi Sachin,
> > > > > > >
> > > > > > > Some quick answers, and a link to some documentation to read
> > more:
> > > > > > >
> > > > > > > - If you restart the application, it will start from the point
> it
> > > > > crashed
> > > > > > > (possibly reprocessing a small window of records).
> > > > > > >
> > > > > > > - You can run more than one instance of the application.
> They'll
> > > > > > > coordinate by virtue of being part of a Kafka consumer group;
> if
> > > one
> > > > > > > crashes, the partitions that it was reading from will be picked
> > up
> > > by
> > > > > > other
> > > > > > > instances.
> > > > > > >
> > > > > > > - When running more than one instance, the tasks will be
> > > distributed
> > > > > > > between the instances.
> > > > > > >
> > > > > > > Confluent's docs on the Kafka Streams architecture goes into a
> > lot
> > > > more
> > > > > > > detail: http://docs.confluent.io/3.0.
> 0/streams/architecture.html
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal <
> > sjmittal@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi All,
> > > > > > > > We were able to run a stream processing application against a
> > > > fairly
> > > > > > > decent
> > > > > > > > load of messages in production environment.
> > > > > > > >
> > > > > > > > To make the system robust say the stream processing
> application
> > > > > > crashes,
> > > > > > > is
> > > > > > > > there a way to make it auto start from the point when it
> > crashed?
> > > > > > > >
> > > > > > > > Also is there any concept like running the same application
> in
> > a
> > > > > > cluster,
> > > > > > > > where one fails, other takes over, until we bring back up the
> > > > failed
> > > > > > node
> > > > > > > > of streams application.
> > > > > > > >
> > > > > > > > If yes, is there any guidelines or some knowledge base we can
> > > look
> > > > at
> > > > > > to
> > > > > > > > understand how this would work.
> > > > > > > >
> > > > > > > > Is there way like in spark, where the driver program
> > distributes
> > > > the
> > > > > > > tasks
> > > > > > > > across various nodes in a cluster, is there something similar
> > in
> > > > > kafka
> > > > > > > > streaming too.
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > > Sachin
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Running cluster of stream processing application

Posted by Damian Guy <da...@gmail.com>.
In the scenario you mention above about max.poll.interval.ms, yes if the
timeout was reached then there would be a rebalance and one of the standby
tasks would take over. However the original task may still be processing
the data when the rebalance occurs and would throw an exception when it
tries to commit the offsets (as it would no longer be the owner of the
partitions). In this case the StreamThread would terminate, so you would
want to have set up an UncaughtExceptionHandler such that you can be
alerted and take any necessary actions, i.e., shutdown the app so it can be
auto-restarted.

On Mon, 12 Dec 2016 at 10:27 Sachin Mittal <sj...@gmail.com> wrote:

> Understood.
>
> Also the line
> Thanks for the application. It is not clear that clustering depends on how
> source topics are partitioned.
>
> Should be read as
> Thanks for the explanation. It is now clear that clustering depends on how
> source topics are partitioned.
>
> Apologies for auto-correct.
>
> One think I want to know is say streams applications consumer has following
> defaults set
> max.poll.interval.ms 300000
> max.poll.records 500
>
> So it is picking 500 records for one batch and pushing it downstream and
> waiting for 300 sec before fetching next set of records.
> It may be possible that for that batch of 500 it may take more than 300
> sec.
>
> So broker will consider that consumer failed and will re-balance for that
> partition and move on to other consumer in that application.
>
> Would this mean that my current streams application would come into standby
> mode and one of the standby would be woken up and become the main
> application?
>
> Also I suppose if that is true then it would still process the complete
> downstream before becoming standby.
>
> Thanks
> Sachin
>
>
>
> On Mon, Dec 12, 2016 at 3:19 PM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > The KafkaStreams StreamsPartitionAssignor will take care of assigning the
> > Standby Tasks to the other instances of your Kafka Streams application.
> The
> > state store updates are all handled by reading from the change-logs and
> > updating local copies, there is no communication required between the
> > individual application instances to do this as all data is flowing
> through
> > Kafka, i.e., they are just reading from the topic that backs the state
> > store.
> >
> > The State Dir is independent on each machine, but that path must exist.
> > Kafka Streams doesn't try and keep the directories themselves in sync,
> > rather it will update local copies of the State Stores that happen to be
> > under that directory path. The idea being that if the active task fails
> it
> > can quickly fail over to one of the standby tasks and not have to spend
> too
> > much time catching up to the head of the change-log.
> >
> > Yes, you should keep the num.standby.replicas value the same on all
> > instances.
> >
> > Yes, if you deploy three instances and one crashes, then one of the
> others
> > will take over the tasks.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 12 Dec 2016 at 09:15 Sachin Mittal <sj...@gmail.com> wrote:
> >
> > > Hi,
> > > Thanks for the application. It is not clear that clustering depends on
> > how
> > > source topics are partitioned.
> > > In our case I guess num.standby.replicas settings is best suited.
> > >
> > > If say I set this to 2 and run two more same application in two
> different
> > > machines, how would my original instance know in which two machines it
> > > needs to be keep the state store up to dated.
> > >
> > > My application has a setting
> > > props.put(StreamsConfig.STATE_DIR_CONFIG, "/data01/kafka-streams");
> > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test";
> > >
> > > So when you say it keeps state store up to date, means it will keep the
> > > dirs /data01/kafka-streams/test in sync in all the three machines?
> > >
> > > Also in the other two standby application do I need to keep the same
> > value
> > > (2) for num.standby.replicas.
> > >
> > > So basically need to deploy three instances on same application and if
> > one
> > > fails and crashes one of the standby takes over.
> > >
> > > Please let me know if my understanding is correct so far.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Fri, Dec 9, 2016 at 1:34 PM, Damian Guy <da...@gmail.com>
> wrote:
> > >
> > > > Hi Sachin,
> > > >
> > > > What you have suggested will never happen. If there is only 1
> partition
> > > > there will only ever be one consumer of that partition. So if you
> had 2
> > > > instances of your streams application, and only a single input
> > partition,
> > > > only 1 instance would be processing the data.
> > > > If you are running like this, then you might want to set
> > > > StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG to 1 - this will mean that
> > the
> > > > State Store that is generated by the aggregation is kept up to date
> on
> > > the
> > > > instance that is not processing the data. So in the event that the
> > active
> > > > instance fails, the standby instance should be able to continue
> without
> > > too
> > > > much of a gap in processing time.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Fri, 9 Dec 2016 at 04:55 Sachin Mittal <sj...@gmail.com>
> wrote:
> > > >
> > > > > Hi,
> > > > > I followed the document and I have few questions.
> > > > > Say I have a single partition input key topic and say I run 2
> streams
> > > > > application from machine1 and machine2.
> > > > > Both the application have same application id are have identical
> > code.
> > > > > Say topic1 has messages like
> > > > > (k1, v11)
> > > > > (k1, v12)
> > > > > (k1, v13)
> > > > > (k2, v21)
> > > > > (k2, v22)
> > > > > (k2, v23)
> > > > > When I was running single application I was getting results like
> > > > > (k1, agg(v11, v12, v13))
> > > > > (k2, agg(v21, v22, v23))
> > > > >
> > > > > Now when 2 applications are run and say messages are read in round
> > > robin
> > > > > fashion.
> > > > > v11 v13 v22 - machine 1
> > > > > v12 v21 v23 - machine 2
> > > > >
> > > > > The aggregation at machine 1 would be
> > > > > (k1, agg(v11, v13))
> > > > > (k2, agg(v22))
> > > > >
> > > > > The aggregation at machine 2 would be
> > > > > (k1, agg(v12))
> > > > > (k2, agg(v21, v23))
> > > > >
> > > > > So now where do I join the independent results of these 2
> aggregation
> > > to
> > > > > get the final result as expected when single instance was running.
> > > > >
> > > > > Note my high level dsl is sometime like
> > > > > srcSTopic.aggragate(...).foreach(key, aggregation) {
> > > > >     //process aggragated value and push it to some external storage
> > > > > }
> > > > >
> > > > > So I want this each to be running against the final set of
> aggregated
> > > > > value. Do I need to add another step before foreach to make sure
> the
> > > > > different results from 2 machines are joined to get the final one
> as
> > > > > expected. If yes what does that step 2.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Dec 9, 2016 at 9:42 AM, Mathieu Fenniak <
> > > > > mathieu.fenniak@replicon.com> wrote:
> > > > >
> > > > > > Hi Sachin,
> > > > > >
> > > > > > Some quick answers, and a link to some documentation to read
> more:
> > > > > >
> > > > > > - If you restart the application, it will start from the point it
> > > > crashed
> > > > > > (possibly reprocessing a small window of records).
> > > > > >
> > > > > > - You can run more than one instance of the application.  They'll
> > > > > > coordinate by virtue of being part of a Kafka consumer group; if
> > one
> > > > > > crashes, the partitions that it was reading from will be picked
> up
> > by
> > > > > other
> > > > > > instances.
> > > > > >
> > > > > > - When running more than one instance, the tasks will be
> > distributed
> > > > > > between the instances.
> > > > > >
> > > > > > Confluent's docs on the Kafka Streams architecture goes into a
> lot
> > > more
> > > > > > detail: http://docs.confluent.io/3.0.0/streams/architecture.html
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal <
> sjmittal@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > > We were able to run a stream processing application against a
> > > fairly
> > > > > > decent
> > > > > > > load of messages in production environment.
> > > > > > >
> > > > > > > To make the system robust say the stream processing application
> > > > > crashes,
> > > > > > is
> > > > > > > there a way to make it auto start from the point when it
> crashed?
> > > > > > >
> > > > > > > Also is there any concept like running the same application in
> a
> > > > > cluster,
> > > > > > > where one fails, other takes over, until we bring back up the
> > > failed
> > > > > node
> > > > > > > of streams application.
> > > > > > >
> > > > > > > If yes, is there any guidelines or some knowledge base we can
> > look
> > > at
> > > > > to
> > > > > > > understand how this would work.
> > > > > > >
> > > > > > > Is there way like in spark, where the driver program
> distributes
> > > the
> > > > > > tasks
> > > > > > > across various nodes in a cluster, is there something similar
> in
> > > > kafka
> > > > > > > streaming too.
> > > > > > >
> > > > > > > Thanks
> > > > > > > Sachin
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Running cluster of stream processing application

Posted by Sachin Mittal <sj...@gmail.com>.
Understood.

Also the line
Thanks for the application. It is not clear that clustering depends on how
source topics are partitioned.

Should be read as
Thanks for the explanation. It is now clear that clustering depends on how
source topics are partitioned.

Apologies for auto-correct.

One think I want to know is say streams applications consumer has following
defaults set
max.poll.interval.ms 300000
max.poll.records 500

So it is picking 500 records for one batch and pushing it downstream and
waiting for 300 sec before fetching next set of records.
It may be possible that for that batch of 500 it may take more than 300 sec.

So broker will consider that consumer failed and will re-balance for that
partition and move on to other consumer in that application.

Would this mean that my current streams application would come into standby
mode and one of the standby would be woken up and become the main
application?

Also I suppose if that is true then it would still process the complete
downstream before becoming standby.

Thanks
Sachin



On Mon, Dec 12, 2016 at 3:19 PM, Damian Guy <da...@gmail.com> wrote:

> Hi Sachin,
>
> The KafkaStreams StreamsPartitionAssignor will take care of assigning the
> Standby Tasks to the other instances of your Kafka Streams application. The
> state store updates are all handled by reading from the change-logs and
> updating local copies, there is no communication required between the
> individual application instances to do this as all data is flowing through
> Kafka, i.e., they are just reading from the topic that backs the state
> store.
>
> The State Dir is independent on each machine, but that path must exist.
> Kafka Streams doesn't try and keep the directories themselves in sync,
> rather it will update local copies of the State Stores that happen to be
> under that directory path. The idea being that if the active task fails it
> can quickly fail over to one of the standby tasks and not have to spend too
> much time catching up to the head of the change-log.
>
> Yes, you should keep the num.standby.replicas value the same on all
> instances.
>
> Yes, if you deploy three instances and one crashes, then one of the others
> will take over the tasks.
>
> Thanks,
> Damian
>
> On Mon, 12 Dec 2016 at 09:15 Sachin Mittal <sj...@gmail.com> wrote:
>
> > Hi,
> > Thanks for the application. It is not clear that clustering depends on
> how
> > source topics are partitioned.
> > In our case I guess num.standby.replicas settings is best suited.
> >
> > If say I set this to 2 and run two more same application in two different
> > machines, how would my original instance know in which two machines it
> > needs to be keep the state store up to dated.
> >
> > My application has a setting
> > props.put(StreamsConfig.STATE_DIR_CONFIG, "/data01/kafka-streams");
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test";
> >
> > So when you say it keeps state store up to date, means it will keep the
> > dirs /data01/kafka-streams/test in sync in all the three machines?
> >
> > Also in the other two standby application do I need to keep the same
> value
> > (2) for num.standby.replicas.
> >
> > So basically need to deploy three instances on same application and if
> one
> > fails and crashes one of the standby takes over.
> >
> > Please let me know if my understanding is correct so far.
> >
> > Thanks
> > Sachin
> >
> >
> > On Fri, Dec 9, 2016 at 1:34 PM, Damian Guy <da...@gmail.com> wrote:
> >
> > > Hi Sachin,
> > >
> > > What you have suggested will never happen. If there is only 1 partition
> > > there will only ever be one consumer of that partition. So if you had 2
> > > instances of your streams application, and only a single input
> partition,
> > > only 1 instance would be processing the data.
> > > If you are running like this, then you might want to set
> > > StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG to 1 - this will mean that
> the
> > > State Store that is generated by the aggregation is kept up to date on
> > the
> > > instance that is not processing the data. So in the event that the
> active
> > > instance fails, the standby instance should be able to continue without
> > too
> > > much of a gap in processing time.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Fri, 9 Dec 2016 at 04:55 Sachin Mittal <sj...@gmail.com> wrote:
> > >
> > > > Hi,
> > > > I followed the document and I have few questions.
> > > > Say I have a single partition input key topic and say I run 2 streams
> > > > application from machine1 and machine2.
> > > > Both the application have same application id are have identical
> code.
> > > > Say topic1 has messages like
> > > > (k1, v11)
> > > > (k1, v12)
> > > > (k1, v13)
> > > > (k2, v21)
> > > > (k2, v22)
> > > > (k2, v23)
> > > > When I was running single application I was getting results like
> > > > (k1, agg(v11, v12, v13))
> > > > (k2, agg(v21, v22, v23))
> > > >
> > > > Now when 2 applications are run and say messages are read in round
> > robin
> > > > fashion.
> > > > v11 v13 v22 - machine 1
> > > > v12 v21 v23 - machine 2
> > > >
> > > > The aggregation at machine 1 would be
> > > > (k1, agg(v11, v13))
> > > > (k2, agg(v22))
> > > >
> > > > The aggregation at machine 2 would be
> > > > (k1, agg(v12))
> > > > (k2, agg(v21, v23))
> > > >
> > > > So now where do I join the independent results of these 2 aggregation
> > to
> > > > get the final result as expected when single instance was running.
> > > >
> > > > Note my high level dsl is sometime like
> > > > srcSTopic.aggragate(...).foreach(key, aggregation) {
> > > >     //process aggragated value and push it to some external storage
> > > > }
> > > >
> > > > So I want this each to be running against the final set of aggregated
> > > > value. Do I need to add another step before foreach to make sure the
> > > > different results from 2 machines are joined to get the final one as
> > > > expected. If yes what does that step 2.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Dec 9, 2016 at 9:42 AM, Mathieu Fenniak <
> > > > mathieu.fenniak@replicon.com> wrote:
> > > >
> > > > > Hi Sachin,
> > > > >
> > > > > Some quick answers, and a link to some documentation to read more:
> > > > >
> > > > > - If you restart the application, it will start from the point it
> > > crashed
> > > > > (possibly reprocessing a small window of records).
> > > > >
> > > > > - You can run more than one instance of the application.  They'll
> > > > > coordinate by virtue of being part of a Kafka consumer group; if
> one
> > > > > crashes, the partitions that it was reading from will be picked up
> by
> > > > other
> > > > > instances.
> > > > >
> > > > > - When running more than one instance, the tasks will be
> distributed
> > > > > between the instances.
> > > > >
> > > > > Confluent's docs on the Kafka Streams architecture goes into a lot
> > more
> > > > > detail: http://docs.confluent.io/3.0.0/streams/architecture.html
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal <sj...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > > We were able to run a stream processing application against a
> > fairly
> > > > > decent
> > > > > > load of messages in production environment.
> > > > > >
> > > > > > To make the system robust say the stream processing application
> > > > crashes,
> > > > > is
> > > > > > there a way to make it auto start from the point when it crashed?
> > > > > >
> > > > > > Also is there any concept like running the same application in a
> > > > cluster,
> > > > > > where one fails, other takes over, until we bring back up the
> > failed
> > > > node
> > > > > > of streams application.
> > > > > >
> > > > > > If yes, is there any guidelines or some knowledge base we can
> look
> > at
> > > > to
> > > > > > understand how this would work.
> > > > > >
> > > > > > Is there way like in spark, where the driver program distributes
> > the
> > > > > tasks
> > > > > > across various nodes in a cluster, is there something similar in
> > > kafka
> > > > > > streaming too.
> > > > > >
> > > > > > Thanks
> > > > > > Sachin
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Running cluster of stream processing application

Posted by Damian Guy <da...@gmail.com>.
Hi Sachin,

The KafkaStreams StreamsPartitionAssignor will take care of assigning the
Standby Tasks to the other instances of your Kafka Streams application. The
state store updates are all handled by reading from the change-logs and
updating local copies, there is no communication required between the
individual application instances to do this as all data is flowing through
Kafka, i.e., they are just reading from the topic that backs the state
store.

The State Dir is independent on each machine, but that path must exist.
Kafka Streams doesn't try and keep the directories themselves in sync,
rather it will update local copies of the State Stores that happen to be
under that directory path. The idea being that if the active task fails it
can quickly fail over to one of the standby tasks and not have to spend too
much time catching up to the head of the change-log.

Yes, you should keep the num.standby.replicas value the same on all
instances.

Yes, if you deploy three instances and one crashes, then one of the others
will take over the tasks.

Thanks,
Damian

On Mon, 12 Dec 2016 at 09:15 Sachin Mittal <sj...@gmail.com> wrote:

> Hi,
> Thanks for the application. It is not clear that clustering depends on how
> source topics are partitioned.
> In our case I guess num.standby.replicas settings is best suited.
>
> If say I set this to 2 and run two more same application in two different
> machines, how would my original instance know in which two machines it
> needs to be keep the state store up to dated.
>
> My application has a setting
> props.put(StreamsConfig.STATE_DIR_CONFIG, "/data01/kafka-streams");
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test";
>
> So when you say it keeps state store up to date, means it will keep the
> dirs /data01/kafka-streams/test in sync in all the three machines?
>
> Also in the other two standby application do I need to keep the same value
> (2) for num.standby.replicas.
>
> So basically need to deploy three instances on same application and if one
> fails and crashes one of the standby takes over.
>
> Please let me know if my understanding is correct so far.
>
> Thanks
> Sachin
>
>
> On Fri, Dec 9, 2016 at 1:34 PM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > What you have suggested will never happen. If there is only 1 partition
> > there will only ever be one consumer of that partition. So if you had 2
> > instances of your streams application, and only a single input partition,
> > only 1 instance would be processing the data.
> > If you are running like this, then you might want to set
> > StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG to 1 - this will mean that the
> > State Store that is generated by the aggregation is kept up to date on
> the
> > instance that is not processing the data. So in the event that the active
> > instance fails, the standby instance should be able to continue without
> too
> > much of a gap in processing time.
> >
> > Thanks,
> > Damian
> >
> > On Fri, 9 Dec 2016 at 04:55 Sachin Mittal <sj...@gmail.com> wrote:
> >
> > > Hi,
> > > I followed the document and I have few questions.
> > > Say I have a single partition input key topic and say I run 2 streams
> > > application from machine1 and machine2.
> > > Both the application have same application id are have identical code.
> > > Say topic1 has messages like
> > > (k1, v11)
> > > (k1, v12)
> > > (k1, v13)
> > > (k2, v21)
> > > (k2, v22)
> > > (k2, v23)
> > > When I was running single application I was getting results like
> > > (k1, agg(v11, v12, v13))
> > > (k2, agg(v21, v22, v23))
> > >
> > > Now when 2 applications are run and say messages are read in round
> robin
> > > fashion.
> > > v11 v13 v22 - machine 1
> > > v12 v21 v23 - machine 2
> > >
> > > The aggregation at machine 1 would be
> > > (k1, agg(v11, v13))
> > > (k2, agg(v22))
> > >
> > > The aggregation at machine 2 would be
> > > (k1, agg(v12))
> > > (k2, agg(v21, v23))
> > >
> > > So now where do I join the independent results of these 2 aggregation
> to
> > > get the final result as expected when single instance was running.
> > >
> > > Note my high level dsl is sometime like
> > > srcSTopic.aggragate(...).foreach(key, aggregation) {
> > >     //process aggragated value and push it to some external storage
> > > }
> > >
> > > So I want this each to be running against the final set of aggregated
> > > value. Do I need to add another step before foreach to make sure the
> > > different results from 2 machines are joined to get the final one as
> > > expected. If yes what does that step 2.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Dec 9, 2016 at 9:42 AM, Mathieu Fenniak <
> > > mathieu.fenniak@replicon.com> wrote:
> > >
> > > > Hi Sachin,
> > > >
> > > > Some quick answers, and a link to some documentation to read more:
> > > >
> > > > - If you restart the application, it will start from the point it
> > crashed
> > > > (possibly reprocessing a small window of records).
> > > >
> > > > - You can run more than one instance of the application.  They'll
> > > > coordinate by virtue of being part of a Kafka consumer group; if one
> > > > crashes, the partitions that it was reading from will be picked up by
> > > other
> > > > instances.
> > > >
> > > > - When running more than one instance, the tasks will be distributed
> > > > between the instances.
> > > >
> > > > Confluent's docs on the Kafka Streams architecture goes into a lot
> more
> > > > detail: http://docs.confluent.io/3.0.0/streams/architecture.html
> > > >
> > > >
> > > >
> > > >
> > > > On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal <sj...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi All,
> > > > > We were able to run a stream processing application against a
> fairly
> > > > decent
> > > > > load of messages in production environment.
> > > > >
> > > > > To make the system robust say the stream processing application
> > > crashes,
> > > > is
> > > > > there a way to make it auto start from the point when it crashed?
> > > > >
> > > > > Also is there any concept like running the same application in a
> > > cluster,
> > > > > where one fails, other takes over, until we bring back up the
> failed
> > > node
> > > > > of streams application.
> > > > >
> > > > > If yes, is there any guidelines or some knowledge base we can look
> at
> > > to
> > > > > understand how this would work.
> > > > >
> > > > > Is there way like in spark, where the driver program distributes
> the
> > > > tasks
> > > > > across various nodes in a cluster, is there something similar in
> > kafka
> > > > > streaming too.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > >
> > >
> >
>

Re: Running cluster of stream processing application

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
Thanks for the application. It is not clear that clustering depends on how
source topics are partitioned.
In our case I guess num.standby.replicas settings is best suited.

If say I set this to 2 and run two more same application in two different
machines, how would my original instance know in which two machines it
needs to be keep the state store up to dated.

My application has a setting
props.put(StreamsConfig.STATE_DIR_CONFIG, "/data01/kafka-streams");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test";

So when you say it keeps state store up to date, means it will keep the
dirs /data01/kafka-streams/test in sync in all the three machines?

Also in the other two standby application do I need to keep the same value
(2) for num.standby.replicas.

So basically need to deploy three instances on same application and if one
fails and crashes one of the standby takes over.

Please let me know if my understanding is correct so far.

Thanks
Sachin


On Fri, Dec 9, 2016 at 1:34 PM, Damian Guy <da...@gmail.com> wrote:

> Hi Sachin,
>
> What you have suggested will never happen. If there is only 1 partition
> there will only ever be one consumer of that partition. So if you had 2
> instances of your streams application, and only a single input partition,
> only 1 instance would be processing the data.
> If you are running like this, then you might want to set
> StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG to 1 - this will mean that the
> State Store that is generated by the aggregation is kept up to date on the
> instance that is not processing the data. So in the event that the active
> instance fails, the standby instance should be able to continue without too
> much of a gap in processing time.
>
> Thanks,
> Damian
>
> On Fri, 9 Dec 2016 at 04:55 Sachin Mittal <sj...@gmail.com> wrote:
>
> > Hi,
> > I followed the document and I have few questions.
> > Say I have a single partition input key topic and say I run 2 streams
> > application from machine1 and machine2.
> > Both the application have same application id are have identical code.
> > Say topic1 has messages like
> > (k1, v11)
> > (k1, v12)
> > (k1, v13)
> > (k2, v21)
> > (k2, v22)
> > (k2, v23)
> > When I was running single application I was getting results like
> > (k1, agg(v11, v12, v13))
> > (k2, agg(v21, v22, v23))
> >
> > Now when 2 applications are run and say messages are read in round robin
> > fashion.
> > v11 v13 v22 - machine 1
> > v12 v21 v23 - machine 2
> >
> > The aggregation at machine 1 would be
> > (k1, agg(v11, v13))
> > (k2, agg(v22))
> >
> > The aggregation at machine 2 would be
> > (k1, agg(v12))
> > (k2, agg(v21, v23))
> >
> > So now where do I join the independent results of these 2 aggregation to
> > get the final result as expected when single instance was running.
> >
> > Note my high level dsl is sometime like
> > srcSTopic.aggragate(...).foreach(key, aggregation) {
> >     //process aggragated value and push it to some external storage
> > }
> >
> > So I want this each to be running against the final set of aggregated
> > value. Do I need to add another step before foreach to make sure the
> > different results from 2 machines are joined to get the final one as
> > expected. If yes what does that step 2.
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> >
> >
> > On Fri, Dec 9, 2016 at 9:42 AM, Mathieu Fenniak <
> > mathieu.fenniak@replicon.com> wrote:
> >
> > > Hi Sachin,
> > >
> > > Some quick answers, and a link to some documentation to read more:
> > >
> > > - If you restart the application, it will start from the point it
> crashed
> > > (possibly reprocessing a small window of records).
> > >
> > > - You can run more than one instance of the application.  They'll
> > > coordinate by virtue of being part of a Kafka consumer group; if one
> > > crashes, the partitions that it was reading from will be picked up by
> > other
> > > instances.
> > >
> > > - When running more than one instance, the tasks will be distributed
> > > between the instances.
> > >
> > > Confluent's docs on the Kafka Streams architecture goes into a lot more
> > > detail: http://docs.confluent.io/3.0.0/streams/architecture.html
> > >
> > >
> > >
> > >
> > > On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal <sj...@gmail.com>
> > wrote:
> > >
> > > > Hi All,
> > > > We were able to run a stream processing application against a fairly
> > > decent
> > > > load of messages in production environment.
> > > >
> > > > To make the system robust say the stream processing application
> > crashes,
> > > is
> > > > there a way to make it auto start from the point when it crashed?
> > > >
> > > > Also is there any concept like running the same application in a
> > cluster,
> > > > where one fails, other takes over, until we bring back up the failed
> > node
> > > > of streams application.
> > > >
> > > > If yes, is there any guidelines or some knowledge base we can look at
> > to
> > > > understand how this would work.
> > > >
> > > > Is there way like in spark, where the driver program distributes the
> > > tasks
> > > > across various nodes in a cluster, is there something similar in
> kafka
> > > > streaming too.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > >
> >
>

Re: Running cluster of stream processing application

Posted by Guozhang Wang <wa...@gmail.com>.
The re-discover new consumer member within the group is part of the
Consumer Rebalance protocol that Streams simply relies on. More details can
be found here:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal

A one sentence summary is that the new consumer will notify the coordinator
about its existence which will then notify other consumers within the group
to rebalance their tasks so that some can be migrated to the new comer.

Guozhang

On Fri, Feb 3, 2017 at 2:51 AM, Sachin Mittal <sj...@gmail.com> wrote:

> > Any reason why you don't just let streams create the changelog topic? Yes
> you should partition it the same as the source topic.
>
> Only reason is that I need to use my max.message.bytes and in version
> 0.10.0.1 configuring the same to state store supplier is not supported.
> But I understood that number of partitions should be same as source one. I
> will take care of that.
>
> > When an instance fails or is shutdown it will be removed from the
> consumer
> group. When it is removed a rebalance will be triggered. The partitions
> will be re-assigned to the remaining threads.
>
> I understood this part. However I did not understand that we we restart the
> same (failed) instance again, at that time all the existing threads are
> already rebalanced and processing different partitions. Now when this new
> instance is up, how and when will some of the existing threads give up
> (some of) their existing partitions and shift them to the threads of this
> new instance.
>
> I looked at new consumer configs, is this metadata.max.age.ms somehow part
> of this rediscover new consumer functionality..
>
> Thanks
> Sachin
>
>
> On Fri, Feb 3, 2017 at 3:26 PM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > On 3 February 2017 at 09:07, Sachin Mittal <sj...@gmail.com> wrote:
> >
> > >
> > > 1. Now what I wanted to know is that for separate machines running same
> > > instance of the streams application, my application.id would be same
> > > right.
> > >
> >
> > That is correct.
> >
> >
> > > If yes then how does kafka cluser know which partition to assign to
> which
> > > machine and which thread.
> > > Because what I see that on same machine each thread has its unique
> name,
> > so
> > > it will get message from a given partition(s) only, but how does kafka
> > > cluster know that each machines thread are different from some other
> > > machines.
> > > Like how does it distinguish thread-1 from machine A vs machine B. Do I
> > > need to configure something here.
> > >
> > >
> > This is all taken care of by the Kafka Consumer. All application
> instances
> > and threads with the same application.id are part of the same consumer
> > group. So the kafka consumer will ensure the partitions are balanced
> across
> > the available threads.
> >
> >
> > > 2. Also my stream processing creates an internal changelog topic which
> is
> > > backed by rocksDB state store.
> > > - So should I have to partition that topic too in same number of
> > partitions
> > > as my source topic. (Here I am creating that change log topic manually)
> > >
> > >
> > Any reason why you don't just let streams create the changelog topic? Yes
> > you should partition it the same as the source topic.
> >
> >
> >
> > > 3. If I don't create that change log topic manually and let kafka
> stream
> > > create that automatically, then does what number of partitions it uses.
> > >
> > >
> > The same number as the source topic.
> >
> >
> > > 4. Suppose my change log topic has single partition (if that is
> allowed)
> > > and now we will have multiple threads accessing that. Is there any
> > deadlock
> > > situation I need to worry about.
> > >
> >
> > Multiple threads will never access a single partition in a kafka streams
> > app. A partition is only ever consumed by a single thread.
> >
> >
> > >
> > > 5. Also now multiple threads will access the same state store and
> attempt
> > > to recreate from change log topic if there is a need. How does this
> work.
> > >
> > >
> > It is done on a per partition basis. You have a state store for each
> > partition.
> >
> >
> > > 6. Lastly say one instance fails then other instances will try to
> balance
> > > off the load, now when i bring that instance up, how does partition get
> > re
> > > assigned to it? Like at what point does some old thread stops
> processing
> > > that partition and new thread of new instance takes over. Is there any
> > > configuration needed gere?
> > >
> >
> > When an instance fails or is shutdown it will be removed from the
> consumer
> > group. When it is removed a rebalance will be triggered. The partitions
> > will be re-assigned to the remaining threads.
> >
> > There are some settings you can adjust that will effect the length of
> time
> > it takes for a dead consumer to be detected.
> >
> > i.e.,
> > max.poll.interval.ms
> > heartbeat.interval.ms
> > session.timeout.ms
> >
> > I suggest you take a look at the consumer config docs here:
> > https://kafka.apache.org/documentation/#newconsumerconfigs
> >
> > Thanks,
> > Damian
> >
>



-- 
-- Guozhang

Re: Running cluster of stream processing application

Posted by Sachin Mittal <sj...@gmail.com>.
> Any reason why you don't just let streams create the changelog topic? Yes
you should partition it the same as the source topic.

Only reason is that I need to use my max.message.bytes and in version
0.10.0.1 configuring the same to state store supplier is not supported.
But I understood that number of partitions should be same as source one. I
will take care of that.

> When an instance fails or is shutdown it will be removed from the consumer
group. When it is removed a rebalance will be triggered. The partitions
will be re-assigned to the remaining threads.

I understood this part. However I did not understand that we we restart the
same (failed) instance again, at that time all the existing threads are
already rebalanced and processing different partitions. Now when this new
instance is up, how and when will some of the existing threads give up
(some of) their existing partitions and shift them to the threads of this
new instance.

I looked at new consumer configs, is this metadata.max.age.ms somehow part
of this rediscover new consumer functionality..

Thanks
Sachin


On Fri, Feb 3, 2017 at 3:26 PM, Damian Guy <da...@gmail.com> wrote:

> Hi Sachin,
>
> On 3 February 2017 at 09:07, Sachin Mittal <sj...@gmail.com> wrote:
>
> >
> > 1. Now what I wanted to know is that for separate machines running same
> > instance of the streams application, my application.id would be same
> > right.
> >
>
> That is correct.
>
>
> > If yes then how does kafka cluser know which partition to assign to which
> > machine and which thread.
> > Because what I see that on same machine each thread has its unique name,
> so
> > it will get message from a given partition(s) only, but how does kafka
> > cluster know that each machines thread are different from some other
> > machines.
> > Like how does it distinguish thread-1 from machine A vs machine B. Do I
> > need to configure something here.
> >
> >
> This is all taken care of by the Kafka Consumer. All application instances
> and threads with the same application.id are part of the same consumer
> group. So the kafka consumer will ensure the partitions are balanced across
> the available threads.
>
>
> > 2. Also my stream processing creates an internal changelog topic which is
> > backed by rocksDB state store.
> > - So should I have to partition that topic too in same number of
> partitions
> > as my source topic. (Here I am creating that change log topic manually)
> >
> >
> Any reason why you don't just let streams create the changelog topic? Yes
> you should partition it the same as the source topic.
>
>
>
> > 3. If I don't create that change log topic manually and let kafka stream
> > create that automatically, then does what number of partitions it uses.
> >
> >
> The same number as the source topic.
>
>
> > 4. Suppose my change log topic has single partition (if that is allowed)
> > and now we will have multiple threads accessing that. Is there any
> deadlock
> > situation I need to worry about.
> >
>
> Multiple threads will never access a single partition in a kafka streams
> app. A partition is only ever consumed by a single thread.
>
>
> >
> > 5. Also now multiple threads will access the same state store and attempt
> > to recreate from change log topic if there is a need. How does this work.
> >
> >
> It is done on a per partition basis. You have a state store for each
> partition.
>
>
> > 6. Lastly say one instance fails then other instances will try to balance
> > off the load, now when i bring that instance up, how does partition get
> re
> > assigned to it? Like at what point does some old thread stops processing
> > that partition and new thread of new instance takes over. Is there any
> > configuration needed gere?
> >
>
> When an instance fails or is shutdown it will be removed from the consumer
> group. When it is removed a rebalance will be triggered. The partitions
> will be re-assigned to the remaining threads.
>
> There are some settings you can adjust that will effect the length of time
> it takes for a dead consumer to be detected.
>
> i.e.,
> max.poll.interval.ms
> heartbeat.interval.ms
> session.timeout.ms
>
> I suggest you take a look at the consumer config docs here:
> https://kafka.apache.org/documentation/#newconsumerconfigs
>
> Thanks,
> Damian
>

Re: Running cluster of stream processing application

Posted by Damian Guy <da...@gmail.com>.
Hi Sachin,

On 3 February 2017 at 09:07, Sachin Mittal <sj...@gmail.com> wrote:

>
> 1. Now what I wanted to know is that for separate machines running same
> instance of the streams application, my application.id would be same
> right.
>

That is correct.


> If yes then how does kafka cluser know which partition to assign to which
> machine and which thread.
> Because what I see that on same machine each thread has its unique name, so
> it will get message from a given partition(s) only, but how does kafka
> cluster know that each machines thread are different from some other
> machines.
> Like how does it distinguish thread-1 from machine A vs machine B. Do I
> need to configure something here.
>
>
This is all taken care of by the Kafka Consumer. All application instances
and threads with the same application.id are part of the same consumer
group. So the kafka consumer will ensure the partitions are balanced across
the available threads.


> 2. Also my stream processing creates an internal changelog topic which is
> backed by rocksDB state store.
> - So should I have to partition that topic too in same number of partitions
> as my source topic. (Here I am creating that change log topic manually)
>
>
Any reason why you don't just let streams create the changelog topic? Yes
you should partition it the same as the source topic.



> 3. If I don't create that change log topic manually and let kafka stream
> create that automatically, then does what number of partitions it uses.
>
>
The same number as the source topic.


> 4. Suppose my change log topic has single partition (if that is allowed)
> and now we will have multiple threads accessing that. Is there any deadlock
> situation I need to worry about.
>

Multiple threads will never access a single partition in a kafka streams
app. A partition is only ever consumed by a single thread.


>
> 5. Also now multiple threads will access the same state store and attempt
> to recreate from change log topic if there is a need. How does this work.
>
>
It is done on a per partition basis. You have a state store for each
partition.


> 6. Lastly say one instance fails then other instances will try to balance
> off the load, now when i bring that instance up, how does partition get re
> assigned to it? Like at what point does some old thread stops processing
> that partition and new thread of new instance takes over. Is there any
> configuration needed gere?
>

When an instance fails or is shutdown it will be removed from the consumer
group. When it is removed a rebalance will be triggered. The partitions
will be re-assigned to the remaining threads.

There are some settings you can adjust that will effect the length of time
it takes for a dead consumer to be detected.

i.e.,
max.poll.interval.ms
heartbeat.interval.ms
session.timeout.ms

I suggest you take a look at the consumer config docs here:
https://kafka.apache.org/documentation/#newconsumerconfigs

Thanks,
Damian

Re: Running cluster of stream processing application

Posted by Sachin Mittal <sj...@gmail.com>.
Hello All,
I am revisiting this topic as now I am actually configuring a partitioned
topic and would like multiple threads of my streams application running on
different instances to process this partitioned topic in parallel.

So I have once source topic partitioned into 40 partitions.
The messages it receives are all keyed message and at my producer side I
take care that a particular key's messages are sent to a particular
partition only.
So since my processing is based on that key, I can process individual
partitions independently.

 Now say to process these in parallel I am planning to run a cluster on
three machines and each having say 8 threads.
So I have configured
streamsProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);

1. Now what I wanted to know is that for separate machines running same
instance of the streams application, my application.id would be same right.
If yes then how does kafka cluser know which partition to assign to which
machine and which thread.
Because what I see that on same machine each thread has its unique name, so
it will get message from a given partition(s) only, but how does kafka
cluster know that each machines thread are different from some other
machines.
Like how does it distinguish thread-1 from machine A vs machine B. Do I
need to configure something here.

2. Also my stream processing creates an internal changelog topic which is
backed by rocksDB state store.
- So should I have to partition that topic too in same number of partitions
as my source topic. (Here I am creating that change log topic manually)

3. If I don't create that change log topic manually and let kafka stream
create that automatically, then does what number of partitions it uses.

4. Suppose my change log topic has single partition (if that is allowed)
and now we will have multiple threads accessing that. Is there any deadlock
situation I need to worry about.

5. Also now multiple threads will access the same state store and attempt
to recreate from change log topic if there is a need. How does this work.

6. Lastly say one instance fails then other instances will try to balance
off the load, now when i bring that instance up, how does partition get re
assigned to it? Like at what point does some old thread stops processing
that partition and new thread of new instance takes over. Is there any
configuration needed gere?


Thanks
Sachin




On Fri, Dec 9, 2016 at 1:34 PM, Damian Guy <da...@gmail.com> wrote:

> Hi Sachin,
>
> What you have suggested will never happen. If there is only 1 partition
> there will only ever be one consumer of that partition. So if you had 2
> instances of your streams application, and only a single input partition,
> only 1 instance would be processing the data.
> If you are running like this, then you might want to set
> StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG to 1 - this will mean that the
> State Store that is generated by the aggregation is kept up to date on the
> instance that is not processing the data. So in the event that the active
> instance fails, the standby instance should be able to continue without too
> much of a gap in processing time.
>
> Thanks,
> Damian
>
> On Fri, 9 Dec 2016 at 04:55 Sachin Mittal <sj...@gmail.com> wrote:
>
> > Hi,
> > I followed the document and I have few questions.
> > Say I have a single partition input key topic and say I run 2 streams
> > application from machine1 and machine2.
> > Both the application have same application id are have identical code.
> > Say topic1 has messages like
> > (k1, v11)
> > (k1, v12)
> > (k1, v13)
> > (k2, v21)
> > (k2, v22)
> > (k2, v23)
> > When I was running single application I was getting results like
> > (k1, agg(v11, v12, v13))
> > (k2, agg(v21, v22, v23))
> >
> > Now when 2 applications are run and say messages are read in round robin
> > fashion.
> > v11 v13 v22 - machine 1
> > v12 v21 v23 - machine 2
> >
> > The aggregation at machine 1 would be
> > (k1, agg(v11, v13))
> > (k2, agg(v22))
> >
> > The aggregation at machine 2 would be
> > (k1, agg(v12))
> > (k2, agg(v21, v23))
> >
> > So now where do I join the independent results of these 2 aggregation to
> > get the final result as expected when single instance was running.
> >
> > Note my high level dsl is sometime like
> > srcSTopic.aggragate(...).foreach(key, aggregation) {
> >     //process aggragated value and push it to some external storage
> > }
> >
> > So I want this each to be running against the final set of aggregated
> > value. Do I need to add another step before foreach to make sure the
> > different results from 2 machines are joined to get the final one as
> > expected. If yes what does that step 2.
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> >
> >
> > On Fri, Dec 9, 2016 at 9:42 AM, Mathieu Fenniak <
> > mathieu.fenniak@replicon.com> wrote:
> >
> > > Hi Sachin,
> > >
> > > Some quick answers, and a link to some documentation to read more:
> > >
> > > - If you restart the application, it will start from the point it
> crashed
> > > (possibly reprocessing a small window of records).
> > >
> > > - You can run more than one instance of the application.  They'll
> > > coordinate by virtue of being part of a Kafka consumer group; if one
> > > crashes, the partitions that it was reading from will be picked up by
> > other
> > > instances.
> > >
> > > - When running more than one instance, the tasks will be distributed
> > > between the instances.
> > >
> > > Confluent's docs on the Kafka Streams architecture goes into a lot more
> > > detail: http://docs.confluent.io/3.0.0/streams/architecture.html
> > >
> > >
> > >
> > >
> > > On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal <sj...@gmail.com>
> > wrote:
> > >
> > > > Hi All,
> > > > We were able to run a stream processing application against a fairly
> > > decent
> > > > load of messages in production environment.
> > > >
> > > > To make the system robust say the stream processing application
> > crashes,
> > > is
> > > > there a way to make it auto start from the point when it crashed?
> > > >
> > > > Also is there any concept like running the same application in a
> > cluster,
> > > > where one fails, other takes over, until we bring back up the failed
> > node
> > > > of streams application.
> > > >
> > > > If yes, is there any guidelines or some knowledge base we can look at
> > to
> > > > understand how this would work.
> > > >
> > > > Is there way like in spark, where the driver program distributes the
> > > tasks
> > > > across various nodes in a cluster, is there something similar in
> kafka
> > > > streaming too.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > >
> >
>

Re: Running cluster of stream processing application

Posted by "Matthias J. Sax" <ma...@confluent.io>.
About failure and restart. Kafka Streams does not provide any tooling
for this. It's a library.

However, because it is a library it is also agnostic to whatever tool
you want to use. You can for example you a resource manager like Mesos
or YARN, or you containerize your application, or you use tools like
Chef. And there is a bunch more -- pick whatever fits your needs best.

-Matthias


On 12/9/16 12:04 AM, Damian Guy wrote:
> Hi Sachin,
> 
> What you have suggested will never happen. If there is only 1 partition
> there will only ever be one consumer of that partition. So if you had 2
> instances of your streams application, and only a single input partition,
> only 1 instance would be processing the data.
> If you are running like this, then you might want to set
> StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG to 1 - this will mean that the
> State Store that is generated by the aggregation is kept up to date on the
> instance that is not processing the data. So in the event that the active
> instance fails, the standby instance should be able to continue without too
> much of a gap in processing time.
> 
> Thanks,
> Damian
> 
> On Fri, 9 Dec 2016 at 04:55 Sachin Mittal <sj...@gmail.com> wrote:
> 
>> Hi,
>> I followed the document and I have few questions.
>> Say I have a single partition input key topic and say I run 2 streams
>> application from machine1 and machine2.
>> Both the application have same application id are have identical code.
>> Say topic1 has messages like
>> (k1, v11)
>> (k1, v12)
>> (k1, v13)
>> (k2, v21)
>> (k2, v22)
>> (k2, v23)
>> When I was running single application I was getting results like
>> (k1, agg(v11, v12, v13))
>> (k2, agg(v21, v22, v23))
>>
>> Now when 2 applications are run and say messages are read in round robin
>> fashion.
>> v11 v13 v22 - machine 1
>> v12 v21 v23 - machine 2
>>
>> The aggregation at machine 1 would be
>> (k1, agg(v11, v13))
>> (k2, agg(v22))
>>
>> The aggregation at machine 2 would be
>> (k1, agg(v12))
>> (k2, agg(v21, v23))
>>
>> So now where do I join the independent results of these 2 aggregation to
>> get the final result as expected when single instance was running.
>>
>> Note my high level dsl is sometime like
>> srcSTopic.aggragate(...).foreach(key, aggregation) {
>>     //process aggragated value and push it to some external storage
>> }
>>
>> So I want this each to be running against the final set of aggregated
>> value. Do I need to add another step before foreach to make sure the
>> different results from 2 machines are joined to get the final one as
>> expected. If yes what does that step 2.
>>
>> Thanks
>> Sachin
>>
>>
>>
>>
>>
>>
>> On Fri, Dec 9, 2016 at 9:42 AM, Mathieu Fenniak <
>> mathieu.fenniak@replicon.com> wrote:
>>
>>> Hi Sachin,
>>>
>>> Some quick answers, and a link to some documentation to read more:
>>>
>>> - If you restart the application, it will start from the point it crashed
>>> (possibly reprocessing a small window of records).
>>>
>>> - You can run more than one instance of the application.  They'll
>>> coordinate by virtue of being part of a Kafka consumer group; if one
>>> crashes, the partitions that it was reading from will be picked up by
>> other
>>> instances.
>>>
>>> - When running more than one instance, the tasks will be distributed
>>> between the instances.
>>>
>>> Confluent's docs on the Kafka Streams architecture goes into a lot more
>>> detail: http://docs.confluent.io/3.0.0/streams/architecture.html
>>>
>>>
>>>
>>>
>>> On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal <sj...@gmail.com>
>> wrote:
>>>
>>>> Hi All,
>>>> We were able to run a stream processing application against a fairly
>>> decent
>>>> load of messages in production environment.
>>>>
>>>> To make the system robust say the stream processing application
>> crashes,
>>> is
>>>> there a way to make it auto start from the point when it crashed?
>>>>
>>>> Also is there any concept like running the same application in a
>> cluster,
>>>> where one fails, other takes over, until we bring back up the failed
>> node
>>>> of streams application.
>>>>
>>>> If yes, is there any guidelines or some knowledge base we can look at
>> to
>>>> understand how this would work.
>>>>
>>>> Is there way like in spark, where the driver program distributes the
>>> tasks
>>>> across various nodes in a cluster, is there something similar in kafka
>>>> streaming too.
>>>>
>>>> Thanks
>>>> Sachin
>>>>
>>>
>>
> 


Re: Running cluster of stream processing application

Posted by Damian Guy <da...@gmail.com>.
Hi Sachin,

What you have suggested will never happen. If there is only 1 partition
there will only ever be one consumer of that partition. So if you had 2
instances of your streams application, and only a single input partition,
only 1 instance would be processing the data.
If you are running like this, then you might want to set
StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG to 1 - this will mean that the
State Store that is generated by the aggregation is kept up to date on the
instance that is not processing the data. So in the event that the active
instance fails, the standby instance should be able to continue without too
much of a gap in processing time.

Thanks,
Damian

On Fri, 9 Dec 2016 at 04:55 Sachin Mittal <sj...@gmail.com> wrote:

> Hi,
> I followed the document and I have few questions.
> Say I have a single partition input key topic and say I run 2 streams
> application from machine1 and machine2.
> Both the application have same application id are have identical code.
> Say topic1 has messages like
> (k1, v11)
> (k1, v12)
> (k1, v13)
> (k2, v21)
> (k2, v22)
> (k2, v23)
> When I was running single application I was getting results like
> (k1, agg(v11, v12, v13))
> (k2, agg(v21, v22, v23))
>
> Now when 2 applications are run and say messages are read in round robin
> fashion.
> v11 v13 v22 - machine 1
> v12 v21 v23 - machine 2
>
> The aggregation at machine 1 would be
> (k1, agg(v11, v13))
> (k2, agg(v22))
>
> The aggregation at machine 2 would be
> (k1, agg(v12))
> (k2, agg(v21, v23))
>
> So now where do I join the independent results of these 2 aggregation to
> get the final result as expected when single instance was running.
>
> Note my high level dsl is sometime like
> srcSTopic.aggragate(...).foreach(key, aggregation) {
>     //process aggragated value and push it to some external storage
> }
>
> So I want this each to be running against the final set of aggregated
> value. Do I need to add another step before foreach to make sure the
> different results from 2 machines are joined to get the final one as
> expected. If yes what does that step 2.
>
> Thanks
> Sachin
>
>
>
>
>
>
> On Fri, Dec 9, 2016 at 9:42 AM, Mathieu Fenniak <
> mathieu.fenniak@replicon.com> wrote:
>
> > Hi Sachin,
> >
> > Some quick answers, and a link to some documentation to read more:
> >
> > - If you restart the application, it will start from the point it crashed
> > (possibly reprocessing a small window of records).
> >
> > - You can run more than one instance of the application.  They'll
> > coordinate by virtue of being part of a Kafka consumer group; if one
> > crashes, the partitions that it was reading from will be picked up by
> other
> > instances.
> >
> > - When running more than one instance, the tasks will be distributed
> > between the instances.
> >
> > Confluent's docs on the Kafka Streams architecture goes into a lot more
> > detail: http://docs.confluent.io/3.0.0/streams/architecture.html
> >
> >
> >
> >
> > On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal <sj...@gmail.com>
> wrote:
> >
> > > Hi All,
> > > We were able to run a stream processing application against a fairly
> > decent
> > > load of messages in production environment.
> > >
> > > To make the system robust say the stream processing application
> crashes,
> > is
> > > there a way to make it auto start from the point when it crashed?
> > >
> > > Also is there any concept like running the same application in a
> cluster,
> > > where one fails, other takes over, until we bring back up the failed
> node
> > > of streams application.
> > >
> > > If yes, is there any guidelines or some knowledge base we can look at
> to
> > > understand how this would work.
> > >
> > > Is there way like in spark, where the driver program distributes the
> > tasks
> > > across various nodes in a cluster, is there something similar in kafka
> > > streaming too.
> > >
> > > Thanks
> > > Sachin
> > >
> >
>

Re: Running cluster of stream processing application

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
I followed the document and I have few questions.
Say I have a single partition input key topic and say I run 2 streams
application from machine1 and machine2.
Both the application have same application id are have identical code.
Say topic1 has messages like
(k1, v11)
(k1, v12)
(k1, v13)
(k2, v21)
(k2, v22)
(k2, v23)
When I was running single application I was getting results like
(k1, agg(v11, v12, v13))
(k2, agg(v21, v22, v23))

Now when 2 applications are run and say messages are read in round robin
fashion.
v11 v13 v22 - machine 1
v12 v21 v23 - machine 2

The aggregation at machine 1 would be
(k1, agg(v11, v13))
(k2, agg(v22))

The aggregation at machine 2 would be
(k1, agg(v12))
(k2, agg(v21, v23))

So now where do I join the independent results of these 2 aggregation to
get the final result as expected when single instance was running.

Note my high level dsl is sometime like
srcSTopic.aggragate(...).foreach(key, aggregation) {
    //process aggragated value and push it to some external storage
}

So I want this each to be running against the final set of aggregated
value. Do I need to add another step before foreach to make sure the
different results from 2 machines are joined to get the final one as
expected. If yes what does that step 2.

Thanks
Sachin






On Fri, Dec 9, 2016 at 9:42 AM, Mathieu Fenniak <
mathieu.fenniak@replicon.com> wrote:

> Hi Sachin,
>
> Some quick answers, and a link to some documentation to read more:
>
> - If you restart the application, it will start from the point it crashed
> (possibly reprocessing a small window of records).
>
> - You can run more than one instance of the application.  They'll
> coordinate by virtue of being part of a Kafka consumer group; if one
> crashes, the partitions that it was reading from will be picked up by other
> instances.
>
> - When running more than one instance, the tasks will be distributed
> between the instances.
>
> Confluent's docs on the Kafka Streams architecture goes into a lot more
> detail: http://docs.confluent.io/3.0.0/streams/architecture.html
>
>
>
>
> On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal <sj...@gmail.com> wrote:
>
> > Hi All,
> > We were able to run a stream processing application against a fairly
> decent
> > load of messages in production environment.
> >
> > To make the system robust say the stream processing application crashes,
> is
> > there a way to make it auto start from the point when it crashed?
> >
> > Also is there any concept like running the same application in a cluster,
> > where one fails, other takes over, until we bring back up the failed node
> > of streams application.
> >
> > If yes, is there any guidelines or some knowledge base we can look at to
> > understand how this would work.
> >
> > Is there way like in spark, where the driver program distributes the
> tasks
> > across various nodes in a cluster, is there something similar in kafka
> > streaming too.
> >
> > Thanks
> > Sachin
> >
>

Re: Running cluster of stream processing application

Posted by Mathieu Fenniak <ma...@replicon.com>.
Hi Sachin,

Some quick answers, and a link to some documentation to read more:

- If you restart the application, it will start from the point it crashed
(possibly reprocessing a small window of records).

- You can run more than one instance of the application.  They'll
coordinate by virtue of being part of a Kafka consumer group; if one
crashes, the partitions that it was reading from will be picked up by other
instances.

- When running more than one instance, the tasks will be distributed
between the instances.

Confluent's docs on the Kafka Streams architecture goes into a lot more
detail: http://docs.confluent.io/3.0.0/streams/architecture.html




On Thu, Dec 8, 2016 at 9:05 PM, Sachin Mittal <sj...@gmail.com> wrote:

> Hi All,
> We were able to run a stream processing application against a fairly decent
> load of messages in production environment.
>
> To make the system robust say the stream processing application crashes, is
> there a way to make it auto start from the point when it crashed?
>
> Also is there any concept like running the same application in a cluster,
> where one fails, other takes over, until we bring back up the failed node
> of streams application.
>
> If yes, is there any guidelines or some knowledge base we can look at to
> understand how this would work.
>
> Is there way like in spark, where the driver program distributes the tasks
> across various nodes in a cluster, is there something similar in kafka
> streaming too.
>
> Thanks
> Sachin
>