You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2019/07/19 09:11:03 UTC

Slack digest for #general - 2019-07-19

2019-07-18 09:43:02 UTC - divyasree: Hi Sijieg..
----
2019-07-18 09:43:18 UTC - divyasree: I am trying proxy in apache pulsar now...
----
2019-07-18 09:43:43 UTC - divyasree: following this link <https://pulsar.apache.org/docs/en/administration-proxy/#proxy-configuration>
----
2019-07-18 09:44:15 UTC - divyasree: when i start proxy with ``` bin/pulsar proxy ``` command
----
2019-07-18 09:44:48 UTC - divyasree: i am getting this error ``` 09:42:17.517 [main] INFO  org.apache.pulsar.broker.authentication.AuthenticationService - Authentication is disabled
09:42:17.730 [main] INFO  org.eclipse.jetty.util.log - Logging initialized @2362ms to org.eclipse.jetty.util.log.Slf4jLog
2019-07-18 09:42:17,863 [sun.misc.Launcher$AppClassLoader@ee7d9f1] error Uncaught exception in thread main: Failed to bind Pulsar Proxy on port 6650 ```
----
2019-07-18 09:45:02 UTC - divyasree: Can you help me on this?
----
2019-07-18 09:57:42 UTC - cathong: @cathong has joined the channel
----
2019-07-18 10:09:28 UTC - Penghui Li: @Shivji Kumar Jha I think this document can help you <http://pulsar.apache.org/docs/en/concepts-messaging/#dead-letter-topic>
----
2019-07-18 10:55:46 UTC - Alexandre DUVAL: Hi, can we use availablePermits from consumer to lock the dispatch to this consumer by broker manually? Then the broker will dispatch on other consumer if they exist or just stop the dispatch.
----
2019-07-18 13:10:29 UTC - Joose Helle: @Joose Helle has joined the channel
----
2019-07-18 13:24:39 UTC - Alexandre DUVAL: It is not possible to take a topic and update it as partitioned topic?
----
2019-07-18 13:32:55 UTC - Joose Helle: @Joose Helle has joined the channel
----
2019-07-18 14:26:48 UTC - Aaron: Sorry to keep bothering you @Matteo Merli, but is there a solution to this besides just increasing the heap size of my JVM?
----
2019-07-18 14:28:12 UTC - Aaron: A few questions:
1. Is their a way to flush acknowledgements that are being batched?
2. Can you subscribe to a particular partition in multiple-partitioned-topic (i.e. topic-1 or topic-2)
3. Is there a way to get the number of partitions in the topic at the consumer level?
----
2019-07-18 14:31:12 UTC - Quentin Adam: @Quentin Adam has joined the channel
----
2019-07-18 15:31:43 UTC - Sijie Guo: 1. Currently it is not exposed to the public api.
2. Yes. you can just open a consumer directly on a partition.
3. PulsarClient#getPartitionsForTopic(String topic);
----
2019-07-18 15:34:08 UTC - Sijie Guo: @divyasree

proxy and broker are using the same default ports. if you are running proxy with broker in the same node, you need to change the port.

you need to edit conf/proxy.conf and change the ports in the following settings:

```
# The port to use for server binary Protobuf requests
servicePort=6650

# The port to use to server binary Protobuf TLS requests
servicePortTls=

# Port that discovery service listen on
webServicePort=8080

# Port to use to server HTTPS request
webServicePortTls=
```
----
2019-07-18 15:44:05 UTC - Aaron: Thanks
----
2019-07-18 16:24:32 UTC - Ryan Samo: Hey guys,

Have you had any issues with setting the receiverQueueSize via a .yml config file for a Pulsar IO Sink vs. a Pulsar Functions? We are using the example exclamation function and setting this receiverQueueSize via the inputSpecs:*, works great.

```

tenant: public

namespace: default

name: exclamation

jar: examples/api-examples.jar

className: org.apache.pulsar.functions.api.examples.ExclamationFunction

inputSpecs:

<persistent://public/default/exclamation-input>:

receiverQueueSize: 1000000
output: <persistent://public/default/exclamation-output>

processingGuarantees: ATLEAST_ONCE

retainOrdering: true

userConfig: {}

runtime: JAVA

autoAck: true

parallelism: 1

resources:

ram: 2147482548

cleanupSubscription: true

```

If I do the same thing on a sink config and then create the sink, it just ignores it and the value is empty as shown here in the logs.

```

"inputSpecs":{"<persistent://public/default/sink_poc>":{}}

```

Any thoughts?
----
2019-07-18 16:41:29 UTC - Matteo Merli: &gt; 1. Is their a way to flush acknowledgements that are being batched?

