You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2020/08/25 09:11:05 UTC

Slack digest for #general - 2020-08-25

2020-08-24 09:17:28 UTC - Takahiro Hozumi: Thank you for your advice!
----
2020-08-24 12:51:48 UTC - Frank Kelly: @Ali Ahmed Thank you!
----
2020-08-24 13:16:08 UTC - Jennifer Huang: @Jennifer Huang set the channel topic: Breaking news: Apache Pulsar has 300 contributors now, thank you all for your contribution to Pulsar community <https://streamnative.io/blog/tech/2020-08-24-pulsar-300-contributors>
star-struck : Frank Kelly, Ali Ahmed
----
2020-08-24 14:37:40 UTC - Nazia Firdous: As we know there are 3 <http://subscriptions.In|subscriptions.In> Failover Subscription When the master consumer disconnects, all (non-acknowledged and subsequent) messages are delivered to the next consumer..I can able to do Exclusive and shared subscription..But i don't know how to achieve Failover..Below are my Code Kindly guide me how i can disconnect the first consumer so Next consumer will get all the messages.
```class PulsarFailoverSubscription {
    public static void main(String[] args) {
        try {
             boolean exit;
            // PulsarClient pc = PulsarClient.builder().serviceUrl("<pulsar://67.160.195.238:6650>").build();
            PulsarClient pc = PulsarClient.builder().serviceUrl("<pulsar://67.160.195.238:6618>").build();
            PulsarFailoverSubscription psc = new PulsarFailoverSubscription();
            psc.producer1(pc, "Producer1");
            psc.consumer1(pc, "Consumer1");
            psc.consumer2(pc, "Consumer2");
        } catch (PulsarClientException e) {
            e.getMessage();
        }
        System.out.println("Exiting out of Main");
    }//catch


    private void producer1(PulsarClient pc, String id) {
        final Thread thread = new Thread(()-&gt; {
                try {
                    Producer&lt;String&gt; producer = pc.newProducer(Schema.STRING)
                            .topic("<persistent://public/default/test-topic1>")
                            .create();
                    for (int i = 0; i &lt;= 10; i++) {
                        try {
                            Thread.sleep(1000);
                        }//try
                        catch (InterruptedException e) {
                            System.out.println("Thread Interrupted "+e.getMessage());
                        }//catch
                        producer.sendAsync("Message from " +id+" : "+ i);
                    } //for

                }//try
                catch (Exception e) {
                    e.getMessage();
                }//catch

        });thread.start();
    }



    public void consumer1(PulsarClient pc, String id) {
        final Thread thread1 = new Thread(() -&gt; {
            try {
                Consumer&lt;String&gt; consumer1 = pc.newConsumer(Schema.STRING)
                        .topic(new String[]{"test-topic1"})
                        .subscriptionType(SubscriptionType.Failover)
                        .subscriptionName("test-subscription1")
                        .subscribe();

                while (true) {
                    Message&lt;String&gt; msg = consumer1.receive();
                    System.out.println(id + " Received " + new String(msg.getData()));
                }
            }//try

            catch (Exception e) {
                e.getMessage();
            }//catch
        });thread1.start();
        thread1.interrupt();
    }

    public void consumer2(PulsarClient pc, String id) {
        final Thread thread2 = new Thread(() -&gt; {

            try {
                Consumer&lt;String&gt; consumer2 = pc.newConsumer(Schema.STRING)
                        .topic(new String[]{"test-topic1"})
                        .subscriptionType(SubscriptionType.Failover)
                        .subscriptionName("test-subscription1")
                        .subscribe();
                while (true) {
                    Message&lt;String&gt; msg = consumer2.receive();
                    System.out.println(id + " Received " + new String(msg.getData()));
                }
            }//try

            catch (Exception e) {
                e.getMessage();
            }//catch
        });thread2.start();
    }
}```

----
2020-08-24 14:55:53 UTC - Matt Mitchell: @Addison Higham I don’t see errors in the broker logs, but it’s possible that Pulsar was redeployed at some point after the problem. If it happens again, I’ll be sure to check the broker logs again though.
----
2020-08-24 15:28:26 UTC - Addison Higham: Failover subscriptions are usually used on two different processes (usually on different machines) as the main purpose is to be used one a machine/process fails and you want to minimize latency.

You could simulate this in a single process, but the easiest way to do so will be to use two different clients, each with a consumer. Then you can disconnect one client and have the other take over
----
2020-08-24 15:32:36 UTC - Nazia Firdous: Okay Thank you i will try this..
----
2020-08-24 15:45:53 UTC - Nathan Mills: What benefits does setting a backlog quota policy and/or TTL in addition to a Retention policy provide?
----
2020-08-24 15:53:18 UTC - Addison Higham: it can still be desired to put reasonable limits on subscription backlog that a consumer may just never reasonably catch up from, which is where a backlog quota is useful. As an example, I store my messages for 30 days for Pulsar SQL queries, but in reality, my apps are never going to catch up to more than a few days of backlog.

Similarly, you may want to retain messages longer, but within a subscription, after X hours, maybe it doesn't make sense to have your app notified.
----
2020-08-24 15:58:30 UTC - Nathan Mills: gotcha, does it have any performance impact on the cluster?
----
2020-08-24 16:05:40 UTC - Addison Higham: enabling or disabling them? not that I am aware of.  I guess having them disabled would be "one less thing" it has to check for, but it should be inconsequential
----
2020-08-24 16:52:07 UTC - Ruian: Hi, I wonder how message id (ledger id, entry id, batch id) is generated? Could I know it in advance without asking pulsar broker?
That’s say I have a producer sending messages in batches to a topic partitioned. And I have a consumer which knows the earliest message id of the partition is A. Could I generate a message id B from A so that I can use `consumer.seek(B)` to skip particular number of messages?
----
2020-08-24 17:04:35 UTC - Addison Higham: This is tricky in the general case. `batchId` is simple, it is just the index of the messages in the batch. So if you only wanted to skip within batches, that might be doable.

However, if the number of messages you want to skip span entries, that might be doable if you only had a single producer, but even then, I don't believe entryId is guaranteed to increase just a single counter. LedgerId would be worse as it is quite unpredictable

What you might try instead is sequenceIDs

If you have a way of generating predictable sequence ids, you could possibly use them. You wouldn't be able to seek, but you could filter the messages based on sequenceId
----
2020-08-24 17:37:08 UTC - Ruian: It seems that it is impossible to pre calculate the message id.
Actually I am writing my own spark connector, and trying to limit the size of the first batch of spark’s micro batch reader.
Anyway, thank you for your replying!
----
2020-08-24 18:39:14 UTC - Joshua Decosta: Noob question here, do i need to explicitly create the topics that functions use as input and output when creating a function with the pulsar-admin client?
----
2020-08-24 18:47:37 UTC - Stepan Mazurov: @Stepan Mazurov has joined the channel
----
2020-08-24 19:03:00 UTC - Addison Higham: by default no. The functions interface will just create a consumer/producer for any of the defined topics, and automatic topic creation in pulsar creates topics for any producer or consumer. You can disable automatic topi c creation with `allowAutoTopicCreation`
----
2020-08-24 19:03:23 UTC - Joshua Decosta: I think for my situation it doesn’t even make it work either way. My function pod keeps crashing and I’m not sure why. 
----
2020-08-24 19:03:46 UTC - Joshua Decosta: Is there some specific configs i need to add to get this working with the default helm charts?
----
2020-08-24 19:30:46 UTC - Addison Higham: do you have `functionAsPods` enabled? if so, do you have logs for the statefulset that is created (assuming it creates successfully)
----
2020-08-24 19:36:35 UTC - Joshua Decosta: I don’t have that enabled but it looks like it’s still generating pods 
----
2020-08-24 20:33:10 UTC - Frank Kelly: Seeing a 500 error using Custom Authn and AuthZ plugins (they worked in 2.5.2 but not in 2.6.1)   this is just a pulsar standalone instance
- DEBUG log is already on - is there a way to see the stack trace?

