You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2019/05/04 09:11:04 UTC

Slack digest for #general - 2019-05-04

2019-05-03 10:46:52 UTC - Alexandre DUVAL: ```10:44:33.704 [pulsar-external-listener-17-1] INFO  org.apache.pulsar.functions.runtime.RuntimeSpawner - clevercloud/functions/RouteApplicationsAddonsLogs-0 RuntimeSpawner starting function
10:44:33.704 [pulsar-external-listener-17-1] INFO  org.apache.pulsar.functions.runtime.ProcessRuntime - Creating function log directory /pulsar/logs/functions/clevercloud/functions/RouteApplicationsAddonsLogs
10:44:33.704 [pulsar-external-listener-17-1] INFO  org.apache.pulsar.functions.runtime.ProcessRuntime - Created or found function log directory /pulsar/logs/functions/clevercloud/functions/RouteApplicationsAddonsLogs
10:44:33.704 [pulsar-external-listener-17-1] INFO  org.apache.pulsar.functions.runtime.ProcessRuntime - ProcessBuilder starting the process with args java -cp /pulsar/instances/java-instance.jar:/pulsar/instances/deps/* -Dpulsar.functions.java.instance.jar=/pulsar/instances/java-instance.jar -Dpulsar.functions.extra.dependencies.dir=/pulsar/instances/deps -Dlog4j.configurationFile=java_instance_log4j2.yml -Dpulsar.function.log.dir=/pulsar/logs/functions/clevercloud/functions/RouteApplicationsAddonsLogs -Dpulsar.function.log.file=RouteApplicationsAddonsLogs-0 -Xmx1073741824 org.apache.pulsar.functions.runtime.JavaInstanceMain --jar /tmp/pulsar_functions/clevercloud/functions/RouteApplicationsAddonsLogs/0/pulsar-functions-0.1.0-SNAPSHOT.jar --instance_id 0 --function_id 70d6d305-7f08-41fe-9f6b-f1ce1a09bf76 --function_version c0baae5b-4631-44cb-83bb-7c0da28a626a --function_details '{"tenant":"clevercloud","namespace":"functions","name":"RouteApplicationsAddonsLogs","className":"com.clevercloud.pulsar.function.RouteApplicationsAddonsLogs","logTopic":"log_topic","autoAck":true,"parallelism":1,"source":{"typeClassName":"java.lang.String","inputSpecs":{"clevercloud/logs/full":{}},"cleanupSubscription":true},"sink":{"typeClassName":"java.lang.Void"},"resources":{"cpu":1.0,"ram":"1073741824","disk":"10737418240"}}' --pulsar_serviceurl <pulsar://c1-pulsar-clevercloud-customers.services.clever-cloud.com:2001> --client_auth_plugin org.apache.pulsar.client.impl.auth.AuthenticationToken --client_auth_params token:TOKEN --use_tls false --tls_allow_insecure false --hostname_verification_enabled false --tls_trust_cert_path /etc/ssl/private/c1-pulsar-clevercloud-customers_services_clever-cloud_com/c1-pulsar-clevercloud-customers_services_clever-cloud_com.crt --max_buffered_tuples 1024 --port 35331 --metrics_port 35175 --expected_healthcheck_interval 30 --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider --cluster_name c1
10:44:33.706 [pulsar-external-listener-17-1] INFO  org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
Exception in thread "main" org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.ParameterException: Unknown option: --metrics_port
        at org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.JCommander.parseValues(JCommander.java:742)
        at org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.JCommander.parse(JCommander.java:282)
        at org.apache.pulsar.functions.runtime.shaded.com.beust.jcommander.JCommander.parse(JCommander.java:265)
        at org.apache.pulsar.functions.runtime.JavaInstanceMain.main(JavaInstanceMain.java:191)
```
----
2019-05-03 10:47:01 UTC - Alexandre DUVAL: `ParameterException: Unknown option: --metrics_port`
----
2019-05-03 10:47:53 UTC - Alexandre DUVAL: I'm running all using v2.4.0 from 30th of April except BK and ZK (yet), don't understand why I've got this issue.
----
2019-05-03 10:48:39 UTC - Alexandre DUVAL: The function instance arg "--metrics_port" is injected.
----
2019-05-03 10:50:52 UTC - Alexandre DUVAL: From RuntimeUtils.java
----
2019-05-03 10:53:22 UTC - Alexandre DUVAL: But JavaInstanceMain reject it :confused:.
----
2019-05-03 11:19:50 UTC - Alexandre DUVAL: --metrics_port is javainstancemain.class parameter in both versions and javainstancemain.java:191 has error about metrics_port
        ```server = ServerBuilder.forPort(port)
                .addService(new InstanceControlImpl(runtimeSpawner))
                .build()
                .start();```

