You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jason Rosenberg <jb...@squareup.com> on 2013/10/04 20:20:57 UTC

testing issue with reliable sending

All,

I'm having an issue with an integration test I've setup.  This is using
0.8-beta1.

The test is to verify that no messages are dropped (or the sender gets an
exception thrown back if failure), while doing a rolling restart of a
cluster of 2 brokers.

The producer is configured to use 'request.required.acks' = '1'.

The servers are set up to run locally on localhost, on different ports, and
different data dirs.  The producer connects with a metadata brokerlist
like:  "localhost:2024,localhost:1025" (so no vip).   The servers are set
up with a default replication factor of 2.  The servers have controlled
shutdown enabled, as well.

The producer code looks like this:
    ...
    Producer<Integer, T> producer = getProducer();
    try {
      KeyedMessage<Integer, T> msg = new KeyedMessage<Integer, T>(topic,
message);
      producer.send(msg);
      return true;
    } catch (RuntimeException e) {
      logger.warn("Error sending data to kafka", e);
      return false;
    }
    ...

The test sends groups of messages at each stage of the test (e.g. servers
up, first server going down, first server coming up, second server going
down, etc.).  Then a consumer connects and consumes all the messages, to
make sure they all arrived ok.

It seems intermittently, a single message gets dropped, right after one of
the servers starts going down.  It doesn't happen always, seems to happen 1
out of every 20 test runs or so.  Here's some sample output.  I see the
exception inside the producer code, but I don't see the producer.send
method ever having an exception thrown back out to the caller (the log line
"Error sending data to kafka" is never triggered).

What's interesting, is that it looks like the exceptions are happening on
message 3, but when the consumer subsequently consumes back all the
messages in the broker cluster, it seems message 2 (and not message 3) is
missing:

...
...
7136 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 3, message: 98
7150 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 3, message: 99
7163 [Thread-2] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Shutting down server2
7163 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 4, message: 0
7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer  - Shutting
down KafkaServer
7176 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 4, message: 1
7189 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 4, message: 2
7203 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 4, message: 3
7394 [kafka-request-handler-5] WARN state.change.logger  - Broker
1946108683 received update metadata request with correlation id 7 from an
old controller 178709090 with epoch 2. Latest known controller epoch is 3
7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis  -
[KafkaApi-1946108683] error when handling request
Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0]
->
(LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026
kafka.common.ControllerMovedException: Broker 1946108683 received update
metadata request with correlation id 7 from an old controller 178709090
with epoch 2. Latest known controller epoch is 3
at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114)
at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:724)
8039 [Controller-178709090-to-broker-178709090-send-thread] WARN
kafka.controller.RequestSendThread  -
[Controller-178709090-to-broker-178709090-send-thread], Controller
178709090 fails to send a request to broker 178709090
java.nio.channels.AsynchronousCloseException
at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)
at kafka.utils.Utils$.read(Utils.scala:394)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer  - Shut down
complete for KafkaServer
17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler  - Failed to
send producer request with correlation id 810 to broker 178709090 with data
for partitions [test-topic,0]
java.net.SocketTimeoutException
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
at kafka.utils.Utils$.read(Utils.scala:394)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)
at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)
at
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
at kafka.producer.Producer.send(Producer.scala:74)
at kafka.javaapi.producer.Producer.send(Producer.scala:32)
at
com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
at
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
at java.lang.Thread.run(Thread.java:724)
17319 [Thread-1] ERROR kafka.producer.SyncProducer  - Producer connection
to localhost:1026 unsuccessful
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
at kafka.utils.Utils$.swallow(Utils.scala:186)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
at kafka.producer.Producer.send(Producer.scala:74)
at kafka.javaapi.producer.Producer.send(Producer.scala:32)
at
com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
at
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
at java.lang.Thread.run(Thread.java:724)
17322 [Thread-1] WARN kafka.client.ClientUtils$  - Fetching topic metadata
with correlation id 811 for topics [Set(test-topic)] from broker
[id:1,host:localhost,port:1026] failed
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
at kafka.utils.Utils$.swallow(Utils.scala:186)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
at kafka.producer.Producer.send(Producer.scala:74)
at kafka.javaapi.producer.Producer.send(Producer.scala:32)
at
com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
at
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
at java.lang.Thread.run(Thread.java:724)
17340 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 4, message: 4
17353 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 4, message: 5
17365 [Thread-1] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Sending message: test-stage: 4, message: 6

...
...
23410 [Thread-28] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Received msg 'test-stage: 3, message: 98'
23410 [Thread-28] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Received msg 'test-stage: 3, message: 99'
23410 [Thread-28] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Received msg 'test-stage: 4, message: 0'
23410 [Thread-28] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Received msg 'test-stage: 4, message: 1'
23410 [Thread-28] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Received msg 'test-stage: 4, message: 3'
23411 [Thread-28] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Received msg 'test-stage: 4, message: 4'
23411 [Thread-28] INFO
com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
Received msg 'test-stage: 4, message: 5'

Re: testing issue with reliable sending

Posted by Jun Rao <ju...@gmail.com>.
Jason,

As Neha said, what you said is possible, but may require a more careful
design. For example, what if the followers don't catch up with the leader
quickly? Do we want to wait forever or up to some configurable amount of
time? If we do the latter, we may still lose data during controlled
shutdown.

Thanks,

Jun



On Sun, Oct 6, 2013 at 9:14 PM, Jason Rosenberg <jb...@squareup.com> wrote:

> Thanks Neha for continued insight....
>
> What you describe as a possible solution is what I was thinking (although I
> wasn't as concerned as maybe I should be with the added delay of the new
> leader delaying processing new requests while it finishes consuming from
> the old leader, and communicates back and forth to complete the leader
> hand-off).
>
> E.g., isn't the definition of a broker being in the ISR that it is keeping
> itself up to date with the leader of the ISR (within an allowed replication
> lag)?  So, it should be possible to elect a new leader, have it buffer
> incoming requests while it finishes replicating everything from the old
> leader (which it should complete within an allowed replication lag
> timeout), and then start acking any buffered requests.
>
> I guess this buffering period would be akin to the leader 'unavailability'
> window, but in reality, it is just a delay (and shouldn't be much more than
> the replication lag timeout).  The producing client can decide to timeout
> the request if it's taking too long, and retry it (that's normal anyway if
> a producer fails to get an ack, etc.).
>
> So, as long as the old leader atomically starts rejecting incoming requests
> at the time it relinquishes leadership, then producer requests will fail
> fast, initiate a new meta data request to find the new leader, and continue
> on with the new leader (possibly after a bit of a replication catch up
> delay).
>
> The old leader can then proceed with shutdown after the new leader has
> caught up (which it will signal with an RPC).
>
> I realize there are all sorts of edge cases here, but it seems there should
> be a way to make it work.
>
> I guess I'm willing to allow a bit of an 'unavailability' delay, rather
> than have messages silently acked then lost during a controlled
> shutdown/new leader election.
>
> My hunch is, in the steady state, when leadership is stable and brokers
> aren't being shutdown, the performance benefit of being able to use
> request.required.acks=1 (instead of -1), far outweighs any momentary
> performance blip during a leader availability delay during a leadership
> change (which should be fully recoverable and retryable by a concerned
> producer client).
>
> Now, of course, if I want to guard against a hard-shutdown, then that's a
> whole different ball of wax!
>
> Jason
>
>
> On Sun, Oct 6, 2013 at 4:30 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > Ok, so if I initiate a controlled shutdown, in which all partitions that
> a
> > shutting down broker is leader of get transferred to another broker, why
> > can't part of that controlled transfer of leadership include ISR
> > synchronization, such that no data is lost?  Is there a fundamental
> reason
> > why that is not possible?  Is it it worth filing a Jira for a feature
> > request?  Or is it technically not possible?
> >
> > It is not as straightforward as it seems and it will slow down the shut
> > down operation furthermore (currently several zookeeper writes already
> slow
> > it down) and also increase the leader unavailability window. But keeping
> > the performance degradation aside, it is tricky since in order to stop
> > "new" data from coming in, we need to move the leaders off of the current
> > leader (broker being shutdown) onto some other follower. Now moving the
> > leader means some other follower will become the leader and as part of
> that
> > will stop copying existing data from the old leader and will start
> > receiving new data. What you are asking for is to insert some kind of
> > "wait" before the new follower becomes the leader so that the consumption
> > of messages is "done". What is the definition of "done" ? This is only
> > dictated by the log end offset of the old leader and will have to be
> > included in the new leader transition state change request by the
> > controller. So this means an extra RPC between the controller and the
> other
> > brokers as part of the leader transition. Also there is no guarantee that
> > the other followers are alive and consuming, so how long does the broker
> > being shutdown wait ? Since it is no longer the leader, it technically
> > cannot kick followers out of the ISR, so ISR handling is another thing
> that
> > becomes tricky here.
> >
> > Also, this sort of catching up on the last few messages is not limited to
> > just controlled shutdown, but you can extend it to any other leader
> > transition. So special casing it does not make much sense. This "wait"
> > combined with the extra RPC will mean extending the leader unavailability
> > window even further than what we have today.
> >
> > So this is fair amount of work to make sure last few messages are
> > replicated to all followers. Instead what we do is to simplify the leader
> > transition and let the clients handle the retries for requests that have
> > not made it to the desired number of replicas, which is configurable.
> >
> > We can discuss that in a JIRA if that helps. May be other committers have
> > more ideas.
> >
> > Thanks,
> > Neha
> >
> >
> > On Sun, Oct 6, 2013 at 10:08 AM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> >
> > > On Sun, Oct 6, 2013 at 4:08 AM, Neha Narkhede <neha.narkhede@gmail.com
> > > >wrote:
> > >
> > > > Does the
> > > > leader just wait for the followers in the ISR to consume?
> > > >
> > > > That's right. Until that is done, the producer does not get an ack
> > back.
> > > It
> > > > has an option of retrying if the previous request times out or fails.
> > > >
> > > >
> > > Ok, so if I initiate a controlled shutdown, in which all partitions
> that
> > a
> > > shutting down broker is leader of get transferred to another broker,
> why
> > > can't part of that controlled transfer of leadership include ISR
> > > synchronization, such that no data is lost?  Is there a fundamental
> > reason
> > > why that is not possible?  Is it it worth filing a Jira for a feature
> > > request?  Or is it technically not possible?
> > >
> > > I'm ok with losing data in this case during a hard-shutdown, but for a
> > > controlled shutdown, I'm wondering if there's at least a best effort
> > > attempt to be made to sync all writes to the ISR.
> > >
> > > Jason
> > >
> >
>

