You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/09/06 18:11:20 UTC

[incubator-pulsar] 02/04: Support compaction options in Go client (#2449)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit 74862956221c7353df551dbe17defe39860d2c61
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Aug 28 06:42:42 2018 -0700

    Support compaction options in Go client (#2449)
    
    Added the option for ReadCompacted in Go based consumer/reader configuration.
---
 pulsar-client-go/pulsar/c_consumer.go    |   2 +
 pulsar-client-go/pulsar/c_reader.go      |   2 +
 pulsar-client-go/pulsar/consumer.go      |  12 ++-
 pulsar-client-go/pulsar/consumer_test.go | 141 ++++++++++++++++++++++++++++++-
 pulsar-client-go/pulsar/producer_test.go |   1 -
 pulsar-client-go/pulsar/reader.go        |   9 ++
 pulsar-client-go/pulsar/reader_test.go   |  91 ++++++++++++++++++++
 7 files changed, 255 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index 093dd9d..7f613b2 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -120,6 +120,8 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
 		C.pulsar_consumer_set_consumer_name(conf, name)
 	}
 
+	C.pulsar_consumer_set_read_compacted(conf, cBool(options.ReadCompacted))
+
 	subName := C.CString(options.SubscriptionName)
 	defer C.free(unsafe.Pointer(subName))
 
diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go
index 12c1103..04bb5cf 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -99,6 +99,8 @@ func createReaderAsync(client *client, options ReaderOptions, callback func(Read
 		C.pulsar_reader_configuration_set_subscription_role_prefix(conf, prefix)
 	}
 
+	C.pulsar_reader_configuration_set_read_compacted(conf, cBool(options.ReadCompacted))
+
 	if options.Name != "" {
 		name := C.CString(options.Name)
 		defer C.free(unsafe.Pointer(name))
diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go
index ed56d9e..b9f2616 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -20,8 +20,8 @@
 package pulsar
 
 import (
-	"time"
 	"context"
+	"time"
 )
 
 // Pair of a Consumer and Message
@@ -92,6 +92,16 @@ type ConsumerOptions struct {
 
 	// Set the consumer name.
 	Name string
+
+	// If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
+	// of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
+	// each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+	// point, the messages will be sent as normal.
+	//
+	// ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
+	//  failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
+	//  shared subscription, will lead to the subscription call throwing a PulsarClientException.
+	ReadCompacted bool
 }
 
 // An interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go
index 75a454b..f81ce56 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -22,6 +22,9 @@ package pulsar
 import (
 	"context"
 	"fmt"
+	"io/ioutil"
+	"net/http"
+	"strings"
 	"testing"
 	"time"
 )
@@ -99,6 +102,111 @@ func TestConsumer(t *testing.T) {
 	consumer.Unsubscribe()
 }
 
+func TestConsumerCompaction(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+
+	assertNil(t, err)
+	defer client.Close()
+
+	topic := fmt.Sprintf("my-compaction-topic-%d", time.Now().Unix())
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+
+	assertNil(t, err)
+	defer producer.Close()
+
+	// Pre-create both subscriptions to retain published messages
+	consumer1, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub-1",
+	})
+
+	assertNil(t, err)
+	consumer1.Close()
+
+	consumer2, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub-2",
+		ReadCompacted:    true,
+	})
+
+	assertNil(t, err)
+	consumer2.Close()
+
+	ctx := context.Background()
+
+	for i := 0; i < 10; i++ {
+		if err := producer.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+			Key:     "Same-Key",
+		}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// Compact topic and wait for operation to complete
+	url := fmt.Sprintf("http://localhost:8080/admin/v2/persistent/public/default/%s/compaction", topic)
+	makeHttpPutCall(t, url)
+	for {
+		res := makeHttpGetCall(t, url)
+		if strings.Contains(res, "RUNNING") {
+			fmt.Println("Compaction still running")
+			time.Sleep(100 * time.Millisecond)
+			continue
+		} else {
+			assertEqual(t, strings.Contains(res, "SUCCESS"), true)
+			fmt.Println("Compaction is done")
+			break
+		}
+	}
+
+	// Restart the consumers
+
+	consumer1, err = client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub-1",
+	})
+
+	assertNil(t, err)
+	defer consumer1.Close()
+
+	consumer2, err = client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub-2",
+		ReadCompacted:    true,
+	})
+
+	assertNil(t, err)
+	defer consumer2.Close()
+
+	// Consumer-1 will receive all messages
+	for i := 0; i < 10; i++ {
+		msg, err := consumer1.Receive(context.Background())
+		assertNil(t, err)
+		assertNotNil(t, msg)
+
+		assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
+	}
+
+	// Consumer-2 will only receive the last message
+	msg, err := consumer2.Receive(context.Background())
+	assertNil(t, err)
+	assertNotNil(t, msg)
+	assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-9"))
+
+	// No more messages on consumer-2
+	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
+	defer cancel()
+
+	msg, err = consumer2.Receive(ctx)
+	assertNil(t, msg)
+	assertNotNil(t, err)
+}
+
 func TestConsumerWithInvalidConf(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: "pulsar://localhost:6650",
@@ -125,13 +233,44 @@ func TestConsumerWithInvalidConf(t *testing.T) {
 		SubscriptionName: "my-subscription",
 	})
 