in v2.3.1 or v2.4.0 :confused:
I'm a bit lost now.
----
2019-05-03 12:29:37 UTC - Laurent Chriqui: I made a fresh install of python3 and it cleared the problem.
----
2019-05-03 12:51:14 UTC - Sanjeev Kulkarni: @Alexandre DUVAL you are using process container factory right m?
----
2019-05-03 12:51:36 UTC - Alexandre DUVAL: I'm only using functions create from pulsar-admin.
----
2019-05-03 12:51:41 UTC - Sanjeev Kulkarni: Are you sure you have the right version of all the classes
----
2019-05-03 12:52:22 UTC - Alexandre DUVAL: I just downloaded the last snapshot from master.
----
2019-05-03 12:52:32 UTC - Alexandre DUVAL: pulsar-server-distribution-2.4.0-20190502.134020-76-bin.tar.gz
----
2019-05-03 12:52:40 UTC - Sanjeev Kulkarni: create itself should say created successfully but this error looks like when actually trying the start the process 
----
2019-05-03 12:52:43 UTC - Alexandre DUVAL: I'm only running this functions worker and this pulsar-admin.
----
2019-05-03 12:52:56 UTC - Alexandre DUVAL: No, I got the created sucessfully
----
2019-05-03 12:53:01 UTC - Alexandre DUVAL: from functions create
----
2019-05-03 12:53:13 UTC - Alexandre DUVAL: it's the running step from the functions_worker which is outputting this error
----
2019-05-03 12:56:30 UTC - Sanjeev Kulkarni: That’s very wierd. Can you check for javainstancemain class and see what version it is 
----
2019-05-03 13:56:57 UTC - Chris DiGiovanni: I'm looking over the Pulsar TLS configuration for Transport and Authentication.  In my situation we have an Internal CA which is fine for TLS Transport but  the internal group at my organization responsible for the CA will not generate the type of certs I need for authentication.  So I have two CA cert files I need Pulsar to Trust; my internal CA, and the one I create for TLS Authentication.  Looking through the source, it seems that Pulsar only expects one path via tlsTrustCertsFilePath.  Though what I cannot figure out is if Pulsar will take multiple certs in one PEM for tlsTrustCertsFilePath for this situation?  If it does not, are my options to just give the JVM the argument `-Djavax.net.ssl.trustStore` and pass my own trustStore with my CA certs in this?  Any guidance would be appreciated.
----
2019-05-03 14:04:17 UTC - Chris Bartholomew: @Chris DiGiovanni I set that to the default OS cert bundle PEM  (since I use Let's Encrypt certs) which contains 150 certs: ```tlsTrustCertsFilePath=/etc/ssl/certs/ca-certificates.crt
``` Works for me, no problem.
----
2019-05-03 14:20:16 UTC - Chris DiGiovanni: Thanks @Chris Bartholomew
----
2019-05-03 15:28:47 UTC - Thor Sigurjonsson: When I am setting up `functions_worker.yml` to enable functions alongside the configs in `broker.conf`there are fields that look the same between those files -- such as `authenticationEnabled` in the section under the comment "security settings for worker service". Do these need to mirror the settings in `broker.conf` or are they to be left blank if `functionsWorkerEnabled=true` (in `broker.conf) -- which enables the function worker to run in the broker itself?
----
2019-05-03 15:29:19 UTC - Thor Sigurjonsson: I;m specifically working to get functions worker to run with token auth enabled in the cluster.
----
2019-05-03 16:03:14 UTC - Alexandre DUVAL: How to create pulsaradmin client in pulsar function? ```Caused by: java.lang.NumberFormatException: null
        at java.lang.Integer.parseInt(Integer.java:542) ~[?:1.8.0_192]
        at java.lang.Integer.parseInt(Integer.java:615) ~[?:1.8.0_192]
        at org.apache.pulsar.shade.org.asynchttpclient.config.AsyncHttpClientConfigHelper$Config.getInt(AsyncHttpClientConfigHelper.java:85) ~[java-instance.jar:2.4.0-SNAPSHOT]
        at org.apache.pulsar.shade.org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultMaxRedirects(AsyncHttpClientConfigDefaults.java:134) ~[java-instance.jar:2.4.0-SNAPSHOT]
        at org.apache.pulsar.shade.org.asynchttpclient.DefaultAsyncHttpClientConfig$Builder.&lt;init&gt;(DefaultAsyncHttpClientConfig.java:670) ~[java-instance.jar:2.4.0-SNAPSHOT]
        at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.&lt;init&gt;(AsyncHttpConnector.java:74) ~[pulsar-functions-0.1.0-SNAPSHOT.jar-unpacked/:?]
        at org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider.getConnector(AsyncHttpConnectorProvider.java:43) ~[pulsar-functions-0.1.0-SNAPSHOT.jar-unpacked/:?]
        at org.apache.pulsar.client.admin.PulsarAdmin.&lt;init&gt;(PulsarAdmin.java:172) ~[pulsar-functions-0.1.0-SNAPSHOT.jar-unpacked/:?]
        at org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl.build(PulsarAdminBuilderImpl.java:42) ~[pulsar-functions-0.1.0-SNAPSHOT.jar-unpacked/:?]
        at com.yo.pulsar.function.RouteApplicationsAddonsLogs.&lt;init&gt;(RouteApplicationsAddonsLogs.java:49) ~[pulsar-functions-0.1.0-SNAPSHOT.jar-unpacked/:?]
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_192]
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_192]
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_192]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_192]
        at org.apache.pulsar.functions.utils.Reflections.createInstance(Reflections.java:118) ~[java-instance.jar:2.4.0-SNAPSHOT]
```
----
2019-05-03 16:03:24 UTC - Alexandre DUVAL: maxRedirect has no default value? How to fill it?
----
2019-05-03 16:08:39 UTC - Brian Doran: @Brian Doran has joined the channel
----
2019-05-03 16:13:07 UTC - Sanjeev Kulkarni: @Alexandre DUVAL how are you instantiating it
----
2019-05-03 16:13:56 UTC - Alexandre DUVAL: ```public class RouteApplicationsAddonsLogs implements Function&lt;String, Void&gt; {
  String UNCLASSIFIED_TOPIC = "<persistent://yo/logs/applications-addons-unclassified>";
  PulsarAdmin pulsarAdmin;

  public RouteApplicationsAddonsLogs() {
    try {
      pulsarAdmin = PulsarAdmin.builder()
        .allowTlsInsecureConnection(false)
        .enableTlsHostnameVerification(false)
        //.tlsTrustCertsFilePath(tlsTrustCertsFilePath)
        .serviceHttpUrl("<https://c1-pulsar-yo-customers.services.yo.com:2000>")
        .authentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", "token:TOKEN")
        .build();
  
    } catch (PulsarClientException e) {
      e.printStackTrace();
    }
  }

  @Override
  public Void process(String input, Context context) {
  ...```
