You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2018/01/24 23:50:20 UTC

Slack digest for #general - 2018-01-24

2018-01-24 00:05:01 UTC - Jaebin Yoon: I've been some stress tests on pulsar with the kafka wrapper and it ended up putting all producers into error states with the following exceptions :
```java.lang.Exception: java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException: org.apache.pulsar.shade.io.netty.util.IllegalReferenceCountException: refCnt: 0
	at org.apache.kafka.clients.producer.KafkaProducer.lambda$send$4(KafkaProducer.java:181)
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
	at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:834)
	at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2155)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:179)```
----
2018-01-24 00:06:04 UTC - Matteo Merli: What kind of test were you doing ?
----
2018-01-24 00:07:19 UTC - Jaebin Yoon: Having 10 producers sent traffic to brokers (4 bundles) over one topic with 4800 partitions and 6 consumers consumed from them.
----
2018-01-24 00:07:47 UTC - Jaebin Yoon: I increased the traffic rate from producers.. to see when bundles split over to spread traffic
----
2018-01-24 00:08:30 UTC - Jaebin Yoon: but they didn't split the bundle but brokers got high CPU utilization and gc time increased and produces and consumers got into bad states.
----
2018-01-24 00:09:14 UTC - Jaebin Yoon: I don't think it saturated the network.. I'm trying to recover from this bad state to do again step-wise.
----
2018-01-24 00:12:43 UTC - Jaebin Yoon: both producers and consumers were in bad states so I had to stop everything to recover. The main feature I'm trying to test is the load balancing and find the maximum throughput with large partitions.
----
2018-01-24 00:13:54 UTC - Matteo Merli: Is it running with default conf?
----
2018-01-24 00:14:24 UTC - Jaebin Yoon: For broker, mostly all default, except cache size.
----
2018-01-24 00:16:05 UTC - Jaebin Yoon: Using m4.2xlarge, 14G heap, 12G direct. 4096 for managedLedgerCacheSizeMB.
----
2018-01-24 00:17:09 UTC - Matteo Merli: Ok, let me try it as well
----
2018-01-24 00:17:27 UTC - Jaebin Yoon: Can I manually change the number of bundles after creating namespace?
----
2018-01-24 00:17:48 UTC - Matteo Merli: Yes, you can split a given bundle manually
----
2018-01-24 00:18:25 UTC - Jaebin Yoon: ok. let me try that. It seems the trigger didn't get triggered in my case.
----
2018-01-24 00:19:58 UTC - Jaebin Yoon: I'm getting this again with just one producer and consumer (same topic, partitions)
```org.apache.pulsar.shade.io.netty.util.IllegalReferenceCountException: refCnt: 0, increment: 1
	at org.apache.pulsar.shade.io.netty.buffer.AbstractReferenceCountedByteBuf.retain0(AbstractReferenceCountedByteBuf.java:68)
	at org.apache.pulsar.shade.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:53)
	at org.apache.pulsar.shade.org.apache.pulsar.common.compression.CompressionCodecNone.encode(CompressionCodecNone.java:30)
	at org.apache.pulsar.client.impl.BatchMessageContainer.getCompressedBatchMetadataAndPayload(BatchMessageContainer.java:110)
	at org.apache.pulsar.client.impl.ProducerImpl.batchMessageAndSend(ProducerImpl.java:1149)
	at org.apache.pulsar.client.impl.ProducerImpl.access$500(ProducerImpl.java:72)
	at org.apache.pulsar.client.impl.ProducerImpl$2.run(ProducerImpl.java:1131)
	at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:663)
	at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:738)
	at org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:466)
	at org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
	at java.lang.Thread.run(Thread.java:748)```
