You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2018/10/10 09:11:04 UTC

Slack digest for #general - 2018-10-10

2018-10-09 10:47:56 UTC - Nathanial Murphy: Is there a way of submitting a proposal for features, or do we just do the work in PRs and ask for review?
----
2018-10-09 11:25:35 UTC - Samuel Sun: <https://github.com/apache/pulsar/wiki> ?
----
2018-10-09 13:21:43 UTC - hj: @Sanjeev Kulkarni Sorry for being late. Thanks for help I will try
----
2018-10-09 13:54:57 UTC - Shalin: `-Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g -XX:+UseG1GC -XX:MaxGCPauseMillis=10 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC -XX:-ResizePLAB -Dio.netty.leakDetectionLevel=disabled -Dio.netty.recycler.maxCapacity.default=1000 -Dio.netty.recycler.linkCapacity=1024`
----
2018-10-09 14:01:31 UTC - Shalin: The topics get messages of size ~ `6mb` published concurrently, max ~ 1000/sec, but intermittently. And the messages are consumed/ack'ed ~ 10/min.
----
2018-10-09 14:03:51 UTC - Shalin: The memory used by the broker increases as I publish more messages, I tried reducing the `managedLedgerCacheSizeMB` to `512` to see if that would help, but there still seems to be a steady increase in the memory used as the backlog increases.
----
2018-10-09 14:04:39 UTC - Shalin: @Sijie Guo Let me know if that makes sense, or if you need some more context on the problem.
----
2018-10-09 14:59:02 UTC - Igor Zubchenok: Hello! Can we change broker config from
```
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=3
managedLedgerDefaultAckQuorum=2
```
to
```managedLedgerDefaultEnsembleSize=2
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
```
and restart brokers one-by-one without data cleanup?
----
2018-10-09 15:33:40 UTC - Matteo Merli: yes, although you don’t need to restart brokers to change this setting. You can apply that at the namespace level and it will be automatically applied
----
2018-10-09 15:33:56 UTC - Matteo Merli: `pulsar-admin namespaces set-persistence ...`
----
2018-10-09 15:36:36 UTC - Sijie Guo: @Nathanial Murphy:

you can write a proposal in PIP (examples: <https://github.com/apache/pulsar/wiki/PIP-23%3A-Message-Tracing-By-Interceptors>). you can write in google doc or gist and send an email to <ma...@pulsar.apache.org>, committers will help you adding your proposal to pulsar wiki.
----
2018-10-09 18:29:04 UTC - William Fry: @Sijie Guo any help here would be _awesome_, thx!
----
2018-10-09 18:52:04 UTC - Matteo Merli: @William Fry the `managedLedgerCacheSizeMB` does indeed come from direct memory. Also the memory used for Netty IO comes from direct memory and it’s pooled. There are Netty tunables to reduce the amount of memory retained in the pools under different conditions.
In any case, we are thinking on ways to simplify the memory configs
----
2018-10-09 18:55:17 UTC - Sijie Guo: I think you can reduce managedLedgerCacheSizeMB to even smaller (given you have pretty small setup), like 128.
----
2018-10-09 18:56:28 UTC - Shalin: @Matteo Merli So if the mem used by the broker is approaching ~ `2GB` when the `managedLedgerCacheSizeMB` is set to `512mb`, is it safe to assume that netty is eating up the memory?
----
2018-10-09 18:58:27 UTC - Matteo Merli: Yes, there are per-thread caches (for perf reasons). You can reduce the mem usage by reducing the pools granularity. Eg. Add system properties:

`-Dio.netty.allocator.numDirectArenas=1 -Dio.netty.allocator.normalCacheSize=0 -Dio.netty.allocator.maxOrder=9`
----
2018-10-09 19:05:53 UTC - Shalin: @Matteo Merli @Sijie Guo Since it has to do with netty, the reason we are experiencing this now is because of the number of concurrent publishers and not the size of the message itself?
----
2018-10-09 19:24:26 UTC - Matteo Merli: @Jean-Bernard van Zuylen I’d prefer to keep it “automatic” if possible, so inheriting the thread-daemon status from parent, so that we don’t have more config options (unless there are drawbacks with the approach).

I’m still not 100% clear on why this should block Tomcat runtime. Is that because there is no easy way to trigger `PulsarClient.close()`? Closing the client instance would be stopping all these threads.
----
2018-10-09 19:32:46 UTC - Nathanial Murphy: Just following up from before - would there be much interest/use to the pulsar user base in splitting up message batches inside of pulsar as an option you can configure? It wigs me out a little that the write rate determines how messages are acknowledged together, but I'm also happy to just use non-batched messages. 
----
2018-10-09 20:17:08 UTC - Alex Mault: Does anyone know the max length of a topic name is? (if any?)
----
2018-10-09 20:27:28 UTC - Shay Rybak: @Shay Rybak has joined the channel
----
2018-10-09 20:27:49 UTC - Matteo Merli: @Nathanial Murphy Rather that splitting batches in the broker, I’d rather add the individual message granularity on the acknowledgement path (with auto discarding of messages already acked within a batch)
----
2018-10-09 20:28:48 UTC - Matteo Merli: @Alex Mault there’s no max length on individual fields. Only limitation is on the whole command frames which are capped at 5MB
+1 : Alex Mault
----
2018-10-09 20:30:18 UTC - Shay Rybak: Hi, I’m trying to set up a kafka source using pulsar IO, I’ve created a source to read from a local kafka instance with the following config: ```configs:
  bootstrapServers: localhost:9092
  topic: pulsar
  groupId: pulsar```
----
2018-10-09 20:30:21 UTC - Shay Rybak: `bin/pulsar-admin source create --tenant public --namespace default --name kafka --sourceConfigFile examples/kafka-source.yaml --source-type kafka --destinationTopicName kafka`
----
2018-10-09 20:30:47 UTC - Shay Rybak: I’m having a hard time troubleshooting why I don’t see any messages in the destination topic
----
2018-10-09 20:36:42 UTC - Matteo Merli: There should be a log file for the PulsarIO source, created on the broker
----
2018-10-09 20:37:49 UTC - Shay Rybak: I believe this is it: ```15:35:21.845 [pulsar-io-22-16] INFO  org.apache.pulsar.broker.service.ServerCnx - New connection from /10.184.117.51:50812
15:35:21.850 [pulsar-io-22-16] INFO  org.apache.pulsar.broker.service.ServerCnx - [/10.184.117.51:50812][<persistent://public/default/kafka>] Creating producer. producerId=0
15:35:21.850 [pulsar-io-22-16] INFO  org.apache.pulsar.broker.service.ServerCnx - [/10.184.117.51:50812] Created new producer: Producer{topic=PersistentTopic{topic=<persistent://public/default/kafka>}, client=/10.184.117.51:50812, producerName=dal12-02-0-103, producerId=0}
15:35:22.124 [pulsar-io-22-16] INFO  org.apache.pulsar.broker.service.ServerCnx - [PersistentTopic{topic=<persistent://public/default/kafka>}][dal12-02-0-103] Closing producer on cnx /10.184.117.51:50812
15:35:22.124 [pulsar-io-22-16] INFO  org.apache.pulsar.broker.service.ServerCnx - [PersistentTopic{topic=<persistent://public/default/kafka>}][dal12-02-0-103] Closed producer on cnx /10.184.117.51:50812
15:35:22.137 [pulsar-io-22-16] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /10.184.117.51:50812``` I see that every 30 seconds or so, looks like it’s creating the producer to push messages but not pushing anything
----
2018-10-09 20:39:49 UTC - Shay Rybak: it has no details about pulling from kafka, if it failed or not
----
2018-10-09 20:40:53 UTC - Matteo Merli: let me try locally
----
2018-10-09 20:47:41 UTC - Matteo Merli: @Shay Rybak did you download the connectors?
----
2018-10-09 20:48:15 UTC - Matteo Merli: You can try also the `localrun` mode. eg: `bin/pulsar-admin source localrun  --name kafka --sourceConfigFile /tmp/kafka-source.yml --source-type kafka --destinationTopicName kafka`
----
2018-10-09 20:48:37 UTC - Sijie Guo: @Shay Rybak :

- where are you running? in standalone mode or in a cluster mode.
- you can use `bin/pulsar-admin functions getstatus  --tenant public --namespace default --name kafka` to query the running status
----
2018-10-09 20:48:39 UTC - Matteo Merli: That prints `Invalid source type 'kafka' -- Available sources are: [twitter]` since the kafka connector is not “available”
----
2018-10-09 20:49:51 UTC - Shay Rybak: yes I did download, this is running in cluster mode
----
2018-10-09 20:50:19 UTC - Shay Rybak: ```$ bin/pulsar-admin functions getstatus  --tenant public --namespace default --name kafka
{
  "functionStatusList": [
    {
      "numRestarts": "69",
      "instanceId": "0",
      "workerId": "c-dal12-02-fw-&lt;host&gt;-8080"
    }
  ]
}```
----
2018-10-09 20:50:58 UTC - Sijie Guo: @Shay Rybak: if it is in cluster mode, you need to point bootstrapServers to the kafka brokers address
----
2018-10-09 20:51:28 UTC - Shay Rybak: kafka is running in a cluster on the same nodes, so that broker address is fine for all nodes
----
2018-10-09 20:51:49 UTC - Sijie Guo: oh i see
----
2018-10-09 20:51:49 UTC - Matteo Merli: ```
13:50:34.861 [public/default/kafka-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public-kafka] Uncaught exception in Java Instance
java.lang.NullPointerException: null
	at org.apache.pulsar.io.kafka.KafkaAbstractSource.open(KafkaAbstractSource.java:59) ~[pulsar-io-kafka-2.2.0-SNAPSHOT.nar-unpacked/:?]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupInput(JavaInstanceRunnable.java:570) ~[java-instance.jar:2.2.0-SNAPSHOT]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupJavaInstance(JavaInstanceRunnable.java:162) ~[java-instance.jar:2.2.0-SNAPSHOT]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:188) [java-instance.jar:2.2.0-SNAPSHOT]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
```
----
2018-10-09 20:52:34 UTC - Matteo Merli: Some of the parameters missing from  the yml file are causing the NPE
----
2018-10-09 20:52:54 UTC - Matteo Merli: (though I think that should be fixed in the source itself)
----
2018-10-09 20:53:02 UTC - Sijie Guo: oh i see
----
2018-10-09 20:53:04 UTC - Shay Rybak: ```bin/pulsar-admin source available-sources
kafka
Kafka source and sink connector
----------------------------------------
rabbitmq
RabbitMQ source connector
----------------------------------------
twitter
Ingest data from Twitter firehose
----------------------------------------```
----
2018-10-09 20:54:26 UTC - Shay Rybak: so is this a configuration issue on my end or is the source broken?
----
2018-10-09 20:54:31 UTC - Sijie Guo: one second
----
2018-10-09 20:55:04 UTC - Matteo Merli: @Shay Rybak try with:

```
configs:
  bootstrapServers: localhost:9092
  topic: pulsar
  groupId: pulsar
  fetchMinBytes: 1024
  autoCommitIntervalMs: 1000
  sessionTimeoutMs: 30000
```

In the Yaml file
----
2018-10-09 20:56:28 UTC - Nathanial Murphy: Doesn't that mean only a single consumer would still be dealing with a single batch? It's a better problem to have, admittedly. 
----
2018-10-09 20:57:04 UTC - Sijie Guo: @Matteo Merli you are fast
----
2018-10-09 20:57:18 UTC - Shay Rybak: ```bin/pulsar-admin functions getstatus  --tenant public --namespace default --name kafka
{
  "functionStatusList": [
    {
      "running": true,
      "instanceId": "0",
      "metrics": {
        "metrics": {
          "__total_processed__": {},
          "__total_successfully_processed__": {},
          "__total_system_exceptions__": {},
          "__total_user_exceptions__": {},
          "__total_serialization_exceptions__": {},
          "__avg_latency_ms__": {}
        }
      },
      "workerId": &lt;&gt;"
    }
  ]
}```
----
2018-10-09 20:57:32 UTC - Shay Rybak: looks better but still can’t fetch messages
----
2018-10-09 20:57:43 UTC - Matteo Merli: Yes. Batching is still a way to have higher throughput at lower cost. If you have high throughput is still better to handle the dispatching in batches rather than for every message
----
2018-10-09 21:00:51 UTC - Matteo Merli: Seeing exception:

```
Exception in thread "Kafka Source Thread" java.lang.ClassCastException: java.lang.String cannot be cast to [B
	at org.apache.pulsar.io.kafka.KafkaStringSource.extractValue(KafkaStringSource.java:31)
	at org.apache.pulsar.io.kafka.KafkaStringSource.extractValue(KafkaStringSource.java:28)
	at org.apache.pulsar.io.kafka.KafkaAbstractSource.lambda$start$0(KafkaAbstractSource.java:109)
	at java.lang.Thread.run(Thread.java:745)
```
----
2018-10-09 21:01:34 UTC - Sijie Guo: which version of this?
----
2018-10-09 21:01:51 UTC - Sijie Guo: I think this is in between some changes.
----
2018-10-09 21:02:11 UTC - Sijie Guo: latest master?
----
2018-10-09 21:02:25 UTC - Matteo Merli: yes, probably is just on my machine
----
2018-10-09 21:02:35 UTC - Shay Rybak: I’m using 2.1.1 binary
----
2018-10-09 21:04:32 UTC - Sijie Guo: one second
----
2018-10-09 21:05:21 UTC - Nathanial Murphy: I'm guessing there's no facility to both ack and publish new messages as part of the same "transaction" either? I imagine that would alleviate some of the pains of two consumers publishing the same response to the same batched message. 
----
2018-10-09 21:09:45 UTC - Matteo Merli: Not yet, though there are plans to work on transactions
----
2018-10-09 21:12:14 UTC - Nathanial Murphy: Sweet. So it sounds like it could be feasible. I guess I have a proposal to attempt. :stuck_out_tongue:
----
2018-10-09 21:12:42 UTC - Sijie Guo: @Matteo Merli: I think your change sounds like different (which might be related schema changes in master).
----
2018-10-09 21:14:05 UTC - Matteo Merli: Definitely feasible, just a matter of time :slightly_smiling_face:
----
2018-10-09 21:14:08 UTC - Sijie Guo: @Shay Rybak: on your pulsar cluster, if you go the node that ‘workerId’ points to, you can see the connector logs under ${PULSAR_HOME}/logs/public/default/kafka
----
2018-10-09 21:16:25 UTC - Shay Rybak: ```15:55:47.177 [Kafka Source Thread] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Successfully joined group pulsar with generation 1
15:55:47.178 [Kafka Source Thread] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [pulsar-0] for group pulsar```
----
2018-10-09 21:16:39 UTC - Shay Rybak: these are the last messages in that log, no errors that I can see
----
2018-10-09 21:18:04 UTC - Sijie Guo: are you keeping publishing data to the kafka topic?
----
2018-10-09 21:21:08 UTC - Shay Rybak: yes, I publish every few minutes to make sure
----
2018-10-09 21:21:30 UTC - Shay Rybak: I deleted and recreated the source, cleaning the log in between
----
2018-10-09 21:21:45 UTC - Shay Rybak: ```16:19:50,810 INFO [public/default/kafka] [instance-0] JavaInstanceRunnable - Starting Java Instance kafka
16:19:50,874 INFO [public/default/kafka] [instance-0] JavaInstanceRunnable - Initialize function class loader for function kafka at function cache manager```
----
2018-10-09 21:21:56 UTC - Shay Rybak: that is all that is in the log
----
2018-10-09 21:22:26 UTC - Shay Rybak: `logs/functions/public/default/kafka/kafka-0.log`
----
2018-10-09 21:22:31 UTC - Shay Rybak: that is the location of the log file
----
2018-10-09 21:23:26 UTC - Sijie Guo: okay give me a few minutes
----
2018-10-09 21:40:51 UTC - Sijie Guo: configs:
  bootstrapServers: localhost:9092
  topic: pulsar
  groupId: pulsar
  fetchMinBytes: 1024
  autoCommitIntervalMs: 1000
  sessionTimeoutMs: 30000
  valueDeserializationClass: org.apache.kafka.common.serialization.ByteArrayDeserializer
----
2018-10-09 21:40:57 UTC - Sijie Guo: ```
configs:
  bootstrapServers: localhost:9092
  topic: pulsar
  groupId: pulsar
  fetchMinBytes: 1024
  autoCommitIntervalMs: 1000
  sessionTimeoutMs: 30000
  valueDeserializationClass: org.apache.kafka.common.serialization.ByteArrayDeserializer
```
----
2018-10-09 21:41:09 UTC - Sijie Guo: @Shay Rybak can you try this one?
----
2018-10-09 21:41:29 UTC - Sijie Guo: set the valueDeserializationClass for kafka connector to ByteArrayDeserializer
----
2018-10-09 21:41:51 UTC - Shay Rybak: trying
----
2018-10-09 21:42:23 UTC - Shay Rybak: works
----
2018-10-09 21:42:29 UTC - Sijie Guo: great
----
2018-10-09 21:42:40 UTC - Sijie Guo: although I think the kafka source connetor needs to be improved.
----
2018-10-09 21:43:11 UTC - Sijie Guo: I will handle those improvements :slightly_smiling_face:
----
2018-10-09 21:43:35 UTC - Shay Rybak: Thank you for your assistance
----
2018-10-09 21:43:37 UTC - Sijie Guo: I will also improve the documentation
----
2018-10-09 21:43:49 UTC - Shay Rybak: maybe paste that config as an example
----
2018-10-09 21:43:53 UTC - Sijie Guo: yes
----
2018-10-09 21:44:07 UTC - Shay Rybak: feel free to use this as well `bin/pulsar-admin source create --tenant public --namespace default --name kafka --sourceConfigFile examples/kafka-source.yaml --source-type kafka --destinationTopicName kafka`
----
2018-10-09 21:44:11 UTC - Sijie Guo: I will add examples configs
----
2018-10-09 21:44:24 UTC - Sijie Guo: yes :+1:
----
2018-10-09 21:44:33 UTC - Sijie Guo: I will use your command :slightly_smiling_face:
----
2018-10-09 21:45:49 UTC - Shay Rybak: :pray:
----
2018-10-10 02:40:30 UTC - jerser: @jerser has joined the channel
----
2018-10-10 03:45:38 UTC - alex xu: @alex xu has joined the channel
----
2018-10-10 04:29:29 UTC - science09: @science09 has joined the channel
----
2018-10-10 07:13:52 UTC - Nicolas Ha: Still haven’t got to using Daemonset in the helm kubernetes - hopefully I get to play today :smile:
----