You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2018/07/25 09:11:04 UTC

Slack digest for #general - 2018-07-25

2018-07-24 09:13:04 UTC - Sijie Guo: what is your subscription type?
----
2018-07-24 09:14:23 UTC - Idan: SubscriptionType.Shared
----
2018-07-24 09:19:15 UTC - Idan: it’s like the consumer never get ‘ack success’ from the broker
----
2018-07-24 09:19:21 UTC - Idan: and therefor it shows that warning
----
2018-07-24 09:19:31 UTC - Idan: although we know the broker got the message for sure. otherwise it would re-deliver right?
----
2018-07-24 09:19:53 UTC - Idan: here are other trace logs on the consumer side:
----
2018-07-24 09:19:55 UTC - Idan: 12:17:41.099 DEBUG o.a.p.c.i.ConsumerImpl:368 - [tomersubscr] [a2f50] can ack message to broker 8:12:-1:0, acktype Individual, cardinality 0, length 1
12:17:41.100 DEBUG o.a.p.c.i.ConsumerImpl:404 - [tomersubscr] [a2f50] acknowledging message - 8:12:-1:0, acktype Individual
----
2018-07-24 09:20:18 UTC - Idan: thats the messageID: 8:12:-1:0
----
2018-07-24 09:20:27 UTC - Idan: the most sanity scenario ever
----
2018-07-24 09:20:35 UTC - Idan: we cant nail this down for few days
----
2018-07-24 09:20:44 UTC - Idan: started to speak about this with @Matteo Merli but he also didnt have a clue
----
2018-07-24 09:20:51 UTC - Idan: 
----
2018-07-24 09:20:57 UTC - Idan: perhaps you’ll have insights
----
2018-07-24 09:23:11 UTC - Sijie Guo: @Idan sorry I don’t have immediate clues by looking into the code. this sounds like a race condition to me on handling UnackedMessages. do you mind creating a github issue? I can take a look in the morning.
----
2018-07-24 09:23:41 UTC - Idan: race condition on the consumer side?
----
2018-07-24 09:24:26 UTC - Idan: perhaps its the way we init the consumer:
----
2018-07-24 09:24:27 UTC - Idan: client.newConsumer()
                    .topic(clusterUrl + topicName)
                    .subscriptionType(subscriptionType)
                    .ackTimeout(ackTimeoutInMillis, TimeUnit.MILLISECONDS)
                    .subscriptionName(subscriptionName)
                    .subscribe(), maxReceive, topicDLQ)