----
2020-08-24 20:33:10 UTC - Frank Kelly: ```16:27:48.573 [pulsar-web-54-14] DEBUG com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthenticationProvider - Authenticating token . . . 
16:27:48.576 [pulsar-web-54-14] DEBUG com.cogito.platform.signal.stream.pulsar.authn.broker.CogitoAuthenticationProvider - Token authenticated
16:27:48.576 [pulsar-web-54-14] DEBUG org.apache.pulsar.broker.web.AuthenticationFilter - [127.0.0.1] Authenticated HTTP request with role cogito
16:27:48.576 [pulsar-web-54-14] DEBUG org.eclipse.jetty.servlet.ServletHandler - call filter org.apache.pulsar.broker.web.ResponseHandlerFilter-7aa01bd9@7aa01bd9==org.apache.pulsar.broker.web.ResponseHandlerFilter,inst=true,async=true
16:27:48.576 [pulsar-web-54-14] DEBUG org.eclipse.jetty.servlet.ServletHandler - call servlet org.glassfish.jersey.servlet.ServletContainer-640d604@f679d7ba==org.glassfish.jersey.servlet.ServletContainer,jsp=null,order=-1,inst=true,async=true
16:27:48.583 [pulsar-web-54-14] DEBUG org.eclipse.jetty.server.HttpOutput - write(array HeapByteBuffer@6c771e32[p=0,l=7120,c=8192,r=7120]={&lt;&lt;&lt;\n --- An unexpected error...d.run(Thread.java:834)\n&gt;&gt;&gt;\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00})
16:27:48.583 [pulsar-web-54-14] DEBUG org.eclipse.jetty.server.HttpOutput - write(array) s=CLOSING,api=BLOCKED,sc=false,e=null last=true agg=false flush=true async=false, len=7120 null
16:27:48.583 [pulsar-web-54-14] DEBUG org.eclipse.jetty.server.HttpChannel - sendResponse info=null content=HeapByteBuffer@79ba1771[p=0,l=7120,c=8192,r=7120]={&lt;&lt;&lt;\n --- An unexpected error...d.run(Thread.java:834)\n&gt;&gt;&gt;\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00} complete=true committing=true callback=Blocker@32faa6a7{null}
16:27:48.584 [pulsar-web-54-14] DEBUG org.eclipse.jetty.server.HttpChannel - COMMIT for /admin/v2/namespaces/public on HttpChannelOverHttp@397aa579{s=HttpChannelState@3535581a{s=HANDLING rs=BLOCKING os=COMMITTED is=IDLE awp=false se=false i=true al=0},r=6,c=false/false,a=HANDLING,uri=//localhost:8080/admin/v2/namespaces/public,age=11}
500 Internal Server Error HTTP/1.1
Date: Mon, 24 Aug 2020 20:27:48 GMT
Content-Length: 7120
Content-Type: text/plain


16:27:48.584 [pulsar-web-54-14] DEBUG org.eclipse.jetty.server.HttpConnection - generate: NEED_HEADER for org.eclipse.jetty.server.HttpConnection$SendCallback@58eeee2d[PROCESSING][i=HTTP/1.1{s=500,h=3,cl=7120},cb=org.eclipse.jetty.server.HttpChannel$SendCallback@4096f060] (null,[p=0,l=7120,c=8192,r=7120],true)@START
16:27:48.584 [pulsar-web-54-14] DEBUG org.eclipse.jetty.http.HttpGenerator - generateHeaders HTTP/1.1{s=500,h=3,cl=7120} last=true content=HeapByteBuffer@79ba1771[p=0,l=7120,c=8192,r=7120]={&lt;&lt;&lt;\n --- An unexpected error...d.run(Thread.java:834)\n&gt;&gt;&gt;\x00\x00\x00\x00\x00\x00\x00\x00\x00...\x00\x00\x00\x00\x00\x00\x00}```
----
2020-08-24 20:55:01 UTC - Frank Kelly: Figured it out `org.apache.pulsar.broker.authorization.AuthorizationProvider` provides a bunch of default implementation methods which basically throw an exception. My guess is they should NOT be default implementations?
<https://github.com/apache/pulsar/blob/48f5a2f62c148b3df617be060fefed51f3145979/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java#L230-L269>
----
2020-08-24 20:55:32 UTC - Frank Kelly: I will file an issue on GitHub tomorrow
----
2020-08-24 21:55:54 UTC - Evan Furman: Hi guys, we’re looking to take adavantage of rack awareness for the bookies. Is there any reason to use `bookkeeperClientRackawarePolicyEnabled` rather than `bookkeeperClientRegionawarePolicyEnabled` ? We’re running in two AZs in AWS.
----
2020-08-24 23:37:06 UTC - Addison Higham: This documentation (for another component that uses bookkeeper but is still applicable) <https://bookkeeper.apache.org/distributedlog/docs/latest/user_guide/implementation/storage.html#id8> gives an overview of the differences between the two.

RegionAware should mostly be the same as Rackaware, but simply adds another level of hierarchy to ensure it spreads across regions first and then can spread across racks.

Specifcally:
&gt; RackAware placement policy basically just chooses bookies from different racks in the built network topology. It guarantees that a write quorum will cover at least two racks.
&gt; RegionAware placement policy is a hierarchical placement policy, which it chooses equal-sized bookies from regions, and within each region it uses RackAware placement policy to choose bookies from racks. For example, if there is 3 regions - region-a, region-b and region-c, an application want to allocate a 15-bookies ensemble. First, it would figure out there are 3 regions and it should allocate 5 bookies from each region. Second, for each region, it would use RackAware placement policy to choose 5 bookies.
----
2020-08-25 04:50:03 UTC - Tymm: Hello, is it possible to subscribe to a list of pulsar topic using websocket api?
----
2020-08-25 08:13:47 UTC - Evion Cane: @Evion Cane has joined the channel
----
2020-08-25 08:58:47 UTC - Vil: I am not sure but docs do not include topic listing <https://pulsar.apache.org/docs/en/client-libraries-websocket/>. But perhaps docs have not all the info?
----