-	// Expect error in creating cosnumer
+	// Expect error in creating consumer
 	assertNil(t, consumer)
 	assertNotNil(t, err)
 
 	assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
 }
 
+func makeHttpPutCall(t *testing.T, url string) string {
+	return makeHttpCall(t, http.MethodPut, url)
+}
+
+func makeHttpGetCall(t *testing.T, url string) string {
+	return makeHttpCall(t, http.MethodGet, url)
+}
+
+func makeHttpCall(t *testing.T, method string, url string) string {
+	client := http.Client{}
+
+	req, err := http.NewRequest(method, url, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	req.Header.Set("Content-Type", "application/json")
+	req.Header.Set("Accept", "application/json")
+
+	res, err := client.Do(req)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	body, err := ioutil.ReadAll(res.Body)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	return string(body)
+}
 
 func TestConsumerMultiTopics(t *testing.T) {
 	client, err := NewClient(ClientOptions{
diff --git a/pulsar-client-go/pulsar/producer_test.go b/pulsar-client-go/pulsar/producer_test.go
index d7748f7..940be85 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -62,7 +62,6 @@ func TestProducer(t *testing.T) {
 		OperationTimeoutSeconds:  30,
 		ConcurrentLookupRequests: 1000,
 		MessageListenerThreads:   5,
-		EnableTLS:                false,
 	})
 
 	assertNil(t, err)
diff --git a/pulsar-client-go/pulsar/reader.go b/pulsar-client-go/pulsar/reader.go
index 7015c9c..5592630 100644
--- a/pulsar-client-go/pulsar/reader.go
+++ b/pulsar-client-go/pulsar/reader.go
@@ -57,6 +57,15 @@ type ReaderOptions struct {
 
 	// Set the subscription role prefix. The default prefix is "reader".
 	SubscriptionRolePrefix string
+
+	// If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog
+	// of the topic. This means that, if the topic has been compacted, the reader will only see the latest value for
+	// each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+	// point, the messages will be sent as normal.
+	//
+	// ReadCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent
+	// topics will lead to the reader create call throwing a PulsarClientException.
+	ReadCompacted bool
 }
 
 // A Reader can be used to scan through all the messages currently available in a topic.
diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go
index 3b075e1..a0a63ae 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -22,7 +22,9 @@ package pulsar
 import (
 	"context"
 	"fmt"
+	"strings"
 	"testing"
+	"time"
 )
 
 func TestReaderConnectError(t *testing.T) {
@@ -128,3 +130,92 @@ func TestReaderWithInvalidConf(t *testing.T) {
 
 	assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
 }
+
+
+func TestReaderCompaction(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+
+	assertNil(t, err)
+	defer client.Close()
+
+	topic := fmt.Sprintf("my-reader-compaction-topic-%d", time.Now().Unix())
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+
+	assertNil(t, err)
+	defer producer.Close()
+
+	ctx := context.Background()
+
+	for i := 0; i < 10; i++ {
+		if err := producer.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+			Key:     "Same-Key",
+		}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// Compact topic and wait for operation to complete
+	url := fmt.Sprintf("http://localhost:8080/admin/v2/persistent/public/default/%s/compaction", topic)
+	makeHttpPutCall(t, url)
+	for {
+		res := makeHttpGetCall(t, url)
+		if strings.Contains(res, "RUNNING") {
+			fmt.Println("Compaction still running")
+			time.Sleep(100 * time.Millisecond)
+			continue
+		} else {
+			assertEqual(t, strings.Contains(res, "SUCCESS"), true)
+			fmt.Println("Compaction is done")
+			break
+		}
+	}
+
+	// Restart the consumers
+
+	reader1, err := client.CreateReader(ReaderOptions{
+		Topic:          topic,
+		StartMessageID: EarliestMessage,
+	})
+
+	assertNil(t, err)
+	defer reader1.Close()
+
+	reader2, err := client.CreateReader(ReaderOptions{
+		Topic:          topic,
+		StartMessageID: EarliestMessage,
+		ReadCompacted:  true,
+	})
+
+	assertNil(t, err)
+	defer reader2.Close()
+
+	// Reader-1 will receive all messages
+	for i := 0; i < 10; i++ {
+		msg, err := reader1.Next(context.Background())
+		assertNil(t, err)
+		assertNotNil(t, msg)
+
+		assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
+	}
+
+	// Reader-2 will only receive the last message
+	msg, err := reader2.Next(context.Background())
+	assertNil(t, err)
+	assertNotNil(t, msg)
+	assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-9"))
+
+	// No more messages on consumer-2
+	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
+	defer cancel()
+
+	msg, err = reader2.Next(ctx)
+	assertNil(t, msg)
+	assertNotNil(t, err)
+}
+