You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2020/06/11 09:11:05 UTC

Slack digest for #general - 2020-06-11

2020-06-10 10:49:06 UTC - Gautam Lodhiya: Hi, How can I pause and resume consumer messages with websocket api?
----
2020-06-10 11:38:40 UTC - alex kurtser: Hi @Sijie Guo

Sorry for disturbing . Do you have an advise for us? Is our design wrong and needed to be changed ib order to make function metrics working as expected ?
----
2020-06-10 13:15:55 UTC - Ebere Abanonu: If you can find a way not to send Flow command to Pulsar server
----
2020-06-10 14:28:41 UTC - Aaron Batilo: I had asked the same question in <#CJ0FMGHSM|kubernetes>  and it looks like they will be!
----
2020-06-10 14:28:42 UTC - Aaron Batilo: <https://apache-pulsar.slack.com/archives/CJ0FMGHSM/p1591723600263200>
----
2020-06-10 14:29:42 UTC - slouie: This sounds like the issue I’m seeing and posted about here. <https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1591375438262800>

I see logs reporting the backlogQuota was reached, yet the backlog message count and unack’d messages metrics do not increase. Was there more discovered about this problem?
----
2020-06-10 15:45:22 UTC - Sijie Guo: backlog isn’t really related to GC. Backlog indicates there are subscriptions not acknowledging messages. Did you capture `topics stats` and `topics stats-internal`?
----
2020-06-10 15:47:46 UTC - slouie: I did not get those two calls at the time. It took me a while to come back to this being a backlog issue because the metrics `pulsar_msg_backlog` and `pulsar_subscription_unacked_massages` remained steady. The only signal that seemed to make sense was `pulsar_storage_size`, it grew for a period and hit a ceiling
----
2020-06-10 15:47:48 UTC - Sijie Guo: Yes. It will be recorded.
----
2020-06-10 15:49:49 UTC - Sijie Guo: Yes. if you don’t ack messages, you will get backlog. So in any cases when you have issues on a topic, `topics stats` and `topics stats-internal` are the best tool for troubleshooting.
----
2020-06-10 15:50:12 UTC - slouie: shouldn’t the `msg_backlog` and `unacked` metrics increase when there is a problem with not properly ack’ing messages?
----
2020-06-10 15:50:54 UTC - Sijie Guo: it will stop increasing if you hit the backlog quota limit
----
2020-06-10 15:51:15 UTC - Sijie Guo: If I understand that correctly, you hit the backlog quota limit, no?
----
2020-06-10 15:53:30 UTC - slouie: Thats what the log messages state even though it doesn’t seem to add up when I look at the `msg_backlog` and `unacked`  metric.

I had unack’ed messages hovering ~100 and the local backlog count holding at ~50.

I can’t reproduce why storage size just increased to ~20GB and then hit the backlog quota. My backlog quota is the default 10G
----
2020-06-10 15:56:03 UTC - slouie: if I saw the `msg_backlog` and `unacked` metrics increase it would make total sense to me. I’m not grasping why those metrics remain consistent and then I suddenly hit a backlog_quota. It made me think it was something to do with retention and GC since the metrics indicate messages were still being ack’d.

`pulsar_rate_in` and `pulsar_rate_out`  were also constant until the limit was reached
----
2020-06-10 16:45:40 UTC - Erik Jansen: We’ve created a small test program with reads lines from a file and produces messages in batch with sendAsync. When file is completely produced we close producer with closeAsync.

We’ve created a reader on the topic. But for some reason the reader doesn’t receive all messages. Any thoughts i’ve we are doing something wrong?

These are the example programs:

import org.apache.pulsar.client.api.*;

import java.util.Date;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;

import <http://java.io|java.io>.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;

public class CsvExample {
    /**
     * The initiated pulsar client, initiated in the runProducerExample method.
     */
    private PulsarClient client;

    /**
     * Constructor
     */
    CsvExample() {
        System.out.println(“Producer Example created”);
    }

