You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Joe Stein <cr...@gmail.com> on 2011/12/02 17:06:55 UTC

New Code Not Receiving Messages After Release Where Old Code Left Off

Thanks Jun, yup, that is how I understand it.  I just figured out the error
is a red herring to my problem since that is caught properly when logged in
ZookeeperConsumerConnector.commitOffsets.

it seems that the problem I am running into is this (any help is
appreciated)

one code base consumer =
apophis_starscream.site1.medialets.com-1322840963912-9c58256d (from the
logs)

so when i kill that consumer (on purpose) and start another consumer (new
code to pickup where the old code left off) in its place it seems to be a
different consumer (though I want it to start back where the other one
stopped)

new code = apophis_starscream.site1.medialets.com-1322840984781-5d58d658
(from the logs) says "stopping fetcher FetchRunnable-0 to host
starscream Received 0 messages"

but ... if I then start back up the code that registeres
as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it keeps
back up where it left off

I am using the exact same consumer.properties for both apps (the point here
is code re-write where my new code is not getting messages but my old code
is and I want to pickup where the old code was consuming messages).

so I guess the question is how to make my new consumer code "trick"
kafka/zk into thinking it is the old one so it can pick back up where the
old one left off or what else have folks done in this regard or how
should/could I handle this????

On Fri, Dec 2, 2011 at 10:39 AM, Jun Rao <ju...@gmail.com> wrote:

> Joe,
>
> It seems that the offset commit thread somehow gets interrupted while
> trying to write the offset to ZK. The consumer should still be able to
> consume in this case though. The offset commit thread runs in the
> background and is decoupled from the consumption logic.
>
> Thanks,
>
> Jun
>
> On Fri, Dec 2, 2011 at 12:01 AM, Joe Stein <cr...@gmail.com> wrote:
>
> > has anyone else gotten this error? I get it 100% of the time depending on
> > how I run my consumer
> >
> > what is weird is when i use the kafka/bin/kafka-consumer-shell.sh from
> > trunk the error shows up BUT the consumer keeps producing the messages
> >
> > if i take the ConsumerShell code and put it into my own program without
> any
> > modification the error comes up and everything is halted
> >
> > not sure if this is known or what but let me know, thanks!
> >
> > 2011-12-02 01:59:37,797 WARN Kafka-consumer-autocommit-0
> > kafka.consumer.ZookeeperConsumerConnector - exception during
> commitOffsets
> > org.I0Itec.zkclient.exception.ZkInterruptedException:
> > java.lang.InterruptedException
> > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687)
> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809)
> > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:102)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:237)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:234)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:660)
> > at
> >
> >
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> > at
> >
> >
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:592)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:234)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:232)
> > at
> >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> > at scala.collection.immutable.List.foreach(List.scala:45)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:232)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:220)
> > at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:100)
> > at kafka.utils.Utils$$anon$2.run(Utils.scala:58)
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> > at
> >
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> > at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> > at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
> > at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > at java.lang.Thread.run(Thread.java:619)
> > Caused by: java.lang.InterruptedException
> > at java.lang.Object.wait(Native Method)
> > at java.lang.Object.wait(Object.java:485)
> > at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1317)
> > at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1036)
> > at org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111)
> > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813)
> > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> >
> >
> > /*
> > Joe Stein
> > http://www.linkedin.com/in/charmalloc
> > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > */
> >
>



-- 

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
*/

Re: New Code Not Receiving Messages After Release Where Old Code Left Off

Posted by Jun Rao <ju...@gmail.com>.
Joe,

If you force kill a consumer, its registration in ZK may not go away until
ZK sessiontimeout time has passed. This means that a new consumer may not
be able to own a partition and therefore may not get events immediately.
Not sure if this is exactly your problem though.

Jun

On Fri, Dec 2, 2011 at 11:12 AM, Joe Stein <cr...@gmail.com> wrote:

> Thanks all for the help.
>
> I found a hack, if I put a Thread.sleep(1000) after I call my consumer then
> all goes well.
>
> I was able to reproduce this using the "ConsumerShell" exactly as it is
> when I take the code, and call it from another class with another main
> function and changing main to a new function.
>
> So if I run ConsumerShell it is fine, if I bundle it into another app and
> change main to a function then (without sleeping) it errors in this way
> because the ZKConsumerThread has some race condition where the latch is not
> set before my program exits...
>
> As I keep digging around I keep seeing a lot of overlap in what I am doing
> and some inconsistencies (like between  "ConsumerShell" and
> "ConsoleConsumer") to what I could/should be doing.
>
> I think the Scala client should handle these situations in a consistent way
> (I see no client yet ) and refactor then ConsumerShell and ConsoleConsumer
> to work   using the new "ClientConsumer"
>
> not sure how much can be done for a ClientConsumer and where implementation
> of the API bleeds into the implementation of an app but I think it is
> something that can be done.  I could be as simple as moving 90% of the
> ConsumerShell code into a ClientConsumer class and moving the latch around
> and then voila, Scala Client (and throw it into examples, et). I need to
> think about it some more.
>
> On Fri, Dec 2, 2011 at 12:07 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > >> "ConsumerConfig" and Predef.Map(topic -> partitions) are
> > exactly the same for both consumers
> >
> > Digging deeper, what are the contents of this map ?
> > How many brokers do you have ?
> > How many partitions do you have for each of the topics that you've listed
> > above ?
> >
> > If I understand correctly, you start only one consumer process at a time
> > right ?
> >
> > Thanks,
> > Neha
> >
> > On Fri, Dec 2, 2011 at 8:48 AM, Joe Stein <cr...@gmail.com> wrote:
> >
> > > >> 1. What is the difference between "Old code" and "New code" ? Have
> you
> > > >> changed any code in the latter case ?
> > >
> > > the code is exactly the same except: I changed the name of the package,
> > > class and function that is called and put that class into a new jar
> file
> > > calling the new named function in the new package, everything else
> > > contained is the same exact code.
> > >
> > > >> 2. What is the {topic, consumer threads} map given to both your
> > > consumers ?
> > >
> > > not sure I understand what you mean by "consumer threads" as far as the
> > map
> > > goes but I am passing the exact same topic and consumer.properties to
> > both
> > > consumers so my "ConsumerConfig" and Predef.Map(topic -> partitions)
> are
> > > exactly the same for both consumers
> > >
> > > On Fri, Dec 2, 2011 at 11:21 AM, Neha Narkhede <
> neha.narkhede@gmail.com
> > > >wrote:
> > >
> > > > Joe,
> > > >
> > > > A couple of questions -
> > > >
> > > > 1. What is the difference between "Old code" and "New code" ? Have
> you
> > > > changed any code in the latter case ?
> > > > 2. What is the {topic, consumer threads} map given to both your
> > > consumers ?
> > > >
> > > > >> but ... if I then start back up the code that registeres
> > > > as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it
> > keeps
> > > > back up where it left off
> > > >
> > > > When you start a new consumer, it will automatically get a unique id
> > > > assigned. However, that doesn't mean it will not pick up from where
> the
> > > > previous consumer thread left off.
> > > > If both your consumer thread ids "own" the same partitions, the 2nd
> one
> > > > should always start from the offset committed by the previous
> consumer
> > > > thread.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > > On Fri, Dec 2, 2011 at 8:06 AM, Joe Stein <cr...@gmail.com>
> wrote:
> > > >
> > > > > Thanks Jun, yup, that is how I understand it.  I just figured out
> the
> > > > error
> > > > > is a red herring to my problem since that is caught properly when
> > > logged
> > > > in
> > > > > ZookeeperConsumerConnector.commitOffsets.
> > > > >
> > > > > it seems that the problem I am running into is this (any help is
> > > > > appreciated)
> > > > >
> > > > > one code base consumer =
> > > > > apophis_starscream.site1.medialets.com-1322840963912-9c58256d (from
> > the
> > > > > logs)
> > > > >
> > > > > so when i kill that consumer (on purpose) and start another
> consumer
> > > (new
> > > > > code to pickup where the old code left off) in its place it seems
> to
> > > be a
> > > > > different consumer (though I want it to start back where the other
> > one
> > > > > stopped)
> > > > >
> > > > > new code =
> > > apophis_starscream.site1.medialets.com-1322840984781-5d58d658
> > > > > (from the logs) says "stopping fetcher FetchRunnable-0 to host
> > > > > starscream Received 0 messages"
> > > > >
> > > > > but ... if I then start back up the code that registeres
> > > > > as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it
> > > keeps
> > > > > back up where it left off
> > > > >
> > > > > I am using the exact same consumer.properties for both apps (the
> > point
> > > > here
> > > > > is code re-write where my new code is not getting messages but my
> old
> > > > code
> > > > > is and I want to pickup where the old code was consuming messages).
> > > > >
> > > > > so I guess the question is how to make my new consumer code "trick"
> > > > > kafka/zk into thinking it is the old one so it can pick back up
> where
> > > the
> > > > > old one left off or what else have folks done in this regard or how
> > > > > should/could I handle this????
> > > > >
> > > > > On Fri, Dec 2, 2011 at 10:39 AM, Jun Rao <ju...@gmail.com> wrote:
> > > > >
> > > > > > Joe,
> > > > > >
> > > > > > It seems that the offset commit thread somehow gets interrupted
> > while
> > > > > > trying to write the offset to ZK. The consumer should still be
> able
> > > to
> > > > > > consume in this case though. The offset commit thread runs in the
> > > > > > background and is decoupled from the consumption logic.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Fri, Dec 2, 2011 at 12:01 AM, Joe Stein <cr...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > has anyone else gotten this error? I get it 100% of the time
> > > > depending
> > > > > on
> > > > > > > how I run my consumer
> > > > > > >
> > > > > > > what is weird is when i use the
> kafka/bin/kafka-consumer-shell.sh
> > > > from
> > > > > > > trunk the error shows up BUT the consumer keeps producing the
> > > > messages
> > > > > > >
> > > > > > > if i take the ConsumerShell code and put it into my own program
> > > > without
> > > > > > any
> > > > > > > modification the error comes up and everything is halted
> > > > > > >
> > > > > > > not sure if this is known or what but let me know, thanks!
> > > > > > >
> > > > > > > 2011-12-02 01:59:37,797 WARN Kafka-consumer-autocommit-0
> > > > > > > kafka.consumer.ZookeeperConsumerConnector - exception during
> > > > > > commitOffsets
> > > > > > > org.I0Itec.zkclient.exception.ZkInterruptedException:
> > > > > > > java.lang.InterruptedException
> > > > > > > at
> > > > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687)
> > > > > > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809)
> > > > > > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> > > > > > > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:102)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:237)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:234)
> > > > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:660)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
> > > > > > > at
> > > scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:592)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:234)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:232)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> > > > > > > at scala.collection.immutable.List.foreach(List.scala:45)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:232)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:220)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:100)
> > > > > > > at kafka.utils.Utils$$anon$2.run(Utils.scala:58)
> > > > > > > at
> > > > > >
> > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> > > > > > > at
> > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > > > > > > at java.lang.Thread.run(Thread.java:619)
> > > > > > > Caused by: java.lang.InterruptedException
> > > > > > > at java.lang.Object.wait(Native Method)
> > > > > > > at java.lang.Object.wait(Object.java:485)
> > > > > > > at
> > > > org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1317)
> > > > > > > at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1036)
> > > > > > > at
> > > org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111)
> > > > > > > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813)
> > > > > > > at
> > > > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> > > > > > >
> > > > > > >
> > > > > > > /*
> > > > > > > Joe Stein
> > > > > > > http://www.linkedin.com/in/charmalloc
> > > > > > > Twitter: @allthingshadoop <
> > http://www.twitter.com/allthingshadoop>
> > > > > > > */
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > /*
> > > > > Joe Stein
> > > > > http://www.linkedin.com/in/charmalloc
> > > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > > > */
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > >
> > > /*
> > > Joe Stein
> > > http://www.linkedin.com/in/charmalloc
> > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > */
> > >
> >
>
>
>
> --
>
> /*
> Joe Stein
> http://www.linkedin.com/in/charmalloc
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> */
>

