You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/05/09 01:43:02 UTC

[GitHub] [pulsar-client-go] sijie commented on a change in pull request #1: Initial client library submission

sijie commented on a change in pull request #1: Initial client library submission
URL: https://github.com/apache/pulsar-client-go/pull/1#discussion_r282313298
 
 

 ##########
 File path: pulsar/producer.go
 ##########
 @@ -0,0 +1,167 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+	"context"
+	"time"
+)
+
+type HashingScheme int
+
+const (
+	JavaStringHash HashingScheme = iota // Java String.hashCode() equivalent
+	Murmur3_32Hash                      // Use Murmur3 hashing function
+)
+
+type CompressionType int
+
+const (
+	NoCompression CompressionType = iota
+	LZ4
+	ZLib
+	ZSTD
+)
+
+type TopicMetadata interface {
+	// Get the number of partitions for the specific topic
+	NumPartitions() uint32
+}
+
+type ProducerOptions struct {
+	// Specify the topic this producer will be publishing on.
+	// This argument is required when constructing the producer.
+	Topic string
+
+	// Specify a name for the producer
+	// If not assigned, the system will generate a globally unique name which can be access with
+	// Producer.ProducerName().
+	// When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique
+	// across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on
+	// a topic.
+	Name string
+
+	// Attach a set of application defined properties to the producer
+	// This properties will be visible in the topic stats
+	Properties map[string]string
+
+	// Set the send timeout (default: 30 seconds)
+	// If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
+	// Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message
+	// deduplication feature.
+	SendTimeout time.Duration
+
+	// Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
+	// When the queue is full, by default, all calls to Producer.send() and Producer.sendAsync() will fail
+	// unless `BlockIfQueueFull` is set to true. Use BlockIfQueueFull(boolean) to change the blocking behavior.
+	MaxPendingMessages int
+
+	// Set the number of max pending messages across all the partitions
+	// This setting will be used to lower the max pending messages for each partition
+	// `MaxPendingMessages(int)`, if the total exceeds the configured value.
+	MaxPendingMessagesAcrossPartitions int
+
+	// Set whether the `Producer.Send()` and `Producer.sendAsync()` operations should block when the outgoing
+	// message queue is full. Default is `false`. If set to `false`, send operations will immediately fail with
+	// `ProducerQueueIsFullError` when there is no space left in pending queue.
+	BlockIfQueueFull bool
+
+	// Change the `HashingScheme` used to chose the partition on where to publish a particular message.
+	// Standard hashing functions available are:
+	//
+	//  - `JavaStringHash` : Java String.hashCode() equivalent
+	//  - `Murmur3_32Hash` : Use Murmur3 hashing function.
+	// 		https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash
+	//
+	// Default is `JavaStringHash`.
+	HashingScheme
+
+	// Set the compression type for the producer.
+	// By default, message payloads are not compressed. Supported compression types are:
+	//  - LZ4
+	//  - ZLIB
+	//  - ZSTD
+	//
+	// Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
+	// release in order to be able to receive messages compressed with ZSTD.
+	CompressionType
+
+	// Set a custom message routing policy by passing an implementation of MessageRouter
+	// The router is a function that given a particular message and the topic metadata, returns the
+	// partition index where the message should be routed to
+	MessageRouter func(Message, TopicMetadata) int
+
+	// Control whether automatic batching of messages is enabled for the producer. By default batching
+	// is enabled.
+	//
+	// When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
+	// broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
+	// messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
+	// contents.
+	//
+	// When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
+	// Setting `DisableBatching: true` will make the producer to send messages individually
+	DisableBatching bool
+
+	// Set the time period within which the messages sent will be batched (default: 10ms) if batch messages are
+	// enabled. If set to a non zero value, messages will be queued until this time interval or until
+	BatchingMaxPublishDelay time.Duration
+
+	// Set the maximum number of messages permitted in a batch. (default: 1000) If set to a value greater than 1,
+	// messages will be queued until this threshold is reached or batch interval has elapsed
+	BatchingMaxMessages uint
+}
+
+// The producer is used to publish messages on a topic
+type Producer interface {
+	// return the topic to which producer is publishing to
+	Topic() string
+
+	// return the producer name which could have been assigned by the system or specified by the client
+	Name() string
+
+	// Send a message
+	// This call will be blocking until is successfully acknowledged by the Pulsar broker.
+	// Example:
+	// producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
+	Send(context.Context, *ProducerMessage) error
+
+	// Send a message in asynchronous mode
+	// The callback will report back the message being published and
+	// the eventual error in publishing
+	SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error))
 
 Review comment:
   IMO it is better to design an API that is more native to go developers.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services