You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2020/04/27 09:11:05 UTC

Slack digest for #general - 2020-04-27

2020-04-26 09:48:19 UTC - Gilles Barbier: Hi - I’m wondering if there is a practical or hard limitation about the size of a function’s state? Does someone know?
----
2020-04-26 10:49:42 UTC - Patrik Kleindl: Maybe disk space available to bookkeeper as rocksdb writes to disk. And if compression is enabled in rocksdb.
Maybe the maximum size of a single message too if that is relevant to Pulsar. 
----
2020-04-26 12:11:31 UTC - Hi Im Fyodor: @Hi Im Fyodor has joined the channel
----
2020-04-26 12:13:04 UTC - Hi Im Fyodor: Hi everyone
wave : Frans Guelinckx, David Kjerrumgaard
----
2020-04-26 13:26:59 UTC - Vladimir Shchur: Hello! I'm testing schema autoupdate, but unable to get it right. After adding a new field to schema from producer side, it refuses to connect although autoupdate strategy is set to FULL on namespace. I expected that new field addition would result in compatible schema(2.5.0 server). Here is the error:
```org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: <http://org.apache.avro.Sc|org.apache.avro.Sc>
hemaValidationException: Unable to read schema: 
{
  "type" : "record",
  "name" : "SimpleRecord",
  "namespace" : "Pulsar.Client.IntegrationTests",
  "fields" : [ {
    "name" : "Name",
    "type" : [ "string", "null" ]
  }, {
    "name" : "Age",
    "type" : [ "int", "null" ]
  } ]
}
using schema:
{
  "type" : "record",
  "name" : "SimpleRecord",
  "namespace" : "Pulsar.Client.IntegrationTests",
  "fields" : [ {
    "name" : "Name",
    "type" : [ "string", "null" ]
  }, {
    "name" : "Age",
    "type" : [ "int", "null" ]
  }, {
    "name" : "Surname",
    "type" : [ "string", "null" ]
  } ]
}```
----
2020-04-26 15:59:41 UTC - Ebere Abanonu: @Sijie Guo does pulsar function register schema for input topics? Cause am facing an issue with
```org.apache.pulsar.broker.service.schema.AvroSchemaBasedCompatibilityCheck```
----
2020-04-26 16:00:24 UTC - Ebere Abanonu: When I retrieve the schema I see it is of type JSON
----
2020-04-26 16:24:00 UTC - David Kjerrumgaard: The function state is stored inside the BookKeeper layer, whicis an infinitely scalable storage layer. However, the real limitation might be when you read the state out of the BK layer and serialize back into your Function. At that point the object size would be limited by the amount of memory allocated to the Function.
----
2020-04-26 16:25:32 UTC - David Kjerrumgaard: For example, if you were storing an ArrayList inside the function state and slowly adding entries. Eventually when you read that map into memory you might get an OOM exception.
----
2020-04-26 16:26:20 UTC - Gilles Barbier: Thanks,  is there a settings for the memory allocated to the function, or is it harware-based?
----
2020-04-26 16:29:24 UTC - David Kjerrumgaard: 1. That is a good question, I would have to look at the code to know for sure. 2. When you enable TLS on the broker, then all the incoming traffic will be encrypted automatically. This is just how TLS works and is independent of Pulsar. If you are able to see plain text on that connection then there must be an issue with the configuration itself.
----
2020-04-26 16:33:01 UTC - David Kjerrumgaard: @Vladimir Shchur Can you post the producer and consumer code snippets?  Are you specifying the Schema in the producer code?
----
2020-04-26 16:35:30 UTC - David Kjerrumgaard: You can specify these when creating the function, see <https://pulsar.apache.org/docs/en/functions-deploy/#function-instance-resources> .
+1 : Gilles Barbier
----
2020-04-26 17:09:46 UTC - Vladimir Shchur: @David Kjerrumgaard, yes, I'm specifying schema in producer code, first time specified two fields and it worked, then I added one more field and expected version  change, but not incompatible avro schema exception. In exception old schema is on the top, new on the bottom
----
2020-04-26 17:32:56 UTC - David Kjerrumgaard: Is this inside a Function or just a normal client?
----
2020-04-26 17:34:41 UTC - David Kjerrumgaard: How are you specifying the schema type when you are publishing the message? e.g. `ctx.newOutputMessage(geoEncoderTopic, AvroSchema._of_(Address.*class*))`
----
2020-04-26 17:49:34 UTC - Sijie Guo: Yes. You need to configure these two settings to make sure the broker-to-broker communication work.
----
2020-04-26 17:50:00 UTC - Sijie Guo: There are some admin calls require broker-to broker communication.
----
2020-04-26 17:50:16 UTC - Sijie Guo: It is needed if you need to configure geo replication.
----
2020-04-26 17:52:30 UTC - Vladimir Shchur: It is a normal client, not function, The error happens on producer registration, not on message sending. The source code won't help you, since I'm adding schema support to .net client.
----
2020-04-26 17:54:32 UTC - Sijie Guo: I talked about authn and authz in one of the TGIP episodes. <https://youtu.be/sTISVpyq73o|https://youtu.be/sTISVpyq73o> Hope that is useful for you.
----
2020-04-26 18:05:14 UTC - Sijie Guo: Yes. A function is essentially a consumer. So it would use the type inferred or specified when submitting a function to consume the messages from the input topics.

The AvroSchemaBasedCompatibilityCheck is a JSON schema compatibility check. It just leverages AVRO for schema specification.
----
2020-04-26 18:06:07 UTC - Sijie Guo: you don’t need to set all fields. You just need to set one of the fields.
----
2020-04-26 18:13:07 UTC - Sijie Guo: @Subash Kunjupillai -

&gt;  I couldn’t see any step to encrypt the traffic between Broker and Bookie.
For encrypting the traffic between Broker and Bookie, you need to configure the brokers to enable TLS  for the bookkeeper clients used by brokers.

A few references:

1. All settings you need to configure in broker.conf are prefixed with `bookkeeperTLS`. <https://github.com/apache/pulsar/blob/master/conf/broker.conf#L551>
2. Bookkeeper TLS documentation: <https://bookkeeper.apache.org/docs/latest/security/tls/>
3. Pulsar Helm Chart provides TLS support for all components including bookies and zk. If you need a fully TLS encrypted cluster, you can use it as a reference: <https://github.com/apache/pulsar-helm-chart>
4. I recorded a TGIPulsar episode talking about TLS encryption.  Starting from around 30:00, I used a diagram to show how to configure a fully TLS encrypted cluster.
<https://www.youtube.com/watch?v=aP31A-ntHLA>
----
2020-04-26 18:19:52 UTC - Sijie Guo: Not sure how did you generate the schema. Adding a new field is not FULL compatible. Only adding an optional field is FULL compatible.

Changing the third field to:

```{
   "name": "Surname",
   "type": [ "null", "string"],
   "default": null
}```
----
2020-04-26 18:27:29 UTC - Vladimir Shchur: Didn't know that, will try, thank you!
----
2020-04-26 18:37:14 UTC - David Kjerrumgaard: Is there a PIP to add the dead letter queue capability to Pulsar Functions?
----
2020-04-26 23:40:02 UTC - Ebere Abanonu: If the function consumer registers the schema it set its type to JSON then CompatibilityCheck fails when creating a producer with avro schema. Reverse is the case as well. If a schema with type avro is created function crashes. How do I make function consumer to use avro?
----
2020-04-27 00:33:27 UTC - Ebere Abanonu: ```00:31:01.166 [public/default/Covid19-function-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - Source open produced uncaught exception: 
2020-04-27T00:31:01.208229081Z java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: org.apache.avro.SchemaValidationException: Unable to read schema: 
2020-04-27T00:31:01.208236381Z {
2020-04-27T00:31:01.208241781Z   "type" : "record",
2020-04-27T00:31:01.208247581Z   "name" : "Covid19Mobile",
2020-04-27T00:31:01.208253381Z   "namespace" : "Samples",
2020-04-27T00:31:01.208267381Z   "fields" : [ {
2020-04-27T00:31:01.208273081Z     "name" : "DeviceId",
2020-04-27T00:31:01.208278481Z     "type" : [ "null", "string" ],
2020-04-27T00:31:01.208283781Z     "default" : null
2020-04-27T00:31:01.208288482Z   }, {
2020-04-27T00:31:01.208293382Z     "name" : "Latitude",
2020-04-27T00:31:01.208298182Z     "type" : [ "null", "double" ],
2020-04-27T00:31:01.208303282Z     "default" : 0
2020-04-27T00:31:01.208308082Z   }, {
2020-04-27T00:31:01.208312582Z     "name" : "Longitude",
2020-04-27T00:31:01.208317482Z     "type" : [ "null", "double" ],
2020-04-27T00:31:01.208322582Z     "default" : 0
2020-04-27T00:31:01.208328082Z   }, {
2020-04-27T00:31:01.208333082Z     "name" : "Time",
2020-04-27T00:31:01.208338582Z     "type" : [ "null", "long" ],
2020-04-27T00:31:01.208344282Z     "default" : 0
2020-04-27T00:31:01.208348782Z   } ]
2020-04-27T00:31:01.208353282Z }
2020-04-27T00:31:01.208357782Z using schema:
2020-04-27T00:31:01.208362382Z {
2020-04-27T00:31:01.208367082Z   "type" : "record",
2020-04-27T00:31:01.208371782Z   "name" : "Covid19Mobile",
2020-04-27T00:31:01.208376982Z   "fields" : [ {
2020-04-27T00:31:01.208381282Z     "name" : "DeviceId",
2020-04-27T00:31:01.208386482Z     "type" : [ "null", "string" ],
2020-04-27T00:31:01.208391382Z     "default" : null
2020-04-27T00:31:01.208396082Z   }, {
2020-04-27T00:31:01.208400382Z     "name" : "Latitude",
2020-04-27T00:31:01.208405182Z     "type" : "double"
2020-04-27T00:31:01.208410382Z   }, {
2020-04-27T00:31:01.208415582Z     "name" : "Longitude",
2020-04-27T00:31:01.208420982Z     "type" : "double"
2020-04-27T00:31:01.208426182Z   }, {
2020-04-27T00:31:01.208430482Z     "name" : "Time",
2020-04-27T00:31:01.208435582Z     "type" : "long"
2020-04-27T00:31:01.208440382Z   } ]
2020-04-27T00:31:01.208444882Z }
2020-04-27T00:31:01.208463182Z 	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_242]
2020-04-27T00:31:01.208469182Z 	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_242]
2020-04-27T00:31:01.208474182Z 	at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:714) ~[?:1.8.0_242]
2020-04-27T00:31:01.208488982Z 	at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701) ~[?:1.8.0_242]
2020-04-27T00:31:01.208496882Z 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_242]
2020-04-27T00:31:01.208500382Z 	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_242]
2020-04-27T00:31:01.208503582Z 	at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:609) ~[org.apache.pulsar-pulsar-client-original-2.5.1.jar:2.5.1]
2020-04-27T00:31:01.208506482Z 	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:171) ~[org.apache.pulsar-pulsar-common-2.5.1.jar:2.5.1]
2020-04-27T00:31:01.208510582Z 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
2020-04-27T00:31:01.208513682Z 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
2020-04-27T00:31:01.208516682Z 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
2020-04-27T00:31:01.208519983Z 	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[io.netty-netty-codec-4.1.45.Final.jar:4.1.45.Final]
2020-04-27T00:31:01.208523083Z 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[io.netty-netty-codec-4.1.45.Final.jar:4.1.45.Final]
2020-04-27T00:31:01.208526083Z 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
2020-04-27T00:31:01.208529383Z 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
2020-04-27T00:31:01.208532683Z 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
2020-04-27T00:31:01.208536083Z 	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
2020-04-27T00:31:01.208539383Z 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
2020-04-27T00:31:01.208542683Z 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
2020-04-27T00:31:01.208545783Z 	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
2020-04-27T00:31:01.208549683Z 	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[io.netty-netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
2020-04-27T00:31:01.208552983Z 	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[io.netty-netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
2020-04-27T00:31:01.208559683Z 	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[io.netty-netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
2020-04-27T00:31:01.208562983Z 	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
2020-04-27T00:31:01.208566083Z 	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
2020-04-27T00:31:01.208569383Z 	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
2020-04-27T00:31:01.208572783Z 	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
2020-04-27T00:31:01.208576183Z Caused by: org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: org.apache.avro.SchemaValidationException: Unable to read schema: 
2020-04-27T00:31:01.208579883Z {
2020-04-27T00:31:01.208582783Z   "type" : "record",
2020-04-27T00:31:01.208585583Z   "name" : "Covid19Mobile",
2020-04-27T00:31:01.208588483Z   "namespace" : "Samples",
2020-04-27T00:31:01.208591383Z   "fields" : [ {
2020-04-27T00:31:01.208594283Z     "name" : "DeviceId",
2020-04-27T00:31:01.208597283Z     "type" : [ "null", "string" ],
2020-04-27T00:31:01.208600283Z     "default" : null
2020-04-27T00:31:01.208603283Z   }, {
2020-04-27T00:31:01.208606183Z     "name" : "Latitude",
2020-04-27T00:31:01.208609183Z     "type" : [ "null", "double" ],
2020-04-27T00:31:01.208612383Z     "default" : 0
2020-04-27T00:31:01.208615183Z   }, {
2020-04-27T00:31:01.208618083Z     "name" : "Longitude",
2020-04-27T00:31:01.208621183Z     "type" : [ "null", "double" ],
2020-04-27T00:31:01.208624183Z     "default" : 0
2020-04-27T00:31:01.208627083Z   }, {
2020-04-27T00:31:01.208629983Z     "name" : "Time",
2020-04-27T00:31:01.208632983Z     "type" : [ "null", "long" ],
2020-04-27T00:31:01.208636283Z     "default" : 0
2020-04-27T00:31:01.208639183Z   } ]
2020-04-27T00:31:01.208642083Z }
2020-04-27T00:31:01.208645083Z using schema:
2020-04-27T00:31:01.208648083Z {
2020-04-27T00:31:01.208650983Z   "type" : "record",
2020-04-27T00:31:01.208654283Z   "name" : "Covid19Mobile",
2020-04-27T00:31:01.208657383Z   "fields" : [ {
2020-04-27T00:31:01.208660483Z     "name" : "DeviceId",
2020-04-27T00:31:01.208663083Z     "type" : [ "null", "string" ],
2020-04-27T00:31:01.208666083Z     "default" : null
2020-04-27T00:31:01.208672183Z   }, {
2020-04-27T00:31:01.208675083Z     "name" : "Latitude",
2020-04-27T00:31:01.208677883Z     "type" : "double"
2020-04-27T00:31:01.208680783Z   }, {
2020-04-27T00:31:01.208683583Z     "name" : "Longitude",
2020-04-27T00:31:01.208686583Z     "type" : "double"
2020-04-27T00:31:01.208689583Z   }, {
2020-04-27T00:31:01.208692383Z     "name" : "Time",
2020-04-27T00:31:01.208695283Z     "type" : "long"
2020-04-27T00:31:01.208698183Z   } ]
2020-04-27T00:31:01.208700983Z }
2020-04-27T00:31:01.208703883Z 	at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1000) ~[org.apache.pulsar-pulsar-client-original-2.5.1.jar:2.5.1]
2020-04-27T00:31:01.208707183Z 	... 21 more```
----
2020-04-27 01:00:00 UTC - Ebere Abanonu: how does custom-schema-inputs work?
----
2020-04-27 03:37:05 UTC - wuYin: When a group of new brokers join the cluster, does broker LoadManager Leader automatically transfer topic-partition ownership from the high-load brokers ?
----
2020-04-27 03:38:59 UTC - wuYin: Or do I need to manually expand partitions to trigger ownership change
----
2020-04-27 03:47:29 UTC - wuYin: sorry for this question, I found the doc….
<https://pulsar.apache.org/docs/en/administration-load-distribution/>
----
2020-04-27 05:08:39 UTC - Shivji Kumar Jha: @Vladimir Shchur can you try adding default to the newly added field(surname?) ?
----
2020-04-27 05:12:35 UTC - Rattanjot Singh: How can I test the pulsar functions processing guarantees of effectively once or atleast once? Any wiki?
----
2020-04-27 05:13:48 UTC - Alexandre DUVAL: Hi, there is a way to make configuration files use env var? Like in broker.conf, tokenPublicKey=$SYSTEM_ENV_VAR_X ?
----
2020-04-27 05:57:33 UTC - Adi Surya Nathanael: @Adi Surya Nathanael has joined the channel
----
2020-04-27 06:16:51 UTC - Gilles Barbier: Thanks
----
2020-04-27 06:36:33 UTC - Sijie Guo: Currently it doesn’t support this way.

You can use `bin/apply-config-from-env.py` that take system variables and apply them to the configuration files.
----
2020-04-27 06:37:11 UTC - Sijie Guo: I don’t think so.
----
2020-04-27 06:37:27 UTC - Sijie Guo: yes it happens automatically
----
2020-04-27 06:38:28 UTC - Sijie Guo: <http://pulsar.apache.org/docs/en/functions-overview/#processing-guarantees>
----
2020-04-27 06:39:16 UTC - Sijie Guo: You can kill the instances or inject failures in functions to test the processing guarantees.
----
2020-04-27 06:51:56 UTC - Erik Jansen: @Erik Jansen has joined the channel
----
2020-04-27 07:47:02 UTC - Mateusz Kieblesz: @Mateusz Kieblesz has joined the channel
----
2020-04-27 07:54:12 UTC - Alexandre DUVAL: There is an issue related to? Shall I create it?
----
2020-04-27 07:54:20 UTC - Alexandre DUVAL: (Didn't find it)
----