Re: testing issue with reliable sending

Posted by Jason Rosenberg <jb...@squareup.com>.
Thanks Neha for continued insight....

What you describe as a possible solution is what I was thinking (although I
wasn't as concerned as maybe I should be with the added delay of the new
leader delaying processing new requests while it finishes consuming from
the old leader, and communicates back and forth to complete the leader
hand-off).

E.g., isn't the definition of a broker being in the ISR that it is keeping
itself up to date with the leader of the ISR (within an allowed replication
lag)?  So, it should be possible to elect a new leader, have it buffer
incoming requests while it finishes replicating everything from the old
leader (which it should complete within an allowed replication lag
timeout), and then start acking any buffered requests.

I guess this buffering period would be akin to the leader 'unavailability'
window, but in reality, it is just a delay (and shouldn't be much more than
the replication lag timeout).  The producing client can decide to timeout
the request if it's taking too long, and retry it (that's normal anyway if
a producer fails to get an ack, etc.).

So, as long as the old leader atomically starts rejecting incoming requests
at the time it relinquishes leadership, then producer requests will fail
fast, initiate a new meta data request to find the new leader, and continue
on with the new leader (possibly after a bit of a replication catch up
delay).

The old leader can then proceed with shutdown after the new leader has
caught up (which it will signal with an RPC).

I realize there are all sorts of edge cases here, but it seems there should
be a way to make it work.

I guess I'm willing to allow a bit of an 'unavailability' delay, rather
than have messages silently acked then lost during a controlled
shutdown/new leader election.

My hunch is, in the steady state, when leadership is stable and brokers
aren't being shutdown, the performance benefit of being able to use
request.required.acks=1 (instead of -1), far outweighs any momentary
performance blip during a leader availability delay during a leadership
change (which should be fully recoverable and retryable by a concerned
producer client).

Now, of course, if I want to guard against a hard-shutdown, then that's a
whole different ball of wax!

Jason


On Sun, Oct 6, 2013 at 4:30 PM, Neha Narkhede <ne...@gmail.com>wrote:

> Ok, so if I initiate a controlled shutdown, in which all partitions that a
> shutting down broker is leader of get transferred to another broker, why
> can't part of that controlled transfer of leadership include ISR
> synchronization, such that no data is lost?  Is there a fundamental reason
> why that is not possible?  Is it it worth filing a Jira for a feature
> request?  Or is it technically not possible?
>
> It is not as straightforward as it seems and it will slow down the shut
> down operation furthermore (currently several zookeeper writes already slow
> it down) and also increase the leader unavailability window. But keeping
> the performance degradation aside, it is tricky since in order to stop
> "new" data from coming in, we need to move the leaders off of the current
> leader (broker being shutdown) onto some other follower. Now moving the
> leader means some other follower will become the leader and as part of that
> will stop copying existing data from the old leader and will start
> receiving new data. What you are asking for is to insert some kind of
> "wait" before the new follower becomes the leader so that the consumption
> of messages is "done". What is the definition of "done" ? This is only
> dictated by the log end offset of the old leader and will have to be
> included in the new leader transition state change request by the
> controller. So this means an extra RPC between the controller and the other
> brokers as part of the leader transition. Also there is no guarantee that
> the other followers are alive and consuming, so how long does the broker
> being shutdown wait ? Since it is no longer the leader, it technically
> cannot kick followers out of the ISR, so ISR handling is another thing that
> becomes tricky here.
>
> Also, this sort of catching up on the last few messages is not limited to
> just controlled shutdown, but you can extend it to any other leader
> transition. So special casing it does not make much sense. This "wait"
> combined with the extra RPC will mean extending the leader unavailability
> window even further than what we have today.
>
> So this is fair amount of work to make sure last few messages are
> replicated to all followers. Instead what we do is to simplify the leader
> transition and let the clients handle the retries for requests that have
> not made it to the desired number of replicas, which is configurable.
>
> We can discuss that in a JIRA if that helps. May be other committers have
> more ideas.
>
> Thanks,
> Neha
>
>
> On Sun, Oct 6, 2013 at 10:08 AM, Jason Rosenberg <jb...@squareup.com> wrote:
>
> > On Sun, Oct 6, 2013 at 4:08 AM, Neha Narkhede <neha.narkhede@gmail.com
> > >wrote:
> >
> > > Does the
> > > leader just wait for the followers in the ISR to consume?
> > >
> > > That's right. Until that is done, the producer does not get an ack
> back.
> > It
> > > has an option of retrying if the previous request times out or fails.
> > >
> > >
> > Ok, so if I initiate a controlled shutdown, in which all partitions that
> a
> > shutting down broker is leader of get transferred to another broker, why
> > can't part of that controlled transfer of leadership include ISR
> > synchronization, such that no data is lost?  Is there a fundamental
> reason
> > why that is not possible?  Is it it worth filing a Jira for a feature
> > request?  Or is it technically not possible?
> >
> > I'm ok with losing data in this case during a hard-shutdown, but for a
> > controlled shutdown, I'm wondering if there's at least a best effort
> > attempt to be made to sync all writes to the ISR.
> >
> > Jason
> >
>

Re: testing issue with reliable sending

Posted by Neha Narkhede <ne...@gmail.com>.
Ok, so if I initiate a controlled shutdown, in which all partitions that a
shutting down broker is leader of get transferred to another broker, why
can't part of that controlled transfer of leadership include ISR
synchronization, such that no data is lost?  Is there a fundamental reason
why that is not possible?  Is it it worth filing a Jira for a feature
request?  Or is it technically not possible?