You can disable ack batching by setting 0 to `ConsumerBuilder.acknowledgmentGroupTime(long delay, TimeUnit unit)`
----
2019-07-18 16:57:14 UTC - Matteo Merli: Hi
----
2019-07-18 16:57:44 UTC - Matteo Merli: Can you get the heap dump on consumer? That should show where the memory is being retained
----
2019-07-18 16:58:24 UTC - Matteo Merli: Just the message ids in the unacked tracker shouldn’t require a lot of memory to keep track of
----
2019-07-18 18:30:07 UTC - Aaron: In a program where all I do is create a consumer that subscribes to a partitioned topic (5 partitions), with an ackTimeout of 1 hour, UnAckedMessageTracker is a top consumer with 73% of the retained heap, and UnAckedTopicMessageTracker uses 15% of the retained heap. The heap dump in total was 80.6 MB. I ran this with a new topic (no messages in it or any publishers attached), and did not call consumer.receive.
----
2019-07-18 18:32:26 UTC - Aaron: The same results are in the dominator tree, there are 6 instances of UnackedMessageTracker and each uses 14% of the heap
----
2019-07-18 18:35:09 UTC - Matteo Merli: what is the overall message rate?
----
2019-07-18 18:35:28 UTC - Matteo Merli: and you did mention that these messages were being acked, for the most part, right?
----
2019-07-18 18:36:04 UTC - Aaron: This is on initialization, with no messages in the topic
----
2019-07-18 18:36:17 UTC - Matteo Merli: Overall, the UnAckedMessageTracker is retaining 12 MB of memory then?
----
2019-07-18 18:38:05 UTC - Matteo Merli: writing a repro client
----
2019-07-18 18:38:34 UTC - Aaron: The 6 instances add up to 73% of 80MB
----
2019-07-18 18:38:45 UTC - Matteo Merli: Gotcha
----
2019-07-18 18:49:18 UTC - Matteo Merli: Ok, I saw what’s happening.
----
2019-07-18 18:49:52 UTC - Matteo Merli: The unacked msg tracker is using multiple hash sets to track the message ids based on time buckets
----
2019-07-18 18:50:02 UTC - Matteo Merli: the time bucket is set to 1sec
----
2019-07-18 18:50:25 UTC - Matteo Merli: so it ends up with many hash-sets (5 * 3600)
----
2019-07-18 18:50:43 UTC - Matteo Merli: there’s no point in tracking with so many time buckets though
----
2019-07-18 18:52:03 UTC - Aaron: Yeah, it seems like a waste. However, I would like to use an ackTimeout of 1 hour and don't see a way to change tickDurationInMs via configuration.
----
2019-07-18 18:53:44 UTC - Matteo Merli: correct, it’s configurable in ConsumerConfigurationData, though not exposed in the ConsumerBuilder
----
2019-07-18 18:57:23 UTC - Aaron: So what would you recommend doing? Are there any other options besides increasing my heap size at runtime?
----
2019-07-18 18:58:22 UTC - Matteo Merli: until we fix this, that would be the only option, aside from reducing the ack timeout delay
----
2019-07-18 18:58:42 UTC - Aaron: Okay, will you consider exposing the tickDurationMs parameter?
----
2019-07-18 18:59:08 UTC - Matteo Merli: I’m more willing to internally cap the the number of buckets
----
2019-07-18 18:59:21 UTC - Aaron: Okay. Thanks for your help.
----
2019-07-19 05:52:52 UTC - vikash: Hello  ,is  any  update  on  pulsar  Dashboard  like   able  to  view  messages ,clear backlog  etc etc ?
----
2019-07-19 06:09:52 UTC - divyasree: Hi, i have changed the proxy.conf file as below
----
2019-07-19 06:10:08 UTC - divyasree: ``` # if Service Discovery is Disabled this url should point to the discovery service provider.
brokerServiceURL=<pulsar://pulsar.ttc.ole.prd.target.com:6650>
brokerServiceURLTLS=

# These settings are unnecessary if `zookeeperServers` is specified
brokerWebServiceURL=<http://pulsar.ttc.ole.prd.target.com:8080>
brokerWebServiceURLTLS=

# If function workers are setup in a separate cluster, configure the following 2 settings
# to point to the function workers cluster
functionWorkerWebServiceURL=
functionWorkerWebServiceURLTLS=

# ZooKeeper session timeout (in milliseconds)
zookeeperSessionTimeoutMs=30000

### --- Server --- ###

# The port to use for server binary Protobuf requests
servicePort=6653

# The port to use to server binary Protobuf TLS requests
servicePortTls=

# Port that discovery service listen on
webServicePort=8083

# Port to use to server HTTPS request
webServicePortTls=

# Path for the file used to determine the rotation status for the proxy instance when responding
# to service discovery health checks
statusFilePath=
 ```
----
2019-07-19 06:11:04 UTC - divyasree: when connection to the broker via client, i am giving <pulsar://pulsar.ttc.ole.prd.target.com:6653> as service url....
----
2019-07-19 06:11:12 UTC - divyasree: and i am getting connection refused error
----
2019-07-19 06:45:06 UTC - Sijie Guo: Is the connection to proxy refused? or the connection from proxy to broker refused?
----