You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by Apache Pulsar Slack <> on 2019/11/06 09:11:04 UTC

Slack digest for #general - 2019-11-06

2019-11-05 09:12:29 UTC - Gopi Krishna: @Sijie Guo can you help me with that ?
2019-11-05 09:41:16 UTC - Sijie Guo: sgk - currently we don’t have a mongodb cdc connector yet. there is a task for adding the support: <>

since we already use debezium, it is very simple to add the support for mongodb.
2019-11-05 10:47:13 UTC - Dennis Yung: @Dennis Yung has joined the channel
2019-11-05 11:10:43 UTC - Shivji Kumar Jha: Hi, Does cumulative acknowledgement work for regex based exclusive/failover subscription?
+1 : n4j
2019-11-05 11:15:36 UTC - Shivji Kumar Jha: I am looking at the MultiTopicsConsumerImpl class and this method specifically,

    protected CompletableFuture&lt;Void&gt; doAcknowledge(MessageId messageId, AckType ackType,
                                                    Map&lt;String,Long&gt; properties) {
        checkArgument(messageId instanceof TopicMessageIdImpl);
        TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;

        if (getState() != State.Ready) {
            return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed"));

        if (ackType == AckType.Cumulative) {
            Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName());
            if (individualConsumer != null) {
                MessageId innerId = topicMessageId.getInnerMessageId();
                return individualConsumer.acknowledgeCumulativeAsync(innerId);
            } else {
                return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
        } else {
2019-11-05 11:21:35 UTC - Shivji Kumar Jha: On smiliar lines, I guess that acking on a msg in regex (or list of topics) would only ack on one of the topics to which that message belongs. The other topics in the list (or that matches regex) will still have unacked messages. Is that right?

This may not be much useful though...
2019-11-05 11:23:10 UTC - Shivji Kumar Jha: But then I see this which means the only restriction is shared subscription  not being allowed!
private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) {
        return SubscriptionType.Shared != type;
2019-11-05 11:27:42 UTC - Sijie Guo: &gt; Does cumulative acknowledgement work for regex based exclusive/failover subscription?

I think so

&gt; I guess that acking on a msg in regex (or list of topics) would only ack on one of the topics to which that message belongs. The other topics in the list (or that matches regex) will still have unacked messages. Is that right?

that’s correct.

&gt; the only restriction is shared subscription  not being allowed!

2019-11-05 11:50:46 UTC - Shivji Kumar Jha: @Sijie Guo in theory then a topic can starve for acknowledgement and redeliver same messages again and again, is that right?
2019-11-05 11:52:23 UTC - Alexandre DUVAL: It can conflict with pulsar env i guess :confused:
2019-11-05 11:52:34 UTC - Alexandre DUVAL: @Sijie Guo hi, WDYT? :slightly_smiling_face:
2019-11-05 11:53:27 UTC - Sijie Guo: correct. but I didn’t get the problem here.
2019-11-05 11:57:02 UTC - Sijie Guo: Hi
2019-11-05 11:57:14 UTC - Sijie Guo: let me check
2019-11-05 11:57:27 UTC - Sijie Guo: (I don’t think we support that yet though)
2019-11-05 11:59:57 UTC - Shivji Kumar Jha: In my opinion, the cumulative ack is expected to ack all the messages processed earlier. If it redelivers the previous messages, then i dont find it usable at all. @Abhinay can you describe your flink use case to keep the discussion specific? 
2019-11-05 12:00:05 UTC - Sijie Guo: I think you can do it via “runtimeFlags”
2019-11-05 12:03:54 UTC - Sijie Guo: add following systems in your function config:

runtimeFlags: "-Denv1=value -Denv2=value2"

Then you can retreive the environment settings from system properties.
2019-11-05 12:04:53 UTC - Sijie Guo: oh cumulative ack acks the messages before the message in one topic (partition).
2019-11-05 12:05:12 UTC - Sijie Guo: so I guess it is the confusion when used in regex based subscription
2019-11-05 12:09:50 UTC - Shivji Kumar Jha: Yes, that sort of makes it unusable for list/regex topic subscription...and should be blocked with an error maybe?
2019-11-05 12:12:44 UTC - Alexandre DUVAL: ok cool, I definitely need to make function yaml files :stuck_out_tongue:.
2019-11-05 12:13:23 UTC - Alexandre DUVAL: there is a fully example of all the configuration parameters available using yaml function config?
2019-11-05 12:31:35 UTC - Sijie Guo: good question. I think @xiaolong.ran and @Jennifer Huang were working on the function documentation. they can help point you the documentation. If it is not documented, @Jennifer Huang can you follow up adding the documentation?
2019-11-05 13:08:52 UTC - Matteo Merli: Yes, it does not make sense when the consumer has more than one partition or topic, since we cannot establish relationships between message ids on different topics
2019-11-05 14:20:37 UTC - Jasper Li: Halo guys,

I have a issue:
java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException:$AnnotatedConnectException: syscall:getsockopt(..) failed: Connection refused: ulterior-catfish-pulsar-functions-functions.pulsar.svc.cluster.local/ at java.util.concurrent.CompletableFuture.reportGet( 
``` when I create a helm chart for a seperated pulsar functions (ip address `````` is ip address of pod of the pulsar function), how can I solve the issue???

2019-11-05 14:50:25 UTC - jia zhai: what is the Pulsar service URL configured for the function? the default Pulsar service URL is <pulsar://localhost:6650>.
seems it is not properly configured?:grinning:
2019-11-05 15:06:41 UTC - Jasper Li: @jia zhai Halo, jia. Yes, if you run it in  standalone mode, or run functions inside broker in cluster mode, but I want to run Pulsar functions outside broker and use headless service of k8s to communicate (hence the domain name = ulterior-catfish-pulsar-functions-functions.pulsar.svc.cluster.local). It is not a must if I can run Pulsar functions command inside broker if Pulsar functions can deploy statefulset in k8s, but I cannot configure that also. :persevere:
2019-11-05 15:39:53 UTC - jmogden: @jmogden has joined the channel
2019-11-05 16:12:52 UTC - Santiago Del Campo: *Hello! Got the following problem:*
We're receiving several and very frequent *ConnectError* and *Timeout* exceptions when producing messages (python client).

We're running our production Pulsar Cluster inside Kubernetes (2 servers):
* 2 Pods for ZK ---&gt; one per node
* 2 Pods for Bk ---&gt; one per node
* 2 Pods for Broker ----&gt; both in the same node
* 2 Pods for Proxy ----&gt; One per node

The load Balancing between the nodes is made with a AWS Network Load Balancer which points to the proxies's open ports.

Ec2 instances dont seem to lack hardware, CPU, MEM, etc....

When i run another cluster for testing, cant reproduce the same ConnectError ratio, which leads me to believe there's a configuration issue i have not understood well with heavy loads.

Any ideas for a solution or where should i look first to troubleshoot? :thinking_face:
2019-11-05 16:22:35 UTC - Daniel Åman: @Daniel Åman has joined the channel
2019-11-05 17:10:39 UTC - jmogden: Hello, my team is trying to switch from Kafka (using Helm) to Pulsar (also using Helm) and are figuring out how to get the same/similar metrics that we are already using. I have a couple of questions:
1) We are getting JMX metrics from Kafka using the JMX Exporter provided by cp-helm-charts, is there a way to get those from Pulsar as well? If so, how do I do that?
2) Is there something similar to the Confluent Metrics Reporter for Kafka that we can use for Pulsar?
3) Are there any best practices somewhere that tells us which metrics we should use to determine in Pulsar is in a healthy state?
2019-11-05 18:38:24 UTC - Raman Gupta: Pulsar exposes metrics in Prometheus format. See <>.
2019-11-05 19:10:15 UTC - jmogden: I was looking at that, but I couldn't find anything about JMX metrics from Prometheus. Would that just be the same as using the Prometheus JMX Exporter except pointing at the Pulsar that is running?
2019-11-05 20:27:48 UTC - Britt Bolen: Is it possible to use the standalone pulsar with Athenz?  I’ve setup athenz in docker, and I’m trying to get standalone to connect to it, so I’ve updated the standalone.conf file with the athenz parameters, but I’m getting a class not found error when I start pulsar…
`Caused by: java.lang.ClassNotFoundException:`
2019-11-05 22:04:47 UTC - Britt Bolen: One more athenz question… i’m trying to create a PulsarClient using Athenz in java… but the AuthenticationAthenz class that I need to create the client isn’t showing up in eclipse.
2019-11-05 22:05:45 UTC - Britt Bolen: 
2019-11-05 22:07:39 UTC - Britt Bolen: AuthenticationAthenz just isn’t found.  Do I need to reference something other pulsar-client through maven to use Athenz?
2019-11-05 22:07:43 UTC - Britt Bolen: thanks
2019-11-05 22:09:06 UTC - Matteo Merli: I think `pulsar-broker-auth-athenz` is not being included in the Pulsar distribution. You would have to manually add it to the classpath.
2019-11-05 22:09:49 UTC - Matteo Merli: Same as for broker, you need to add the `pulsar-client-auth-athenz` dep on the client side.
2019-11-05 22:10:47 UTC - Matteo Merli: The exported metrics will contain everything needed to monitor Pulsar, both JVM and Pulsar specific aspects
2019-11-05 22:14:32 UTC - Matteo Merli: &gt; Would that just be the same as using the Prometheus JMX Exporter except pointing at the Pulsar that is running?

Yes, except you skip JMX
2019-11-05 22:41:04 UTC - Britt Bolen: Thanks, that got me farther along
2019-11-06 01:03:35 UTC - Jasper Li: Hi all,

I have an issue when I create a sources by execute ```pulsar-admin create sources ``` in my broker, does it mean my broker does not have enough memory to allocate for the job?

```broker error

io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 251658247, max: 268435456) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter

ERROR org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl - Unable to allocate memory```

Thanks again!!
2019-11-06 01:58:08 UTC - xiaolong.ran: Can try to increase the size of direct memory.

In `conf/`

# Extra options to be passed to the jvm
PULSAR_MEM=${PULSAR_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"}
+1 : Jasper Li
2019-11-06 02:04:28 UTC - jia zhai: @xiaolong.ran to take a look.
2019-11-06 03:22:44 UTC - Jasper Li: Hi, I have got this error following the above issue:
INFO org.apache.pulsar.functions.worker.Worker - Initializing Pulsar Functions namespace...
ERROR org.apache.pulsar.functions.worker.Worker -  Failed to create namespace public/functions for pulsar functions
ERROR org.apache.pulsar.functions.worker.FunctionWorkerStarter - Failed to start function worker
I think my functions pod can find the broker now, but it cannot create a tenant after the connection is created. :persevere:
2019-11-06 04:46:11 UTC - Jasper Li: Oh! I have got the issue is because I have a typo to write  PF_clusterName instead of PF_pulsarFunctionsCluster, so the cluster name of functions have not been changed from standalone, and hence got the error.

Sorry for asking this stupid question. :persevere:
2019-11-06 04:51:34 UTC - Gopi Krishna: Ohh ok. So is there any work around for the time being to achieve this ?
2019-11-06 05:29:09 UTC - vikash: is  any   fixes  on  backlog  issue  on  pulsar 2.4.1 ?
2019-11-06 05:59:15 UTC - Sijie Guo: what backlog issue?
2019-11-06 06:16:13 UTC - vikash: i  have  seen backlog  almost on  above 50 GB and  it  tooks so  much   time  to clear  off in  that  case not   consuming  Messages
2019-11-06 06:17:37 UTC - Logan B: @Logan B has joined the channel
2019-11-06 06:18:32 UTC - vikash: like  this  ,but i  get  more Backlog  too   some  time
2019-11-06 06:22:49 UTC - Sijie Guo: backlog means you didn’t consume fast enough.
2019-11-06 06:24:45 UTC - Bob Li: @Bob Li has joined the channel
2019-11-06 06:26:02 UTC - Bob Li: Hello guys.
grin : Sijie Guo
2019-11-06 06:51:01 UTC - vikash: is  there  any  issue  since  i  have  used Websocket  Client for  Consumer ,shall  i  try   with  .net  client  or  might  be  java  to  check  or  any  suggeston  from your  side
2019-11-06 08:16:32 UTC - Jasper Li: Hello all,

does Pulsar have a avro converter like ```io.confluent.connect.avro.AvroConverter``` used in Kafka? I want to use that in debezium cdc for taking change logs from my database.

Thanks. :persevere: