You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2018/12/17 09:11:04 UTC

Slack digest for #general - 2018-12-17

2018-12-16 09:54:52 UTC - zero.xu: @zero.xu has joined the channel
----
2018-12-16 15:02:25 UTC - zero.xu: after read the code, I found the msg will be add async inot ledger in PersistentTopic impl, but how the consumer know the new msg coming? I can't find any code about this, ashamed about my terrible ability. but in NonPersistentTopic impl, the msg just go through subscriptions-&gt; subscription -&gt; dispatcher-&gt; consumer-&gt;channel. who call show me the related code in PersistentTopic?
----
2018-12-16 15:25:38 UTC - jia zhai: Hi @zero.xu Consumer will send a “Flow” command to broker, broker will handle this command, and push data to Consumer. Please take a look at `handleFlow` command in `ServerCnx.java`, and `handleMessage` in ClientCnx.java
----
2018-12-16 15:26:55 UTC - zero.xu: thx!
----
2018-12-16 15:28:31 UTC - jia zhai: welcome
----
2018-12-16 15:29:19 UTC - Sijie Guo: @zero.xu: to add on what @jia zhai explained - the consumer is “reading”/“waiting for” entries, while the producer is writing the entries and on successfully appending entries, it will add the entries to ManagedLedger entries cache, which will then notify the consumers waiting for entries.
----
2018-12-16 15:52:14 UTC - zero.xu: I did not found any code aboud the notify action, can u show related code?
----
2018-12-16 15:53:07 UTC - zero.xu: I review the code: PersistentTopic ManagedLedgerImpl PersistentSubscription, can not found any notify action
----
2018-12-16 15:53:52 UTC - zero.xu: @Sijie Guo
----
2018-12-16 16:03:51 UTC - Matteo Merli: @zero.xu Take a look at <https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L413>
----
2018-12-16 16:04:21 UTC - Sijie Guo: @zero.xu check ManagedCursorImpl.asyncReadEntriesOrWait
----
2018-12-16 16:05:35 UTC - Matteo Merli: `cursor.asyncReadEntriesOrWait()` is the call that will register to get the next batch of messages. if the cursor is at the end of topic, it will asynchronosly wait until messages are available
+1 : zero.xu
----
2018-12-16 16:05:35 UTC - Sijie Guo: if a cursor is caught up, it will be added to the managed ledger’s `waitingCursors` list. when new entries appened, the cursors will be waken up to read the actual emtries from the cache.
----
2018-12-16 16:06:07 UTC - Sijie Guo: yeah @Matteo Merli is faster than me
+1 : zero.xu
----
2018-12-16 16:06:12 UTC - Matteo Merli: :slightly_smiling_face:
----
2018-12-17 00:26:41 UTC - jia zhai: :+1:
----
2018-12-17 01:13:01 UTC - zero.xu: @Matteo Merli @Sijie Guo after read the code carefully, I found the clue: PersistentTopic.publishMessage -&gt; ManagedLedgerImpl.asyncAddEntry -&gt; OpAddEntry.initiate -&gt; LedgerHandle.asyncAddEntry -&gt; OpAddEntry.addComplete+safeRun -&gt; ManagedLedgerImpl.notifyCursors -&gt; ManagedCursorImpl.notifyEntriesAvailable -&gt; ManagedLedgerImpl.asyncReadEntries -&gt; EntryCacheImpl.asyncReadEntry
----
2018-12-17 01:14:12 UTC - Matteo Merli: Yes, there’s is an optimization there to avoid contention between the threads managing the publishers and consumers
----
2018-12-17 01:14:43 UTC - Matteo Merli: With a retry logic to avoid the notification when the throughput is high enough
----
2018-12-17 07:07:20 UTC - linxin: @linxin has joined the channel
----
2018-12-17 07:26:17 UTC - linxin: I have read the official documentation on the Schema Registry and read the source code. I found that the Pulsar Broker only performs the Schema checking when the Producers and Consumers first connect to the Broker. The actual write process does not verify. What happens if the schema is deleted after the producer connects to the broker?
----
2018-12-17 07:28:39 UTC - linxin: I have read the official documentation on the Schema Registry and read the source code. I found that the Pulsar Broker only performs the Schema checking when the Producers and Consumers first connect to the Broker. The actual write process does not verify. What happens if the schema is deleted after the producer connects to the broker? @Sijie Guo
----
2018-12-17 07:42:39 UTC - Matteo Merli: You mean the schema gets created or deleted after the producer creation?
----
2018-12-17 07:44:12 UTC - linxin: @Matteo Merli Yes
----
2018-12-17 07:51:58 UTC - Matteo Merli: In case of deletion, I think we’re not taking any action at the moment 
----
2018-12-17 07:53:47 UTC - Matteo Merli: For creation, I believe that the topic should have been created with type bytes already, therefore it should not let you to set the schema at this point since it will be for a different schema type
----
2018-12-17 07:54:16 UTC - Matteo Merli: Just going from memory, I don’t have the code in front 
----
2018-12-17 08:10:23 UTC - linxin: @Matteo Merli I understand what you mean, thanks. And i have another question, why Pulsar schema is topic dimension, unlike Kafka, each message gets a schema id?
----
2018-12-17 08:14:46 UTC - linxin: What happens if the schema makes incompatible changes (such as delete a Topic schema and then re-creating one) and then register a new consumer with new schema trying to consume the old message with old schema?
----
2018-12-17 08:19:42 UTC - linxin: 1. Topic#setSchema(SchemaA); 2. send a messageA under SchemaA; 3. delete SchemaA and Topic#setSchema(SchemaB); 4. Consumer#subscribeTopicWith(SchemaB); How the consumer handles the messageA? My English is not very good. Can you understand what I mean?
----