You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/09 22:29:22 UTC

[GitHub] [pulsar-client-go] hunter2046 opened a new pull request #507: Expose NumPartitions() method on the producer

hunter2046 opened a new pull request #507:
URL: https://github.com/apache/pulsar-client-go/pull/507


   ### Motivation
   
   Expose the method so that the client will be able to know the number of partitions, which could help the client decide custom routing strategies.
   
   ### Modifications
   
   Exposed `NumPartitions()` method on the `Producer` interface and updated implementations accordingly.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change added tests and can be verified as follows:
   - Added unit tests in producer_test.go
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] hunter2046 commented on pull request #507: Expose GetHashingFunction() method on producer

Posted by GitBox <gi...@apache.org>.
hunter2046 commented on pull request #507:
URL: https://github.com/apache/pulsar-client-go/pull/507#issuecomment-857267988


   > > > The point is wrong, can MessageRouter not meet our functions?
   > > > e.g:
   > > > ```
   > > > 	producer, err := client.CreateProducer(ProducerOptions{
   > > > 		Topic: "my-partitioned-topic",
   > > > 		MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
   > > > 			fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
   > > > 			return 2
   > > > 		},
   > > > 	})
   > > > ```
   > > 
   > > 
   > > Thanks for taking a look. MessageRouter is what we need. We'd like to set the MessageRouter to a function that works similar to the defaultRouter with a customized number of partitions, e.g.
   > > ```
   > > 	internalRouter := pulsarClient.NewDefaultRouter(
   > > 		getHashingFunction(producerOptions.HashingScheme),
   > > 		producerOptions.BatchingMaxMessages,
   > > 		producerOptions.BatchingMaxSize,
   > > 		producerOptions.BatchingMaxPublishDelay,
   > > 		producerOptions.DisableBatching)
   > > 	messageRouter := func(message *pulsarClient.ProducerMessage, metadata pulsarClient.TopicMetadata) int {
   > >                  // customNumberOfPartitions could be different from metadata.NumPartitions() here
   > > 		return internalRouter(message, customNumberOfPartitions)
   > > 	}
   > > ```
   > > 
   > > 
   > >     
   > >       
   > >     
   > > 
   > >       
   > >     
   > > 
   > >     
   > >   
   > > The problem is that `getHashingFunction` is not exposed so we can't use it to feed the `NewDefaultRouter` method.
   > 
   > Sorry for the late reply. @hunter2046 Regarding all the options in NewDefaultRouter, we expose them in the form of parameters. You can set these options in producerOptions, and NewDefaultRouter is also the specific value obtained from these options.
   
   Thanks. I understand that those options including MessageRouter are in producerOptions. My use case (as mentioned in https://github.com/apache/pulsar-client-go/pull/507#issuecomment-822802546) is that I need to set the MessageRouter in a customized way that is close to what the internalRouter does except using a different number of partitions to avoid causing producers to send messages to a different paritition when the number of partitions is being increased.
   
   In order to do that, I need to initialize the internalRouter by my own by calling `NewDefaultRouter`. To call `NewDefaultRouter`, I could supply my own hashing function for sure. However, because I want to mimic what the internalRouter does, I would like using the same hashing function. That is why I want to see if `getHashingFunction` can be exposed so I could achieve the above thing. Hope that this makes sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat commented on pull request #507: Expose NumPartitions() method on the producer

Posted by GitBox <gi...@apache.org>.
merlimat commented on pull request #507:
URL: https://github.com/apache/pulsar-client-go/pull/507#issuecomment-817048549


   In the `MessageRouter`, you're getting a `TopicMetadata` object which will tell you the number of partitions: https://github.com/apache/pulsar-client-go/blob/f17deac7ac47919aa9e46d11d33638672a24d126/pulsar/producer.go#L57
   
   Wouldn't this already solve the problem?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] hunter2046 commented on pull request #507: Expose NumPartitions() method on producer interface and GetHashingFunction() method

Posted by GitBox <gi...@apache.org>.
hunter2046 commented on pull request #507:
URL: https://github.com/apache/pulsar-client-go/pull/507#issuecomment-818072313


   > > IIUC, the TopicMetadata gives us the number of partitions at the time the MessageRouter is invoked. For our use case, we need to keep recording the number of partitions when a new partition key is defined.
   > 
   > I'm not convinced exposing `NumPartitions()` would help here. All the custom logic should be possible to implement within a custom message router.
   > 
   > In any case, there's already a way to get the partitions for a topic, by using `Client.TopicPartitions()`
   
   ACK, I didn't realize there is a method already. That should work. I removed changes related to `NumPartitions()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #507: Expose GetHashingFunction() method on producer

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #507:
URL: https://github.com/apache/pulsar-client-go/pull/507#issuecomment-820869970


   The point is wrong, can MessageRouter not meet our functions?
   
   e.g:
   
   ```
   	producer, err := client.CreateProducer(ProducerOptions{
   		Topic: "my-partitioned-topic",
   		MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
   			fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
   			return 2
   		},
   	})
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #507: Expose GetHashingFunction() method on producer

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #507:
URL: https://github.com/apache/pulsar-client-go/pull/507#issuecomment-852675220


   > > The point is wrong, can MessageRouter not meet our functions?
   > > e.g:
   > > ```
   > > 	producer, err := client.CreateProducer(ProducerOptions{
   > > 		Topic: "my-partitioned-topic",
   > > 		MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
   > > 			fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
   > > 			return 2
   > > 		},
   > > 	})
   > > ```
   > 
   > Thanks for taking a look. MessageRouter is what we need. We'd like to set the MessageRouter to a function that works similar to the defaultRouter with a customized number of partitions, e.g.
   > 
   > ```
   > 	internalRouter := pulsarClient.NewDefaultRouter(
   > 		getHashingFunction(producerOptions.HashingScheme),
   > 		producerOptions.BatchingMaxMessages,
   > 		producerOptions.BatchingMaxSize,
   > 		producerOptions.BatchingMaxPublishDelay,
   > 		producerOptions.DisableBatching)
   > 	messageRouter := func(message *pulsarClient.ProducerMessage, metadata pulsarClient.TopicMetadata) int {
   >                  // customNumberOfPartitions could be different from metadata.NumPartitions() here
   > 		return internalRouter(message, customNumberOfPartitions)
   > 	}
   > ```
   > 
   > The problem is that `getHashingFunction` is not exposed so we can't use it to feed the `NewDefaultRouter` method.
   
   Sorry for the late reply. @hunter2046 Regarding all the options in NewDefaultRouter, we expose them in the form of parameters. You can set these options in producerOptions, and NewDefaultRouter is also the specific value obtained from these options.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat commented on a change in pull request #507: Expose NumPartitions() method on producer interface and GetHashingFunction() method

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #507:
URL: https://github.com/apache/pulsar-client-go/pull/507#discussion_r611882360



##########
File path: pulsar/producer.go
##########
@@ -199,3 +204,15 @@ type Producer interface {
 	// of errors, pending writes will not be retried.
 	Close()
 }
+
+// GetHashingFunction return the corresponding hashing function for the hashing scheme
+func GetHashingFunction(s HashingScheme) func(string) uint32 {

Review comment:
       Why do we need to expose this publicly? An application is free to use a custom message router function, at that point it can use any hashing function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] hunter2046 commented on pull request #507: Expose GetHashingFunction() method on producer

Posted by GitBox <gi...@apache.org>.
hunter2046 commented on pull request #507:
URL: https://github.com/apache/pulsar-client-go/pull/507#issuecomment-822802546


   > The point is wrong, can MessageRouter not meet our functions?
   > 
   > e.g:
   > 
   > ```
   > 	producer, err := client.CreateProducer(ProducerOptions{
   > 		Topic: "my-partitioned-topic",
   > 		MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
   > 			fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
   > 			return 2
   > 		},
   > 	})
   > ```
   
   Thanks for taking a look. MessageRouter is what we need. We'd like to set the MessageRouter to a function that works similar to the defaultRouter with a customized number of partitions, e.g.
   
   ```
   	internalRouter := pulsarClient.NewDefaultRouter(
   		getHashingFunction(producerOptions.HashingScheme),
   		producerOptions.BatchingMaxMessages,
   		producerOptions.BatchingMaxSize,
   		producerOptions.BatchingMaxPublishDelay,
   		producerOptions.DisableBatching)
   	messageRouter := func(message *pulsarClient.ProducerMessage, metadata pulsarClient.TopicMetadata) int {
                    // customNumberOfPartitions could be different from metadata.NumPartitions() here
   		return internalRouter(message, customNumberOfPartitions)
   	}
   ```
   
   The problem is that `getHashingFunction` is not exposed so we can't use it to feed the `NewDefaultRouter` method.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] hunter2046 edited a comment on pull request #507: Expose GetHashingFunction() method on producer

Posted by GitBox <gi...@apache.org>.
hunter2046 edited a comment on pull request #507:
URL: https://github.com/apache/pulsar-client-go/pull/507#issuecomment-818072313


   > > IIUC, the TopicMetadata gives us the number of partitions at the time the MessageRouter is invoked. For our use case, we need to keep recording the number of partitions when a new partition key is defined.
   > 
   > I'm not convinced exposing `NumPartitions()` would help here. All the custom logic should be possible to implement within a custom message router.
   > 
   > In any case, there's already a way to get the partitions for a topic, by using `Client.TopicPartitions()`
   
   ~~ACK, I didn't realize there is a method already. That should work. I removed changes related to `NumPartitions()`~~
   
   As Denis commented in [this MR](https://cd.splunkdev.com/data-availability/go-ingest-lib/-/merge_requests/182#note_5331041), there could be concerns if we hit brokers often. What do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] hunter2046 commented on a change in pull request #507: Expose NumPartitions() method on producer interface and GetHashingFunction() method

Posted by GitBox <gi...@apache.org>.
hunter2046 commented on a change in pull request #507:
URL: https://github.com/apache/pulsar-client-go/pull/507#discussion_r611894151



##########
File path: pulsar/producer.go
##########
@@ -199,3 +204,15 @@ type Producer interface {
 	// of errors, pending writes will not be retried.
 	Close()
 }
+
+// GetHashingFunction return the corresponding hashing function for the hashing scheme
+func GetHashingFunction(s HashingScheme) func(string) uint32 {

Review comment:
       We would like to mimic what the default router does including the hash function. The only difference would be to use a snapshotted numPartitions instead of a latest one that is from `TopicMetadata`: https://github.com/apache/pulsar-client-go/blob/6ab17dcca28db389ab91a4a2ea6e5ae990d613da/pulsar/producer_impl.go#L103




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] hunter2046 commented on pull request #507: Expose NumPartitions() method on the producer

Posted by GitBox <gi...@apache.org>.
hunter2046 commented on pull request #507:
URL: https://github.com/apache/pulsar-client-go/pull/507#issuecomment-817052498


   > In the `MessageRouter`, you're getting a `TopicMetadata` object which will tell you the number of partitions:
   > 
   > https://github.com/apache/pulsar-client-go/blob/f17deac7ac47919aa9e46d11d33638672a24d126/pulsar/producer.go#L57
   > 
   > Wouldn't this already solve the problem?
   
   IIUC, the TopicMetadata gives us the number of partitions at the time the MessageRouter is invoked. For our use case, we need to keep recording the number of partitions when a new partition key is defined.
   
   I probably shouldn't paste the link here but DSP-28866 is the jira ticket that this change is made for.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] hunter2046 edited a comment on pull request #507: Expose GetHashingFunction() method on producer

Posted by GitBox <gi...@apache.org>.
hunter2046 edited a comment on pull request #507:
URL: https://github.com/apache/pulsar-client-go/pull/507#issuecomment-818072313


   > > IIUC, the TopicMetadata gives us the number of partitions at the time the MessageRouter is invoked. For our use case, we need to keep recording the number of partitions when a new partition key is defined.
   > 
   > I'm not convinced exposing `NumPartitions()` would help here. All the custom logic should be possible to implement within a custom message router.
   > 
   > In any case, there's already a way to get the partitions for a topic, by using `Client.TopicPartitions()`
   
   ACK, I didn't realize there is a method already. That should work. I removed changes related to `NumPartitions()`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat commented on pull request #507: Expose NumPartitions() method on producer interface and GetHashingFunction() method

Posted by GitBox <gi...@apache.org>.
merlimat commented on pull request #507:
URL: https://github.com/apache/pulsar-client-go/pull/507#issuecomment-818062726


   > IIUC, the TopicMetadata gives us the number of partitions at the time the MessageRouter is invoked. For our use case, we need to keep recording the number of partitions when a new partition key is defined.
   
   I'm not convinced exposing `NumPartitions()` would help here. All the custom logic should be possible to implement within a custom message router. 
   
   In any case, there's already a way to get the partitions for a topic, by using `Client.TopicPartitions()` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] hunter2046 commented on pull request #507: Expose NumPartitions() method on producer interface and GetHashingFunction() method

Posted by GitBox <gi...@apache.org>.
hunter2046 commented on pull request #507:
URL: https://github.com/apache/pulsar-client-go/pull/507#issuecomment-818044735


   I have updated the pull request to also expose the `GetHashingFunction()` method to make it possible for the user to call `NewDefaultRouter`.
   
   The idea about how we plan to make use of the changes can be found at https://cd.splunkdev.com/data-availability/s2s-svc/-/merge_requests/937/diffs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org