----
2019-05-03 16:13:59 UTC - Alexandre DUVAL: @Sanjeev Kulkarni
----
2019-05-03 16:18:32 UTC - Sanjeev Kulkarni: i have never done this before. Let me work this at my end and I will get back to you
----
2019-05-03 16:19:42 UTC - Brian Doran: Hi all, just started using Pulsar as an alternative to my Kafka implementation. So for the schema registry with Kafka we currently use the Confluent Schema registry.

So we provide a `KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG` to the producer when creating our KafkaProducer.

We generate out Schema built dynamically as they can change according to fields in the record for any particular topic. I have a org.apache.avro.Schema and was wondering what's the best way to plug this into the `pulsarClient.newProducer(org.apache.pulsar.client.api.Schema)` method. Can this be done easily, a util method for converting avro.Schema to pulsar.Schema perhaps?
----
2019-05-03 16:19:46 UTC - David Kjerrumgaard: @Alexandre DUVAL What do you need the PulsarAdmin for?
----
2019-05-03 16:19:49 UTC - Brian Doran: Thanks in advance for any pointers
----
2019-05-03 16:24:40 UTC - David Kjerrumgaard: @Brian Doran Pulsar supports Avro schemas. <http://pulsar.apache.org/docs/en/concepts-schema-registry/>
----
2019-05-03 16:25:01 UTC - David Kjerrumgaard: So if you already have an existing schema you can just upload it and start using it.
----
2019-05-03 16:25:10 UTC - David Kjerrumgaard: <http://pulsar.apache.org/docs/en/admin-api-schemas/>
----
2019-05-03 16:25:21 UTC - David Kjerrumgaard: HTH
----
2019-05-03 16:28:34 UTC - Brian Doran: Hi @David Kjerrumgaard thanks for your quick response. Sure I understand that it supports Avro which is great, however we build up these schemas dynamically and they can change at runtime depending on various factors. Previously we handle this in our code with Confluent SR as our Generic Record will have a different schema associated with it.
----
2019-05-03 16:28:57 UTC - Brian Doran: Was hoping not to have to pre-upload the schemas.
----
2019-05-03 16:30:23 UTC - Alexandre DUVAL: create tenant and namespaces depending on message properties
----
2019-05-03 16:30:28 UTC - Alexandre DUVAL: if they not exist
----
2019-05-03 16:30:29 UTC - Brian Doran: I was just wondering if this can be done with Pulsar too ... I was just going to recreate the producers with the 'new' schema.
----
2019-05-03 16:30:43 UTC - Alexandre DUVAL: thanks, let me know your progress :wink:
----
2019-05-03 16:30:51 UTC - David Kjerrumgaard: @Brian Doran Ok, thank you for the additional background.  Your goal is to be able to dynamically create and update schemas inside the Pulsar schema registry?
----
2019-05-03 16:31:58 UTC - Brian Doran: yes..
----
2019-05-03 16:32:14 UTC - Alexandre DUVAL: the goal is to create tenant and namespaces dynamically from message properties informations if they not exist, maybe you have other way to do it?
----
2019-05-03 16:33:26 UTC - Sijie Guo: We have GenericSchemaBuilder and GenericRecordBuilder support in pulsar 2.4.0.
----
2019-05-03 16:33:35 UTC - Sanjeev Kulkarni: I general I would try to limit the use of pulsaradmin. It is kind of heavy for this purpose, a better way would be to use a much simpler rest api library to make the approrpriate rest call
----
2019-05-03 16:33:46 UTC - David Kjerrumgaard: That seems to complicate your function unnecessarily.  Can't you impose a requirement that the tenant and namespace already exist.
----
2019-05-03 16:34:01 UTC - Brian Doran: Generally what we do when a new Schema is generated is just start creating GenericRecords with this new schema. However, with Pulsar I was going to create a new Producer for that topic with the new Schema after closing the old producers.
----
2019-05-03 16:34:48 UTC - Sijie Guo: currently Schema is not well documented in Pulsar website. I have been working on adding the documentation for Pulsar. I will have a PR out in a few days.
+1 : David Kjerrumgaard
----
2019-05-03 16:34:57 UTC - Sanjeev Kulkarni: or atleast use a alternate mechanism for creating the tenant/namespace other than pulsaradmin. Pulsaradmin is kind of heavy weight for this purpose
----
2019-05-03 16:35:38 UTC - David Kjerrumgaard: The the pulsar admin API I referenced above is also supported in the Java library, so you could in theory register new schemas as you "discover" and create them
----
2019-05-03 16:37:53 UTC - Sijie Guo: @Brian Doran :

