You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by Apache Pulsar Slack <> on 2020/03/04 09:11:04 UTC

Slack digest for #general - 2020-03-04

2020-03-03 10:01:00 UTC - Kannan: This is because of broker advertise k8s service name (which is pulsaristio-broker) and the Broker-znode session gets constructed using this service name. So, with 1 broker it works but on adding another broker, it uses same service name and hence, fails with error - "broker-znode owned by different session."
2020-03-03 10:21:20 UTC - eilonk: anyone here deployed pulsar with helm using tls authentication?
While the chart allows you to create a tls secret for the admin certs, what happens when i want to use different ones for the consumers? brokers?
(in a different type of deployment, i would give the filepath to these certificates but that's not how helm works)
2020-03-03 13:13:03 UTC - Santiago Del Campo: It was a problem with K8s and linux host networking, for some reason the bookkeeper pod it's accessible from outside but the pod can't access itself..... as fas as i know, seems to be a bug with k8s and iptables.
2020-03-03 13:20:37 UTC - Greg: Hello there, we are currently facing an issue with pulsar 2.5.0 (was working fine in 2.4.2). at some point we are not able to subscribe to a given topic with this error sent by the server : [<non-persistent://public/default/cluster][Proxy-vm-k8s-worker-infinity-2]> Failed to create consumer: Topic does not have schema to check
2020-03-03 13:21:56 UTC - Greg: We are suspecting the topic to be cleaned at some point because of no activity, and when the client automatically reconnect, there are no more schema linked to this topic recreated
2020-03-03 13:23:32 UTC - Greg: Any hint on how to workaround this or some informations needed to pinpoint what is the issue ?
2020-03-03 13:26:15 UTC - Greg: I just realized we are using 2.4.2 java client on top of 2.5.0 server version of pulsar, i will try with 2.5.0 java client
2020-03-03 13:32:11 UTC - Ryan Slominski: Thanks for pointing out the getCurrentRecord method.  I missed that as I thought the input of the function was always passed in as a parameter in the method signature (in all the examples I've seen it's a String named "input").   I guess the input parameter appears in two places then?
2020-03-03 13:34:15 UTC - Penghui Li: Have you deleted the schema of this topic before?
2020-03-03 13:34:56 UTC - Antti Kaikkonen: <>

You have to use the pulsar function SDK instead of language-native function interface to get access to the context object.
2020-03-03 13:38:32 UTC - Greg: no, this topic is created by the first publisher with a schema (STRING)
2020-03-03 13:39:15 UTC - Greg: and at some point this topic is removed automatically
2020-03-03 13:39:22 UTC - Ryan Slominski: Yeah, it looks like you can get the input from the method signature AND from the context object.   In other words, the SDK version should not include the input as a method parameter - all you need is the context object.
+1 : Antti Kaikkonen
2020-03-03 13:39:41 UTC - Greg: and we cannot reconnect manually on it because of the schema not available
2020-03-03 13:40:24 UTC - Greg: i think the topic is recreated automatically by the java client, right ?
2020-03-03 13:40:26 UTC - Penghui Li: ok, is it happens when the topic is removed then a consumer try to connect to it.
2020-03-03 13:40:34 UTC - Greg: yes
2020-03-03 13:41:40 UTC - Greg: because it looks like it has been recreated automatically (client reconnection mechanism ?) without schema
2020-03-03 13:43:47 UTC - Penghui Li: No, `Topic does not have schema to check`  means broker can get schema from consumer but can't find any schema on this topic. So, it's better to check is the topic don't have any schema
2020-03-03 13:44:16 UTC - Greg: Sorry that was what i meant, the topic has lost the schema
2020-03-03 13:44:24 UTC - Penghui Li: yes
2020-03-03 13:44:53 UTC - Greg: we verified using pulsar-admin and can confirmed the topic has lost the schema, but i cannot explain why
2020-03-03 13:44:55 UTC - Penghui Li: Here is the source code:
```return getSchema(schemaId).thenCompose(existingSchema -&gt; {
            if (existingSchema != null &amp;&amp; !existingSchema.schema.isDeleted()) {
                    if (strategy == SchemaCompatibilityStrategy.BACKWARD ||
                            strategy == SchemaCompatibilityStrategy.FORWARD ||
                            strategy == SchemaCompatibilityStrategy.FORWARD_TRANSITIVE ||
                            strategy == SchemaCompatibilityStrategy.FULL) {
                        return checkCompatibilityWithLatest(schemaId, schemaData, SchemaCompatibilityStrategy.BACKWARD);
                    } else {
                        return checkCompatibilityWithAll(schemaId, schemaData, strategy);
            } else {
                return FutureUtil.failedFuture(new IncompatibleSchemaException("Topic does not have schema to check"));
2020-03-03 13:45:58 UTC - Penghui Li: &gt; we verified using pulsar-admin and can confirmed the topic has lost the schema, but i cannot explain why
Ok, thanks.
2020-03-03 13:47:00 UTC - Greg: and i also see that the schema has been cleaned : [<non-persistent://public/default/cluster>] Topic deleted successfully due to inact
2020-03-03 13:47:34 UTC - Greg: so i don't know how the topic has been recreated (something automatic somewhere :wink: )
2020-03-03 13:49:07 UTC - Greg: sorry i mean the topic has been cleaned, not the schema...
2020-03-03 13:49:44 UTC - Penghui Li: Pulsar enable topic auto creation by default. So, if a producer or a consumer try to connect to a not exists topic, broker will create the topic.
2020-03-03 13:50:13 UTC - Greg: but in this case the topic is well created with the schema
2020-03-03 13:51:03 UTC - Ryan Slominski: FYI - looks like syntax highlighting of method getCurrentRecord is broken in the 2.5.0 docs.  Not sure why it isn't blue like the rest of the methods.  Probably why I didn't see it.
2020-03-03 13:51:16 UTC - Greg: so i suppose a client already connected that tries to use a topic that has been cleaned
2020-03-03 13:51:20 UTC - Greg: is it possible ?
2020-03-03 13:52:50 UTC - Greg: Can a topic be cleaned when there are publisher/consumer connnected to it ?
2020-03-03 13:53:05 UTC - Penghui Li: topic only delete when no publishers, no consumers, no subscriptions.
2020-03-03 13:53:18 UTC - Penghui Li: &gt; Can a topic be cleaned when there are publisher/consumer connnected to it ? (edited)
2020-03-03 13:53:43 UTC - Greg: We are using a pulsar cluster with 3 instances
2020-03-03 13:54:23 UTC - Greg: i think we don't have the issue with only one pulsar instance, but i need to verify to confirm
2020-03-03 14:00:03 UTC - Penghui Li: Ok, I will try to reproduce it. looks this is a bug related to schema deletion.
2020-03-03 14:00:18 UTC - Alex Yaroslavsky: Hi,

Can't figure out how to deal with this one

I get the following error in the function log:
`[ERROR] Exception while executing user method`
`Traceback (most recent call last):`
  `File "/opt/pulsar/instances/python-instance/", line 242, in actual_execution`
    `output_object = self.function_class.process(input_object, self.contextimpl)`
  `File "/tmp/pulsar_functions/c1-d1/in/in_router/0/", line 8, in process`
    `context.publish(self.out_topic, item, properties={ 'tenant' : context.get_function_tenant() })`
  `File "/opt/pulsar/instances/python-instance/", line 190, in publish`
    `output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()), **message_conf)`
  `File "/opt/pulsar/instances/python-instance/pulsar/", line 894, in send_async`
    `replication_clusters, disable_replication, event_timestamp)`
  `File "/opt/pulsar/instances/python-instance/pulsar/", line 928, in _build_msg`
    `, v)`
`ArgumentError: Python argument types in`
    `, str, unicode)`
`did not match C++ signature:`
    `property(pulsar::MessageBuilder {lvalue}, std::string, std::string)`

The function code is this:
`from pulsar import Function`

`class RoutingFunction(Function):`
    `def __init__(self):`
        `self.out_topic = "<persistent://internal/emsaas/out>"`

    `def process(self, item, context):`
            `context.publish(self.out_topic, item, properties={ 'tenant' : context.get_function_tenant() })`
2020-03-03 14:01:07 UTC - Penghui Li: @Greg Can you help to create a issue on github, and provide the reproduce steps if possible ?
2020-03-03 14:01:15 UTC - Greg: ok thanks Penghui, tell me if you need more informations, traces or tests from me !
+1 : Penghui Li
2020-03-03 14:01:29 UTC - Greg: yes
2020-03-03 14:01:36 UTC - Penghui Li: Thanks
2020-03-03 14:03:46 UTC - Ryan Slominski: Looks like DOCUSAURUS code tabs interferes with the markdown because the raw markdown works fine.   Might be related to this bug:  <>
2020-03-03 14:08:45 UTC - Tobias Macey: @Sijie Guo is there anywhere that I can track progress on the Kafka protocol layer that you are working on?
2020-03-03 14:10:59 UTC - Penghui Li: @Greg I can't reproduce it on master branch
2020-03-03 14:11:04 UTC - Penghui Li: ```{
  "version": 2,
  "schemaInfo": {
    "name": "test",
    "schema": "",
    "type": "STRING",
    "properties": {}
2020-03-03 14:11:56 UTC - Greg: i try to see how to reproduce it easily, will also see if i have the issue with only one broker
2020-03-03 14:12:01 UTC - Penghui Li: This is created schema by consumer. I create the consumer after topic delete.
2020-03-03 14:12:39 UTC - Greg: but in my case the consumer is already connected when the topic is deleted
2020-03-03 14:13:03 UTC - Greg: i don't know why it is deleted, i though it was because no messages were sent
2020-03-03 14:13:04 UTC - Penghui Li: Ok, let me try
2020-03-03 14:17:45 UTC - Penghui Li: ```22:14:20.129 [pulsar-client-io-1-1:org.apache.pulsar.client.impl.ConnectionHandler@114] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [test] [test] Closed connection [id: 0x448f874d, L:/ - R:/] -- Will try again in 0.1 s
22:14:20.230 [pulsar-timer-4-1:org.apache.pulsar.client.impl.ConnectionHandler@117] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [test] [test] Reconnecting after timeout
22:14:20.236 [pulsar-client-io-1-1:org.apache.pulsar.client.impl.ConsumerImpl@550] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [test][test] Subscribing to topic on cnx [id: 0x448f874d, L:/ - R:/]
22:14:20.270 [pulsar-client-io-1-1:org.apache.pulsar.client.impl.ConsumerImpl@662] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [test][test] Subscribed to topic on / -- consumer: 0
22:15:00.982 [pulsar-client-io-1-1:org.apache.pulsar.client.impl.ClientCnx@629] INFO  org.apache.pulsar.client.impl.ClientCnx - [/] Broker notification of Closed consumer: 0
22:15:00.982 [pulsar-client-io-1-1:org.apache.pulsar.client.impl.ConnectionHandler@114] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [test] [test] Closed connection [id: 0x448f874d, L:/ - R:/] -- Will try again in 0.1 s
22:15:01.084 [pulsar-timer-4-1:org.apache.pulsar.client.impl.ConnectionHandler@117] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [test] [test] Reconnecting after timeout
22:15:01.085 [pulsar-client-io-1-1:org.apache.pulsar.client.impl.ConsumerImpl@550] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [test][test] Subscribing to topic on cnx [id: 0x448f874d, L:/ - R:/]
22:15:01.120 [pulsar-client-io-1-1:org.apache.pulsar.client.impl.ConsumerImpl@662] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [test][test] Subscribed to topic on / -- consumer: 0
22:15:30.509 [pulsar-client-io-1-1:org.apache.pulsar.client.impl.ClientCnx@629] INFO  org.apache.pulsar.client.impl.ClientCnx - [/] Broker notification of Closed consumer: 0
22:15:30.510 [pulsar-client-io-1-1:org.apache.pulsar.client.impl.ConnectionHandler@114] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [test] [test] Closed connection [id: 0x448f874d, L:/ - R:/] -- Will try again in 0.1 s
22:15:30.611 [pulsar-timer-4-1:org.apache.pulsar.client.impl.ConnectionHandler@117] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [test] [test] Reconnecting after timeout
22:15:30.613 [pulsar-client-io-1-1:org.apache.pulsar.client.impl.ConsumerImpl@550] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [test][test] Subscribing to topic on cnx [id: 0x448f874d, L:/ - R:/]
22:15:30.650 [pulsar-client-io-1-1:org.apache.pulsar.client.impl.ConsumerImpl@662] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [test][test] Subscribed to topic on / -- consumer: 0```
Looks not easy to reproduce. I tried delete topic only, delete topic and schema. The consumer still can reconnect succeed. And when I delete shema, I check the last schema version is changed.
2020-03-03 14:18:05 UTC - Penghui Li: ```lipenghui@lipenghuideMacBook-Pro-2 apache-pulsar-2.6.0-SNAPSHOT % bin/pulsar-admin schemas get test
  "version": 4,
  "schemaInfo": {
    "name": "test",
    "schema": "",
    "type": "STRING",
    "properties": {}
lipenghui@lipenghuideMacBook-Pro-2 apache-pulsar-2.6.0-SNAPSHOT % bin/pulsar-admin topics delete test -f -d
lipenghui@lipenghuideMacBook-Pro-2 apache-pulsar-2.6.0-SNAPSHOT % bin/pulsar-admin schemas get test        
  "version": 6,
  "schemaInfo": {
    "name": "test",
    "schema": "",
    "type": "STRING",
    "properties": {}
2020-03-03 14:20:32 UTC - Greg: wow ok :disappointed: I will try to find the exact way to reproduce, thanks for the help
2020-03-03 14:29:39 UTC - Geetish Sanjeeb Nayak: @Geetish Sanjeeb Nayak has joined the channel
2020-03-03 14:31:54 UTC - Greg: So here is a reproduction with 2 pulsar nodes :
```Node 1 :
14:23:30.460 [ForkJoinPool.commonPool-worker-1] WARN - [<non-persistent://public/default/cluster>] Error getting policies java.util.concurrent.CompletableFuture cannot be cast to and publish throttling will be disabled
14:23:30.461 [ForkJoinPool.commonPool-worker-1] INFO - Disabling publish throttling for <non-persistent://public/default/cluster>
14:23:30.477 [ForkJoinPool.commonPool-worker-1] INFO - Created topic NonPersistentTopic{topic=<non-persistent://public/default/cluster>}
14:25:20.219 [pulsar-io-24-3] INFO - [<non-persistent://public/default/cluster>] Topic deleted
14:25:20.219 [pulsar-io-24-3] INFO - [<non-persistent://public/default/cluster>] Topic deleted successfully due to inactivity
14:28:24.086 [pulsar-io-24-4] WARN - [<non-persistent://public/default/cluster>] Error getting policies java.util.concurrent.CompletableFuture cannot be cast to and publish throttling will be disabled
14:28:24.086 [pulsar-io-24-4] INFO - Disabling publish throttling for <non-persistent://public/default/cluster>
14:28:24.087 [pulsar-io-24-4] INFO - Created topic NonPersistentTopic{topic=<non-persistent://public/default/cluster>}

Node 2 :
14:25:20.201 [pulsar-ordered-OrderedExecutor-5-0-EventThread] INFO  org.apache.pulsar.zookeeper.ZooKeeperCache - [State:CONNECTED Timeout:30000 sessionid:0x100d2a693590018 local:/ remoteserver:zookeeper/ lastZxid:4294984843 xid:719 sent:719 recv:753 queuedpkts:0 pendingresp:0 queuedevents:0] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDataChanged path:/schemas/public/default/cluster
14:27:04.984 [pulsar-io-24-3] INFO - [/] Subscribing on topic <non-persistent://public/default/cluster> / Monitor-istra-monitor-667c9fbbcc-ldlsf
14:27:05.086 [Thread-14] WARN - [/][<non-persistent://public/default/cluster][Monitor-istra-monitor-667c9fbbcc-ldlsf]> Failed to create consumer: Topic does not have schema to check```
2020-03-03 14:33:21 UTC - Greg: The publisher does not disconnect but does not send any message, after 1 minute the Topic is deleted. Then a new publisher connects on the other node and gets the error message
2020-03-03 14:33:41 UTC - Greg: Will try with only one node
2020-03-03 14:40:35 UTC - Santiago Del Campo: Hello!.. any idea about this?

We're running pulsar cluster on top of K8s.... the broker takes high load traffic through websocket.... overtime, the CPU rise up to 3 cores of the server, could be related to some CPU usage "leak"?

Broker version is 2.4.2
2020-03-03 14:48:21 UTC - Greg: @Penghui Li: I cannot reproduce with only one pulsar instance, the Topic is never cleaned. But with several instances, i can easily reproduce as the Topic is deleted after 1 minute and then i cannot reconnect anymore to it
2020-03-03 14:56:45 UTC - Penghui Li: Is the instance means pulsar broker?
2020-03-03 15:15:51 UTC - Greg: yes
2020-03-03 15:34:06 UTC - Pavel Tishkevich: Hello.
We’ve increased number of brokers from 3 to 12 to have better availability when broker fails as @Sijie Guo suggested.

But after that latency for some operations increased significantly (For example `ConsumerBuilder.subscribe()` increased from 0.6s to 1.5s)

What is causing this latency growth? Can it be tuned with this number of brokers?

Also we’ve noticed that zookeeper is more loaded after adding brokers (see attached). Is this normal? What can we do with this to decrease latency?
2020-03-03 17:24:46 UTC - Sijie Guo: We are about to open source KoP in the coming week.
2020-03-03 17:27:06 UTC - Sijie Guo: what kind of hardware are you using for your zookeeper cluster?
2020-03-03 17:49:45 UTC - Gabriel Paiz III: @Gabriel Paiz III has joined the channel
2020-03-03 17:57:53 UTC - Devin G. Bost: I was just going to ask the same question.
2020-03-03 17:58:10 UTC - Devin G. Bost: More brokers puts greater load on Zookeeper.
2020-03-03 18:03:04 UTC - Devin G. Bost: I know there are folks using helm.
2020-03-03 18:03:29 UTC - Devin G. Bost: I don't know specifics though.
2020-03-03 18:19:50 UTC - Devin G. Bost: @Penghui Li I'm not sure if it's the same issue, but I know that sometimes the schema isn't created depending on the other in which components are created. For example, if a source is created before a function, or if a function is created before a sink, the schema is not created.
2020-03-03 18:20:36 UTC - Devin G. Bost: Do you have logs associated with the event? I think I've seen this happen before, but we need to capture logs around what's happening to identify the cause.
2020-03-03 18:24:54 UTC - Santiago Del Campo: Which logs exactly... broker's?:thinking_face:

I made some research about this.. found an issue on github related to CPU usage leak when broker was under high load, websocket communication and exclusive subscription of a topic was attempted (which btw it's our case due to the app architecture around Pulsar consumption).
2020-03-03 18:26:05 UTC - Santiago Del Campo: According to the github issue, and if i understand correctly, a fix was made in 2.4.2 and 2.5.0 :thinking_face:
2020-03-03 18:28:26 UTC - Devin G. Bost: Do you have the issue number? I can check on this.
2020-03-03 18:28:38 UTC - Devin G. Bost: Yes, I meant broker logs.
2020-03-03 18:29:42 UTC - Devin G. Bost: Is the issue intermittent? If so, it may be tricky to reproduce.
2020-03-03 18:30:09 UTC - Santiago Del Campo: issue: <>

apparently fixed with: <>
2020-03-03 18:31:50 UTC - Devin G. Bost: Thanks
2020-03-03 18:33:57 UTC - Santiago Del Campo: Thanks to you :slightly_smiling_face:
2020-03-03 18:41:47 UTC - Santiago Del Campo: This is the exception received from the stdout:
Tell me if you needed something else.

2020-03-03 18:41:47 UTC - Santiago Del Campo: ```18:38:41.381 [pulsar-web-32-7] WARN org.apache.pulsar.websocket.ConsumerHandler - [] Failed in creating subscription xxxxxxxxxxxxxxxxx on topic <non-persistent://public/default/xxxxxxxxxxxxxxxxxxx>
org.apache.pulsar.client.api.PulsarClientException$ConsumerBusyException: Exclusive consumer is already connected
at org.apache.pulsar.client.api.PulsarClientException.unwrap( ~[org.apache.pulsar-pulsar-client-api-2.4.2.jar:2.4.2]
at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe( ~[org.apache.pulsar-pulsar-client-original-2.4.2.jar:2.4.2]
at org.apache.pulsar.websocket.ConsumerHandler.&lt;init&gt;( ~[org.apache.pulsar-pulsar-websocket-2.4.2.jar:2.4.2]
at org.apache.pulsar.websocket.WebSocketConsumerServlet.lambda$configure$0( ~[org.apache.pulsar-pulsar-websocket-2.4.2.jar:2.4.2]
at org.eclipse.jetty.websocket.server.WebSocketServerFactory.acceptWebSocket( [org.eclipse.jetty.websocket-websocket-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.websocket.server.WebSocketServerFactory.acceptWebSocket( [org.eclipse.jetty.websocket-websocket-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.websocket.servlet.WebSocketServlet.service( [org.eclipse.jetty.websocket-websocket-servlet-9.4.20.v20190813.jar:9.4.20.v20190813]
at javax.servlet.http.HttpServlet.service( [javax.servlet-javax.servlet-api-3.1.0.jar:3.1.0]
at org.eclipse.jetty.servlet.ServletHolder.handle( [org.eclipse.jetty-jetty-servlet-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter( [org.eclipse.jetty-jetty-servlet-9.4.20.v20190813.jar:9.4.20.v20190813]
at [org.apache.pulsar-pulsar-broker-2.4.2.jar:2.4.2]
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter( [org.eclipse.jetty-jetty-servlet-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.servlet.ServletHandler.doHandle( [org.eclipse.jetty-jetty-servlet-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.session.SessionHandler.doHandle( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.handler.ContextHandler.doHandle( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.servlet.ServletHandler.doScope( [org.eclipse.jetty-jetty-servlet-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.session.SessionHandler.doScope( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.handler.ContextHandler.doScope( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.handler.ScopedHandler.handle( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.handler.HandlerCollection.handle( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.handler.StatisticsHandler.handle( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.handler.HandlerWrapper.handle( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.Server.handle( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.HttpChannel.handle( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.server.HttpConnection.onFillable( [org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar:9.4.20.v20190813]
at <|>.AbstractConnection$ReadCallback.succeeded( [org.eclipse.jetty-jetty-io-9.4.20.v20190813.jar:9.4.20.v20190813]
at <|>.FillInterest.fillable( [org.eclipse.jetty-jetty-io-9.4.20.v20190813.jar:9.4.20.v20190813]
at <|>.ChannelEndPoint$ [org.eclipse.jetty-jetty-io-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask( [org.eclipse.jetty-jetty-util-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce( [org.eclipse.jetty-jetty-util-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce( [org.eclipse.jetty-jetty-util-9.4.20.v20190813.jar:9.4.20.v20190813]
at [org.eclipse.jetty-jetty-util-9.4.20.v20190813.jar:9.4.20.v20190813]
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ [org.eclipse.jetty-jetty-util-9.4.20.v20190813.jar:9.4.20.v20190813]
at java.util.concurrent.ThreadPoolExecutor.runWorker( [?:1.8.0_232]
at java.util.concurrent.ThreadPoolExecutor$ [?:1.8.0_232]
at [io.netty-netty-all-4.1.32.Final.jar:4.1.32.Final]
at [?:1.8.0_232]```
2020-03-03 19:11:04 UTC - eilonk: If anyone was wondering, this isn’t related to helm but to kubernetes itself - just requires to mount volumes with the matching secrets and then editing the chart templates to use those.
2020-03-03 19:16:54 UTC - Devin G. Bost: Zookeeper needs to be properly tuned to function at its best.
2020-03-03 19:17:04 UTC - Devin G. Bost: It needs very fast disk storage.
2020-03-03 19:30:55 UTC - Alexander Ursu: Quick question, has anyone tried to put the Pulsar brokers directly behind a reverse proxy, specifically Traefik in my case, without first standing behind the Pulsar proxy? Are there any inherit problems with this approach, or are there any opinions on it?

I know the Pulsar proxy can do service discovery, but so can Traefik, since I have all my components running in docker containers in a swarm.
2020-03-03 19:33:00 UTC - Rolf Arne Corneliussen: Simple question: for a _partitioned_ topic, if you set a *compaction-threshold*, will it apply to the topic as a whole or for each individual partition? I assume the latter, but I could not find the answer in the documentation.
2020-03-03 19:37:42 UTC - Addison Higham: @Alexander Ursu it is a bit more complex than that, topics belong to a broker, for admin calls, http redirects are used, and the proxy follows those for you, likely you could do that with Traefik fairly easily, but for the tcp protocol (that actually sends/receives messages), you need to ensure you are being routed to the proper broker. When you try and connect a consumer/producer via a proxy, it looks up the broker for a topic (either via talking to zookeeper directly or by making an http call to the brokers, depending on how your proxy is configured) and then forwards requests. There is additional complexity if you are doing authz/authn there. Making Traefik do all that would be a fair bit more complicated and requires domains specific knowledge of pulsar's protocol
2020-03-03 19:40:11 UTC - Alexander Ursu: Thanks for the input! So I guess using the Pulsar proxy is still a much more sane approach, and then putting that behind Traefik would be straightforward?
2020-03-03 19:40:40 UTC - Addison Higham: should be, in our k8s we use an AWS NLB -&gt; Pulasr proxy
2020-03-03 21:31:53 UTC - Ryan Slominski: `java.lang.IllegalStateException: State is not enabled.`

I turned on debugging and I see this.  That explain why my Pulsar Function that attempts to use state is not working.  So, how do I enable state in a Pulsar Function?
2020-03-03 21:45:47 UTC - Ryan Slominski: Tried updating conf/bookkeeper.conf with:
Still no luck.
2020-03-03 22:04:54 UTC - Alexander Ursu: Might anyone have an idea/opinion on connecting a pulsar cluster to a sink for an arbitrary amount of topics, with arbitrary schemas, and can be connected to Grafana for graphing, preferably through SQL or SQL-like query language
2020-03-03 22:12:38 UTC - Alexander Ursu: Or has anyone had any experience with putting any sort of visualizations over Presto after configuring it to work with Pulsar
2020-03-03 22:18:42 UTC - Ali Ahmed: @Alexander Ursu you can try this
2020-03-03 22:42:23 UTC - Ryan Slominski: FYI - This does work if you deploy the function to the cluster.  Just doesn't work with Local Function Runner
2020-03-03 23:59:09 UTC - Derek Moore: @Derek Moore has joined the channel
2020-03-04 02:31:48 UTC - Eugen: I'm currently testing a pulsar cluster and I experience message loss when producing from a single thread to a single topic (asynchronously). The following list shows the sequence numbers that are missing when sending at 200.000 msg/sec. It does not matter if I consume in a tailing fashion or if I seek to the beginning and receive everything - the result is the same - which leads me to believe the producing side is the problem:
Under what circumstances can message loss occur? I was under the impression that that's pretty much impossible in pulsar.
2020-03-04 02:33:13 UTC - Joe Francis: It is impossible, if the message is acked to the producer
2020-03-04 02:33:59 UTC - Eugen: Fwiw, I am using the same benchmark code to produce data for pulsar and kafka, which I'm comparing pulsar to, and the kafka variant, which uses the same message generation threading and logic, and with kafka there is no message loss.
2020-03-04 02:34:55 UTC - Eugen: I also thought it is impossible, even if brokers and bookies fail, as long as the producer is alive, which it is
2020-03-04 02:35:21 UTC - Joe Francis: Published messages are durable. A message is published only after you receive an ack for it.
2020-03-04 02:36:14 UTC - Eugen: I'm ignoring acks
2020-03-04 02:36:25 UTC - Joe Francis: :slightly_smiling_face:
2020-03-04 02:37:00 UTC - Eugen: I'm just sending using producer.sendAsync()
2020-03-04 02:37:22 UTC - Eugen: so you are saying that if I do not wait for an ack, messages may be lost?
2020-03-04 02:37:29 UTC - Eugen: In other words, I need to send synchronously?
2020-03-04 02:37:35 UTC - Joe Francis: Not at all
2020-03-04 02:39:21 UTC - Joe Francis: But the ack is the guarantee that your message was persisted.  Asynchronous send allows you to pipeline sends, instead of waiting for the roundtrip before the next send.
2020-03-04 02:42:16 UTC - Eugen: I heard (I believe here in slack) that as long as a producer lives, it will buffer messages in the face of broker failures, and make sure there can be nore reordering or message loss. As I'm facing message loss (always in batches), either my producer is buggy (but it works in kafka), or something is happening in pulsar, which I thought was impossible. So how could some messages get lost?
2020-03-04 02:43:43 UTC - Eugen: Obviously there could be message "loss" at the end, as long as brokers (or bookies) aren't recovering, but I still don't see how that could happen with messages in the middle.
2020-03-04 02:48:15 UTC - Joe Francis: That is correct. But if the operation times out, the clinet returns an error (future completed with error) and app has to republish
2020-03-04 02:49:06 UTC - Joe Francis: The broker retries will  not exceed the timeout window.  Within the timeout window, there will be retries.
2020-03-04 02:50:21 UTC - Joe Francis: The timeout window essentially is a limit - The app  tells Pulsar, if you cannot publish within this time, kick up the error to me
2020-03-04 02:50:52 UTC - Eugen: In which case there will be message loss (or reordering in case I resend), right?
2020-03-04 02:51:14 UTC - Eugen: I will look into the timeout setting next. Do you happen to know what the default is?
2020-03-04 02:52:13 UTC - Joe Francis: What happens on a timeout is that everything after that message that is pending in sthe client sendbuffer is cleaned out. the app has to resend everything starting from the message that returned error.
2020-03-04 02:52:57 UTC - Eugen: So before resending it needs to clear some flag that it received the nack?
2020-03-04 02:53:22 UTC - Eugen: Otherwise how would the client know if it's a resend (aware of the problem) or not
2020-03-04 02:58:43 UTC - Joe Francis: Lets say you get an error on the 23rd message, after you send 50 in async. You will get error on all 23..50 messages (futures). You will  just have to resend starting from 23...
2020-03-04 03:01:11 UTC - Joe Francis: I am not clear on what happened in your case, but I am pretty confident that if you received the ack for the publish, that message will be delivered.
2020-03-04 03:05:11 UTC - Eugen: So in your example, I send msg 50 asynchronously, and then the future for msg 23 returns an error. Everything before that is lost, but everything after that (like msg 51 which follows immediately after the asynchronously sent msg 50) gets sent? Then I don't see how I can prevent message loss / reordering when sending asynchronously.
2020-03-04 03:17:30 UTC - Joe Francis: That is a very good qn - I have to look at the code and see what happens @Matteo Merli may know it off the top of his head
2020-03-04 03:25:20 UTC - Eugen: I will try to set `brokerDeduplicationEnabled` to true
2020-03-04 03:59:45 UTC - Kannan: Broker-znode sessions names are constructed using broker advertised address. By default broker advertise its IP &amp; in k8s if we try to advertise the broker service name then other brokers failing to create session with because of session conflict. how to resolve this ?
2020-03-04 04:20:22 UTC - Eugen: adding
to broker.conf did not fix this
2020-03-04 04:48:22 UTC - Matteo Merli: @Eugen The most probable cause of the publish failure is the message being rejected directly in the client side because the buffer is full.

Producer has a queue where messages are kept before they are acknoledged by the broker.

When the queue gets full (default size is 1K messages), then the producer will, by default, reject new sendAsync() operation (by failing the associated future objects).

One way to avoid that is to set `blockIfQueueFull=true` on the `ProducerBuilder`. This will make the sendAsync operation to throttle (blocking) when there's no more space.
+1 : Eugen, Ali Ahmed
2020-03-04 04:49:48 UTC - Matteo Merli: Another way you can get publish failure is through timeouts (default is 30s). You can disable timeout, making the client to keep trying publishing the message forever by setting `sendTimeout=0`.
2020-03-04 04:58:29 UTC - Eugen: Good news - I will give this a shot!
2020-03-04 05:13:51 UTC - Eugen: The 30 sec timeout can't be what is causing my problem though, because it occurs after only a couple of seconds
2020-03-04 05:30:12 UTC - Sijie Guo: Are you using the same service name among different broker nodes?
2020-03-04 05:38:25 UTC - Eugen: Note to self and others following this thread: the queue size can be changed via `ProducerBuilder.maxPendingMessages()`
2020-03-04 05:42:36 UTC - Kannan: yes
2020-03-04 07:36:35 UTC - Greg: No, i can reproduce easily, but only with several brokers. If i use a single broker, i don't reproduce as the Topic is never cleaned. I need to understand why the Topic is cleaned even when there are subscribers/consumers connected...
2020-03-04 07:39:45 UTC - Greg: We managed to workaround the issue by setting brokerDeleteInactiveTopicsEnabled=false, but this is a temporary solution...
2020-03-04 07:50:47 UTC - Sijie Guo: each broker should have its own advertised address
2020-03-04 07:50:50 UTC - Sijie Guo: you can’t assign same address to different brokers.
2020-03-04 07:56:59 UTC - Kannan: by default, if we advertise broker ip its not accessible by its own pod with mTLS (<http://pod-ip:8080>)
2020-03-04 08:09:16 UTC - Greg: @Penghui Li: Hi, i just reproduced the issue with 2 brokers, i have a publisher and a consumer connected to the topic on broker 1 and i see that broker 2 deletes the topic :
```Broker 1 :
08:01:56.281 [pulsar-io-24-3] INFO - [/] Subscribing on topic <non-persistent://public/default/cluster> / Monitor-istra-monitor-667c9fbbcc-9cbns
08:01:56.309 [pulsar-io-24-3] WARN - [<non-persistent://public/default/cluster>] Error getting policies java.util.concurrent.CompletableFuture cannot be cast to and publish throttling will be disabled
08:01:56.309 [pulsar-io-24-3] INFO - Disabling publish throttling for <non-persistent://public/default/cluster>
08:01:56.314 [pulsar-io-24-3] INFO - Created topic NonPersistentTopic{topic=<non-persistent://public/default/cluster>}
08:01:57.199 [Thread-9] INFO - [<non-persistent://public/default/cluster][Monitor-istra-monitor-667c9fbbcc-9cbns]> Created new subscription for 0
08:01:57.199 [Thread-9] INFO - [/] Created subscription on topic <non-persistent://public/default/cluster> / Monitor-istra-monitor-667c9fbbcc-9cbns
08:01:57.888 [pulsar-io-24-3] INFO - [/][<non-persistent://public/default/cluster>] Creating producer. producerId=0
08:01:58.025 [BookKeeperClientWorker-OrderedExecutor-0-0] INFO - [/] Created new producer: Producer{topic=NonPersistentTopic{topic=<non-persistent://public/default/cluster>}, client=/, producerName=Monitor-istra-monitor-667c9fbbcc-9cbns, producerId=0}
08:03:10.130 [pulsar-ordered-OrderedExecutor-5-0-EventThread] INFO  org.apache.pulsar.zookeeper.ZooKeeperCache - [State:CONNECTED Timeout:30000 sessionid:0x100d2a69359002d local:/ remoteserver:zookeeper/ lastZxid:4294987042 xid:600 sent:600 recv:631 queuedpkts:0 pendingresp:0 queuedevents:0] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDataChanged path:/schemas/public/default/cluster

Broker 2 :
08:01:55.954 [ForkJoinPool.commonPool-worker-0] WARN - [<non-persistent://public/default/cluster>] Error getting policies java.util.concurrent.CompletableFuture cannot be cast to and publish throttling will be disabled
08:01:55.955 [ForkJoinPool.commonPool-worker-0] INFO - Disabling publish throttling for <non-persistent://public/default/cluster>
08:01:55.959 [ForkJoinPool.commonPool-worker-0] INFO - Created topic NonPersistentTopic{topic=<non-persistent://public/default/cluster>}
08:03:10.129 [pulsar-ordered-OrderedExecutor-4-0-EventThread] INFO  org.apache.pulsar.zookeeper.ZooKeeperCache - [State:CONNECTED Timeout:30000 sessionid:0x200eccd91c10031 local:/ remoteserver:zookeeper/ lastZxid:4294987046 xid:186 sent:186 recv:202 queuedpkts:0 pendingresp:0 queuedevents:1] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDataChanged path:/schemas/public/default/cluster
08:03:10.132 [pulsar-io-24-3] INFO - [<non-persistent://public/default/cluster>] Topic deleted
08:03:10.133 [pulsar-io-24-3] INFO - [<non-persistent://public/default/cluster>] Topic deleted successfully due to inactivity```
2020-03-04 08:11:58 UTC - Greg: Is it normal that the topic is created in both brokers ?
2020-03-04 08:14:20 UTC - Penghui Li: Ok, let me take a look.
2020-03-04 08:19:52 UTC - Greg: thanks
2020-03-04 08:31:09 UTC - Vincent LE MAITRE: Hi, I would like to develop Pulsar functions with state. On my local standalone Pulsar installation it works fine (state is enabled by default). On a kubernetes Pulsar installation, deployed using Pulsar helm templates, the state feature seems to be disabled by default. And I am not able to enable this feature. Is there someone who succeed in activating function state on Kubernetes deployment ? Does the provided helm template allow to enable pulsar functions state ? Thanks
2020-03-04 08:42:02 UTC - Pavel Tishkevich: Zookeeper node has 12 CPU cores, 64GB of RAM, CPU idle > 75%
On each zookeeper node there are also deployed 1 broker and 1 bookie.

System load is not high: after adding more brokers it is never more than 5 (less than 50%).
There are almost no processes blocked by I/O.

Also we have disabled fsync in Zookeeper.

All this looks like problem is not related to hardware/disk speed.
2020-03-04 08:42:21 UTC - Pavel Tishkevich: 