It is not as straightforward as it seems and it will slow down the shut
down operation furthermore (currently several zookeeper writes already slow
it down) and also increase the leader unavailability window. But keeping
the performance degradation aside, it is tricky since in order to stop
"new" data from coming in, we need to move the leaders off of the current
leader (broker being shutdown) onto some other follower. Now moving the
leader means some other follower will become the leader and as part of that
will stop copying existing data from the old leader and will start
receiving new data. What you are asking for is to insert some kind of
"wait" before the new follower becomes the leader so that the consumption
of messages is "done". What is the definition of "done" ? This is only
dictated by the log end offset of the old leader and will have to be
included in the new leader transition state change request by the
controller. So this means an extra RPC between the controller and the other
brokers as part of the leader transition. Also there is no guarantee that
the other followers are alive and consuming, so how long does the broker
being shutdown wait ? Since it is no longer the leader, it technically
cannot kick followers out of the ISR, so ISR handling is another thing that
becomes tricky here.

Also, this sort of catching up on the last few messages is not limited to
just controlled shutdown, but you can extend it to any other leader
transition. So special casing it does not make much sense. This "wait"
combined with the extra RPC will mean extending the leader unavailability
window even further than what we have today.

So this is fair amount of work to make sure last few messages are
replicated to all followers. Instead what we do is to simplify the leader
transition and let the clients handle the retries for requests that have
not made it to the desired number of replicas, which is configurable.

We can discuss that in a JIRA if that helps. May be other committers have
more ideas.

Thanks,
Neha


On Sun, Oct 6, 2013 at 10:08 AM, Jason Rosenberg <jb...@squareup.com> wrote:

> On Sun, Oct 6, 2013 at 4:08 AM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > Does the
> > leader just wait for the followers in the ISR to consume?
> >
> > That's right. Until that is done, the producer does not get an ack back.
> It
> > has an option of retrying if the previous request times out or fails.
> >
> >
> Ok, so if I initiate a controlled shutdown, in which all partitions that a
> shutting down broker is leader of get transferred to another broker, why
> can't part of that controlled transfer of leadership include ISR
> synchronization, such that no data is lost?  Is there a fundamental reason
> why that is not possible?  Is it it worth filing a Jira for a feature
> request?  Or is it technically not possible?
>
> I'm ok with losing data in this case during a hard-shutdown, but for a
> controlled shutdown, I'm wondering if there's at least a best effort
> attempt to be made to sync all writes to the ISR.
>
> Jason
>

Re: testing issue with reliable sending

Posted by Jason Rosenberg <jb...@squareup.com>.
On Sun, Oct 6, 2013 at 4:08 AM, Neha Narkhede <ne...@gmail.com>wrote:

> Does the
> leader just wait for the followers in the ISR to consume?
>
> That's right. Until that is done, the producer does not get an ack back. It
> has an option of retrying if the previous request times out or fails.
>
>
Ok, so if I initiate a controlled shutdown, in which all partitions that a
shutting down broker is leader of get transferred to another broker, why
can't part of that controlled transfer of leadership include ISR
synchronization, such that no data is lost?  Is there a fundamental reason
why that is not possible?  Is it it worth filing a Jira for a feature
request?  Or is it technically not possible?

I'm ok with losing data in this case during a hard-shutdown, but for a
controlled shutdown, I'm wondering if there's at least a best effort
attempt to be made to sync all writes to the ISR.

Jason

Re: testing issue with reliable sending

Posted by Neha Narkhede <ne...@gmail.com>.
Does the
leader just wait for the followers in the ISR to consume?

That's right. Until that is done, the producer does not get an ack back. It
has an option of retrying if the previous request times out or fails.

A separate question, can the request.required.acks be set to a higher
positive integer, say "2", to indicate that 2 of say 3 replicas have acked?

Yes that's possible.

Thanks,
Neha
On Oct 5, 2013 10:18 AM, "Jason Rosenberg" <jb...@squareup.com> wrote:

> Thanks for the explanation Neha.....still holding out hope.....
>
> So, if request.required.acks=-1, how does the leader confirm that the other
> brokers have consumed the message, before acking to the producer?  Does the
> leader just wait for the followers in the ISR to consume?  Or does the
> leader have a way to push, or ping the followers to consume?
>
> Couldn't that mechanism be used, during a clean shutdown, even if the
> messages were initially produced with acks=1? That is, when shutting down,
> get acks from all ISR members for each partition, before shutting down.
>
> I'm just a bit leery about using -1 across the board, because of the
> performance hit (but for now it seems the best option to use for reliable
> sending).
>
> A separate question, can the request.required.acks be set to a higher
> positive integer, say "2", to indicate that 2 of say 3 replicas have acked?
>  ("request.required.acks" in the name would seem to indicate this).  I'm
> not saying I'd want to use this (we are hoping to use only a replication
> factor of 2).
>
> Jason
>
>
> On Sat, Oct 5, 2013 at 1:00 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > Shouldn't this be part of the contract?  It should be able to make sure
> > this happens before shutting down, no?
> >
> > The leader writes messages to its local log and then the replicas consume
> > messages from the leader and write those to their local logs. If you set
> > request.required.acks=1, the ack is sent to the producer only after the
> > leader has written messages to its local log. What you are asking for, is
> > part of the contract if request.required.acks=-1.
> >
> > In this case, if we need to use
> > required.request.acks=-1, that will pretty much prevent any successful
> > message producing while any of the brokers for a partition is
> unavailable.
> >  So, I don't think that's an option.  (Not to mention the performance
> > degradation).
> >
> > You can implement reliable delivery semantics while allowing rolling
> > restart of brokers by setting request.required.acks=-1. When one of the
> > replicas is shut down, the ISR reduces to remove the replica being shut
> > down and the messages will be committed using the new ISR.
> >
> > Thanks,
> > Neha
> >
> >
> > On Fri, Oct 4, 2013 at 11:51 PM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> >
> > > Neha,
> > >
> > > I'm not sure I understand.  I would have thought that if the leader
> > > acknowledges receipt of a message, and is then shut down cleanly (with
> > > controlled shutdown enabled), that it would be able to reliably persist
> > any
> > > in memory buffered messages (and replicate them), before shutting down.
> > >  Shouldn't this be part of the contract?  It should be able to make
> sure
> > > this happens before shutting down, no?
> > >
> > > I would understand a message dropped if it were a hard shutdown.
> > >
> > > I'm not sure then how to implement reliable delivery semantics, while
> > > allowing a rolling restart of the broker cluster (or even to tolerate a
> > > single node failure, where one node might be down for awhile and need
> to
> > be
> > > replaced or have a disk repaired).  In this case, if we need to use
> > > required.request.acks=-1, that will pretty much prevent any successful
> > > message producing while any of the brokers for a partition is
> > unavailable.
> > >  So, I don't think that's an option.  (Not to mention the performance
> > > degradation).
> > >
> > > Is there not a way to make this work more reliably with leader only
> > > acknowledgment, and clean/controlled shutdown?
> > >
> > > My test does succeed, as expected, with acks = -1, at least for the 100
> > or
> > > so iterations I've let it run so far.  It does on occasion send
> > duplicates
> > > (but that's ok with me).
> > >
> > > Jason
> > >
> > >
> > > On Fri, Oct 4, 2013 at 6:38 PM, Neha Narkhede <neha.narkhede@gmail.com
> > > >wrote:
> > >
> > > > The occasional single message loss could happen since
> > > > required.request.acks=1 and the leader is shut down before the
> follower
> > > > gets a chance to copy the message. Can you try your test with num
> acks
> > > set
> > > > to -1 ?
> > > >
> > > > Thanks,
> > > > Neha
> > > > On Oct 4, 2013 1:21 PM, "Jason Rosenberg" <jb...@squareup.com> wrote:
> > > >
> > > > > All,
> > > > >
> > > > > I'm having an issue with an integration test I've setup.  This is
> > using
> > > > > 0.8-beta1.
> > > > >
> > > > > The test is to verify that no messages are dropped (or the sender
> > gets
> > > an
> > > > > exception thrown back if failure), while doing a rolling restart
> of a
> > > > > cluster of 2 brokers.
> > > > >
> > > > > The producer is configured to use 'request.required.acks' = '1'.
> > > > >
> > > > > The servers are set up to run locally on localhost, on different
> > ports,
> > > > and
> > > > > different data dirs.  The producer connects with a metadata
> > brokerlist
> > > > > like:  "localhost:2024,localhost:1025" (so no vip).   The servers
> are
> > > set
> > > > > up with a default replication factor of 2.  The servers have
> > controlled
> > > > > shutdown enabled, as well.
> > > > >
> > > > > The producer code looks like this:
> > > > >     ...
> > > > >     Producer<Integer, T> producer = getProducer();
> > > > >     try {
> > > > >       KeyedMessage<Integer, T> msg = new KeyedMessage<Integer,
> > > T>(topic,
> > > > > message);
> > > > >       producer.send(msg);
> > > > >       return true;
> > > > >     } catch (RuntimeException e) {
> > > > >       logger.warn("Error sending data to kafka", e);
> > > > >       return false;
> > > > >     }
> > > > >     ...
> > > > >
> > > > > The test sends groups of messages at each stage of the test (e.g.
> > > servers
> > > > > up, first server going down, first server coming up, second server
> > > going
> > > > > down, etc.).  Then a consumer connects and consumes all the
> messages,
> > > to
> > > > > make sure they all arrived ok.
> > > > >
> > > > > It seems intermittently, a single message gets dropped, right after
> > one
> > > > of
> > > > > the servers starts going down.  It doesn't happen always, seems to
> > > > happen 1
> > > > > out of every 20 test runs or so.  Here's some sample output.  I see
> > the
> > > > > exception inside the producer code, but I don't see the
> producer.send
> > > > > method ever having an exception thrown back out to the caller (the
> > log
> > > > line
> > > > > "Error sending data to kafka" is never triggered).
> > > > >
> > > > > What's interesting, is that it looks like the exceptions are
> > happening
> > > on
> > > > > message 3, but when the consumer subsequently consumes back all the
> > > > > messages in the broker cluster, it seems message 2 (and not message
> > 3)
> > > is
> > > > > missing:
> > > > >
> > > > > ...
> > > > > ...
> > > > > 7136 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 3, message: 98
> > > > > 7150 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 3, message: 99
> > > > > 7163 [Thread-2] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Shutting down server2
> > > > > 7163 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 4, message: 0
> > > > > 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer  -
> > > > Shutting
> > > > > down KafkaServer
> > > > > 7176 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 4, message: 1
> > > > > 7189 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 4, message: 2
> > > > > 7203 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 4, message: 3
> > > > > 7394 [kafka-request-handler-5] WARN state.change.logger  - Broker
> > > > > 1946108683 received update metadata request with correlation id 7
> > from
> > > an
> > > > > old controller 178709090 with epoch 2. Latest known controller
> epoch
> > > is 3
> > > > > 7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis  -
> > > > > [KafkaApi-1946108683] error when handling request
> > > > >
> > > > >
> > > >
> > >
> >
> Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0]
> > > > > ->
> > > > >
> > > > >
> > > >
> > >
> >
> (LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026
> > > > > kafka.common.ControllerMovedException: Broker 1946108683 received
> > > update
> > > > > metadata request with correlation id 7 from an old controller
> > 178709090
> > > > > with epoch 2. Latest known controller epoch is 3
> > > > > at
> > > >
> kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114)
> > > > > at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
> > > > > at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> > > > > at java.lang.Thread.run(Thread.java:724)
> > > > > 8039 [Controller-178709090-to-broker-178709090-send-thread] WARN
> > > > > kafka.controller.RequestSendThread  -
> > > > > [Controller-178709090-to-broker-178709090-send-thread], Controller
> > > > > 178709090 fails to send a request to broker 178709090
> > > > > java.nio.channels.AsynchronousCloseException
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
> > > > > at
> > > > >
> > > >
> > >
> >
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)
> > > > > at kafka.utils.Utils$.read(Utils.scala:394)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > > > at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127)
> > > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > > 8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer  -
> Shut
> > > > down
> > > > > complete for KafkaServer
> > > > > 17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler  -
> > > Failed
> > > > to
> > > > > send producer request with correlation id 810 to broker 178709090
> > with
> > > > data
> > > > > for partitions [test-topic,0]
> > > > > java.net.SocketTimeoutException
> > > > > at
> > > >
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226)
> > > > > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> > > > > at
> > > > >
> > > >
> > >
> >
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> > > > > at kafka.utils.Utils$.read(Utils.scala:394)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > > > at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> > > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
> > > > > at
> > > > >
> > >
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> > > > > at
> > > > >
> > >
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> > > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)
> > > > > at
> > > > >
> > > >
> > >
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> > > > > at
> > > > >
> > > >
> > >
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > > > at
> > > >
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> > > > > at
> > > > >
> > > >
> > >
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> > > > > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> > > > > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > > > > at kafka.producer.Producer.send(Producer.scala:74)
> > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> > > > > at java.lang.Thread.run(Thread.java:724)
> > > > > 17319 [Thread-1] ERROR kafka.producer.SyncProducer  - Producer
> > > connection
> > > > > to localhost:1026 unsuccessful
> > > > > java.net.ConnectException: Connection refused
> > > > > at sun.nio.ch.Net.connect0(Native Method)
> > > > > at sun.nio.ch.Net.connect(Net.java:465)
> > > > > at sun.nio.ch.Net.connect(Net.java:457)
> > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > > > at
> > > >
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > > > at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > > > at
> > > > >
> > > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > > > at kafka.producer.Producer.send(Producer.scala:74)
> > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> > > > > at java.lang.Thread.run(Thread.java:724)
> > > > > 17322 [Thread-1] WARN kafka.client.ClientUtils$  - Fetching topic
> > > > metadata
> > > > > with correlation id 811 for topics [Set(test-topic)] from broker
> > > > > [id:1,host:localhost,port:1026] failed
> > > > > java.net.ConnectException: Connection refused
> > > > > at sun.nio.ch.Net.connect0(Native Method)
> > > > > at sun.nio.ch.Net.connect(Net.java:465)
> > > > > at sun.nio.ch.Net.connect(Net.java:457)
> > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > > > at
> > > >
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > > > at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > > > at
> > > > >
> > > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > > > at kafka.producer.Producer.send(Producer.scala:74)
> > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> > > > > at java.lang.Thread.run(Thread.java:724)
> > > > > 17340 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 4, message: 4
> > > > > 17353 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 4, message: 5
> > > > > 17365 [Thread-1] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Sending message: test-stage: 4, message: 6
> > > > >
> > > > > ...
> > > > > ...
> > > > > 23410 [Thread-28] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Received msg 'test-stage: 3, message: 98'
> > > > > 23410 [Thread-28] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Received msg 'test-stage: 3, message: 99'
> > > > > 23410 [Thread-28] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Received msg 'test-stage: 4, message: 0'
> > > > > 23410 [Thread-28] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Received msg 'test-stage: 4, message: 1'
> > > > > 23410 [Thread-28] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Received msg 'test-stage: 4, message: 3'
> > > > > 23411 [Thread-28] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Received msg 'test-stage: 4, message: 4'
> > > > > 23411 [Thread-28] INFO
> > > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
>  -
> > > > > Received msg 'test-stage: 4, message: 5'
> > > > >
> > > >
> > >
> >
>

Re: testing issue with reliable sending

Posted by Jason Rosenberg <jb...@squareup.com>.
Thanks for the explanation Neha.....still holding out hope.....

So, if request.required.acks=-1, how does the leader confirm that the other
brokers have consumed the message, before acking to the producer?  Does the
leader just wait for the followers in the ISR to consume?  Or does the
leader have a way to push, or ping the followers to consume?

Couldn't that mechanism be used, during a clean shutdown, even if the
messages were initially produced with acks=1? That is, when shutting down,
get acks from all ISR members for each partition, before shutting down.

I'm just a bit leery about using -1 across the board, because of the
performance hit (but for now it seems the best option to use for reliable
sending).

A separate question, can the request.required.acks be set to a higher
positive integer, say "2", to indicate that 2 of say 3 replicas have acked?
 ("request.required.acks" in the name would seem to indicate this).  I'm