Re: New Code Not Receiving Messages After Release Where Old Code Left Off

Posted by Joe Stein <cr...@gmail.com>.
Thanks all for the help.

I found a hack, if I put a Thread.sleep(1000) after I call my consumer then
all goes well.

I was able to reproduce this using the "ConsumerShell" exactly as it is
when I take the code, and call it from another class with another main
function and changing main to a new function.

So if I run ConsumerShell it is fine, if I bundle it into another app and
change main to a function then (without sleeping) it errors in this way
because the ZKConsumerThread has some race condition where the latch is not
set before my program exits...

As I keep digging around I keep seeing a lot of overlap in what I am doing
and some inconsistencies (like between  "ConsumerShell" and
"ConsoleConsumer") to what I could/should be doing.

I think the Scala client should handle these situations in a consistent way
(I see no client yet ) and refactor then ConsumerShell and ConsoleConsumer
to work   using the new "ClientConsumer"

not sure how much can be done for a ClientConsumer and where implementation
of the API bleeds into the implementation of an app but I think it is
something that can be done.  I could be as simple as moving 90% of the
ConsumerShell code into a ClientConsumer class and moving the latch around
and then voila, Scala Client (and throw it into examples, et). I need to
think about it some more.

On Fri, Dec 2, 2011 at 12:07 PM, Neha Narkhede <ne...@gmail.com>wrote:

> >> "ConsumerConfig" and Predef.Map(topic -> partitions) are
> exactly the same for both consumers
>
> Digging deeper, what are the contents of this map ?
> How many brokers do you have ?
> How many partitions do you have for each of the topics that you've listed
> above ?
>
> If I understand correctly, you start only one consumer process at a time
> right ?
>
> Thanks,
> Neha
>
> On Fri, Dec 2, 2011 at 8:48 AM, Joe Stein <cr...@gmail.com> wrote:
>
> > >> 1. What is the difference between "Old code" and "New code" ? Have you
> > >> changed any code in the latter case ?
> >
> > the code is exactly the same except: I changed the name of the package,
> > class and function that is called and put that class into a new jar file
> > calling the new named function in the new package, everything else
> > contained is the same exact code.
> >
> > >> 2. What is the {topic, consumer threads} map given to both your
> > consumers ?
> >
> > not sure I understand what you mean by "consumer threads" as far as the
> map
> > goes but I am passing the exact same topic and consumer.properties to
> both
> > consumers so my "ConsumerConfig" and Predef.Map(topic -> partitions) are
> > exactly the same for both consumers
> >
> > On Fri, Dec 2, 2011 at 11:21 AM, Neha Narkhede <neha.narkhede@gmail.com
> > >wrote:
> >
> > > Joe,
> > >
> > > A couple of questions -
> > >
> > > 1. What is the difference between "Old code" and "New code" ? Have you
> > > changed any code in the latter case ?
> > > 2. What is the {topic, consumer threads} map given to both your
> > consumers ?
> > >
> > > >> but ... if I then start back up the code that registeres
> > > as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it
> keeps
> > > back up where it left off
> > >
> > > When you start a new consumer, it will automatically get a unique id
> > > assigned. However, that doesn't mean it will not pick up from where the
> > > previous consumer thread left off.
> > > If both your consumer thread ids "own" the same partitions, the 2nd one
> > > should always start from the offset committed by the previous consumer
> > > thread.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Fri, Dec 2, 2011 at 8:06 AM, Joe Stein <cr...@gmail.com> wrote:
> > >
> > > > Thanks Jun, yup, that is how I understand it.  I just figured out the
> > > error
> > > > is a red herring to my problem since that is caught properly when
> > logged
> > > in
> > > > ZookeeperConsumerConnector.commitOffsets.
> > > >
> > > > it seems that the problem I am running into is this (any help is
> > > > appreciated)
> > > >
> > > > one code base consumer =
> > > > apophis_starscream.site1.medialets.com-1322840963912-9c58256d (from
> the
> > > > logs)
> > > >
> > > > so when i kill that consumer (on purpose) and start another consumer
> > (new
> > > > code to pickup where the old code left off) in its place it seems to
> > be a
> > > > different consumer (though I want it to start back where the other
> one
> > > > stopped)
> > > >
> > > > new code =
> > apophis_starscream.site1.medialets.com-1322840984781-5d58d658
> > > > (from the logs) says "stopping fetcher FetchRunnable-0 to host
> > > > starscream Received 0 messages"
> > > >
> > > > but ... if I then start back up the code that registeres
> > > > as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it
> > keeps
> > > > back up where it left off
> > > >
> > > > I am using the exact same consumer.properties for both apps (the
> point
> > > here
> > > > is code re-write where my new code is not getting messages but my old
> > > code
> > > > is and I want to pickup where the old code was consuming messages).
> > > >
> > > > so I guess the question is how to make my new consumer code "trick"
> > > > kafka/zk into thinking it is the old one so it can pick back up where
> > the
> > > > old one left off or what else have folks done in this regard or how
> > > > should/could I handle this????
> > > >
> > > > On Fri, Dec 2, 2011 at 10:39 AM, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > > > Joe,
> > > > >
> > > > > It seems that the offset commit thread somehow gets interrupted
> while
> > > > > trying to write the offset to ZK. The consumer should still be able
> > to
> > > > > consume in this case though. The offset commit thread runs in the
> > > > > background and is decoupled from the consumption logic.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Fri, Dec 2, 2011 at 12:01 AM, Joe Stein <cr...@gmail.com>
> > wrote:
> > > > >
> > > > > > has anyone else gotten this error? I get it 100% of the time
> > > depending
> > > > on
> > > > > > how I run my consumer
> > > > > >
> > > > > > what is weird is when i use the kafka/bin/kafka-consumer-shell.sh
> > > from
> > > > > > trunk the error shows up BUT the consumer keeps producing the
> > > messages
> > > > > >
> > > > > > if i take the ConsumerShell code and put it into my own program
> > > without
> > > > > any
> > > > > > modification the error comes up and everything is halted
> > > > > >
> > > > > > not sure if this is known or what but let me know, thanks!
> > > > > >
> > > > > > 2011-12-02 01:59:37,797 WARN Kafka-consumer-autocommit-0
> > > > > > kafka.consumer.ZookeeperConsumerConnector - exception during
> > > > > commitOffsets
> > > > > > org.I0Itec.zkclient.exception.ZkInterruptedException:
> > > > > > java.lang.InterruptedException
> > > > > > at
> > > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687)
> > > > > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809)
> > > > > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> > > > > > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:102)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:237)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:234)
> > > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:660)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
> > > > > > at
> > scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:592)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:234)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:232)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> > > > > > at scala.collection.immutable.List.foreach(List.scala:45)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:232)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:220)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:100)
> > > > > > at kafka.utils.Utils$$anon$2.run(Utils.scala:58)
> > > > > > at
> > > > >
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> > > > > > at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > > > > > at java.lang.Thread.run(Thread.java:619)
> > > > > > Caused by: java.lang.InterruptedException
> > > > > > at java.lang.Object.wait(Native Method)
> > > > > > at java.lang.Object.wait(Object.java:485)
> > > > > > at
> > > org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1317)
> > > > > > at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1036)
> > > > > > at
> > org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111)
> > > > > > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813)
> > > > > > at
> > > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> > > > > >
> > > > > >
> > > > > > /*
> > > > > > Joe Stein
> > > > > > http://www.linkedin.com/in/charmalloc
> > > > > > Twitter: @allthingshadoop <
> http://www.twitter.com/allthingshadoop>
> > > > > > */
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > /*
> > > > Joe Stein
> > > > http://www.linkedin.com/in/charmalloc
> > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > > */
> > > >
> > >
> >
> >
> >
> > --
> >
> > /*
> > Joe Stein
> > http://www.linkedin.com/in/charmalloc
> > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > */
> >
>



