You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/05/20 17:19:58 UTC

[GitHub] [pulsar] wolfstudy commented on a change in pull request #6023: [pulsar-client-go]Add DeliverAfter option for go client

wolfstudy commented on a change in pull request #6023:
URL: https://github.com/apache/pulsar/pull/6023#discussion_r428176058



##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
 		assert.NotNil(t, IsNil(msgID))
 	}
 }
+
+func TestProducer_DelayMessage(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "test-send-with-message-id"
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	failoverConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-delay-message-failover",
+	})
+	assert.Nil(t, err)
+	defer failoverConsumer.Close()
+
+	sharedConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-delay-message-shared",
+		Type:             Shared,
+	})
+	assert.Nil(t, err)
+	defer sharedConsumer.Close()
+
+	ctx := context.Background()
+
+	delay := time.Second*5
+	begin := time.Now()
+	fmt.Printf("begin %v\n", begin)
+	for i := 0; i < 10; i++ {
+		err := producer.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+			DeliverAfter: delay,
+		})
+		fmt.Printf("send message %d\n", i)
+		assert.Nil(t, err)
+	}
+
+	// Failover consumer will receive the messages immediately while
+    // the shared consumer will get them after the delay
+	for i := 0; i < 10; i++ {
+		msg, err := failoverConsumer.Receive(ctx)
+		assert.Nil(t, err)
+		fmt.Printf("message: %s\n",msg.Payload())

Review comment:
       Please replace` fmt.Printf` with `t.Logf()`.

##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
 		assert.NotNil(t, IsNil(msgID))
 	}
 }
+
+func TestProducer_DelayMessage(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "test-send-with-message-id"

Review comment:
       This topic name is the same as the topic name in another test case, in order to avoid test conflicts, please replace it

##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
 		assert.NotNil(t, IsNil(msgID))
 	}
 }
+
+func TestProducer_DelayMessage(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "test-send-with-message-id"
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	failoverConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-delay-message-failover",
+	})
+	assert.Nil(t, err)
+	defer failoverConsumer.Close()
+
+	sharedConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-delay-message-shared",
+		Type:             Shared,
+	})
+	assert.Nil(t, err)
+	defer sharedConsumer.Close()
+
+	ctx := context.Background()
+
+	delay := time.Second*5
+	begin := time.Now()
+	fmt.Printf("begin %v\n", begin)
+	for i := 0; i < 10; i++ {
+		err := producer.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+			DeliverAfter: delay,
+		})
+		fmt.Printf("send message %d\n", i)
+		assert.Nil(t, err)
+	}
+
+	// Failover consumer will receive the messages immediately while
+    // the shared consumer will get them after the delay
+	for i := 0; i < 10; i++ {
+		msg, err := failoverConsumer.Receive(ctx)
+		assert.Nil(t, err)
+		fmt.Printf("message: %s\n",msg.Payload())
+		err = failoverConsumer.Ack(msg)
+		assert.Nil(t, err)
+
+		fmt.Printf("after %v\n", time.Now())

Review comment:
       Please replace` fmt.Printf` with `t.Logf()`.

##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
 		assert.NotNil(t, IsNil(msgID))
 	}
 }
+
+func TestProducer_DelayMessage(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "test-send-with-message-id"
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	failoverConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-delay-message-failover",
+	})
+	assert.Nil(t, err)
+	defer failoverConsumer.Close()
+
+	sharedConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-delay-message-shared",
+		Type:             Shared,
+	})
+	assert.Nil(t, err)
+	defer sharedConsumer.Close()
+
+	ctx := context.Background()
+
+	delay := time.Second*5
+	begin := time.Now()
+	fmt.Printf("begin %v\n", begin)
+	for i := 0; i < 10; i++ {
+		err := producer.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+			DeliverAfter: delay,
+		})
+		fmt.Printf("send message %d\n", i)
+		assert.Nil(t, err)
+	}
+
+	// Failover consumer will receive the messages immediately while
+    // the shared consumer will get them after the delay
+	for i := 0; i < 10; i++ {
+		msg, err := failoverConsumer.Receive(ctx)
+		assert.Nil(t, err)
+		fmt.Printf("message: %s\n",msg.Payload())
+		err = failoverConsumer.Ack(msg)
+		assert.Nil(t, err)
+
+		fmt.Printf("after %v\n", time.Now())
+		assert.True(t, time.Since(begin) < delay)
+	}
+
+	for i := 0; i < 10; i++ {
+		msg, err := sharedConsumer.Receive(ctx)
+		assert.Nil(t, err)
+		fmt.Printf("message: %s\n",msg.Payload())

Review comment:
       Please replace` fmt.Printf` with `t.Logf()`.

##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
 		assert.NotNil(t, IsNil(msgID))
 	}
 }
