You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2020/06/27 09:11:05 UTC

Slack digest for #general - 2020-06-27

2020-06-26 09:37:28 UTC - Ali Ahmed: some discussion of pulsar is the real world it’s the last part of the talk.
<https://www.youtube.com/watch?v=CbirFaTDpWE>
----
2020-06-26 10:57:23 UTC - Pierre-Yves Lebecq: Hello :wave: Sorry this is going to be a long post but I have an issue with Pulsar Functions and more precisely the state storage of the functions and I’ll try to give as much as possible information about what is happening.

I’m basically running 3 functions on a pulsar standalone instance, using docker image apachepulsar/pulsar-all:2.6.0. There is only one instance of each function for now:
• tasks-JobEnginePulsarFunction: This function keeps track of the state of some jobs (they have a name and an ID basically). This function saves an entry in the state for every job to run (one key per job) and send a message in a topic for a consumer that will try to process the job and send various events back, like job start, and then failed / completed. The function will record in the state the events that happened for that job and when failed it will send a new message to have the job retried. When it receives an event saying the job completed, it will delete the associated state because it won’t be used anymore. When the status of a job changes, the function publishes a message to a topic named “tasks-monitoring-per-name” containing the old status and the new status for a job)
• MonitoringPerName: This function uses counters and state to store how many jobs are in a given state. Based on the state update published by the previous function, it will decrement the counter corresponding to the old status of the job, and increment the counter corresponding to the new status. Because counters are not easy to retrieve from the outside world, this function also stores the values of the counters in function state. When a new job name is received, the function publishes in a “tasks-monitoring-global” topic the fact that it has received a job which was previously not know, and will initialise counters and state for it.
• MonitoringGlobal: This function uses state to store a list of all job names that are known, based on the message published by the previous function.
When I feed the first function with 10k messages, at some point it will just get stuck and stop doing anything. Usually between 2k - 5k message. It also failed sometimes after a few hundreds messages. And one time it went through the 10k messages without issue. I tried to run some commands while it was stuck to get more information and here are some things I observed:

The function instance seems to be considered running by pulsar:

```# bin/pulsar-admin functions stats --name tasks-JobEnginePulsarFunction
{
  "receivedTotal" : 315,
  "processedSuccessfullyTotal" : 314,
  "systemExceptionsTotal" : 0,
  "userExceptionsTotal" : 0,
  "avgProcessLatency" : 118.07828642993623,
  "1min" : {
    "receivedTotal" : 0,
    "processedSuccessfullyTotal" : 0,
    "systemExceptionsTotal" : 0,
    "userExceptionsTotal" : 0,
    "avgProcessLatency" : null
  },
  "lastInvocation" : 1593099037904,
  "instances" : [ {
    "instanceId" : 0,
    "metrics" : {
      "receivedTotal" : 315,
      "processedSuccessfullyTotal" : 314,
      "systemExceptionsTotal" : 0,
      "userExceptionsTotal" : 0,
      "avgProcessLatency" : 118.07828642993623,
      "1min" : {
        "receivedTotal" : 0,
        "processedSuccessfullyTotal" : 0,
        "systemExceptionsTotal" : 0,
        "userExceptionsTotal" : 0,
        "avgProcessLatency" : null
      },
      "lastInvocation" : 1593099037904,
      "userMetrics" : { }
    }
  } ]
}```
Gettings stats about the topic shows the function has some message to process on the topic:


+1 : Kirill Merkushev
----
2020-06-26 10:57:23 UTC - Pierre-Yves Lebecq: ```# bin/pulsar-admin topics stats tasks-engine                                                                                                                                                              
{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "bytesInCounter" : 301946,
  "msgInCounter" : 1504,
  "bytesOutCounter" : 301946,
  "msgOutCounter" : 1504,
  "averageMsgSize" : 0.0,
  "msgChunkPublished" : false,
  "storageSize" : 301946,
  "backlogSize" : 239146,
  "publishers" : [ {
    "msgRateIn" : 0.0,
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 0,
    "metadata" : { },
    "producerName" : "standalone-0-11",
    "connectedSince" : "2020-06-25T15:32:32.654Z",
    "clientVersion" : "2.5.0",
    "address" : "/10.0.2.2:54818"
  }, {
    "msgRateIn" : 0.0,                                                                                                                                                                                                             
    "msgThroughputIn" : 0.0,
    "averageMsgSize" : 0.0,
    "chunkedMessageRate" : 0.0,
    "producerId" : 2,
    "metadata" : {
      "instance_id" : "0",
      "application" : "pulsar-function",
      "id" : "public/default/tasks-JobEnginePulsarFunction"
    },
    "producerName" : "standalone-0-6",
    "connectedSince" : "2020-06-25T15:29:26.603Z",
    "clientVersion" : "2.6.0",
    "address" : "/127.0.0.1:56128"
  } ],
  "subscriptions" : {
    "public/default/tasks-JobEnginePulsarFunction" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 301946,
      "msgOutCounter" : 1504,
      "msgRateRedeliver" : 0.0,
      "chuckedMessageRate" : 0,
      "msgBacklog" : 1190,
      "msgBacklogNoDelayed" : 1190,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 1190,
      "type" : "Shared",
      "msgRateExpired" : 0.0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 1593099153207,
      "lastConsumedTimestamp" : 1593099153689,
      "lastAckedTimestamp" : 1593099037935,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 301946,
        "msgOutCounter" : 1504,
        "msgRateRedeliver" : 0.0,
        "chuckedMessageRate" : 0.0,
        "consumerName" : "75d3c",
        "availablePermits" : 496,
        "unackedMessages" : 1190,
        "avgMessagesPerEntry" : 6,
        "blockedConsumerOnUnackedMsgs" : false,
        "lastAckedTimestamp" : 1593099037935,
        "lastConsumedTimestamp" : 1593099153689,
        "metadata" : {
          "instance_id" : "0",
          "application" : "pulsar-function",
          "id" : "public/default/tasks-JobEnginePulsarFunction"
        },
        "connectedSince" : "2020-06-25T15:21:10.978Z",
        "clientVersion" : "2.6.0",
        "address" : "/127.0.0.1:56128"
      } ],
      "isDurable" : true,
      "isReplicated" : false
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled"
}```
----
2020-06-26 10:57:23 UTC - Pierre-Yves Lebecq: Restarting the function or stopping/starting it does nothing. After being restarted, the function does not process messages. I did not mention it bug I don’t see anything special in the container output. There is not error in the function log file and the pulsar.log and pulsar-standalone.log file are empty.