-- 

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
*/

Re: New Code Not Receiving Messages After Release Where Old Code Left Off

Posted by Neha Narkhede <ne...@gmail.com>.
>> "ConsumerConfig" and Predef.Map(topic -> partitions) are
exactly the same for both consumers

Digging deeper, what are the contents of this map ?
How many brokers do you have ?
How many partitions do you have for each of the topics that you've listed
above ?

If I understand correctly, you start only one consumer process at a time
right ?

Thanks,
Neha

On Fri, Dec 2, 2011 at 8:48 AM, Joe Stein <cr...@gmail.com> wrote:

> >> 1. What is the difference between "Old code" and "New code" ? Have you
> >> changed any code in the latter case ?
>
> the code is exactly the same except: I changed the name of the package,
> class and function that is called and put that class into a new jar file
> calling the new named function in the new package, everything else
> contained is the same exact code.
>
> >> 2. What is the {topic, consumer threads} map given to both your
> consumers ?
>
> not sure I understand what you mean by "consumer threads" as far as the map
> goes but I am passing the exact same topic and consumer.properties to both
> consumers so my "ConsumerConfig" and Predef.Map(topic -> partitions) are
> exactly the same for both consumers
>
> On Fri, Dec 2, 2011 at 11:21 AM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > Joe,
> >
> > A couple of questions -
> >
> > 1. What is the difference between "Old code" and "New code" ? Have you
> > changed any code in the latter case ?
> > 2. What is the {topic, consumer threads} map given to both your
> consumers ?
> >
> > >> but ... if I then start back up the code that registeres
> > as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it keeps
> > back up where it left off
> >
> > When you start a new consumer, it will automatically get a unique id
> > assigned. However, that doesn't mean it will not pick up from where the
> > previous consumer thread left off.
> > If both your consumer thread ids "own" the same partitions, the 2nd one
> > should always start from the offset committed by the previous consumer
> > thread.
> >
> > Thanks,
> > Neha
> >
> > On Fri, Dec 2, 2011 at 8:06 AM, Joe Stein <cr...@gmail.com> wrote:
> >
> > > Thanks Jun, yup, that is how I understand it.  I just figured out the
> > error
> > > is a red herring to my problem since that is caught properly when
> logged
> > in
> > > ZookeeperConsumerConnector.commitOffsets.
> > >
> > > it seems that the problem I am running into is this (any help is
> > > appreciated)
> > >
> > > one code base consumer =
> > > apophis_starscream.site1.medialets.com-1322840963912-9c58256d (from the
> > > logs)
> > >
> > > so when i kill that consumer (on purpose) and start another consumer
> (new
> > > code to pickup where the old code left off) in its place it seems to
> be a
> > > different consumer (though I want it to start back where the other one
> > > stopped)
> > >
> > > new code =
> apophis_starscream.site1.medialets.com-1322840984781-5d58d658
> > > (from the logs) says "stopping fetcher FetchRunnable-0 to host
> > > starscream Received 0 messages"
> > >
> > > but ... if I then start back up the code that registeres
> > > as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it
> keeps
> > > back up where it left off
> > >
> > > I am using the exact same consumer.properties for both apps (the point
> > here
> > > is code re-write where my new code is not getting messages but my old
> > code
> > > is and I want to pickup where the old code was consuming messages).
> > >
> > > so I guess the question is how to make my new consumer code "trick"
> > > kafka/zk into thinking it is the old one so it can pick back up where
> the
> > > old one left off or what else have folks done in this regard or how
> > > should/could I handle this????
> > >
> > > On Fri, Dec 2, 2011 at 10:39 AM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > Joe,
> > > >
> > > > It seems that the offset commit thread somehow gets interrupted while
> > > > trying to write the offset to ZK. The consumer should still be able
> to
> > > > consume in this case though. The offset commit thread runs in the
> > > > background and is decoupled from the consumption logic.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Fri, Dec 2, 2011 at 12:01 AM, Joe Stein <cr...@gmail.com>
> wrote:
> > > >
> > > > > has anyone else gotten this error? I get it 100% of the time
> > depending
> > > on
> > > > > how I run my consumer
> > > > >
> > > > > what is weird is when i use the kafka/bin/kafka-consumer-shell.sh
> > from
> > > > > trunk the error shows up BUT the consumer keeps producing the
> > messages
> > > > >
> > > > > if i take the ConsumerShell code and put it into my own program
> > without
> > > > any
> > > > > modification the error comes up and everything is halted
> > > > >
> > > > > not sure if this is known or what but let me know, thanks!
> > > > >
> > > > > 2011-12-02 01:59:37,797 WARN Kafka-consumer-autocommit-0
> > > > > kafka.consumer.ZookeeperConsumerConnector - exception during
> > > > commitOffsets
> > > > > org.I0Itec.zkclient.exception.ZkInterruptedException:
> > > > > java.lang.InterruptedException
> > > > > at
> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687)
> > > > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809)
> > > > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> > > > > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:102)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:237)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:234)
> > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:660)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
> > > > > at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:592)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:234)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:232)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> > > > > at scala.collection.immutable.List.foreach(List.scala:45)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:232)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:220)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:100)
> > > > > at kafka.utils.Utils$$anon$2.run(Utils.scala:58)
> > > > > at
> > > >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> > > > > at
> > > > >
> > > >
> > >
> >
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> > > > > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > > > > at java.lang.Thread.run(Thread.java:619)
> > > > > Caused by: java.lang.InterruptedException
> > > > > at java.lang.Object.wait(Native Method)
> > > > > at java.lang.Object.wait(Object.java:485)
> > > > > at
> > org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1317)
> > > > > at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1036)
> > > > > at
> org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111)
> > > > > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813)
> > > > > at
> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> > > > >
> > > > >
> > > > > /*
> > > > > Joe Stein
> > > > > http://www.linkedin.com/in/charmalloc
> > > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > > > */
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > >
> > > /*
> > > Joe Stein
> > > http://www.linkedin.com/in/charmalloc
> > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > */
> > >
> >
>
>
>
> --
>
> /*
> Joe Stein
> http://www.linkedin.com/in/charmalloc
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> */
>

