You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by Apache Pulsar Slack <> on 2020/04/25 09:11:05 UTC

Slack digest for #general - 2020-04-25

2020-04-24 09:19:20 UTC - Ali Ahmed: yes should be
2020-04-24 09:19:41 UTC - Fayce: Thanks
2020-04-24 09:40:26 UTC - rani: &gt; It seems that you didn’t configure this properly. Hence the anonymous role is used.
@Sijie Guo, hmm, when I set the following on `broker.conf` everything works as expected with the healthcheck endpoint.
But is this configuration recommended?
I read in this issue that this is required for broker to broker communication. Is this still valid? <>
2020-04-24 09:49:15 UTC - Fernando Miguélez: It is already available at maven central by the way
2020-04-24 10:06:30 UTC - Gabriel Volpe: Hi guys, can anyone tell me what should be the default behavior of calling `subscribe` with a subscription type `Exclusive` when another subscription of type `Exclusive` with the same name already exists?

The Java client says the following, which is not clear what "operation fails" means.

```     * @throws PulsarClientException
     *             if the the subscribe operation fails
    Consumer&lt;T&gt; subscribe() throws PulsarClientException;```
I'm hoping the subscription cannot be created when there's an existing one but it's happening anyway, no errors.
2020-04-24 10:08:37 UTC - Rahul: If there is already one consumer running using the same subscription name then it will fail. Otherwise it will let you use the subscription name
2020-04-24 10:10:47 UTC - Gabriel Volpe: I have indeed another producer running in a different application but it doesn't publish messages until it gets triggered by certain events.
2020-04-24 10:11:14 UTC - Gabriel Volpe: Is there a way to verify it with the `pulsar-admin` ?
2020-04-24 10:12:08 UTC - Rahul: You can check the connected consumers using the admin cli
2020-04-24 10:15:29 UTC - Fayce: This example creates an Avro schema from a JSON string, but I don't see any message sent or received. How do you define the actual message? Do I just create a simple struct with the fields as described in the exampleSchema? In Java, you can create a schema directly from a class. Is there the same functionality in C++?
2020-04-24 10:16:22 UTC - Gabriel Volpe: Is that `pulsar-admin`? I'm trying to find what's the command
2020-04-24 10:19:35 UTC - Gabriel Volpe: ```root@9f7e3deb58bf:/pulsar# bin/pulsar-admin persistent stats <persistent://public/default/auth>
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "averageMsgSize" : 0.0,
  "storageSize" : 2632,
  "backlogSize" : 6,
  "publishers" : [ {
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "producerId" : 0,
    "metadata" : { },
    "address" : "/",
    "producerName" : "standalone-0-297",
    "connectedSince" : "2020-04-24T10:03:02.946Z",
    "clientVersion" : "2.5.0"
  } ],
  "subscriptions" : {
    "<persistent://public/default/auth-2b37c028-0476-4d82-90a4-b16032c08468-subscription>" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Exclusive",
      "msgRateExpired" : 0.0,
      "lastExpireTimestamp" : 0,
      "consumers" : [ ],
      "isReplicated" : false
    "<persistent://public/default/auth-e1236e39-acda-468d-bc75-d3b5e1f83508-subscription>" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 7,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Exclusive",
      "msgRateExpired" : 0.0,
      "lastExpireTimestamp" : 0,
      "consumers" : [ ],
      "isReplicated" : false
    "<persistent://public/default/auth-7f71fe1f-4024-414e-93b2-b313b204951d-subscription>" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 7,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Exclusive",
      "msgRateExpired" : 0.0,
      "lastExpireTimestamp" : 0,
      "consumers" : [ ],
      "isReplicated" : false
    "<persistent://public/default/auth-436c4d6f-a172-4d82-86b3-a47696b8da03-subscription>" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Exclusive",
      "msgRateExpired" : 0.0,
      "lastExpireTimestamp" : 0,
      "consumers" : [ ],
      "isReplicated" : false
    "<persistent://public/default/auth-88a8c412-a099-46a9-8cab-92bc9d30cb3d-subscription>" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Exclusive",
      "msgRateExpired" : 0.0,
      "lastExpireTimestamp" : 0,
      "consumers" : [ ],
      "isReplicated" : false
    "<persistent://public/default/auth-61e7a134-bb97-4d92-b16b-ce8639a9e786-subscription>" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 7,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Exclusive",
      "msgRateExpired" : 0.0,
      "lastExpireTimestamp" : 0,
      "consumers" : [ ],
      "isReplicated" : false
    "<persistent://public/default/auth-subscription>" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Exclusive",
      "msgRateExpired" : 0.0,
      "lastExpireTimestamp" : 0,
      "consumers" : [ ],
      "isReplicated" : false
    "<persistent://public/default/auth-43068564-cd8c-450a-8276-2e1119b13b25-subscription>" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Exclusive",
      "msgRateExpired" : 0.0,
      "lastExpireTimestamp" : 0,
      "consumers" : [ ],
      "isReplicated" : false
    "<persistent://public/default/auth-d00ff3a0-94a3-4025-9072-26f8b9c04a24-subscription>" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Exclusive",
      "msgRateExpired" : 0.0,
      "lastExpireTimestamp" : 0,
      "consumers" : [ ],
      "isReplicated" : false
    "<persistent://public/default/auth-70f9c16f-e9b4-4cb7-8383-cde1857ffb3c-subscription>" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Exclusive",
      "activeConsumerName" : "53105",
      "msgRateExpired" : 0.0,
      "lastExpireTimestamp" : 0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "53105",
        "availablePermits" : 987,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "metadata" : { },
        "address" : "/",
        "connectedSince" : "2020-04-24T10:03:24.013Z",
        "clientVersion" : "2.5.0"
      } ],
      "isReplicated" : false
  "replication" : { },
  "deduplicationStatus" : "Disabled",
  "bytesInCounter" : 2632,
  "msgInCounter" : 13
2020-04-24 10:20:07 UTC - Gabriel Volpe: That tells me there is a publisher connected right?
2020-04-24 10:20:31 UTC - Gabriel Volpe: The other issue I'm seeing is that subscriptions don't get removed when I call `unsubscribe` , what could be keeping them there?
2020-04-24 10:21:40 UTC - Rahul: You can see the `consumerName` section
2020-04-24 10:26:24 UTC - Gabriel Volpe: It's there `"producerName" : "standalone-0-297",`
2020-04-24 10:26:39 UTC - Gabriel Volpe: It looks like a bug then?
2020-04-24 10:38:44 UTC - Gabriel Volpe: Or maybe I misunderstood what you said and the `producerName` should actually match the `subscriptionName` such as `<persistent://public/default/auth-70f9c16f-e9b4-4cb7-8383-cde1857ffb3c-subscription>` ? That seems very unlikely to happen given the format of the subscription name.
2020-04-24 10:40:23 UTC - Rahul: You can pass the consumer name while starting your producer application. Otherwise pulsar randomly assigns a producer name when you start the producer application
2020-04-24 10:41:31 UTC - Gabriel Volpe: Okay so I should give my producer a name matching the name of the subscription if I don't want other exclusive subscriptions to be created? That seems odd no?
2020-04-24 10:41:56 UTC - Gabriel Volpe: Is this behavior documented somewhere? I'd like to deepen my understanding
2020-04-24 10:43:34 UTC - Rahul: Producer name has nothing to do with the subscription name.