The schema versioning, GenericSchemaBuilder and GenericRecord and GenericRecordBuilder is going to be released in 2.4.0.

Documentation around that part will be out in a few days.
+1 : David Kjerrumgaard
----
2019-05-03 16:38:43 UTC - Alexandre DUVAL: Can't have this requirement :/.
----
2019-05-03 16:39:06 UTC - Alexandre DUVAL: So use the rest api and http client? To create tenant then namespace ?
----
2019-05-03 16:39:06 UTC - David Kjerrumgaard: Thanks @Sijie Guo I was trying to find a link to the docs to share with Brian  :smiley:
----
2019-05-03 16:40:17 UTC - Alexandre DUVAL: In Pulsar function, context.publish can't create namespace and tenant, right? Only topic if it not existe?
----
2019-05-03 16:40:31 UTC - Sanjeev Kulkarni: that is correct
----
2019-05-03 16:40:35 UTC - Alexandre DUVAL: Okay
----
2019-05-03 16:41:00 UTC - Sijie Guo: Schema is not well documented. Folks from zhaopin have been contributing a lot of schema features recently. I have been working with them on the documentation part. so stay tuned.
----
2019-05-03 16:41:07 UTC - Alexandre DUVAL: Okay then i will make it as rest api calls.
----
2019-05-03 16:41:23 UTC - Sanjeev Kulkarni: that would be better. let us know how it goes
----
2019-05-03 16:43:48 UTC - Brian Doran: OK great .. thanks for the help lads, I will wait for 2.4.0 when is that due to drop?
----
2019-05-03 16:45:33 UTC - David Kjerrumgaard: @Brian Doran 2.4.0 is the latest on the master branch, so you can clone it and build it locally if you like
----
2019-05-03 16:46:25 UTC - Brian Doran: @David Kjerrumgaard I've just done it. Thanks!
----
2019-05-03 16:46:51 UTC - Sijie Guo: @Brian Doran there were discussions on wrapping up 2.4.0. so I would expect it will come out soon.
+1 : Brian Doran
----
2019-05-03 16:51:26 UTC - Brian Doran: Just a note, I don;t see GenericSchemaBuilder on master yet. Am I missing something?
----
2019-05-03 16:53:28 UTC - Sijie Guo: @Brian Doran <https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaBuilder.java>
+1 : Brian Doran
----
2019-05-03 17:14:28 UTC - Brian Doran: @Sijie Guo One last question .. if I already have a avro.Schema is there an easy way to convert to pulsar.Schema?
----
2019-05-03 17:51:13 UTC - Matteo Merli: @Brian Doran You can use:

```
Schema.AVRO(
   SchemaDefinition.builder()
         .withJsonDef(avroJsonSchema)
         .build()
   );
```
----
2019-05-03 20:34:17 UTC - Patrick Lange: Does anyone know how to get the python 3.7 pulsar-client to work with python from anaconda on MacOS? The 2.0.1 client under python 3.6 worked  but `pulsar-client==2.3.1` segfaults on `import _pulsar` with python3.7 (from  conda). When I use homebrew python3.7 it works.

Homebrew python
&gt; Python 3.7.3 (default, Mar 27 2019, 09:23:15)
&gt; [Clang 10.0.1 (clang-1001.0.46.3)] on darwin

Anaconda python
&gt;Python 3.7.3 (default, Mar 27 2019, 16:54:48)
&gt;[Clang 4.0.1 (tags/RELEASE_401/final)] :: Anaconda, Inc. on darwin
----
2019-05-03 21:27:00 UTC - Jason Gu: @Jason Gu has joined the channel
----
2019-05-03 23:12:57 UTC - Poule: @Patrick Lange my python segfaults too
----
2019-05-03 23:13:19 UTC - Patrick Lange: it does work for me with the python3 installed from homebrew
----
2019-05-03 23:13:42 UTC - Vincent Ngan: On my way.
----
2019-05-03 23:33:41 UTC - Poule: @Patrick Lange thanks buddy it solved my problem
----
2019-05-04 00:42:57 UTC - Poule: how can I trigger a function from a python script?
----
2019-05-04 01:11:55 UTC - Jonathan Pearl: @Jonathan Pearl has joined the channel
----
2019-05-04 01:19:59 UTC - Jonathan Pearl: Hey all, newby question coming from a Kafka background. When using partitioned topics, is there any notification on the consuming side which partitions you'll be receiving messages from?
I guess I'm looking for an analog to Kafka's rebalance callback to know when it is safe to offload a cache or when I should start fetching.
----
2019-05-04 01:21:00 UTC - Jonathan Pearl: Similarly, is there a way to control which consumers get which partitions? In Kafka you could set an "assigner" that the group leader would determine the partition spread. Out-of-the box options were round-robin and range assignment.
----
2019-05-04 01:24:07 UTC - Matteo Merli: &gt; When using partitioned topics, is there any notification on the consuming side which partitions you’ll be receiving messages from?