+
+func TestProducer_DelayMessage(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "test-send-with-message-id"
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	failoverConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-delay-message-failover",
+	})
+	assert.Nil(t, err)
+	defer failoverConsumer.Close()
+
+	sharedConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-delay-message-shared",
+		Type:             Shared,
+	})
+	assert.Nil(t, err)
+	defer sharedConsumer.Close()
+
+	ctx := context.Background()
+
+	delay := time.Second*5
+	begin := time.Now()
+	fmt.Printf("begin %v\n", begin)
+	for i := 0; i < 10; i++ {
+		err := producer.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+			DeliverAfter: delay,
+		})
+		fmt.Printf("send message %d\n", i)

Review comment:
       Can we replace `fmt.Printf` with `t.Logf()`?

##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
 		assert.NotNil(t, IsNil(msgID))
 	}
 }
+
+func TestProducer_DelayMessage(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "test-send-with-message-id"
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	failoverConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-delay-message-failover",
+	})
+	assert.Nil(t, err)
+	defer failoverConsumer.Close()
+
+	sharedConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-delay-message-shared",
+		Type:             Shared,
+	})
+	assert.Nil(t, err)
+	defer sharedConsumer.Close()
+
+	ctx := context.Background()
+
+	delay := time.Second*5
+	begin := time.Now()
+	fmt.Printf("begin %v\n", begin)
+	for i := 0; i < 10; i++ {
+		err := producer.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+			DeliverAfter: delay,
+		})
+		fmt.Printf("send message %d\n", i)
+		assert.Nil(t, err)
+	}
+
+	// Failover consumer will receive the messages immediately while
+    // the shared consumer will get them after the delay
+	for i := 0; i < 10; i++ {
+		msg, err := failoverConsumer.Receive(ctx)
+		assert.Nil(t, err)
+		fmt.Printf("message: %s\n",msg.Payload())
+		err = failoverConsumer.Ack(msg)
+		assert.Nil(t, err)
+
+		fmt.Printf("after %v\n", time.Now())
+		assert.True(t, time.Since(begin) < delay)
+	}
+
+	for i := 0; i < 10; i++ {
+		msg, err := sharedConsumer.Receive(ctx)
+		assert.Nil(t, err)
+		fmt.Printf("message: %s\n",msg.Payload())
+		err = sharedConsumer.Ack(msg)
+		assert.Nil(t, err)
+
+		fmt.Printf("after %v\n", time.Now())

Review comment:
       Please replace` fmt.Printf` with `t.Logf()`.

##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
 		assert.NotNil(t, IsNil(msgID))
 	}
 }
+
+func TestProducer_DelayMessage(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "test-send-with-message-id"
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topicName,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	failoverConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-delay-message-failover",
+	})
+	assert.Nil(t, err)
+	defer failoverConsumer.Close()
+
+	sharedConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:            topicName,
+		SubscriptionName: "sub-delay-message-shared",
+		Type:             Shared,
+	})
+	assert.Nil(t, err)
+	defer sharedConsumer.Close()
+
+	ctx := context.Background()
+
+	delay := time.Second*5
+	begin := time.Now()
+	fmt.Printf("begin %v\n", begin)

Review comment:
       Please replace` fmt.Printf` with `t.Logf()`.

##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
 		assert.NotNil(t, IsNil(msgID))
 	}
 }
+
+func TestProducer_DelayMessage(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "test-send-with-message-id"

Review comment:
       ```
   func newTopicName() string {
   	return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
   }
   ```

##########
File path: pulsar-client-go/pulsar/producer_test.go
##########
@@ -416,3 +416,71 @@ func TestProducer_SendAndGetMsgID(t *testing.T) {
 		assert.NotNil(t, IsNil(msgID))
 	}
 }
+
+func TestProducer_DelayMessage(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topicName := "test-send-with-message-id"

Review comment:
       Can we rename the topic name?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org