It will be somewhere in the documentation. Not sure where though.
2020-04-24 10:43:56 UTC - Gabriel Volpe: mmm sorry, I don't understand then
2020-04-24 10:48:30 UTC - Gabriel Volpe: &gt;  If there is already one producer running using the same subscription name then it will fail. Otherwise it will let you use the subscription name
What does this mean then? I'm creating an exclusive subscription and then another exclusive subscription is being created for the same topic when I expect it to fail (isn't that what exclusive mean after all?
2020-04-24 10:48:46 UTC - Gabriel Volpe: I'm a little bit lost on how all this works, really appreciate your help
2020-04-24 10:58:23 UTC - Rahul: Sorry. My bad. You have to look at the consumer side.
Just check that when you are trying to subscribe to a topic at that moment there is no existing consumer is connected to that topic using the same subscription name when using exclusive subscription.
If there is an existing consumer on that subscription then subscribe operation will throw error. Otherwise it will use the same subscription name and start reading from the offset.
2020-04-24 11:08:03 UTC - Gabriel Volpe: That makes sense, thanks.
2020-04-24 11:08:50 UTC - Gabriel Volpe: I only have one consumer at a time for this topic, so that's why it doesn't fail
2020-04-24 11:09:56 UTC - Gabriel Volpe: I was still seeing the issue with previous subscriptions (created by the same application) not being removed upon calling `unsubscribe` but I did restart everything (including `pulsar` ) and now it seems to work as expected, it was driving me crazy
2020-04-24 11:09:59 UTC - Gabriel Volpe: Thanks for your help
2020-04-24 12:16:35 UTC - Rattanjot Singh: Hello,
```bin/pulsar-admin properties create my-tenant \
&gt;   --admin-roles my-admin-role \
&gt;   --allowed-clusters us-west,us-east
WARN: The properties subcommand is deprecated. Please use tenants instead

Reason: <|>.ProcessingException```
It was working before. it started giving this error. any ideas how to resolve this
2020-04-24 12:19:31 UTC - Chris Bartholomew: Change `properties` to `tenants` in the command. The term `properties` has been replaced by `tenants`.
2020-04-24 12:22:13 UTC - Rattanjot Singh: ```bin/pulsar-admin tenants create my-tenant   --admin-roles my-admin-role   --allowed-clusters us-west,us-east

Reason: <|>.ProcessingException```
Still getting this error
2020-04-24 12:24:26 UTC - Chris Bartholomew: Have you changed your `conf/client.conf` file recently?
2020-04-24 12:25:15 UTC - Rattanjot Singh: no
2020-04-24 12:27:01 UTC - Chris Bartholomew: Is the URL set for `webServiceUrl` in that file still reachable from where you are running the command?
2020-04-24 12:27:32 UTC - Shivji Kumar Jha: Is there a boker.conf and namespace conf for this? cant find one..
```    /**
     * Set message-publish-rate (topics under this namespace can publish this many messages per second).
     * @param namespace
     * @param publishMsgRate
     *            number of messages per second
     * @throws PulsarAdminException
     *             Unexpected error
    void setPublishRate(String namespace, PublishRate publishMsgRate) throws PulsarAdminException;

     * Set message-publish-rate (topics under this namespace can publish this many messages per second) asynchronously.
     * @param namespace
     * @param publishMsgRate
     *            number of messages per second
    CompletableFuture&lt;Void&gt; setPublishRateAsync(String namespace, PublishRate publishMsgRate);```