Yes, you can attach a consumer event listener for that. <https://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#consumerEventListener-org.apache.pulsar.client.api.ConsumerEventListener->
----
2019-05-04 01:25:09 UTC - Jonathan Pearl: Ah, would also apply to a shared subscription? The description makes me think it's only for failover
----
2019-05-04 01:25:36 UTC - Matteo Merli: yes, only for failover. in shared mode each consumer is getting messages from all the partitions
----
2019-05-04 01:25:56 UTC - Matteo Merli: &gt; Similarly, is there a way to control which consumers get which partitions? In Kafka you could set an “assigner” that the group leader would determine the partition spread. Out-of-the box options were round-robin and range assignment.

If you want to manually assign the partitions to consumers, you can directly subscribe to the individual partitions, instead of the partitioned topic. eg: `TOPIC-partition-5` and so on
----
2019-05-04 01:29:26 UTC - Jonathan Pearl: So if I wanted something like a dynamically scaled subscription with exclusive partitions per member to get to something similar to a Kafka consumer group, that would need to be built on top of Pulsar?
----
2019-05-04 01:30:41 UTC - Matteo Merli: &gt; dynamically scaled subscription with exclusive partitions per member

What do you refer exactly with this?
----
2019-05-04 01:32:03 UTC - Matteo Merli: to rephrase, what kind of assignment behavior (consumer -&gt; partitions) are you looking at?
----
2019-05-04 01:35:46 UTC - Jonathan Pearl: Basically a partition would ever only get served to one consumer at a time
----
2019-05-04 01:36:26 UTC - Jonathan Pearl: So if a consumer joined or left a subscription, its partitions would be coordinated with the others who would give up their own partitions (or would give them partitions when they left)
----
2019-05-04 01:37:53 UTC - Jonathan Pearl: Poor drawing ahead
----
2019-05-04 01:40:09 UTC - Jonathan Pearl: It's my main appeal to Kafka as it allows things to cache nicely if you are aware of your own partition strategy, as well as some notion of exclusivity knowing that other consumers in your group/subscription wouldn't be operating on messages that had the same partition
----
2019-05-04 01:42:29 UTC - Matteo Merli: So, in Pulsar with a failover subscription over a partitioned topic, you’ll get that semantic: “One consumer active on each partition”.  The only thing is that the assignment is always decided by broker.
----
2019-05-04 01:45:08 UTC - Jonathan Pearl: My assumption with failover is that all of the partitions would go to 1 consumer, and the other consumers in the subscription would get nothing.
From <http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/SubscriptionType.html#Failover>: `Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.`
----
2019-05-04 01:46:30 UTC - Matteo Merli: Yes, that’s the semantic within one partition. With a partitioned topic, the brokers will choose a different “active” consumer for each partition (in round-robin fashion)
----
2019-05-04 01:47:21 UTC - Jonathan Pearl: Oh! Gotcha, my bad. Thanks! Unfortunate that the assignment is fixed.
----
2019-05-04 01:47:57 UTC - Jonathan Pearl: assignment strategy*
----
2019-05-04 01:48:14 UTC - Matteo Merli: Going back to the question, how you’d want to customize that?
----
2019-05-04 01:51:37 UTC - Jonathan Pearl: Ideally, I'd like to minimize the number of partitions changing hands whenever a subscription membership changes. With round-robin, you could see each consumer getting affected. This could get expensive depending on the cleverness going on as far as caching on partition de/activation.
----
2019-05-04 01:52:40 UTC - Jonathan Pearl: <https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html> tries to keep partitions to their previous owners, but if it was up to the client, it might have more context on which partitions are more "expensive" than others
----
2019-05-04 01:54:31 UTC - Matteo Merli: So, if that would help, the round-robin assigment in Pulsar is deterministic. It’s based on a simple schema that doesn’t use any coordinations.