Re: New Code Not Receiving Messages After Release Where Old Code Left Off

Posted by Joe Stein <cr...@gmail.com>.
>> 1. What is the difference between "Old code" and "New code" ? Have you
>> changed any code in the latter case ?

the code is exactly the same except: I changed the name of the package,
class and function that is called and put that class into a new jar file
calling the new named function in the new package, everything else
contained is the same exact code.

>> 2. What is the {topic, consumer threads} map given to both your
consumers ?

not sure I understand what you mean by "consumer threads" as far as the map
goes but I am passing the exact same topic and consumer.properties to both
consumers so my "ConsumerConfig" and Predef.Map(topic -> partitions) are
exactly the same for both consumers

On Fri, Dec 2, 2011 at 11:21 AM, Neha Narkhede <ne...@gmail.com>wrote:

> Joe,
>
> A couple of questions -
>
> 1. What is the difference between "Old code" and "New code" ? Have you
> changed any code in the latter case ?
> 2. What is the {topic, consumer threads} map given to both your consumers ?
>
> >> but ... if I then start back up the code that registeres
> as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it keeps
> back up where it left off
>
> When you start a new consumer, it will automatically get a unique id
> assigned. However, that doesn't mean it will not pick up from where the
> previous consumer thread left off.
> If both your consumer thread ids "own" the same partitions, the 2nd one
> should always start from the offset committed by the previous consumer
> thread.
>
> Thanks,
> Neha
>
> On Fri, Dec 2, 2011 at 8:06 AM, Joe Stein <cr...@gmail.com> wrote:
>
> > Thanks Jun, yup, that is how I understand it.  I just figured out the
> error
> > is a red herring to my problem since that is caught properly when logged
> in
> > ZookeeperConsumerConnector.commitOffsets.
> >
> > it seems that the problem I am running into is this (any help is
> > appreciated)
> >
> > one code base consumer =
> > apophis_starscream.site1.medialets.com-1322840963912-9c58256d (from the
> > logs)
> >
> > so when i kill that consumer (on purpose) and start another consumer (new
> > code to pickup where the old code left off) in its place it seems to be a
> > different consumer (though I want it to start back where the other one
> > stopped)
> >
> > new code = apophis_starscream.site1.medialets.com-1322840984781-5d58d658
> > (from the logs) says "stopping fetcher FetchRunnable-0 to host
> > starscream Received 0 messages"
> >
> > but ... if I then start back up the code that registeres
> > as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it keeps
> > back up where it left off
> >
> > I am using the exact same consumer.properties for both apps (the point
> here
> > is code re-write where my new code is not getting messages but my old
> code
> > is and I want to pickup where the old code was consuming messages).
> >
> > so I guess the question is how to make my new consumer code "trick"
> > kafka/zk into thinking it is the old one so it can pick back up where the
> > old one left off or what else have folks done in this regard or how
> > should/could I handle this????
> >
> > On Fri, Dec 2, 2011 at 10:39 AM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Joe,
> > >
> > > It seems that the offset commit thread somehow gets interrupted while
> > > trying to write the offset to ZK. The consumer should still be able to
> > > consume in this case though. The offset commit thread runs in the
> > > background and is decoupled from the consumption logic.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Dec 2, 2011 at 12:01 AM, Joe Stein <cr...@gmail.com> wrote:
> > >
> > > > has anyone else gotten this error? I get it 100% of the time
> depending
> > on
> > > > how I run my consumer
> > > >
> > > > what is weird is when i use the kafka/bin/kafka-consumer-shell.sh
> from
> > > > trunk the error shows up BUT the consumer keeps producing the
> messages
> > > >
> > > > if i take the ConsumerShell code and put it into my own program
> without
> > > any
> > > > modification the error comes up and everything is halted
> > > >
> > > > not sure if this is known or what but let me know, thanks!
> > > >
> > > > 2011-12-02 01:59:37,797 WARN Kafka-consumer-autocommit-0
> > > > kafka.consumer.ZookeeperConsumerConnector - exception during
> > > commitOffsets
> > > > org.I0Itec.zkclient.exception.ZkInterruptedException:
> > > > java.lang.InterruptedException
> > > > at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687)
> > > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809)
> > > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> > > > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:102)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:237)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:234)
> > > > at scala.collection.Iterator$class.foreach(Iterator.scala:660)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
> > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:592)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:234)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:232)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> > > > at scala.collection.immutable.List.foreach(List.scala:45)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:232)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:220)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:100)
> > > > at kafka.utils.Utils$$anon$2.run(Utils.scala:58)
> > > > at
> > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> > > > at
> > > >
> > >
> >
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> > > > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> > > > at
> > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> > > > at
> > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
> > > > at
> > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
> > > > at
> > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > > > at
> > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > > > at java.lang.Thread.run(Thread.java:619)
> > > > Caused by: java.lang.InterruptedException
> > > > at java.lang.Object.wait(Native Method)
> > > > at java.lang.Object.wait(Object.java:485)
> > > > at
> org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1317)
> > > > at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1036)
> > > > at org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111)
> > > > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813)
> > > > at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> > > >
> > > >
> > > > /*
> > > > Joe Stein
> > > > http://www.linkedin.com/in/charmalloc
> > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > > */
> > > >
> > >
> >
> >
> >
> > --
> >
> > /*
> > Joe Stein
> > http://www.linkedin.com/in/charmalloc
> > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > */
> >
>