2020-04-24 12:28:17 UTC - Rattanjot Singh: Yes I am able to telnet
+1 : Chris Bartholomew
2020-04-24 12:29:44 UTC - Chris Bartholomew: When you run the command, do you see anything in the logs on the proxy/broker you are connecting to?
2020-04-24 12:31:11 UTC - Aviram Webman: Hi,
pulsar-flink connector, which one should I use?
2020-04-24 12:31:55 UTC - Rattanjot Singh: which logs do i need to check?
2020-04-24 12:37:54 UTC - Chris Bartholomew: Look in the `logs` directory or use `kubectl logs &lt;podname&gt;` if running in Kubernetes
2020-04-24 12:44:29 UTC - Rattanjot Singh: ```12:41:59.898 [pulsar-external-listener-40-1] WARN  org.apache.pulsar.client.impl.PulsarClientImpl - [topic: <persistent://my-tenant/my-namespace/my-topic3>] Could not ge
t connection while getPartitionedTopicMetadata -- Will try again in 100 ms```
getting this error in logs
2020-04-24 12:47:58 UTC - Chris Bartholomew: This looks like a client is trying to connect to the tenant that's been failing on create. Does that tenant already exist? What do you get when you do `pulsar-admin tenants list`?
2020-04-24 12:51:01 UTC - Rattanjot Singh: ```bin/pulsar-admin tenants list

Reason: <|>.ProcessingException```
same error
2020-04-24 12:54:14 UTC - Chris Bartholomew: Where are you running the admin command from? Can you run the command on the same server as the broker?
2020-04-24 12:54:30 UTC - Rattanjot Singh: I am running this on the broker only
2020-04-24 12:55:28 UTC - Chris Bartholomew: And what is set for `webServiceUrl` in the `client.conf`?
2020-04-24 12:57:26 UTC - Rattanjot Singh: http://&lt;broker-dns&gt;:8080
2020-04-24 12:58:26 UTC - Chris Bartholomew: What happens if you change it to `<http://localhost:8080>`?
2020-04-24 13:00:11 UTC - Rattanjot Singh: It's working
2020-04-24 13:01:47 UTC - Rattanjot Singh: but why is it failing for broker dns
2020-04-24 13:02:26 UTC - Rattanjot Singh: It was working before with the broker dns
2020-04-24 13:03:06 UTC - Chris Bartholomew: Your DNS is not resolving to the broker IP
2020-04-24 13:07:14 UTC - Rattanjot Singh: Thanks for your help
+1 : Chris Bartholomew
2020-04-24 13:19:09 UTC - Ebere Abanonu: @Sijie Guo what protocol version is pulsar 2.5.1? 15?
2020-04-24 14:06:56 UTC - Addison Higham: The built in connector  (in the pulsar repo) is very simple, it doesn't support schema, it isn't exactly once. However, because of how simple it is a bit more stable. The newer one by stream native is much more full featured. However, it  is still pretty new and has some rough edges. It is much better than even just a few months ago but depending on your use case, you may need to make some tweaks to it
+1 : Aviram Webman
2020-04-24 14:13:56 UTC - Konstantinos Papalias: Yes I had the same question around the semantics on Pulsar!
2020-04-24 14:14:38 UTC - Konstantinos Papalias: If you were to implement joins on Pulsar Functions, how would you approach it?
2020-04-24 16:20:44 UTC - rani: Trying to start a function with the following configuration fails. However it works well for java based functions! Is `customRuntimeOptions` supported for python functions? If not, are there any other means by which the same intended effect can be achieved (setting node selectors and tolerations)?
```name: pythonprocessor
tenant: public
namespace: default
py: /tmp/
className: exclamation-function.ExclamationFunction
  - <persistent://sales/salesforce/pythonprocessor-input>
output: <persistent://sales/salesforce/pythonprocessor-output>
customRuntimeOptions: '{
  "extraLabels": {
    "pythonlabel": "pythonvalue"
  "extraAnnotations": {
    "pythonAnnotation": "pythonannotationvalue"
 "nodeSelectorLabels": {
    "instance_type": "special_instance"
  "tolerations": [
      "key": "dedicated",
      "value": "special_instance",
      "effect": "NoSchedule"
Results in the function starting with this error:
```16:15:43.934 [main] INFO  org.apache.pulsar.common.util.SecurityUtility - Found and Instantiated Bouncy Castle provider in classpath BC
"Downloaded successfully"
Traceback (most recent call last):
  File "/pulsar/instances/python-instance/", line 211, in &lt;module&gt;
  File "/pulsar/instances/python-instance/", line 98, in main
    json_format.Parse(args.function_details, function_details)
  File "/usr/local/lib/python2.7/dist-packages/google/protobuf/", line 430, in Parse
    return ParseDict(js, message, ignore_unknown_fields, descriptor_pool)
  File "/usr/local/lib/python2.7/dist-packages/google/protobuf/", line 450, in ParseDict
    parser.ConvertMessage(js_dict, message)
  File "/usr/local/lib/python2.7/dist-packages/google/protobuf/", line 481, in ConvertMessage
    self._ConvertFieldValuePair(value, message)
  File "/usr/local/lib/python2.7/dist-packages/google/protobuf/", line 592, in _ConvertFieldValuePair
    raise ParseError(str(e))