not saying I'd want to use this (we are hoping to use only a replication
factor of 2).

Jason


On Sat, Oct 5, 2013 at 1:00 PM, Neha Narkhede <ne...@gmail.com>wrote:

> Shouldn't this be part of the contract?  It should be able to make sure
> this happens before shutting down, no?
>
> The leader writes messages to its local log and then the replicas consume
> messages from the leader and write those to their local logs. If you set
> request.required.acks=1, the ack is sent to the producer only after the
> leader has written messages to its local log. What you are asking for, is
> part of the contract if request.required.acks=-1.
>
> In this case, if we need to use
> required.request.acks=-1, that will pretty much prevent any successful
> message producing while any of the brokers for a partition is unavailable.
>  So, I don't think that's an option.  (Not to mention the performance
> degradation).
>
> You can implement reliable delivery semantics while allowing rolling
> restart of brokers by setting request.required.acks=-1. When one of the
> replicas is shut down, the ISR reduces to remove the replica being shut
> down and the messages will be committed using the new ISR.
>
> Thanks,
> Neha
>
>
> On Fri, Oct 4, 2013 at 11:51 PM, Jason Rosenberg <jb...@squareup.com> wrote:
>
> > Neha,
> >
> > I'm not sure I understand.  I would have thought that if the leader
> > acknowledges receipt of a message, and is then shut down cleanly (with
> > controlled shutdown enabled), that it would be able to reliably persist
> any
> > in memory buffered messages (and replicate them), before shutting down.
> >  Shouldn't this be part of the contract?  It should be able to make sure
> > this happens before shutting down, no?
> >
> > I would understand a message dropped if it were a hard shutdown.
> >
> > I'm not sure then how to implement reliable delivery semantics, while
> > allowing a rolling restart of the broker cluster (or even to tolerate a
> > single node failure, where one node might be down for awhile and need to
> be
> > replaced or have a disk repaired).  In this case, if we need to use
> > required.request.acks=-1, that will pretty much prevent any successful
> > message producing while any of the brokers for a partition is
> unavailable.
> >  So, I don't think that's an option.  (Not to mention the performance
> > degradation).
> >
> > Is there not a way to make this work more reliably with leader only
> > acknowledgment, and clean/controlled shutdown?
> >
> > My test does succeed, as expected, with acks = -1, at least for the 100
> or
> > so iterations I've let it run so far.  It does on occasion send
> duplicates
> > (but that's ok with me).
> >
> > Jason
> >
> >
> > On Fri, Oct 4, 2013 at 6:38 PM, Neha Narkhede <neha.narkhede@gmail.com
> > >wrote:
> >
> > > The occasional single message loss could happen since
> > > required.request.acks=1 and the leader is shut down before the follower
> > > gets a chance to copy the message. Can you try your test with num acks
> > set
> > > to -1 ?
> > >
> > > Thanks,
> > > Neha
> > > On Oct 4, 2013 1:21 PM, "Jason Rosenberg" <jb...@squareup.com> wrote:
> > >
> > > > All,
> > > >
> > > > I'm having an issue with an integration test I've setup.  This is
> using
> > > > 0.8-beta1.
> > > >
> > > > The test is to verify that no messages are dropped (or the sender
> gets
> > an
> > > > exception thrown back if failure), while doing a rolling restart of a
> > > > cluster of 2 brokers.
> > > >
> > > > The producer is configured to use 'request.required.acks' = '1'.
> > > >
> > > > The servers are set up to run locally on localhost, on different
> ports,
> > > and
> > > > different data dirs.  The producer connects with a metadata
> brokerlist
> > > > like:  "localhost:2024,localhost:1025" (so no vip).   The servers are
> > set
> > > > up with a default replication factor of 2.  The servers have
> controlled
> > > > shutdown enabled, as well.
> > > >
> > > > The producer code looks like this:
> > > >     ...
> > > >     Producer<Integer, T> producer = getProducer();
> > > >     try {
> > > >       KeyedMessage<Integer, T> msg = new KeyedMessage<Integer,
> > T>(topic,
> > > > message);
> > > >       producer.send(msg);
> > > >       return true;
> > > >     } catch (RuntimeException e) {
> > > >       logger.warn("Error sending data to kafka", e);
> > > >       return false;
> > > >     }
> > > >     ...
> > > >
> > > > The test sends groups of messages at each stage of the test (e.g.
> > servers
> > > > up, first server going down, first server coming up, second server
> > going
> > > > down, etc.).  Then a consumer connects and consumes all the messages,
> > to
> > > > make sure they all arrived ok.
> > > >
> > > > It seems intermittently, a single message gets dropped, right after
> one
> > > of
> > > > the servers starts going down.  It doesn't happen always, seems to
> > > happen 1
> > > > out of every 20 test runs or so.  Here's some sample output.  I see
> the
> > > > exception inside the producer code, but I don't see the producer.send
> > > > method ever having an exception thrown back out to the caller (the
> log
> > > line
> > > > "Error sending data to kafka" is never triggered).
> > > >
> > > > What's interesting, is that it looks like the exceptions are
> happening
> > on
> > > > message 3, but when the consumer subsequently consumes back all the
> > > > messages in the broker cluster, it seems message 2 (and not message
> 3)
> > is
> > > > missing:
> > > >
> > > > ...
> > > > ...
> > > > 7136 [Thread-1] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Sending message: test-stage: 3, message: 98
> > > > 7150 [Thread-1] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Sending message: test-stage: 3, message: 99
> > > > 7163 [Thread-2] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Shutting down server2
> > > > 7163 [Thread-1] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Sending message: test-stage: 4, message: 0
> > > > 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer  -
> > > Shutting
> > > > down KafkaServer
> > > > 7176 [Thread-1] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Sending message: test-stage: 4, message: 1
> > > > 7189 [Thread-1] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Sending message: test-stage: 4, message: 2
> > > > 7203 [Thread-1] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Sending message: test-stage: 4, message: 3
> > > > 7394 [kafka-request-handler-5] WARN state.change.logger  - Broker
> > > > 1946108683 received update metadata request with correlation id 7
> from
> > an
> > > > old controller 178709090 with epoch 2. Latest known controller epoch
> > is 3
> > > > 7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis  -
> > > > [KafkaApi-1946108683] error when handling request
> > > >
> > > >
> > >
> >
> Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0]
> > > > ->
> > > >
> > > >
> > >
> >
> (LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026
> > > > kafka.common.ControllerMovedException: Broker 1946108683 received
> > update
> > > > metadata request with correlation id 7 from an old controller
> 178709090
> > > > with epoch 2. Latest known controller epoch is 3
> > > > at
> > > kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114)
> > > > at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
> > > > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> > > > at java.lang.Thread.run(Thread.java:724)
> > > > 8039 [Controller-178709090-to-broker-178709090-send-thread] WARN
> > > > kafka.controller.RequestSendThread  -
> > > > [Controller-178709090-to-broker-178709090-send-thread], Controller
> > > > 178709090 fails to send a request to broker 178709090
> > > > java.nio.channels.AsynchronousCloseException
> > > > at
> > > >
> > > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
> > > > at
> > > >
> > >
> >
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)
> > > > at kafka.utils.Utils$.read(Utils.scala:394)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127)
> > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > 8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer  - Shut
> > > down
> > > > complete for KafkaServer
> > > > 17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler  -
> > Failed
> > > to
> > > > send producer request with correlation id 810 to broker 178709090
> with
> > > data
> > > > for partitions [test-topic,0]
> > > > java.net.SocketTimeoutException
> > > > at
> > > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226)
> > > > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> > > > at
> > > >
> > >
> >
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> > > > at kafka.utils.Utils$.read(Utils.scala:394)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
> > > > at
> > > >
> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> > > > at
> > > >
> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)
> > > > at
> > > >
> > >
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> > > > at
> > > >
> > >
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > > at
> > > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> > > > at
> > > >
> > >
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> > > > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> > > > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > > > at kafka.producer.Producer.send(Producer.scala:74)
> > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > > at
> > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> > > > at
> > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> > > > at java.lang.Thread.run(Thread.java:724)
> > > > 17319 [Thread-1] ERROR kafka.producer.SyncProducer  - Producer
> > connection
> > > > to localhost:1026 unsuccessful
> > > > java.net.ConnectException: Connection refused
> > > > at sun.nio.ch.Net.connect0(Native Method)
> > > > at sun.nio.ch.Net.connect(Net.java:465)
> > > > at sun.nio.ch.Net.connect(Net.java:457)
> > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > > at
> > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > > at
> > > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > > at kafka.producer.Producer.send(Producer.scala:74)
> > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > > at
> > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> > > > at
> > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> > > > at java.lang.Thread.run(Thread.java:724)
> > > > 17322 [Thread-1] WARN kafka.client.ClientUtils$  - Fetching topic
> > > metadata
> > > > with correlation id 811 for topics [Set(test-topic)] from broker
> > > > [id:1,host:localhost,port:1026] failed
> > > > java.net.ConnectException: Connection refused
> > > > at sun.nio.ch.Net.connect0(Native Method)
> > > > at sun.nio.ch.Net.connect(Net.java:465)
> > > > at sun.nio.ch.Net.connect(Net.java:457)
> > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > > at
> > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > > at
> > > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > > at kafka.producer.Producer.send(Producer.scala:74)
> > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > > at
> > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> > > > at
> > > >
> > > >
> > >
> >
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> > > > at java.lang.Thread.run(Thread.java:724)
> > > > 17340 [Thread-1] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Sending message: test-stage: 4, message: 4
> > > > 17353 [Thread-1] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Sending message: test-stage: 4, message: 5
> > > > 17365 [Thread-1] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Sending message: test-stage: 4, message: 6
> > > >
> > > > ...
> > > > ...
> > > > 23410 [Thread-28] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Received msg 'test-stage: 3, message: 98'
> > > > 23410 [Thread-28] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Received msg 'test-stage: 3, message: 99'
> > > > 23410 [Thread-28] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Received msg 'test-stage: 4, message: 0'
> > > > 23410 [Thread-28] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Received msg 'test-stage: 4, message: 1'
> > > > 23410 [Thread-28] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Received msg 'test-stage: 4, message: 3'
> > > > 23411 [Thread-28] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Received msg 'test-stage: 4, message: 4'
> > > > 23411 [Thread-28] INFO
> > > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > > Received msg 'test-stage: 4, message: 5'
> > > >
> > >
> >
>