-- 

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
*/

Re: New Code Not Receiving Messages After Release Where Old Code Left Off

Posted by Neha Narkhede <ne...@gmail.com>.
Joe,

A couple of questions -

1. What is the difference between "Old code" and "New code" ? Have you
changed any code in the latter case ?
2. What is the {topic, consumer threads} map given to both your consumers ?

>> but ... if I then start back up the code that registeres
as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it keeps
back up where it left off

When you start a new consumer, it will automatically get a unique id
assigned. However, that doesn't mean it will not pick up from where the
previous consumer thread left off.
If both your consumer thread ids "own" the same partitions, the 2nd one
should always start from the offset committed by the previous consumer
thread.

Thanks,
Neha

On Fri, Dec 2, 2011 at 8:06 AM, Joe Stein <cr...@gmail.com> wrote:

> Thanks Jun, yup, that is how I understand it.  I just figured out the error
> is a red herring to my problem since that is caught properly when logged in
> ZookeeperConsumerConnector.commitOffsets.
>
> it seems that the problem I am running into is this (any help is
> appreciated)
>
> one code base consumer =
> apophis_starscream.site1.medialets.com-1322840963912-9c58256d (from the
> logs)
>
> so when i kill that consumer (on purpose) and start another consumer (new
> code to pickup where the old code left off) in its place it seems to be a
> different consumer (though I want it to start back where the other one
> stopped)
>
> new code = apophis_starscream.site1.medialets.com-1322840984781-5d58d658
> (from the logs) says "stopping fetcher FetchRunnable-0 to host
> starscream Received 0 messages"
>
> but ... if I then start back up the code that registeres
> as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it keeps
> back up where it left off
>
> I am using the exact same consumer.properties for both apps (the point here
> is code re-write where my new code is not getting messages but my old code
> is and I want to pickup where the old code was consuming messages).
>
> so I guess the question is how to make my new consumer code "trick"
> kafka/zk into thinking it is the old one so it can pick back up where the
> old one left off or what else have folks done in this regard or how
> should/could I handle this????
>
> On Fri, Dec 2, 2011 at 10:39 AM, Jun Rao <ju...@gmail.com> wrote:
>
> > Joe,
> >
> > It seems that the offset commit thread somehow gets interrupted while
> > trying to write the offset to ZK. The consumer should still be able to
> > consume in this case though. The offset commit thread runs in the
> > background and is decoupled from the consumption logic.
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Dec 2, 2011 at 12:01 AM, Joe Stein <cr...@gmail.com> wrote:
> >
> > > has anyone else gotten this error? I get it 100% of the time depending
> on
> > > how I run my consumer
> > >
> > > what is weird is when i use the kafka/bin/kafka-consumer-shell.sh from
> > > trunk the error shows up BUT the consumer keeps producing the messages
> > >
> > > if i take the ConsumerShell code and put it into my own program without
> > any
> > > modification the error comes up and everything is halted
> > >
> > > not sure if this is known or what but let me know, thanks!
> > >
> > > 2011-12-02 01:59:37,797 WARN Kafka-consumer-autocommit-0
> > > kafka.consumer.ZookeeperConsumerConnector - exception during
> > commitOffsets
> > > org.I0Itec.zkclient.exception.ZkInterruptedException:
> > > java.lang.InterruptedException
> > > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687)
> > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809)
> > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> > > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:102)
> > > at
> > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:237)
> > > at
> > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:234)
> > > at scala.collection.Iterator$class.foreach(Iterator.scala:660)
> > > at
> > >
> > >
> >
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
> > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> > > at
> > >
> > >
> >
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:592)
> > > at
> > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:234)
> > > at
> > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:232)
> > > at
> > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> > > at scala.collection.immutable.List.foreach(List.scala:45)
> > > at
> > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:232)
> > > at
> > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:220)
> > > at
> > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:100)
> > > at kafka.utils.Utils$$anon$2.run(Utils.scala:58)
> > > at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> > > at
> > >
> >
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> > > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> > > at
> > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> > > at
> > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
> > > at
> > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
> > > at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > > at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > > at java.lang.Thread.run(Thread.java:619)
> > > Caused by: java.lang.InterruptedException
> > > at java.lang.Object.wait(Native Method)
> > > at java.lang.Object.wait(Object.java:485)
> > > at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1317)
> > > at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1036)
> > > at org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111)
> > > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813)
> > > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> > >
> > >
> > > /*
> > > Joe Stein
> > > http://www.linkedin.com/in/charmalloc
> > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > */
> > >
> >
>
>
>
> --
>
> /*
> Joe Stein
> http://www.linkedin.com/in/charmalloc
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> */
>