google.protobuf.json_format.ParseError: Message type "proto.FunctionDetails" has no field named "customRuntimeOptions".
 Available Fields(except extensions): ['tenant', 'namespace', 'name', 'className', 'logTopic', 'processingGuarantees', 'userConfig', 'secretsMap', 'runtime', 'autoAck', 'parallelism', 'source', 'sink', 'resources', 'packageUrl', 'retryDetails', 'runtimeFlags', 'componentType']```
2020-04-24 16:45:00 UTC - Alexander Ursu: Hi, was wondering if anyone had any experience setting up the Pulsar proxy with certbot for transport encryption using TLS, and automatically renewing certificates
2020-04-24 19:24:31 UTC - Tamer: I am running pulsar on kubernetes so we just use `cert-manager` to manage all TLS automatic renewal (with DNS validation). I configure an ingress gateway (Istio, Contour or nginx) to redirect traffic to the pulsar proxy

2020-04-24 20:58:17 UTC - rani: And btw, even without setting `brokerClientAuthenticationPlugin` and `brokerClientAuthenticationParameters` most other endpoints work as expected (listing tenants, listing topics, etc)
2020-04-24 23:14:02 UTC - Ebere Abanonu: it is v15
2020-04-25 00:33:35 UTC - Sijie Guo: Which version are you using?
2020-04-25 05:17:33 UTC - Subash Kunjupillai: Hi, basic questions on baremetal deployment of Pulsar..
1. If we enable Authentication using TLS, and regenerate the certificate for broker and client (before expiry), will Pulsar instance refresh the new certificates in run-time? or do we need to restart the services when the certificates are regenerated before  it's expiry?
2. Also, I couldn't see any step to encrypt the traffic between Broker and Bookie. Is it handled internally or the traffic between Broker and Bookie is still in plain text?
2020-04-25 08:55:25 UTC - rani: *Pulsar Component Version*: `2.5.1-candidate-3`
*Architecture*: Function workers running on k8s separately from remaining components that are managed on EC2 via ASGs. Pulsar proxy was configured to be aware of the separate function worker cluster by configuring `functionWorkerWebServiceURL` appropriately.