Re: testing issue with reliable sending

Posted by Neha Narkhede <ne...@gmail.com>.
Shouldn't this be part of the contract?  It should be able to make sure
this happens before shutting down, no?

The leader writes messages to its local log and then the replicas consume
messages from the leader and write those to their local logs. If you set
request.required.acks=1, the ack is sent to the producer only after the
leader has written messages to its local log. What you are asking for, is
part of the contract if request.required.acks=-1.

In this case, if we need to use
required.request.acks=-1, that will pretty much prevent any successful
message producing while any of the brokers for a partition is unavailable.
 So, I don't think that's an option.  (Not to mention the performance
degradation).

You can implement reliable delivery semantics while allowing rolling
restart of brokers by setting request.required.acks=-1. When one of the
replicas is shut down, the ISR reduces to remove the replica being shut
down and the messages will be committed using the new ISR.

Thanks,
Neha


On Fri, Oct 4, 2013 at 11:51 PM, Jason Rosenberg <jb...@squareup.com> wrote:

> Neha,
>
> I'm not sure I understand.  I would have thought that if the leader
> acknowledges receipt of a message, and is then shut down cleanly (with
> controlled shutdown enabled), that it would be able to reliably persist any
> in memory buffered messages (and replicate them), before shutting down.
>  Shouldn't this be part of the contract?  It should be able to make sure
> this happens before shutting down, no?
>
> I would understand a message dropped if it were a hard shutdown.
>
> I'm not sure then how to implement reliable delivery semantics, while
> allowing a rolling restart of the broker cluster (or even to tolerate a
> single node failure, where one node might be down for awhile and need to be
> replaced or have a disk repaired).  In this case, if we need to use
> required.request.acks=-1, that will pretty much prevent any successful
> message producing while any of the brokers for a partition is unavailable.
>  So, I don't think that's an option.  (Not to mention the performance
> degradation).
>
> Is there not a way to make this work more reliably with leader only
> acknowledgment, and clean/controlled shutdown?
>
> My test does succeed, as expected, with acks = -1, at least for the 100 or
> so iterations I've let it run so far.  It does on occasion send duplicates
> (but that's ok with me).
>
> Jason
>
>
> On Fri, Oct 4, 2013 at 6:38 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > The occasional single message loss could happen since
> > required.request.acks=1 and the leader is shut down before the follower
> > gets a chance to copy the message. Can you try your test with num acks
> set
> > to -1 ?
> >
> > Thanks,
> > Neha
> > On Oct 4, 2013 1:21 PM, "Jason Rosenberg" <jb...@squareup.com> wrote:
> >
> > > All,
> > >
> > > I'm having an issue with an integration test I've setup.  This is using
> > > 0.8-beta1.
> > >
> > > The test is to verify that no messages are dropped (or the sender gets
> an
> > > exception thrown back if failure), while doing a rolling restart of a
> > > cluster of 2 brokers.
> > >
> > > The producer is configured to use 'request.required.acks' = '1'.
> > >
> > > The servers are set up to run locally on localhost, on different ports,
> > and
> > > different data dirs.  The producer connects with a metadata brokerlist
> > > like:  "localhost:2024,localhost:1025" (so no vip).   The servers are
> set
> > > up with a default replication factor of 2.  The servers have controlled
> > > shutdown enabled, as well.
> > >
> > > The producer code looks like this:
> > >     ...
> > >     Producer<Integer, T> producer = getProducer();
> > >     try {
> > >       KeyedMessage<Integer, T> msg = new KeyedMessage<Integer,
> T>(topic,
> > > message);
> > >       producer.send(msg);
> > >       return true;
> > >     } catch (RuntimeException e) {
> > >       logger.warn("Error sending data to kafka", e);
> > >       return false;
> > >     }
> > >     ...
> > >
> > > The test sends groups of messages at each stage of the test (e.g.
> servers
> > > up, first server going down, first server coming up, second server
> going
> > > down, etc.).  Then a consumer connects and consumes all the messages,
> to
> > > make sure they all arrived ok.
> > >
> > > It seems intermittently, a single message gets dropped, right after one
> > of
> > > the servers starts going down.  It doesn't happen always, seems to
> > happen 1
> > > out of every 20 test runs or so.  Here's some sample output.  I see the
> > > exception inside the producer code, but I don't see the producer.send
> > > method ever having an exception thrown back out to the caller (the log
> > line
> > > "Error sending data to kafka" is never triggered).
> > >
> > > What's interesting, is that it looks like the exceptions are happening
> on
> > > message 3, but when the consumer subsequently consumes back all the
> > > messages in the broker cluster, it seems message 2 (and not message 3)
> is
> > > missing:
> > >
> > > ...
> > > ...
> > > 7136 [Thread-1] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Sending message: test-stage: 3, message: 98
> > > 7150 [Thread-1] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Sending message: test-stage: 3, message: 99
> > > 7163 [Thread-2] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Shutting down server2
> > > 7163 [Thread-1] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Sending message: test-stage: 4, message: 0
> > > 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer  -
> > Shutting
> > > down KafkaServer
> > > 7176 [Thread-1] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Sending message: test-stage: 4, message: 1
> > > 7189 [Thread-1] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Sending message: test-stage: 4, message: 2
> > > 7203 [Thread-1] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Sending message: test-stage: 4, message: 3
> > > 7394 [kafka-request-handler-5] WARN state.change.logger  - Broker
> > > 1946108683 received update metadata request with correlation id 7 from
> an
> > > old controller 178709090 with epoch 2. Latest known controller epoch
> is 3
> > > 7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis  -
> > > [KafkaApi-1946108683] error when handling request
> > >
> > >
> >
> Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0]
> > > ->
> > >
> > >
> >
> (LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026
> > > kafka.common.ControllerMovedException: Broker 1946108683 received
> update
> > > metadata request with correlation id 7 from an old controller 178709090
> > > with epoch 2. Latest known controller epoch is 3
> > > at
> > kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114)
> > > at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
> > > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> > > at java.lang.Thread.run(Thread.java:724)
> > > 8039 [Controller-178709090-to-broker-178709090-send-thread] WARN
> > > kafka.controller.RequestSendThread  -
> > > [Controller-178709090-to-broker-178709090-send-thread], Controller
> > > 178709090 fails to send a request to broker 178709090
> > > java.nio.channels.AsynchronousCloseException
> > > at
> > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
> > > at
> > >
> >
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)
> > > at kafka.utils.Utils$.read(Utils.scala:394)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > at
> > >
> > >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127)
> > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > 8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer  - Shut
> > down
> > > complete for KafkaServer
> > > 17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler  -
> Failed
> > to
> > > send producer request with correlation id 810 to broker 178709090 with
> > data
> > > for partitions [test-topic,0]
> > > java.net.SocketTimeoutException
> > > at
> > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226)
> > > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> > > at
> > >
> >
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> > > at kafka.utils.Utils$.read(Utils.scala:394)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
> > > at
> > >
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> > > at
> > >
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)
> > > at
> > >
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> > > at
> > >
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > at
> > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> > > at
> > >
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> > > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> > > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > at
> > >
> > >
> >
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> > > at
> > >
> > >
> >
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> > > at java.lang.Thread.run(Thread.java:724)
> > > 17319 [Thread-1] ERROR kafka.producer.SyncProducer  - Producer
> connection
> > > to localhost:1026 unsuccessful
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > at
> > >
> > >
> >
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> > > at
> > >
> > >
> >
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> > > at java.lang.Thread.run(Thread.java:724)
> > > 17322 [Thread-1] WARN kafka.client.ClientUtils$  - Fetching topic
> > metadata
> > > with correlation id 811 for topics [Set(test-topic)] from broker
> > > [id:1,host:localhost,port:1026] failed
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > at
> > >
> > >
> >
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> > > at
> > >
> > >
> >
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> > > at java.lang.Thread.run(Thread.java:724)
> > > 17340 [Thread-1] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Sending message: test-stage: 4, message: 4
> > > 17353 [Thread-1] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Sending message: test-stage: 4, message: 5
> > > 17365 [Thread-1] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Sending message: test-stage: 4, message: 6
> > >
> > > ...
> > > ...
> > > 23410 [Thread-28] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Received msg 'test-stage: 3, message: 98'
> > > 23410 [Thread-28] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Received msg 'test-stage: 3, message: 99'
> > > 23410 [Thread-28] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Received msg 'test-stage: 4, message: 0'
> > > 23410 [Thread-28] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Received msg 'test-stage: 4, message: 1'
> > > 23410 [Thread-28] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Received msg 'test-stage: 4, message: 3'
> > > 23411 [Thread-28] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Received msg 'test-stage: 4, message: 4'
> > > 23411 [Thread-28] INFO
> > > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > > Received msg 'test-stage: 4, message: 5'
> > >
> >
>