----
2018-07-24 09:25:39 UTC - Idan: i tried it also with ackSync
----
2018-07-24 09:25:46 UTC - Idan: consumer.acknowledge(messageId);
----
2018-07-24 09:25:47 UTC - Idan: this way
----
2018-07-24 09:25:50 UTC - Idan: got the same result
----
2018-07-24 09:25:55 UTC - Idan: it’s like the most sanity flow:)
----
2018-07-24 09:26:11 UTC - Sijie Guo: I mean race condition within pulsar on handling acks and ack timeouts
----
2018-07-24 09:26:33 UTC - Idan: i guess someone would complain about this wouldnt it?
----
2018-07-24 09:26:37 UTC - Idan: it’s like you see it right away..
----
2018-07-24 09:29:16 UTC - Sijie Guo: but I didn’t see other complains and also I didn’t see this also. so I guess it must be triggered by some sequence. however I don’t have any immediate clues on the problem. so I have to look at it in the morning.
----
2018-07-24 09:29:19 UTC - Sijie Guo: sorry
----
2018-07-24 09:29:25 UTC - Sijie Guo: it is a bit late to me now
----
2018-07-24 09:29:34 UTC - Idan: ok no worries. ill be waiting for your response
----
2018-07-24 09:29:45 UTC - Sijie Guo: please file a github issue
----
2018-07-24 09:29:51 UTC - Sijie Guo: that’s better for tracking purpose.
----
2018-07-24 09:31:39 UTC - Idan: i will do that
----
2018-07-24 10:57:53 UTC - Ewout: @Ewout has joined the channel
----
2018-07-24 11:12:13 UTC - Ewout: Just getting started with pulsar... I would like to publish a stream of messages to pulsar. In order to survive crashes of the producer I wanted to use setSequenceId and producer.getLastSequenceId() to determine which message has been sent last to pulsar before the crash and resume sending from there. The problem is that sequenceId in pulsar is of type long and I cannot identify messages in my source data with only a long. Are there any other methods of accomplishing something similar ?
----
2018-07-24 11:14:34 UTC - Ewout: Something that would return the complete last published message for example
----
2018-07-24 11:25:41 UTC - Karthik Palanivelu: @Sijie Guo yes I am trying active active replication without ZKs global quorum.
----
2018-07-24 11:54:19 UTC - Guillaume Pitel: Hi everyone. A bit of context first, we are using Pulsar as a message queue for a crawling service. In order to improve the load balancing of the pulsar nodes, I' ve changed some settings in the broker.conf. And now one of my service, where several topics are subscribed by the same object, has these kind of warnings :
----
2018-07-24 11:54:30 UTC - Guillaume Pitel: @Guillaume Pitel uploaded a file: <https://apache-pulsar.slack.com/files/UBULT3BTJ/FBV681M88/-.txt|Sans titre>
----
2018-07-24 11:55:13 UTC - Guillaume Pitel: This error happens several times, always with the same pattern "allTopicPartitionsNumber X not equals expected: X-1"
----
2018-07-24 11:59:27 UTC - Guillaume Pitel: @Guillaume Pitel uploaded a file: <https://apache-pulsar.slack.com/files/UBULT3BTJ/FBW1Z235K/the_changed_settings.js|The changed settings>
----
2018-07-24 16:07:56 UTC - Matteo Merli: @Guillaume Pitel the configuration changes looks reasonable. Are you using a regex or consumer with a list of topics? Or is that a partitioned topic?
----
2018-07-24 17:17:17 UTC - Guillaume Pitel: No, although I have tried using a partitioned topic afterward to see if the problem would disappear.
----
2018-07-24 17:17:27 UTC - Poule: is it ok to use Pulsar with Ceph? Hetzner's vps use ceph
----
2018-07-24 17:18:22 UTC - Guillaume Pitel: I've solved the problem by starting one thread per Consumer, which is acceptable because there are only 100 topics currently
----
2018-07-24 17:19:35 UTC - Matteo Merli: Ok, another option would be to set a message listener on the consumers. That way you can still have 100 consumers but you don’t need 1 thread per consumer
----
2018-07-24 17:19:44 UTC - Poule: disabling fsync too I guess
----
2018-07-24 17:21:40 UTC - Matteo Merli: In any case, I’ll try to reproduce the subscribe issue. Any other hints on how it got into that state ?
----
2018-07-24 17:21:49 UTC - Sijie Guo: &gt; Something that would return the complete last published message for example

@Ewout: you can use the Reader API to get the last published message. However I don’t think that would address your problem. My sense on your problem is you want to achieve exactly-once publish. You need some guarantees on brokers for reasoning about duplicated produces from retries.
----
2018-07-24 17:28:32 UTC - Sijie Guo: yes it is okay to use ceph
----
2018-07-24 17:29:34 UTC - Poule: ok thanks
----
2018-07-24 17:34:38 UTC - Daniel Ferreira Jorge: If I'm consuming from a topic and, inside it's message listener, I'm producing to another topic, and that my producer is named, the sendTimeout is set to 0 and the namespace has deduplication enabled (as per the deduplication cookbook), do I have a guarantee that no messages will be produced twice in case of a failure? (failure after producing but before acking).
----
2018-07-24 17:42:06 UTC - Matteo Merli: @Daniel Ferreira Jorge not entirely. This would cover all Pulsar side crashes but not application crashes, since it would be a different producer “session” with a different producer name (by default). 