While this happens, I cannot even retrieve state using pulsar CLI:

```# bin/pulsar-admin functions querystate --name tasks-MonitoringGlobalPulsarFunction --key monitoringGlobal.state
null

Reason: java.util.concurrent.TimeoutException```
The only way to make it start processing message again is to stop the container and start it again. Doing this leads to other issues, although maybe it’s not worth mentioning them here otherwise we might have too many things to look at at the same time.

I found some github issues really close to this: <https://github.com/apache/pulsar/issues/6813> , <https://github.com/apache/pulsar/issues/6427> and <https://github.com/apache/pulsar/issues/7036> .
Unfortunately, they’re not getting much attention. Is there anything I can do or maybe provide additional details to help push them forward?

Any help would be much appreciated. Thanks!
----
2020-06-26 11:14:15 UTC - Jonas Kint: @Jonas Kint has joined the channel
----
2020-06-26 12:03:57 UTC - Kirill Merkushev: tried on 2.5.0 - no such param, and no mention in the api <http://pulsar.apache.org/staging/admin-rest-api/#operation/deleteSubscription> - also it stands that should be no active consumers
----
2020-06-26 13:56:43 UTC - rwaweber: I’m getting it directly from prometheus, but I can retrieve it directly from the brokers if that would help. (not sure if it’s a red herring, but only one of the brokers seem to report _that_ metric at a given time — we also only have three brokers ATM)

Metric with labels:
```pulsar_storage_size{cluster="pulsar-cluster-1",instance="brk-01q.local:8080",job="scrapes",namespace="public/dev",topic="<persistent://public/dev/beats.replica>"} 2786956094```
----
2020-06-26 14:07:00 UTC - jinggang: thanks all. I have forked the code and create a generic authentication for my own
----
2020-06-26 15:00:17 UTC - Matteo Merli: The "force delete" for subscriptions was added in 2.6
----
2020-06-26 15:01:37 UTC - Matteo Merli: Keep in mind that, if the consumer is connected, it will recreate the subscription immediately on reconnection.

If you just want to get rid of the backlog, you can skip it all
----
2020-06-26 16:11:35 UTC - Kirill Merkushev: Thanks, I just got a stale connection and want to get rid of it
----
2020-06-26 17:06:05 UTC - Sijie Guo: Do you have the sequence to reproduce this issue?

Btw, <#C015BU8JWUW|aop> is the good place to ask questions related to <#C015BU8JWUW|aop>
----
2020-06-26 19:18:55 UTC - Jeff Schneller: What is the preferred authentication/authorization mechanism - TLS or JWT?
----
2020-06-26 19:31:20 UTC - Matteo Merli: JWT is generally easier to setup, given that the tooling around OpenSSL can be a bit cryptic.

Also, it can easier to share token (as strings) compared to certificates.

At the end, it really boils down to what auth schema you're already using for other services.
----
2020-06-26 20:07:12 UTC - Frank Kelly: Another Auth*n question - this time for a Custom Auth*n Plugin. I have `authorizationEnabled=true` defined on my proxy (so I'm expecting Authorization to occur on the proxy and not on the broker)
```root@pulsar-proxy-6f798754db-r9gbw:/pulsar/conf# grep -i authorization proxy.conf 
### ---Authorization --- ###
# Whether authorization is enforced by the Pulsar proxy
authorizationEnabled=true
# Authorization provider as a fully qualified class name
authorizationProvider=com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthorizationProvider
# Whether client authorization credentials are forwared to the broker for re-authorization.
forwardAuthorizationCredentials=false```
and I can see that the Plugin has been loaded successfully and initialized
```[16:02:04] fkelly@Franks-Cogito-Work-Computer:[~/platform2-test]: (feature/sdlc-31257-minikube-integration) klf pulsar-proxy-6f798754db-r9gbw | grep -i authorization
[conf/proxy.conf] Applying config authorizationEnabled = true
[conf/proxy.conf] Applying config authorizationProvider = com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthorizationProvider
19:55:31.069 [main] INFO  com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthorizationProvider - ==&gt; Initialize()
19:55:31.074 [main] INFO  org.apache.pulsar.broker.authorization.AuthorizationService - com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthorizationProvider has been loaded.```
From the Proxy logs I see my token was authenticated successfully but I see no traces of the AuthorizationProvider being accessed/executed. Any thoughts?
----
2020-06-26 21:58:37 UTC - Jared Marolf: @Jared Marolf has joined the channel
----
2020-06-26 22:06:41 UTC - Jared Marolf: Has anyone ever used pulsar with openstack? I am trying to deploy a multi broker setup but the issue is the openstack instances can't communicate via floating ip, only private ip. I am trying to figure out if I need to utilize the advertisedListeners and internalListenerName for this to work and what the appropriate setup would be. Would I need to advertise both the floating ip and private ip on the service and http ports or some other combination of that? The brokers need to be able to be connected to by the floating ip from external sources. Any input would be appreciated.
----