----
2018-01-24 00:20:47 UTC - Jaebin Yoon: this is the producer exception
----
2018-01-24 00:20:55 UTC - Matteo Merli: Yep
----
2018-01-24 00:58:26 UTC - Jaebin Yoon: is the producer kafka wrapper thread-safe?  Kafka-produer is thread-safe but consumer is not. I'm wondering the same thing applies to the pulsar kafka wrapper.
----
2018-01-24 02:41:43 UTC - Matteo Merli: the producer wrapper should be thread safe, though I think I have identified the problem in the wrapper
----
2018-01-24 02:41:55 UTC - Matteo Merli: that leads to the exception
----
2018-01-24 03:09:45 UTC - Jaebin Yoon: One thing I observed is that when the consumer got into bad state somehow and gets killed, the broker also gets into bad state with the following exception (it seems it goes into infinite loop)
```2018-01-24 03:04:43,394 - WARN  - [pulsar-io-55-10:ServerCnx@163] - [/100.85.24.127:11890] Got exception: writevAddresses(..) failed: Connection reset by peer
io.netty.channel.unix.Errors$NativeIoException: writevAddresses(..) failed: Connection reset by peer
	at io.netty.channel.unix.Errors.newIOException(Errors.java:117)
	at io.netty.channel.unix.Errors.ioResult(Errors.java:138)
	at io.netty.channel.unix.FileDescriptor.writevAddresses(FileDescriptor.java:156)
	at io.netty.channel.epoll.AbstractEpollStreamChannel.writeBytesMultiple(AbstractEpollStreamChannel.java:284)
	at io.netty.channel.epoll.AbstractEpollStreamChannel.doWriteMultiple(AbstractEpollStreamChannel.java:511)
	at io.netty.channel.epoll.AbstractEpollStreamChannel.doWrite(AbstractEpollStreamChannel.java:448)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:856)
	at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:480)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollOutReady(AbstractEpollStreamChannel.java:909)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:391)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:306)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
	at java.lang.Thread.run(Thread.java:748)```