    /**
     * Initiates the pulsar client and sends messages;
     *
     * @throws IOException
     */
    public void sendMessages(String topicName) throws IOException {
        Map&lt;String, Object&gt; config = getPulsarClientConfig();

        try {
            // Create the pulsar client with the config defined in
            client = PulsarClient.builder().loadConf(config).build();

            // Create a producer for the passed on topic
            Producer&lt;byte[]&gt; producer = createProducer(topicName);
            Date date = new Date();
            DateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss.SSS”);
            String stringDate = sdf.format(date);
            System.out.println(“Start sending messages at: ” + stringDate);

            Stream&lt;String&gt; lines = Files
                    .lines(Paths.get(“/Users/erik/i-refactory-demo/tpc_h/deliveredData/Initial load/LINEITEM.tbl”));

            int numLines = 0;

            for (String line : (Iterable&lt;String&gt;) lines::iterator) {
                producer.sendAsync(line.getBytes());
                numLines++;

                if (numLines % 1000000 == 0) {
                    System.out.println(sdf.format(new Date()) + ” - num lines: ” + numLines);
                }
            }

            System.out.println(“num lines: ” + numLines);

            lines.close();
            System.out.println(“Waiting for async ops to complete”);
            try {
                producer.closeAsync().get();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            try {
                client.closeAsync().get();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            date = new Date();
            sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss.SSS”);
            stringDate = sdf.format(date);
            System.out.println(“All operations completed at: ” + stringDate);

        } catch (PulsarClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * Builds the producer.
     *
     * @link <https://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerBuilder.html>
     *       for the building methods which can be called to configure the producer.
     *
     * @param topicName The name of the topic to which the producer messages’ should
     *                  be send too.
     *
     * @return A byte producer.
     *
     * @throws PulsarClientException A checked exception which should be catched.
     */
    private Producer&lt;byte[]&gt; createProducer(String topicName) throws PulsarClientException {
        // Producer&lt;String&gt; producer =
        // client.newProducer(Schema.STRING).topic(topicName)
        Producer&lt;byte[]&gt; producer = client.newProducer().topic(topicName)
                .batchingMaxPublishDelay(5000, TimeUnit.MILLISECONDS)
                .enableBatching(true).batchingMaxMessages(500).batchingMaxBytes(4000000).blockIfQueueFull(true)
                .compressionType(CompressionType.LZ4).create();

        return producer;
    }

    /**
     * Get the configuration for the pulsar client.
     *
     * @link <https://pulsar.apache.org/docs/en/client-libraries-java/#client> (for
     *       configuration values)
     *
     * @return A map containing the configuration values for the client.
     */
    private Map&lt;String, Object&gt; getPulsarClientConfig() {
        Map&lt;String, Object&gt; config = new HashMap&lt;&gt;();

        config.put(“serviceUrl”, “<pulsar://localhost:6650>”);
        config.put(“numIoThreads”, 3);
        config.put(“numListenerThreads”, 3);

        return config;
    }

    public static void main(String[] args) throws IOException {
        CsvExample producer = new CsvExample();
        producer.sendMessages(“public/default/line-item-12”);
    }
}

import org.apache.pulsar.client.api.*;

public class ReaderExample {
    public static void main(String[] args) throws PulsarClientException {
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(“<pulsar://localhost:6650>”).build();

        // Class regionClass = Region.class;
        // Schema schema = Schema.AVRO(regionClass);
        // Reader&lt;Region&gt; regionReader =
        // pulsarClient.newReader().topic(“region”).startMessageId(MessageId.earliest).create();

        Reader&lt;byte[]&gt; reader = pulsarClient.newReader().topic(“public/default/line-item-12")
                .startMessageId(MessageId.earliest).create();

            int numLines = 0;
            while (true) {
            // Message&lt;Region&gt; message = regionReader.readNext();
            Message&lt;byte[]&gt; message = reader.readNext();
            numLines++;
            if (numLines % 1000000 == 0) {
                System.out.println(” - num lines: ” + numLines);
            }

            if (numLines &gt; 6000000) {
                System.out.println(” - num lines: ” + numLines);

            }

        }
    }

}
----
2020-06-10 16:48:38 UTC - Matteo Merli: When you call `sendAsync()` you need to check the future returned for the successful completion of the operation.

In this case, the problem is that, by default, when the producer outgoing queue is full, it will reject immediately new send request.

This is done so that, by default, `sendAsync()` never blocks.

If you want to send as fast as possible, you should instead blocking when the queue is full. For that use `blockIfQueueFull=true`  in the producer builder.
----
2020-06-10 16:57:09 UTC - Sijie Guo: Interesting - do you set retention?
----
2020-06-10 17:13:43 UTC - slouie: yup. `defaultRetentionTimeInMinutes: 0` and `defaultRetentionSizeInMB: 0`. Which as I understand is the most aggressive retention policy one could set
----
2020-06-10 17:16:05 UTC - slouie: I see `2.4.0` has a metric specifically around `backlog_size` which should be helpful. I’m just at a loss on how to reproduce and be alerted to this since alerts based off `msg_backlog` and `unacked` wouldn’t work. I guess alerting on `storage_size` is a proxy for the problem, but that lacks precision in `2.3`
----
2020-06-10 17:18:52 UTC - Devin G. Bost: Here’s my latest Pulsar video: <https://www.youtube.com/watch?v=vlU9UegYab8&amp;feature=youtu.be>
clap : Karthik Ramasamy, Damien Burke
----
2020-06-10 18:33:37 UTC - Sijie Guo: I am not sure as well. without the information of `topics stats`  or `topics stats-internal`, it is really hard to debug this issue.
----
2020-06-10 19:09:52 UTC - Fred George: A slightly random (and not particularly important) question: where did the name 'Pulsar' come from? Some one asked me and I wondered if there was a story behind the name??
----
2020-06-10 19:10:38 UTC - Matteo Merli: The logo has a hint
----
2020-06-10 19:10:47 UTC - Matteo Merli: 
----
2020-06-10 19:39:40 UTC - Erik Jansen: Hi Merlimat, blockIfQueuefull is set to true.
----
2020-06-10 19:40:08 UTC - Fred George: ian curtis fan maybe?
----
2020-06-10 19:48:49 UTC - Marcio Martins: Which version would guys you recommend running successfully in production? I am having a hard time finding a version that is stable. 2.5.2 has problems with s3 offload and cannot do hardware accelerated CRC32C... 2.5.1 spams exceptions after a while, should I keep going to earlier versions?
----
2020-06-10 20:05:06 UTC - Sijie Guo: I don’t think you should go back to earlier version.

Can you share more details on the s3 offload and CRC32C issue?
----
2020-06-10 20:14:11 UTC - Damien Burke: Re pulsar functions, can anyone share any documentation/best practises around handling failure? The function interface declares it throws a checked Exception;
```O process(I input, Context context) throws Exception```
, but zero JavaDoc. I was anticipating there may be a retry or error / dead letter queue configurable? but again, cant find any related docs.. Thanks
----
2020-06-10 20:21:49 UTC - Marcio Martins: It fails to load the class
----
2020-06-10 20:21:53 UTC - Marcio Martins: One sec
----
2020-06-10 20:22:21 UTC - Marcio Martins: ```ERROR org.apache.bookkeeper.proto.checksum.CRC32CDigestManager - Sse42Crc32C is not supported, will use a slower CRC32C implementation.```

----
2020-06-10 20:22:31 UTC - Damien Burke: To provide an example, i need an enricher function, which depends on making a REST call. This REST call could of course fail due to a simple (temp) network issue, eg. Does the "framework" give me anything to support handling this? I could have some local retry logic to call REST api again. I know I could have logic to not write to the output topic if REST call fails. Is that kinda it?
----
2020-06-10 20:23:13 UTC - Marcio Martins: It also fails to load the S3 offloader with this error:
```Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set if s3 offload enabled```
Even though those requirements are met, it's the exact same config I use for 2.5.1
----
2020-06-10 20:25:51 UTC - slouie: Is there something specific you would be looking for in `topic stats` and `topics stats-internal`?
----
2020-06-10 22:51:52 UTC - Sijie Guo: In stats-internal,  check the read and deleted positions of cursors. And check individual deletes
----
2020-06-11 00:16:51 UTC - Yuya Ebihara: @Yuya Ebihara has joined the channel
----
2020-06-11 00:31:55 UTC - Matteo Merli: @Marcio Martins Where do you get the CRC error? Linux or Mac?
----
2020-06-11 00:41:41 UTC - Sijie Guo: @Penghui Li can you help with the S3 offloader error? I think it is related to per namespace offloading settings.
----
2020-06-11 03:36:31 UTC - Alexander Ursu: Hi, was wondering if anyone here who has used the Pulsar SQL Presto implementation extensively have any recommendations on hardware requirements for the Presto coordinator running alone, or possible with one or two workers.
----
2020-06-11 06:45:27 UTC - Penghui Li: There is a break change in 2.5.2 related to namespace level offloader policy and broker level policy, we already fix it on the master branch. Now we can get around by setting namespace level offloader policy.
----
2020-06-11 06:59:15 UTC - Marcio Martins: @Matteo Merli Linux, it's the official pulsar docker images.
----
2020-06-11 07:00:45 UTC - Marcio Martins: @Penghui Li Thanks!
----
2020-06-11 08:30:19 UTC - jujugrrr: Any luck @Marcio Martins?
----
2020-06-11 09:07:55 UTC - Marcio Martins: No, I just added S3 permission to the node roles, and that's working - not optimal but also not worth stressing over, in my case. Will revisit later and hopefully it will be working as newer versions get picked up.
----
2020-06-11 09:10:23 UTC - Ken Huang: I create a function in k8s environment. I can see a pod is active. But I found the function doesn't subscribe to the input topic

I modified <https://github.com/apache/pulsar/blob/master/deployment/kubernetes/helm/pulsar/templates/broker-configmap.yaml#L64|jobNamespace> , <https://github.com/apache/pulsar/blob/master/deployment/kubernetes/helm/pulsar/templates/broker-configmap.yaml#L68|serviceUrl>  and adminUrl, I set the URL to proxy url.

here is the function pod log
```"Downloaded successfully"
shardId=0
[2020-06-11 08:29:11 +0000] [INFO] python_instance_main.py: Starting Python instance with Namespace(client_auth_params='file:///etc/auth/token', client_auth_plugin='org.apache.pulsar.client.impl.auth.AuthenticationToken', cluster_name='pulsar-145', dependency_repository=None, expected_healthcheck_interval=-1, extra_dependency_repository=None, function_details='{"tenant":"public","namespace":"default","name":"ken","className":"test_func.ExclamationFunction1","runtime":"PYTHON","autoAck":true,"parallelism":1,"source":{"inputSpecs":{"<persistent://public/default/intput>":{}},"cleanupSubscription":true},"sink":{"topic":"<persistent://public/default/output>"},"resources":{"cpu":1.1,"ram":"1073741824","disk":"10737418240"},"componentType":"FUNCTION"}', function_id='dd305aeb-a773-45d9-aa48-765c4fa451e5', function_version='fff11148-ed0d-4a65-b79f-8c6b7182ac88', hostname_verification_enabled='false', install_usercode_dependencies=True, instance_id='0', logging_config_file='/pulsar/conf/functions-logging/console_logging_config.ini', logging_directory='logs/functions', logging_file='ken', max_buffered_tuples='1024', metrics_port=9094, port=9093, pulsar_serviceurl='pulsar://&lt;my-proxy-url&gt;/', py='/pulsar/test_func.py', secrets_provider='secretsprovider.ClearTextSecretsProvider', secrets_provider_config=None, state_storage_serviceurl=None, tls_allow_insecure_connection='false', tls_trust_cert_path=None, use_tls='false')
2020-06-11 08:29:11.916 INFO  Client:88 | Subscribing on Topic :<persistent://public/default/intput>
2020-06-11 08:29:11.916 INFO  ConnectionPool:85 | Created connection for pulsar://&lt;my-proxy-url&gt;/
2020-06-11 08:29:11.918 INFO  ClientConnection:330 | [10.244.166.160:35006 -&gt; &lt;my-proxy-url&gt;] Connected to broker
2020-06-11 08:29:12.312 INFO  HandlerBase:53 | [<persistent://public/default/intput>, public/default/ken, 0] Getting connection from pool
2020-06-11 08:29:12.350 INFO  ConnectionPool:85 | Created connection for <pulsar://pulsar-145-broker-0.pulsar-145-broker.pulsar.svc.cluster.local:6650>
2020-06-11 08:29:12.351 INFO  ClientConnection:332 | [10.244.166.160:35022 -&gt; &lt;my-proxy-url&gt;] Connected to broker through proxy. Logical broker: <pulsar://pulsar-145-broker-0.pulsar-145-broker.pulsar.svc.cluster.local:6650>
2020-06-11 08:29:12.439 INFO  ConsumerImpl:175 | [<persistent://public/default/intput>, public/default/ken, 0] Created consumer on broker [10.244.166.160:35022 -&gt; &lt;my-proxy-url&gt;]
[2020-06-11 08:29:12 +0000] [INFO] server.py: Serving InstanceCommunication on port 9093
2020-06-11 08:39:12.313 INFO  ConsumerStatsImpl:64 | Consumer [<persistent://public/default/intput>, public/default/ken, 0] , ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, totalAckedMsgMap_ = {})
2020-06-11 08:49:12.313 INFO  ConsumerStatsImpl:64 | Consumer [<persistent://public/default/intput>, public/default/ken, 0] , ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 0, receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {}, totalAckedMsgMap_ = {})```
----