You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/10/23 16:53:50 UTC

[GitHub] [pulsar] flowchartsman commented on a change in pull request #8327: [Go Functions] Add newOuputMessage interface for Go Function

flowchartsman commented on a change in pull request #8327:
URL: https://github.com/apache/pulsar/pull/8327#discussion_r511014789



##########
File path: pulsar-function-go/pf/context.go
##########
@@ -119,6 +120,12 @@ func (c *FunctionContext) GetUserConfMap() map[string]interface{} {
 	return c.userConfigs
 }
 
+// NewOutputMessage send message to the topic
+// @param topicName: The name of the topic for output message
+func (c *FunctionContext) NewOutputMessage(topicName string) pulsar.Producer {

Review comment:
       I can understand the desire for naming parity, but I wonder whether or not it is better to call this something different, given how different the concepts are.  In Go, if I see `New<Foo>` I expect that I am going to be receiving a pointer to a new struct of that type, or at least an interface of some kind with that name.  In this case, I would expect to get a new Message.
   
   I think I would probably recommend something like the following:
   
   ```go
   func (c *FunctionContext) SendTopic(topicName string, message *pulsar.ProducerMessage)
   ```
   
   or even
   ```go
   func (c *FunctionContext) SendTopicMessage(topicName string, message *pulsar.ProducerMessage)
   func (c *FunctionContext) SendTopic(topicName string, payload []byte)
   ```
   
   which, given the way producers are currently packaged, might benefit from a registry of producers per topic:
   
   ```go
   var producerRegistry = map[string]pulsar.Producer{}
   func (gi *goInstance) getProducer(topicName string) (pulsar.Producer, error) {
           if found := producerRegistry[topicName]; found != nil {
                   return found, nil
           }
   	properties := getProperties(getDefaultSubscriptionName(
   		gi.context.instanceConf.funcDetails.Tenant,
   		gi.context.instanceConf.funcDetails.Namespace,
   		gi.context.instanceConf.funcDetails.Name), gi.context.instanceConf.instanceID)
   
   
   	producer, err := gi.client.CreateProducer(pulsar.ProducerOptions{
   		Topic:                   topicName,
   		Properties:              properties,
   		CompressionType:         pulsar.LZ4,
   		BatchingMaxPublishDelay: time.Millisecond * 10,
   		// Set send timeout to be infinity to prevent potential deadlock with consumer
   		// that might happen when consumer is blocked due to unacked messages
   	})
   	if err != nil {
   		gi.stats.incrTotalSysExceptions(err)
   		log.Errorf("create producer error:%s", err.Error())
   		return nil, err
   	}
           producerRegistry[topicName] = producer
   	return producer, err
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org