And this makes producers into bad state as well with the exception (ref count exception)
----
2018-01-24 03:10:59 UTC - Matteo Merli: The exception itself is not necessary a bad symptom, it just means the broker got error when writing on the socket
----
2018-01-24 03:11:09 UTC - Jaebin Yoon: The chain reaction from the consumer to the broker and the producer. All brokers who had connection with that consumer go into this state...
----
2018-01-24 03:11:19 UTC - Jaebin Yoon: yeah but it goes on and on
----
2018-01-24 03:11:22 UTC - Matteo Merli: ok
----
2018-01-24 03:12:28 UTC - Matteo Merli: that’s very strange :confused:
----
2018-01-24 03:13:37 UTC - Jaebin Yoon: yeah something doesn't get cleared up. It fills up the disk with logs quickly
----
2018-01-24 03:15:46 UTC - Matteo Merli: I’ve been killing the consumers multiple times to trigger failover reassignments with the many partition case but so far didn’t happen
----
2018-01-24 03:16:49 UTC - Jaebin Yoon: One thing I observed was that the consumer was in the bad state.. meaning there were stuffs in tcp socket buffers.. when I saw netstat, there were many unread messages in the tcp buffer.
----
2018-01-24 03:17:59 UTC - Matteo Merli: in the client host?
----
2018-01-24 03:18:06 UTC - Jaebin Yoon: yes.
----
2018-01-24 03:18:40 UTC - Jaebin Yoon: let me try again..
----
2018-01-24 03:19:14 UTC - Matteo Merli: ok, also pass me the exact steps and will do the same
----
2018-01-24 03:32:06 UTC - Jaebin Yoon: BTW, On some of brokers, I got the following WARN messages :
```2018-01-24 03:29:46,861 - WARN  - [BookKeeperClientScheduler-17-1:BookieWatcher@330] - Bookie 100.85.11.16:3181 has been quarantined because of read/write errors.
2018-01-24 03:29:46,861 - WARN  - [BookKeeperClientScheduler-17-1:BookieWatcher@330] - Bookie 100.82.156.43:3181 has been quarantined because of read/write errors.
2018-01-24 03:29:46,861 - WARN  - [BookKeeperClientScheduler-17-1:BookieWatcher@330] - Bookie 100.82.71.254:3181 has been quarantined because of read/write errors.
2018-01-24 03:29:46,864 - WARN  - [BookKeeperClientScheduler-17-1:BookieWatcher@330] - Bookie 100.82.140.30:3181 has been quarantined because of read/write errors.
2018-01-24 03:29:46,865 - WARN  - [BookKeeperClientScheduler-17-1:BookieWatcher@330] - Bookie 100.85.46.178:3181 has been quarantined because of read/write errors.
2018-01-24 03:29:46,867 - WARN  - [BookKeeperClientScheduler-17-1:BookieWatcher@330] - Bookie 100.82.113.61:3181 has been quarantined because of read/write errors.
2018-01-24 03:29:46,868 - WARN  - [BookKeeperClientScheduler-17-1:BookieWatcher@330] - Bookie 100.82.131.137:3181 has been quarantined because of read/write errors.
2018-01-24 03:29:46,868 - WARN  - [BookKeeperClientScheduler-17-1:BookieWatcher@330] - Bookie 100.85.23.239:3181 has been quarantined because of read/write errors.
2018-01-24 03:29:46,868 - WARN  - [BookKeeperClientScheduler-17-1:BookieWatcher@330] - Bookie 100.82.90.37:3181 has been quarantined because of read/write errors.
2018-01-24 03:29:46,870 - WARN  - [BookKeeperClientScheduler-17-1:BookieWatcher@330] - Bookie 100.82.150.113:3181 has been quarantined because of read/write errors.```
----
2018-01-24 03:33:16 UTC - Jaebin Yoon: I don't see any errors on bookies though. It seems they're from bookie health check... If I restart brokers, it gets back to ok.
----
2018-01-24 03:38:53 UTC - Jaebin Yoon: I'm not sure why the consumer gets stuck in the first place. but it seems that's the first step to reproduce the issue. The setup I have is one topic with 4800 partitions. When the consumer seems stuck on something,  the tcp connections with brokers have things sent by brokers but not read by consumers and at that time, if consumer got killed or restarted, the brokers go crazy. At this time, I got the following errors in 4 brokers when a consumer got killed. Again, this is not one exception but repeating almost infinitely (had to stop brokers)
```2018-01-24 03:34:05,089 - WARN  - [pulsar-io-55-3:ServerCnx@163] - [/100.82.72.100:22730] Got exception: null
java.nio.channels.ClosedChannelException
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)```
----
2018-01-24 03:42:59 UTC - Jaebin Yoon: Something like this. This is on the consumer. "kafkaauditor-puls" is the consumer. 
```(kafkaauditor) ~ $ sudo netstat -tpa | grep 6650
tcp   13644964      0 kafkaauditor-puls:22692 ip-100-85-130-172.:6650 ESTABLISHED 11367/java
tcp   13604175      0 kafkaauditor-puls:36600 ip-100-85-144-19.e:6650 ESTABLISHED 11367/java
tcp   12964832      0 kafkaauditor-puls:22862 ip-100-82-102-123.:6650 ESTABLISHED 11367/java
tcp   12971608      0 kafkaauditor-puls:60076 <http://ip-100-85-0-117.ec:6650|ip-100-85-0-117.ec:6650> ESTABLISHED 11367/java```
----
2018-01-24 04:04:01 UTC - Jaebin Yoon: I guess the broker side connection error handling needs to be improved. I think some exceptions from netty are not handled properly so the bad state doesn't get cleared up.
----
2018-01-24 04:05:07 UTC - Matteo Merli: In that “got exception”  handler, it should be cleaning up all the state associated with that connection
----
2018-01-24 04:10:42 UTC - Matteo Merli: anyway, i’m getting ready a kafka consumer tool, that uses the wrapper, to test the same scenario
----
2018-01-24 04:48:51 UTC - Matteo Merli: @Jaebin Yoon regarding the “got exception” message, did you see in the full blown logs, any subsequent lines related to the same connection? eg: grepping for same client_ip:port: 100.85.24.127:11890
----
2018-01-24 04:50:06 UTC - Jaebin Yoon: "Got exception: null" ? no .. that was all.  those three lines repeated in the log
----
2018-01-24 04:51:49 UTC - Matteo Merli: It seems to me there’s something fishy in the netty side (not trying to offload bugs :slightly_smiling_face: )
----
2018-01-24 04:52:46 UTC - Matteo Merli: because that handling was working for a long time without problems. though in 1.21 we switched from 4.0 to 4.1.12
----
2018-01-24 04:53:47 UTC - Matteo Merli: the handler itself is invoked for any exception in the IO thread and it’s just trying to call `ctx.close()` which should not trigger another exception
----
2018-01-24 04:54:38 UTC - Matteo Merli: that’s my suspect based on your logs, that call to close might actually be failing and triggering exceptions that are caught and trigger the handler
----
2018-01-24 04:55:41 UTC - Jaebin Yoon: I see.. yeah it seems something is going on there.
----
2018-01-24 04:55:52 UTC - Jaebin Yoon: I'm still not sure why the consumer keeps going into bad state. The consumer (shared) is far behind from the producers. What happened was that I pushed the producer side publish many messages and at that time, the overall system fell a part. So to bring back the system up, I reduced the producers and consumers to one. But the consumer was far behind.
----
2018-01-24 04:56:40 UTC - Jaebin Yoon: When a consumer tries to get messages, how many messages are pulled from the broker?
----
2018-01-24 04:57:10 UTC - Matteo Merli: the consumer give some “permits” to the broker
----
2018-01-24 04:57:11 UTC - Jaebin Yoon: This is the case where there are 4800 partitions and one consumer tries to get all.
----
2018-01-24 04:57:41 UTC - Jaebin Yoon: and there are lots of backlogs. I think somehow it overwhelmed the consumer.
----
2018-01-24 04:58:16 UTC - Matteo Merli: that also depends how frequently the consumers calls receive
----
2018-01-24 04:58:59 UTC - Matteo Merli: so, the flow control is setup so that the consumers gives permits to broker, depending on the receiving queue size
----
2018-01-24 04:59:36 UTC - Matteo Merli: when that is being emptied, because the applications pulls messages out of the local queue, the consumer will ask for more messages to broker
----
2018-01-24 05:00:26 UTC - Matteo Merli: if consumer slows down, it will stop getting messages
----
2018-01-24 05:00:50 UTC - Matteo Merli: now, default receiver queue size is 1000
----
2018-01-24 05:01:21 UTC - Matteo Merli: so with 1 consumer across 4800 partition that can be a bit of messages accumulated
----
2018-01-24 05:01:49 UTC - Jaebin Yoon: ah so it'll be 4800 * 1000 ?
----
2018-01-24 05:02:04 UTC - Matteo Merli: yes, it would be good to reduce that
----
2018-01-24 05:02:47 UTC - Jaebin Yoon: i see.. I might just need to reset the subscription to get back to normal state.
----
2018-01-24 05:03:27 UTC - Jaebin Yoon: I need to test from the beginning to gradually increase traffic until it hits the wall.
----
2018-01-24 05:03:45 UTC - Jaebin Yoon: I can just remove the subscription and create again right?
----
2018-01-24 05:04:43 UTC - Matteo Merli: yes, you can use cli tool : `bin/pulsar-admin persistent unsubscribe $TOPIC -s $SUBSCRIPTION`
----
2018-01-24 05:04:53 UTC - Jaebin Yoon: ok. thanks!!!
----
2018-01-24 05:05:12 UTC - Matteo Merli: or do `skip-all`
----
2018-01-24 05:05:22 UTC - Matteo Merli: to clear the backlog
----
2018-01-24 05:05:31 UTC - Jaebin Yoon: ah that would be nice.
----
2018-01-24 05:05:37 UTC - Jaebin Yoon: ok. i'll try that
----
2018-01-24 05:06:36 UTC - Matteo Merli: So, a bunch of issues. The first is that with Kafka wrapper the consumption is slow and that’s what’s causing the consumer to fall behind
----
2018-01-24 05:07:42 UTC - Jaebin Yoon: the kafka wrapper didn't work so we changed to the pulsar consumer.
----
2018-01-24 05:08:21 UTC - Jaebin Yoon: so it is the pulsar consumer couldn't keep up when there were lots of backlogs with many partitions
----
2018-01-24 05:08:31 UTC - Matteo Merli: are you using `pulsar-perf` tool or a custom process?
----
2018-01-24 05:08:41 UTC - Jaebin Yoon: we're using our own tool.
----
2018-01-24 05:09:16 UTC - Jaebin Yoon: simple java process to send or receive with rate control.
----
2018-01-24 05:10:05 UTC - Jaebin Yoon: that process has integration with our own metrics, monitoring tool
----
2018-01-24 05:10:14 UTC - Matteo Merli: ok, you can also take a look at pulsar-perf which should be flexible enough for these things. eg: 
`bin/pulsar-perf consume $TOPIC -s $MY_SUBSCRIPTION`
----
2018-01-24 05:10:18 UTC - Matteo Merli: ok, I see
----
2018-01-24 05:10:40 UTC - Jaebin Yoon: yeah i saw that. I'll try that as well.
----
2018-01-24 05:10:45 UTC - Matteo Merli: just for trying, you could be setting the `--receiver-queue-size` there as well
----
2018-01-24 05:10:51 UTC - Jaebin Yoon: cool
----
2018-01-24 06:06:45 UTC - Matteo Merli: Ok, so far I’ve tried with 10K partitions on multiple connections or 4800 partitions on a single TCP connection, building backlog (&gt;1K per partition) and draining. I haven’t seen the issue yet. I’ll try to reduce memory setting on consumer process to get that hosed and hopefully trigger the other issue
----
2018-01-24 06:16:42 UTC - Jaebin Yoon: the consumer was running on m3.xlarge (4 cores, 16G) and the memory setup for JVM was -Xms2g -Xmx4g -XX:MaxDirectMemorySize=1g
----
2018-01-24 06:17:08 UTC - Jaebin Yoon: the message size was 1k byte
----
2018-01-24 06:19:52 UTC - Matteo Merli: Yes, I’ve tried ” -Xms1g -Xmx1g -XX:MaxDirectMemorySize=1g” but still was able to catch up without issues
----
2018-01-24 06:32:08 UTC - Matteo Merli: Ok, finally got the same exception with the same memory setting and setting rate limiting on the perf consumer
----
2018-01-24 06:36:16 UTC - Matteo Merli: but that didn’t hose anything else that the consumer that was getting OutOfMem
----
2018-01-24 07:18:21 UTC - Matteo Merli: So far, the ` Got exception: writevAddresses(..) failed:` line is very noisy but it’s not causing issues for me. The exception is being thrown for each pending write operation on the socket and since the connection has many consumers for the partitions, it gets printed a lot, though at most once per each partition
----
2018-01-24 08:45:17 UTC - Jaebin Yoon: Hmm.. maybe it was just the issue with noisy logs then? In my case, after broker started to log this, the producers started to have lots of error logs (not sure if that hosed the producer side). Probably it was more of the OOM issue in the consumer side causing lots of logs on the broker side. I'm not sure having that many error logs slowed down the brokers (logs should be async, right?). 
Thanks a lot @Matteo Merli for tracking this down. I'll try to reproduce the initial problems with high volumes. I'll make sure the consumers have enough memory to handle the queue size. I now know how much memory the consumer should have (direct memory should be greater than (receiver-queue-size * # of partitions * max message size) for the worst case.
----
2018-01-24 13:48:01 UTC - Julien Nioche: @Julien Nioche has joined the channel
----
2018-01-24 15:55:41 UTC - Matteo Merli: @Jaebin Yoon At least I figured out why the bundles were not getting split. There is a discrepancy for the configuration values defined in `conf/broker.conf` and the default that is taken when a setting is not present (defined in the `ServiceConfiguration` class). 

In `broker.conf` we have `loadBalancerEnabled=true`, which is the intended default. If you start the broker passing a `ServiceConfiguration` instance (or a `broker.conf` files with just few options overrides) it will take `loadBalancerEnabled=false` as default…
----
2018-01-24 17:08:58 UTC - Jaebin Yoon: @Matteo Merli oh yeah actually it was set to false. I'll explicitly set that configuration. And regarding producer side problem with io.netty.util.IllegalReferenceCountException , you said that found a problem earlier?  Is that kafka wrapper specific or same with pulsar client?
----
2018-01-24 17:09:59 UTC - Matteo Merli: I’m still looking into that
+1 : Jaebin Yoon
----
2018-01-24 17:22:01 UTC - Jaebin Yoon: @Matteo Merli I'm curious why it throws exception for each partition. The connection was just one from a consumer. Maybe there is a abstraction that works independently on each partition without knowing underlying connection. Maybe I need to dig in the code to see how the partition subscriptions are handled in the relation to the real consumer connection.
----
2018-01-24 17:23:15 UTC - Matteo Merli: Yes, in general the write operation should fail but not throw exception (the Netty `ChannelFuture` that we use to track the socket write operation)
----
2018-01-24 17:23:59 UTC - Matteo Merli: I really think this is some change in the epoll based Netty eventLoop for 4.1.x.
----
2018-01-24 17:24:39 UTC - Matteo Merli: So the reason for having multiple ongoing socket writes is that since we’re multiplexing multiple consumers on a single TCP connection
----
2018-01-24 17:25:30 UTC - Matteo Merli: when we have messages available, we try to write them for each consumer on the connection, (abiding flow control)
----
2018-01-24 17:26:30 UTC - Matteo Merli: In general, I’m trying to just have the 1st exception on the connection to be printed, and lower the log level for subsequent exceptions
----
2018-01-24 17:35:53 UTC - Matteo Merli: &gt; Maybe there is a abstraction that works independently on each partition without knowing underlying connection.
----
2018-01-24 17:37:04 UTC - Matteo Merli: Yes, each partition is implemented as a topic, the TCP connections/threads are all shared, but the reconnection logic is per partition since the partitions can be moved around independently to different brokers
----
2018-01-24 22:32:16 UTC - Matteo Merli: Ok, I think I got a handle on that. <https://github.com/apache/incubator-pulsar/pull/1108> Should have the fix for the problem (all the refcounting exceptions were all reconducible at the root cause that the PR should fix).
----