the Pulsar functions runtime is enabling the effectively-once logic with these additional steps: 
 * Producer name is set and reused after crash
 * sequence id used when publishing needs to be tied to some property of the source. In this case the source is a topic so the sequenceId is derived from the messageId. (See: `org.apache.pulsar.functions.utils.Utils.getSequenceId(message.getMessageId())`
----
2018-07-24 17:51:33 UTC - Daniel Ferreira Jorge: @Matteo Merli The producer inside the message listener has an immutable name, so when the app restarts, it will have the same name. Is this enough? Will the producer derive the sequenceId from the messageId?
----
2018-07-24 17:52:46 UTC - Guillaume Pitel: Regarding your previous remark (one message listener on the consumers), it's what was done before, I think I misunderstood your first question about a list of topics.
----
2018-07-24 17:54:22 UTC - Matteo Merli: If the producer has the same name it will automatically assign sequenceIds based on the last committed sequenceId. 

The problem is that the source (the consumer) will replay some of the entries. Explicitely setting the sequence Id on the message builder when re-publishing, will allow the broker to identify the messages replayed by the consumer as dups.
----
2018-07-24 17:54:30 UTC - Guillaume Pitel: Unfortunately, no other hints, I've switched to Pulsar 2.0.1 only recently, it first worked quite as usual. I've changed the brokers settings because I had some problems with "Namespace unavailable because of unloading bundles" which seems to take forever.
----
2018-07-24 17:56:20 UTC - Matteo Merli: Quick example, without explicitely setting the sequence Id it would happen the following: 
 * “producer-a” comes back 
 * Broker will tell the last sequenceId published by “producer-a” was 10 
 * Producer will start publishing messages starting from sequenceId 11, 12, 13..
----
2018-07-24 17:58:54 UTC - Matteo Merli: When setting the sequenceId from source, we would instead have: 
 • “producer-a” comes back
 • Broker will tell the last sequenceId published by “producer-a” was 10 — Though we ignore it 
 • Consumer will replay some messages: 7, 8, 9, 10 
 • When publishing, `producer.newMessage().value(x).sequenceId(7).send()` and broker will discard anything with sequenceId &lt;= 10
----
2018-07-24 17:59:56 UTC - Daniel Ferreira Jorge: @Matteo Merli do you mean `.sequenceId(10)`?
----
2018-07-24 18:00:37 UTC - Daniel Ferreira Jorge: I see
----
2018-07-24 18:04:09 UTC - Matteo Merli: Simple, pseudo-code example:

```
while (true) {
  Message msg = consumer.receive();
  
  byte[] newValue = process(msg);
  
  // Publish result:
  producer.newMessage()
            .value(newValue)
            .sequenceId(Utils.getSequenceId(msg.getMessageId()))
            .send();
  consumer.acknowledge(msg);
}
```
----
2018-07-24 18:06:11 UTC - Daniel Ferreira Jorge: @Matteo Merli I'm already using deduplication successfully in other parts of my code where I provide a sequenceId derived from somewhere else. My doubt is more centered around the messageId being able to be used as a sequenceId, specially in the case of consuming partitioned topics.
----
2018-07-24 18:08:51 UTC - Matteo Merli: Correct, with partitioned topics it gets a bit more complicated
----
2018-07-24 18:09:02 UTC - Daniel Ferreira Jorge: If I'm consuming a 100-partitions, with let's say 1 Exclusive subscription, topic and producing to one topic, can I still use messageId as sequenceId for my producer?
----
2018-07-24 18:09:21 UTC - Daniel Ferreira Jorge: will pulsar hold the last sequence id for each partition?
----
2018-07-24 18:09:47 UTC - Matteo Merli: Yes, though you need to have individual producers for each partition you’re going to produce into
----
2018-07-24 18:10:04 UTC - Daniel Ferreira Jorge: ahhh... that is fine
----
2018-07-24 18:10:50 UTC - Matteo Merli: actually, you need 1 producer per each partition you are consuming from
----
2018-07-24 18:10:57 UTC - Matteo Merli: (sorry for confusion)
----
2018-07-24 18:11:20 UTC - Daniel Ferreira Jorge: yes
----
2018-07-24 18:12:14 UTC - Matteo Merli: so, when you get a message you also have to lookup the appropriate producer to use
----
2018-07-24 18:14:15 UTC - Daniel Ferreira Jorge: Is `HashMap&lt;Short, Producer&lt;byte[]&gt;&gt;` enough, where the `Short` will be the partition number?
----
2018-07-24 18:15:55 UTC - Matteo Merli: correct
----
2018-07-24 18:16:30 UTC - Matteo Merli: or you could just use a Pulsar function (either in local or managed mode) and it would handle all this logic
----
2018-07-24 18:20:27 UTC - Daniel Ferreira Jorge: We may try functions but we have a whole delivery process to kubernetes via spinnaker, which handle everything that goes to production. Deploying functions manually for the "important" parts of the system is a no-no...
----
2018-07-24 18:20:46 UTC - Daniel Ferreira Jorge: @Matteo Merli Thank you very much, as always, for your help!
+1 : Matteo Merli
----
2018-07-24 18:22:58 UTC - Matteo Merli: Yes, in that context, the local runner is probably the best way to integrate — is just a standalone java process that runs you jar and you can deploy as you want.
----
2018-07-24 18:23:58 UTC - Daniel Ferreira Jorge: Thanks I will take a more in depth look at function!
----
2018-07-24 18:26:17 UTC - Poule: NFS?
----
2018-07-24 18:27:13 UTC - Poule: @Sijie Guo
----
2018-07-24 18:29:24 UTC - Sijie Guo: any filesystem should work. but you might consider disable fsync since network filesystem and distributed filesystem would have latency penalty
----
2018-07-24 18:31:58 UTC - Poule: ok
----
2018-07-24 18:32:24 UTC - Poule: thanks again
----
2018-07-24 19:35:27 UTC - Idan: @Matteo Merli @Sijie Guo i created git issue as request: <https://github.com/apache/incubator-pulsar/issues/2221>
----
2018-07-24 19:35:36 UTC - Idan: this is our last step before giving up:(
----
2018-07-24 19:35:53 UTC - Idan: as we cant move on with our pular poc till we know we can resolve this
+1 : Matteo Merli
----
2018-07-24 19:51:32 UTC - Idan: @Matteo Merli iam also having on that service sqs(aws queue) client
----
2018-07-24 19:51:36 UTC - Idan: mybe they collide somehow
----
2018-07-24 19:51:41 UTC - Idan: so my consumer struggles to ack
----
2018-07-24 19:53:16 UTC - Matteo Merli: in your example I’ve seen that you have a map of consumers
----
2018-07-24 19:53:32 UTC - Matteo Merli: is there any chance that map gets updated?
----
2018-07-24 19:59:53 UTC - Idan: yes i have map of consumers as a singleton is handling all pulsar logic
----
2018-07-24 20:00:06 UTC - Idan: updated.. not really coz i added logs to make sure iam using the right consumer after iam getting it
----
2018-07-24 20:00:36 UTC - Idan: i can send you this class
----
2018-07-24 20:00:43 UTC - Idan: but i cant make this public
----
2018-07-24 21:26:57 UTC - William Fry: We’re using the Admin REST API to determine stats about topics. If I curl the REST API from one of the instances, I correctly receive a 404 with topic not found. However, if I curl the second of two instances, the request times out and the logs show a 307 error.
----
2018-07-24 21:27:51 UTC - William Fry: I’m running into issues in that sometimes the load balancer directs to instance one, while other times it directs to instance two. Does anyone know what’s going on or how to resolve this so that either A) the ELB directs to the correct instance or B) the second instance doesn’t timeout
----
2018-07-24 21:28:18 UTC - Matteo Merli: 307 is just an HTTP redirection
----
2018-07-24 21:28:27 UTC - William Fry: But it never successfully redirects
----
2018-07-24 21:28:41 UTC - William Fry: it timesout at whatever requests’s default timeout is
----
2018-07-24 21:28:51 UTC - William Fry: is that because Pulsar instances need to be able to chat to one another?
----
2018-07-24 21:29:25 UTC - Matteo Merli: is the connection coming from outside the network in which the brokers are located?
----
2018-07-24 21:30:10 UTC - William Fry: Normally it would be coming from outside the network. Right now I’m testing on localhost from the brokers, one of which correctly 404s and the other which 307s and times out
----
2018-07-24 21:30:19 UTC - William Fry: (outside the network via an ELB)
----
2018-07-24 21:32:12 UTC - William Fry: Looks like the instances need to be able to chat with one another
----
2018-07-24 21:32:15 UTC - Matteo Merli: yes, some of the REST endpoint will redirect to the broker that is serving the particular topic. If that hostname/IP is not reachable from the client it will be having problem
----
2018-07-24 21:32:24 UTC - William Fry: I thought that only Zookeeper instances needed to be able to connect to one another
----
2018-07-24 21:32:27 UTC - William Fry: Seems I was mistaken
----
2018-07-24 21:33:03 UTC - Matteo Merli: there’s no connection broker -&gt; broker, just the client is redirected to particular broker.
----
2018-07-24 21:34:09 UTC - Matteo Merli: eg: HTTP request for stats of “my-topic” goes to broker-1. It figures out that topic is being served on broker-2, then it sends 307 reply, telling client to go to broker-2
----
2018-07-24 21:34:31 UTC - Matteo Merli: if “broker-2” is not reachable from client, that is a problem
----
2018-07-24 21:35:27 UTC - Matteo Merli: one possible solution is to use the pulsar-proxy, which provides a frontend service for both pulsar:// and http:// protocols and can be exposed through the load balancer
----
2018-07-24 23:24:22 UTC - Matteo Merli: @Idan posted a fix for the issue at <https://github.com/apache/incubator-pulsar/pull/2224>
----
2018-07-25 08:03:10 UTC - Idan: @Matteo Merli how come none payed attention to it before isnt it basic produce-consume scenario ??  have we done something special ?
----
2018-07-25 08:18:41 UTC - Idan: @Matteo Merli @Sijie Guo how this affect us currently? can we just ignore this warning till we get the fix?
----