Re: testing issue with reliable sending

Posted by Jason Rosenberg <jb...@squareup.com>.
Neha,

I'm not sure I understand.  I would have thought that if the leader
acknowledges receipt of a message, and is then shut down cleanly (with
controlled shutdown enabled), that it would be able to reliably persist any
in memory buffered messages (and replicate them), before shutting down.
 Shouldn't this be part of the contract?  It should be able to make sure
this happens before shutting down, no?

I would understand a message dropped if it were a hard shutdown.

I'm not sure then how to implement reliable delivery semantics, while
allowing a rolling restart of the broker cluster (or even to tolerate a
single node failure, where one node might be down for awhile and need to be
replaced or have a disk repaired).  In this case, if we need to use
required.request.acks=-1, that will pretty much prevent any successful
message producing while any of the brokers for a partition is unavailable.
 So, I don't think that's an option.  (Not to mention the performance
degradation).

Is there not a way to make this work more reliably with leader only
acknowledgment, and clean/controlled shutdown?

My test does succeed, as expected, with acks = -1, at least for the 100 or
so iterations I've let it run so far.  It does on occasion send duplicates
(but that's ok with me).

Jason


On Fri, Oct 4, 2013 at 6:38 PM, Neha Narkhede <ne...@gmail.com>wrote:

> The occasional single message loss could happen since
> required.request.acks=1 and the leader is shut down before the follower
> gets a chance to copy the message. Can you try your test with num acks set
> to -1 ?
>
> Thanks,
> Neha
> On Oct 4, 2013 1:21 PM, "Jason Rosenberg" <jb...@squareup.com> wrote:
>
> > All,
> >
> > I'm having an issue with an integration test I've setup.  This is using
> > 0.8-beta1.
> >
> > The test is to verify that no messages are dropped (or the sender gets an
> > exception thrown back if failure), while doing a rolling restart of a
> > cluster of 2 brokers.
> >
> > The producer is configured to use 'request.required.acks' = '1'.
> >
> > The servers are set up to run locally on localhost, on different ports,
> and
> > different data dirs.  The producer connects with a metadata brokerlist
> > like:  "localhost:2024,localhost:1025" (so no vip).   The servers are set
> > up with a default replication factor of 2.  The servers have controlled
> > shutdown enabled, as well.
> >
> > The producer code looks like this:
> >     ...
> >     Producer<Integer, T> producer = getProducer();
> >     try {
> >       KeyedMessage<Integer, T> msg = new KeyedMessage<Integer, T>(topic,
> > message);
> >       producer.send(msg);
> >       return true;
> >     } catch (RuntimeException e) {
> >       logger.warn("Error sending data to kafka", e);
> >       return false;
> >     }
> >     ...
> >
> > The test sends groups of messages at each stage of the test (e.g. servers
> > up, first server going down, first server coming up, second server going
> > down, etc.).  Then a consumer connects and consumes all the messages, to
> > make sure they all arrived ok.
> >
> > It seems intermittently, a single message gets dropped, right after one
> of
> > the servers starts going down.  It doesn't happen always, seems to
> happen 1
> > out of every 20 test runs or so.  Here's some sample output.  I see the
> > exception inside the producer code, but I don't see the producer.send
> > method ever having an exception thrown back out to the caller (the log
> line
> > "Error sending data to kafka" is never triggered).
> >
> > What's interesting, is that it looks like the exceptions are happening on
> > message 3, but when the consumer subsequently consumes back all the
> > messages in the broker cluster, it seems message 2 (and not message 3) is
> > missing:
> >
> > ...
> > ...
> > 7136 [Thread-1] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Sending message: test-stage: 3, message: 98
> > 7150 [Thread-1] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Sending message: test-stage: 3, message: 99
> > 7163 [Thread-2] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Shutting down server2
> > 7163 [Thread-1] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Sending message: test-stage: 4, message: 0
> > 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer  -
> Shutting
> > down KafkaServer
> > 7176 [Thread-1] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Sending message: test-stage: 4, message: 1
> > 7189 [Thread-1] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Sending message: test-stage: 4, message: 2
> > 7203 [Thread-1] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Sending message: test-stage: 4, message: 3
> > 7394 [kafka-request-handler-5] WARN state.change.logger  - Broker
> > 1946108683 received update metadata request with correlation id 7 from an
> > old controller 178709090 with epoch 2. Latest known controller epoch is 3
> > 7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis  -
> > [KafkaApi-1946108683] error when handling request
> >
> >
> Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0]
> > ->
> >
> >
> (LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026
> > kafka.common.ControllerMovedException: Broker 1946108683 received update
> > metadata request with correlation id 7 from an old controller 178709090
> > with epoch 2. Latest known controller epoch is 3
> > at
> kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114)
> > at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
> > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> > at java.lang.Thread.run(Thread.java:724)
> > 8039 [Controller-178709090-to-broker-178709090-send-thread] WARN
> > kafka.controller.RequestSendThread  -
> > [Controller-178709090-to-broker-178709090-send-thread], Controller
> > 178709090 fails to send a request to broker 178709090
> > java.nio.channels.AsynchronousCloseException
> > at
> >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
> > at
> >
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)
> > at kafka.utils.Utils$.read(Utils.scala:394)
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > at
> >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127)
> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > 8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer  - Shut
> down
> > complete for KafkaServer
> > 17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler  - Failed
> to
> > send producer request with correlation id 810 to broker 178709090 with
> data
> > for partitions [test-topic,0]
> > java.net.SocketTimeoutException
> > at
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226)
> > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> > at
> >
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> > at kafka.utils.Utils$.read(Utils.scala:394)
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
> > at
> >
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
> > at
> >
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> > at
> >
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> >
> >
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
> > at
> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> > at
> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)
> > at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> > at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> > at
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > at
> >
> >
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> > at
> >
> >
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> > at java.lang.Thread.run(Thread.java:724)
> > 17319 [Thread-1] ERROR kafka.producer.SyncProducer  - Producer connection
> > to localhost:1026 unsuccessful
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > at
> >
> >
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> > at
> >
> >
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> > at java.lang.Thread.run(Thread.java:724)
> > 17322 [Thread-1] WARN kafka.client.ClientUtils$  - Fetching topic
> metadata
> > with correlation id 811 for topics [Set(test-topic)] from broker
> > [id:1,host:localhost,port:1026] failed
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > at
> >
> >
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> > at
> >
> >
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> > at java.lang.Thread.run(Thread.java:724)
> > 17340 [Thread-1] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Sending message: test-stage: 4, message: 4
> > 17353 [Thread-1] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Sending message: test-stage: 4, message: 5
> > 17365 [Thread-1] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Sending message: test-stage: 4, message: 6
> >
> > ...
> > ...
> > 23410 [Thread-28] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Received msg 'test-stage: 3, message: 98'
> > 23410 [Thread-28] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Received msg 'test-stage: 3, message: 99'
> > 23410 [Thread-28] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Received msg 'test-stage: 4, message: 0'
> > 23410 [Thread-28] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Received msg 'test-stage: 4, message: 1'
> > 23410 [Thread-28] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Received msg 'test-stage: 4, message: 3'
> > 23411 [Thread-28] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Received msg 'test-stage: 4, message: 4'
> > 23411 [Thread-28] INFO
> > com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> > Received msg 'test-stage: 4, message: 5'
> >
>

Re: testing issue with reliable sending

Posted by Neha Narkhede <ne...@gmail.com>.
The occasional single message loss could happen since
required.request.acks=1 and the leader is shut down before the follower
gets a chance to copy the message. Can you try your test with num acks set
to -1 ?

Thanks,
Neha
On Oct 4, 2013 1:21 PM, "Jason Rosenberg" <jb...@squareup.com> wrote:

> All,
>
> I'm having an issue with an integration test I've setup.  This is using
> 0.8-beta1.
>
> The test is to verify that no messages are dropped (or the sender gets an
> exception thrown back if failure), while doing a rolling restart of a
> cluster of 2 brokers.
>
> The producer is configured to use 'request.required.acks' = '1'.
>
> The servers are set up to run locally on localhost, on different ports, and
> different data dirs.  The producer connects with a metadata brokerlist
> like:  "localhost:2024,localhost:1025" (so no vip).   The servers are set
> up with a default replication factor of 2.  The servers have controlled
> shutdown enabled, as well.
>
> The producer code looks like this:
>     ...
>     Producer<Integer, T> producer = getProducer();
>     try {
>       KeyedMessage<Integer, T> msg = new KeyedMessage<Integer, T>(topic,
> message);
>       producer.send(msg);
>       return true;
>     } catch (RuntimeException e) {
>       logger.warn("Error sending data to kafka", e);
>       return false;
>     }
>     ...
>
> The test sends groups of messages at each stage of the test (e.g. servers
> up, first server going down, first server coming up, second server going
> down, etc.).  Then a consumer connects and consumes all the messages, to
> make sure they all arrived ok.
>
> It seems intermittently, a single message gets dropped, right after one of
> the servers starts going down.  It doesn't happen always, seems to happen 1
> out of every 20 test runs or so.  Here's some sample output.  I see the
> exception inside the producer code, but I don't see the producer.send
> method ever having an exception thrown back out to the caller (the log line
> "Error sending data to kafka" is never triggered).
>
> What's interesting, is that it looks like the exceptions are happening on
> message 3, but when the consumer subsequently consumes back all the
> messages in the broker cluster, it seems message 2 (and not message 3) is
> missing:
>
> ...
> ...
> 7136 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 3, message: 98
> 7150 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 3, message: 99
> 7163 [Thread-2] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Shutting down server2
> 7163 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 0
> 7164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer  - Shutting
> down KafkaServer
> 7176 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 1
> 7189 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 2
> 7203 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 3
> 7394 [kafka-request-handler-5] WARN state.change.logger  - Broker
> 1946108683 received update metadata request with correlation id 7 from an
> old controller 178709090 with epoch 2. Latest known controller epoch is 3
> 7397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis  -
> [KafkaApi-1946108683] error when handling request
>
> Name:UpdateMetadataRequest;Version:0;Controller:178709090;ControllerEpoch:2;CorrelationId:7;ClientId:id_178709090-host_null-port_1026;PartitionState:[test-topic,0]
> ->
>
> (LeaderAndIsrInfo:(Leader:1946108683,ISR:1946108683,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:178709090,1946108683);AliveBrokers:id:1946108683,host:192.168.1.105,port:1025,id:178709090,host:192.168.1.105,port:1026
> kafka.common.ControllerMovedException: Broker 1946108683 received update
> metadata request with correlation id 7 from an old controller 178709090
> with epoch 2. Latest known controller epoch is 3
> at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> at java.lang.Thread.run(Thread.java:724)
> 8039 [Controller-178709090-to-broker-178709090-send-thread] WARN
> kafka.controller.RequestSendThread  -
> [Controller-178709090-to-broker-178709090-send-thread], Controller
> 178709090 fails to send a request to broker 178709090
> java.nio.channels.AsynchronousCloseException
> at
>
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)
> at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)
> at kafka.utils.Utils$.read(Utils.scala:394)
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> at
>
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer  - Shut down
> complete for KafkaServer
> 17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler  - Failed to
> send producer request with correlation id 810 to broker 178709090 with data
> for partitions [test-topic,0]
> java.net.SocketTimeoutException
> at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226)
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> at kafka.utils.Utils$.read(Utils.scala:394)
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
> at
>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
> at
>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> at
>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
> at
>
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> at
>
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> at kafka.producer.Producer.send(Producer.scala:74)
> at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> at
>
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> at
>
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> at java.lang.Thread.run(Thread.java:724)
> 17319 [Thread-1] ERROR kafka.producer.SyncProducer  - Producer connection
> to localhost:1026 unsuccessful
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> at kafka.utils.Utils$.swallow(Utils.scala:186)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> at kafka.producer.Producer.send(Producer.scala:74)
> at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> at
>
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> at
>
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> at java.lang.Thread.run(Thread.java:724)
> 17322 [Thread-1] WARN kafka.client.ClientUtils$  - Fetching topic metadata
> with correlation id 811 for topics [Set(test-topic)] from broker
> [id:1,host:localhost,port:1026] failed
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> at kafka.utils.Utils$.swallow(Utils.scala:186)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> at kafka.producer.Producer.send(Producer.scala:74)
> at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> at
>
> com.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)
> at
>
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)
> at java.lang.Thread.run(Thread.java:724)
> 17340 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 4
> 17353 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 5
> 17365 [Thread-1] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Sending message: test-stage: 4, message: 6
>
> ...
> ...
> 23410 [Thread-28] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Received msg 'test-stage: 3, message: 98'
> 23410 [Thread-28] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Received msg 'test-stage: 3, message: 99'
> 23410 [Thread-28] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Received msg 'test-stage: 4, message: 0'
> 23410 [Thread-28] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Received msg 'test-stage: 4, message: 1'
> 23410 [Thread-28] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Received msg 'test-stage: 4, message: 3'
> 23411 [Thread-28] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Received msg 'test-stage: 4, message: 4'
> 23411 [Thread-28] INFO
> com.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest  -
> Received msg 'test-stage: 4, message: 5'
>