You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2020/05/29 09:11:03 UTC

Slack digest for #dev - 2020-05-29

2020-05-28 15:01:23 UTC - Paul Wagner: @Paul Wagner has joined the channel
----
2020-05-29 00:02:07 UTC - Matteo Merli: &gt;  Sounds like the best path would be to implement a heartbeat in the connections to bookkeeper and to kill sockets if heartbeats fail
Yes, I'd recommend that way in any case.

&gt; When a bookie gets rescheduled, the time for a broker to be happy is just how fast keep alive.
This might also be made worse if there's no proper shutdown of bookie pod. Eg. K8S will send signal to the pod to allow for a graceful shutdown. If the JVM process is not receiving this, it will get forcefully closed (and the pod IP will stop responding).

Ensuring that JVM gets the signal and bookies is gracefully shut down, will make these sockets to get properly closed
----
2020-05-29 00:02:16 UTC - Matteo Merli: @Addison Higham ^^^
----
2020-05-29 02:51:22 UTC - Addison Higham: Yeah, I assumed that as well
----
2020-05-29 02:52:11 UTC - Addison Higham: About not getting proper shutdown that is
----
2020-05-29 06:41:45 UTC - Alex Yaroslavsky: I think I'm misunderstanding something but I can't seem to make key_shared to balance load between two consumers.
I use the code below, and only consumer1 gets all the messages (from different partitions)

```import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

public class test {
    private static Producer&lt;byte[]&gt; producer;
    private static Consumer&lt;byte[]&gt; consumer1;
    private static Consumer&lt;byte[]&gt; consumer2;
    private static String PULSAR_URL = "<pulsar+ssl://pulsar:6651>";
    private static String topic = "public/default/partitioned-topic";
    private static String subscription = "test";
    private static SubscriptionType subscriptionType = SubscriptionType.Key_Shared;
    
    public static PulsarClient getClient(String pulsarUrl) throws PulsarClientException {
        return PulsarClient.builder().
                serviceUrl(pulsarUrl)
                .authentication(new AuthenticationTls("certs/client.pem", "certs/client.key"))
                .build();        
    }

    public static void main(String[] args) throws PulsarClientException, InterruptedException {
        PulsarClient pc = getClient(PULSAR_URL);
        consumer1 = pc.newConsumer()
                .topic(topic)
                .subscriptionType(subscriptionType)
                .subscriptionName(subscription)
                .subscribe();

        PulsarClient pc2 = getClient(PULSAR_URL);
        consumer2 = pc2.newConsumer()
                .topic(topic)
                .subscriptionType(subscriptionType)
                .subscriptionName(subscription)
                .subscribe();

        PulsarClient pc3 = getClient(PULSAR_URL);
        producer = pc3.newProducer().topic(topic).create();

        produce();

        for (int i=0; i &lt; 2000; i++)
        {
            Message&lt;byte[]&gt; msg = consumer1.receive(1, TimeUnit.MILLISECONDS);
            if (msg != null) {
                consumer1.acknowledge(msg);
                System.out.println("Consumer1 Message " + i + ". key: " + msg.getKey() + "_" + msg.getTopicName() + " msg: " + new String(msg.getData(), StandardCharsets.UTF_8));
            }
            msg = consumer2.receive(1, TimeUnit.MILLISECONDS);
            if (msg != null) {
                consumer2.acknowledge(msg);
                System.out.println("Consumer2 Message " + i + ". key: " + msg.getKey() + "_" + msg.getTopicName() + " msg: " + new String(msg.getData(), StandardCharsets.UTF_8));
            }
        }

        try {
            producer.close();
            consumer1.close();
            consumer2.close();
            pc.close();
            pc2.close();
            pc3.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void produce() throws PulsarClientException, InterruptedException {
        for (String k: Arrays.asList("alex", "bibi", "camel", "ultra", "version", "water", "xavier", "yatsi", "zoology")) {
            for (int i=1; i &lt; 100; i++) {
                String message = k + "." + i;
                producer.newMessage().key(k).value(message.getBytes()).sendAsync();
            }
        }
    }
}```

----
2020-05-29 07:10:48 UTC - Sijie Guo: Try to disable batching or use key-based batcher for your producer.
----
2020-05-29 07:42:59 UTC - Alex Yaroslavsky: @Sijie Guo Thanks a lot!
Adding batcherBuilder(BatcherBuilder.KEY_BASED) made it work.
100 : Sijie Guo
----
2020-05-29 07:56:34 UTC - Alex Yaroslavsky: Another question about key_shared, now with a function in the middle.
There is a publisher that publishes with keys (and key based batching enabled) to a certain partitioned topic.
A routing function (currently in Python, see below) reads from this topic and forwards the messages to a different partitioned topic (keeping the key with slight modification). If I now consume with KEY_SHARED from this topic then only one consumer will get all the messages.
Will rewriting the function in Java solve this issue, or functions do not support KEY_SHARED?

`from pulsar import Function`
`class RoutingFunction(Function):`
    `def process(self, item, context):`
        `properties = context.get_message_properties()`
        `context.publish("persistent://" + properties["tenant"] + "/ns/" + properties["dst_id"], item, message_conf={"partition_key": properties["tenant"] + "_" + context.get_partition_key()})`

----
2020-05-29 08:16:44 UTC - Sijie Guo: functions doesn’t support key batcher yet. You can disable batching in functions. That would probably work.  Can you create an issue for us? /cc @Penghui Li
----
2020-05-29 08:25:24 UTC - Alex Yaroslavsky: @Sijie Guo Thanks for the quick reply! How do I disable batching in functions? And sure, I will create an issue for this.
----
2020-05-29 08:34:28 UTC - Alex Yaroslavsky: <https://github.com/apache/pulsar/issues/7095>
----
2020-05-29 09:05:10 UTC - Alex Yaroslavsky: I see that batching is hardcoded as True for both python and java functions. I will try to modify the python code on the worker...
----