eg: each consumer has a “consumerName” which can be set and by default is a random string. Broker will sort all available consumer by their name and will pick the 1st consumer for partition-0, the 2nd for partition-1 and so on.
----
2019-05-04 01:55:45 UTC - Matteo Merli: for the re-assignment there is currently a “grace-time” of 1sec by default (I think) within which the partition is not re-assigned.
----
2019-05-04 01:56:19 UTC - Matteo Merli: so, in case of a quick reconnect, the partition will not be handed over to another consumer
----
2019-05-04 01:57:19 UTC - Matteo Merli: In any case, it might be interesting to have that grace-time be more configurable (right now is per-broker) so that each consumer could specify its own value
----
2019-05-04 01:57:34 UTC - Jonathan Pearl: Ah, that's cool. Would it be possible for a consumer to assume another consumer's name and "inherit" its assignment if it started up fast enough?
----
2019-05-04 01:57:57 UTC - Jonathan Pearl: Or does it need to be the same client.
----
2019-05-04 01:57:58 UTC - Matteo Merli: Yes, you can specify the name in `ConsumerBuilder`
----
2019-05-04 01:58:09 UTC - Matteo Merli: or it will get a random one
----
2019-05-04 02:00:43 UTC - Jonathan Pearl: I think that might help a bit, but I can still see round-robin still being an issue in legitimate up-scaling and down-scaling scenarios. But I suppose implanting a new assignment strategy in Pulsar wouldn't be impossible! :stuck_out_tongue:
----
2019-05-04 02:01:00 UTC - Matteo Merli: definitely not impossible :smile:
----
2019-05-04 02:01:02 UTC - Jonathan Pearl: Thanks for all the great tips!
+1 : Matteo Merli
----
2019-05-04 02:05:44 UTC - Jonathan Pearl: Actually, while I have you here, how heavy are reassignments? As in, how long do they take to resolve, and are messages stopped during this time for all partitions?
----
2019-05-04 02:08:39 UTC - Matteo Merli: there’s only the artificial delay of 1sec (configurable) to avoid flickering in reconnections. the delivery is only stopped in 1 partition
----
2019-05-04 02:09:09 UTC - Matteo Merli: since each broker decides on its own, there’s not much overhead
----
2019-05-04 02:09:34 UTC - Matteo Merli: (decides on its own using the same deterministic logic)
----
2019-05-04 02:10:07 UTC - Jonathan Pearl: Fantastic! As customizable as it was in Kafka, one of my biggest pain points is how much a showstopper sync point rebalancing is across a consumer group. So that's great news.
----
2019-05-04 02:12:19 UTC - Matteo Merli: but yes, definitely we could add some better way to guarantee the stickyness
----
2019-05-04 02:37:33 UTC - Sanjeev Kulkarni: What do you mean trigger a function? 
----