You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by Apache Pulsar Slack <> on 2019/11/12 09:11:03 UTC

Slack digest for #general - 2019-11-12

2019-11-11 09:12:37 UTC - leonidv: Thanks! It's exactly what I find! Great!
2019-11-11 09:12:50 UTC - leonidv: I'm about subscriptionInitalPosition
2019-11-11 09:48:50 UTC - Gopi Krishna: Hi, I am trying to run pulsar using two different servers. My current scenario is that I will be producing messages using NiFi, as shown in this blog (<>). But the major change in my case will be that I want to consume messages using pulsar-client in a different server. I think that this kind of case is possible, but couldn't proceed any further any help will be highly appreciated.
2019-11-11 09:56:43 UTC - Sijie Guo: Once the NIFI processors produce the messages to a Pulsar topic, you can use a consumer to receive message from that topic in the other server. Alternatively you can write a Pulsar Function to process the messages as well.
2019-11-11 09:59:40 UTC - Gopi Krishna: Ok, but I have doubt on how the consumer in the other server know that, should we provide the nifi processor the url, if so where would I find that and what would that url be specified as ?
2019-11-11 10:02:05 UTC - Sijie Guo: you need to provide the pulsar service url to the consumer you used in the other server.
2019-11-11 10:02:35 UTC - Sijie Guo: for how to use a consumer in Java, you can check : <>
2019-11-11 10:07:05 UTC - tuteng: You can try: from pulsar import Function
from distutils.util import strtobool
from pulsar import SerDe
from pulsar.schema import (
    Record, String, Integer, Boolean

class Message(Record):
    a = String()
    b = Integer()
    c = Boolean()

    def __init__(self, a, b, c):
        self.a = a
        self.b = int(b)
        self.c = bool(strtobool(c))

class MessageSerde(SerDe):
    def __init__(self):

    def serialize(self, input):
        return bytes("{0},{1},{2}".format(
            input.a, input.b, input.c))

    def deserialize(self, input_bytes):
        components = str(input_bytes).split(",")
        return Message(*components)

class PulsarFunction(Function):
    def __init__(self):

    def process(self, input, context):
        logger = context.get_logger()
        <|>("----- got message -----")
        <|>("%s" % ", ".join(input.a, input.b, input.c))
        return "/".join(input.a, input.b, input.c)
+1 : Jasper Li
2019-11-11 10:07:15 UTC - tuteng: reference: <>
+1 : Jasper Li
2019-11-11 10:07:55 UTC - tuteng: This is incorrect <>, we will fix it
+1 : Jasper Li
2019-11-11 10:09:32 UTC - tuteng: Messages should come from the input topic in serialize, not be assigned at the beginning of initialization
+1 : Jasper Li
2019-11-11 10:37:11 UTC - Naveen Kumar: @Jasper Li Okay. I'll check by that process.
2019-11-11 11:50:33 UTC - Kevin.Chen: @Kevin.Chen has joined the channel
2019-11-11 11:52:39 UTC - Naveen Kumar: @tuteng ./pulsar-admin sources create --source-config-file ./user_conf/debezium-postgres-topic.yml
2019-11-11 11:53:37 UTC - Naveen Kumar: connector source config file
2019-11-11 11:59:21 UTC - Naveen Kumar: @Jasper Li I had some trouble adding <> as maven dependency bcoz it's located in ICM repo (specified in the Note section), but it's also not available in ICM repo too.

So I downloaded the avro converter from confluent hub, extracted the zip, copied only the kafka-connect-avro-converter-5.3.1.jar to pulsar directory and specified dependency as


But still the generated nar file doesn't contain the above jar.
2019-11-11 12:01:59 UTC - Naveen Kumar: The dependency is added in pom.xml of pulsar-io-debezium-postgres module
2019-11-11 12:49:30 UTC - Jasper Li: @tuteng Got it finally! Cool to have that!!! Thank you very much!
2019-11-11 13:25:24 UTC - Naveen Kumar: IGNORE above problems related to pom.xml
2019-11-11 14:49:58 UTC - Pedro Cardoso: Hello, can pulsar functions access the BookKeeper's ledger or stream APIs?
2019-11-11 14:52:06 UTC - Britt Bolen: one must also setup a keystore for the zts client in order for it to use the truststore.  I just created an empty keystore.
2019-11-11 14:52:36 UTC - Retardust: Is there any out of the box way to send acks async but with ordering guaranties?
2019-11-11 14:56:55 UTC - Jasper Li: @Naveen Kumar I am not sure if I am correct or not since it is done by my colleague and I will confirm it tomorrow, but it should be added in here: <> if I got it correctly and you can build a nar with the confluent converter for pulsar (but we have got other issue after we built it, since `io.confluent.connect.avro.AvroConverter` requires confluent schema registry and we can only send message to that schema registry at this stage. :persevere:
2019-11-11 15:26:37 UTC - Pedro Cardoso: What is the difference between defining a Pulsar Schema as `JSONSchema.of(User.class)` vs `Schema.JSON(User.class);` ?
2019-11-11 16:32:14 UTC - Devin G. Bost: Is there a way to create redundant topics? We have recurring issues where a topic will randomly “freeze,” which will result in backlog accumulation and downstream functions receiving no data. The only resolution is to restart the broker where the frozen topic is living.
2019-11-11 16:32:52 UTC - Devin G. Bost: We’ve been having this same issue for months, and we still haven’t received much guidance on it.
2019-11-11 16:34:58 UTC - Addison Higham: that ruby client is very out of date, there is a WIP new ruby client based on C++ here: <>
2019-11-11 16:35:21 UTC - Addison Higham: it is being contributed back to pulsar, it works for basic use cases ATM
2019-11-11 17:07:14 UTC - Nathan Mills: @Nathan Mills has joined the channel
2019-11-11 18:31:36 UTC - Sijie Guo: Freeze means stopping consuming? What version of broker are you using?
2019-11-11 18:32:24 UTC - Sijie Guo: Can you explain a bit of the idea of redundant topics?
2019-11-11 18:33:01 UTC - Sijie Guo: Order of acks?
2019-11-11 18:33:17 UTC - Sijie Guo: They are effectively same 
2019-11-11 18:33:57 UTC - Sijie Guo: Current no yet. We can consider exposing it if there are use cases.
2019-11-11 18:35:41 UTC - Pedro Cardoso: I need access to the ledger API in order to perform sliding window operations (with a per-event slide) which is currently not possible. Is connecting to the underlying bookkeeper cluster when creating a pulsar function feasible?
2019-11-11 18:41:30 UTC - Matteo Merli: Oh that's really out of my Athenz experience depth. Pinging @Nozomi Kurihara or @Masahiro Sakamoto since they're actually using Pulsar with Athenz
2019-11-11 18:50:29 UTC - Joshua Dunham: Hi Everyone, I'm creating some relatively custom containers for Pulsar. Since Pulsar uses bookkeeper over ports will it kill performance if I have separate containers for each?
2019-11-11 18:51:51 UTC - Matteo Merli: No, that's the normal deployment option. A pulsar broker will typically send the data to multiple bookkeeper nodes anyway
2019-11-11 18:57:01 UTC - Joshua Dunham: Sounds great. Any value in stripping out the libs for the other apps / services?
2019-11-11 18:59:37 UTC - Matteo Merli: For example?
2019-11-11 19:38:53 UTC - Retardust: Yep. Cause if i will ack 2nd message and fail to ack first then pulsar will send me first again or not? In failover mode. From initial sequence of a, b messages i will have b, a, b in downstream topic or not?
2019-11-11 20:03:25 UTC - Joshua Dunham: libs ~ org.apache.bookkeeper-stream-storage-java-client-base-4.9.2.jar
2019-11-11 20:03:45 UTC - Joshua Dunham: ok, maybe not the bookkeeper client. :smile:
2019-11-11 20:05:30 UTC - Joshua Dunham: The libs seem like they support running the various services which can be activated by setting the pulsar 'mode' (./bin/pulsar broker vs. ./bin/pulsar bookie vs. etc)
2019-11-11 21:31:31 UTC - Joshua Dunham: Hey Everyone,
2019-11-11 21:32:15 UTC - Joshua Dunham: If I have a highly nested URL for producers /1/2/3/4/5, how in the namespace calculated? Is it /1/ or everything before the topic?
2019-11-11 21:34:07 UTC - Matteo Merli: the topic itself cannot contain `/` characters
2019-11-11 21:34:33 UTC - Joshua Dunham: In my long example what would the tenant, namespace, and topic be?
2019-11-11 21:34:35 UTC - Matteo Merli: so, the form will always be: `<persistent://TENANT/NAMESPACE/TOPIC>`
2019-11-11 21:34:56 UTC - Matteo Merli: when passing a namespace name, you'd need to pass `TENANT/NAMESPACE`
2019-11-11 21:35:47 UTC - Joshua Dunham: I adapted the python example and this works.
2019-11-11 21:35:51 UTC - Joshua Dunham: producer = client.create_producer('persistent://1/2/3/4/5/6/7')
2019-11-11 21:36:11 UTC - Joshua Dunham: (I've replaced numbers with some other hierarchal info)
2019-11-11 21:36:43 UTC - Joshua Dunham: But of course there are different management tools (auth etc) that operate on the different levels.
2019-11-11 21:36:56 UTC - Joshua Dunham: I'm just not sure where Pulsar draws the lines if I provide so many.
2019-11-11 21:39:46 UTC - Matteo Merli: So, there's a bit of a long story :slightly_smiling_face:

Before Pulsar 2.0, we had naming like:


We removed cluster from the naming, but now we had to disallow `/` characters (which were previously allowed in v1.x) because we wouldn't be able to distinguish between old 1.x naming vs 2.x naming. (and we need to keep supporting the old topic names for existing deployments).
2019-11-11 21:40:07 UTC - Matteo Merli: In your case, `persistent://1/2/3/4/5/6/7` will be interpreted as :
2019-11-11 21:40:32 UTC - Matteo Merli: tenant: `1`
cluster: `2`
namespace `1/2/3`
2019-11-11 21:40:42 UTC - Matteo Merli: Meaning a v1.x style topic
2019-11-11 21:42:41 UTC - Joshua Dunham: hmm
2019-11-11 21:44:45 UTC - Joshua Dunham: I'll read more on these and the related management tools to see how it would fit my usecase using less hierarchical elements.
2019-11-11 21:45:04 UTC - Joshua Dunham: I don't (think) I need the concept of tenants.
2019-11-11 21:45:48 UTC - Joshua Dunham: What I was going for is a sort of narrowing scope until some midpoint as a unique domain and then expanding on that for exact topics (to support schemas etc).
2019-11-11 21:46:36 UTC - Joshua Dunham: so like /base/applications/app1/{app1_logging, app1_metrics, app1_current_task}
2019-11-11 21:46:57 UTC - Joshua Dunham: If you consider the { } as brace expansion.
2019-11-11 21:47:35 UTC - Joshua Dunham: I would have multiple applications (as producers) commiting messages to each 'sub-topic' (with set schemas).
2019-11-11 21:47:54 UTC - Joshua Dunham: Do you know of docs on writeups for this sort of strategy?
2019-11-11 22:07:25 UTC - Devin G. Bost: We’re running 4.2.0 on our brokers.
“Freeze” means that all consumers stop getting output from the topics on the particular broker involved.
I’m just trying to find a workaround so that when this happens, we have a way to ensure that we don’t have outages.
2019-11-11 23:34:44 UTC - Derek Rhodehamel: @Derek Rhodehamel has joined the channel
2019-11-11 23:42:30 UTC - Derek Rhodehamel: Hello,
I'm trying to use the pulsar go client library in an alpine docker container.
The issue is that I do not see a way to install the CPP bindings since an alpine apk is not mentioned in the docs.
Is there a known way to either build an alpine apk or use one of the existing builds in an alpine container?
2019-11-11 23:44:28 UTC - Ali Ahmed: @Dean Anderson you compile pulsar libs statically on any linux machine and copy them into the target container
+1 : Derek Rhodehamel
2019-11-11 23:51:19 UTC - Matteo Merli: If you're not scared easily, you can take a look at the WIP in <>
2019-11-11 23:52:11 UTC - Matteo Merli: basic producer is mostly working, consumer is getting there
2019-11-12 00:25:40 UTC - Sijie Guo: Are you using functions to consume the messages? Or do you also use Java consumers in your services?

Stopping getting output from the topics looks like a problem we have seen in our users. It was related to permits issues in pulsar flow control logic. we still working on how to fix it. but it usually can be fixed by unloading the topics or namespace. Have you tried unloading before?
2019-11-12 00:27:57 UTC - Sijie Guo: Do you want to create ledgers for storing sliding windows? or you want to access the ledgers of a topic to perform sliding windows?

If you can provide more details about this use case, I can help figure out the right approach for you.
2019-11-12 00:30:38 UTC - Sijie Guo: &gt; Cause if i will ack 2nd message and fail to ack first then pulsar will send me first again or not?

If you are using individual acks, it will.

If you are using cummulative acks, it will not.

You don’t need ordering when using individual acks, since broker tracks individual acks and only redelivers those unacked messages. You also don’t need ordering when using cumulative acks, eventually it only tracks the highest watermark (the cursor position).
2019-11-12 01:54:34 UTC - Bob Li: @Bob Li has joined the channel
2019-11-12 05:55:47 UTC - sundar: My friend has created another issue on github for the same issue..could you respond there 

2019-11-12 05:57:47 UTC - sundar: Can you also tell me how to wipe the existing pulsar configuration and resetup the whole thing because if we run the metadata command again it says pulsar cluster already present...but if i run the pulsar admin command to view the clusters present it does not work so we don't know what to do...could you please help us? We're college students and we don't know how to proceed
2019-11-12 06:01:06 UTC - Raghavi: Hi,
Can someome explain me how to configure avro as the schema type for pulsar IO with poatgres debezium cdc connector? When i set the "schemaType" property in connector configuration, I'm getting "AvroTypeException unknown type K". Can someone help me with a link to documentation/blog post explaining in detail?
2019-11-12 06:41:37 UTC - Sijie Guo: @Raghavi Are you planning to consume the topics produced by cdc connector?
2019-11-12 06:41:51 UTC - Sijie Guo: Or what are you planning to do ?
2019-11-12 06:44:19 UTC - Sijie Guo: commented in the issue.
2019-11-12 06:44:46 UTC - sundar: Thanks will see immediately
2019-11-12 06:44:58 UTC - Sijie Guo: you can stop the cluster, delete `data` directory and re-setup the cluster.
2019-11-12 06:59:45 UTC - sundar: We setup the cluster using the initialize metadata command but when we use the pulsar admin command it is coming like this
2019-11-12 07:02:45 UTC - Sijie Guo: you need to setup the whole cluster before you can use `pulsar-admin`. because pulsar-admin talks to brokers via port 8080
2019-11-12 07:05:05 UTC - Raghavi: Our goal is to capture changes from one postgres source and persist the changes to another postgres destination. This should be done with defined schema for each table and should have functionality to upgrade schema whenever there is a change in the source. So, I'm looking to work with avro schema which is where I'm struck.
2019-11-12 07:05:49 UTC - Raghavi: Using Postgres debezium source connector and jdbc sink connector
2019-11-12 07:06:46 UTC - Retardust: But there is a tricky part with negative ack. For example. I send 3 messages a, b, c downstream async . They batches on pulsar producer. I have 3 callbacks. I have ack accumulative each and send negative ack on message sending fail. Could i get in sittuation when a is acked, b is negative acked during send failed and c is override that negative ack?
2019-11-12 07:11:42 UTC - Sijie Guo: if they are individual acks, I don’t see there is a problem. they are different messages, no?
2019-11-12 07:19:58 UTC - Pradeep Mishra: ok, do we have any plan to add plugin in fluentd based on this lib?
2019-11-12 07:31:42 UTC - sundar: What do you mean by setting up the whole cluster? Is it setting up :
1. Zookeeper
2019-11-12 07:31:51 UTC - sundar: 2. Bookkeeper
3. Broker
2019-11-12 07:32:44 UTC - sundar: Also if the DNS is a combination of host ids like
<pulsar://ip1:port,ip2:port> how will we access this from the client