You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/09/06 18:11:20 UTC
[incubator-pulsar] 02/04: Support compaction options in Go client
(#2449)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
commit 74862956221c7353df551dbe17defe39860d2c61
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Aug 28 06:42:42 2018 -0700
Support compaction options in Go client (#2449)
Added the option for ReadCompacted in Go based consumer/reader configuration.
---
pulsar-client-go/pulsar/c_consumer.go | 2 +
pulsar-client-go/pulsar/c_reader.go | 2 +
pulsar-client-go/pulsar/consumer.go | 12 ++-
pulsar-client-go/pulsar/consumer_test.go | 141 ++++++++++++++++++++++++++++++-
pulsar-client-go/pulsar/producer_test.go | 1 -
pulsar-client-go/pulsar/reader.go | 9 ++
pulsar-client-go/pulsar/reader_test.go | 91 ++++++++++++++++++++
7 files changed, 255 insertions(+), 3 deletions(-)
diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index 093dd9d..7f613b2 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -120,6 +120,8 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
C.pulsar_consumer_set_consumer_name(conf, name)
}
+ C.pulsar_consumer_set_read_compacted(conf, cBool(options.ReadCompacted))
+
subName := C.CString(options.SubscriptionName)
defer C.free(unsafe.Pointer(subName))
diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go
index 12c1103..04bb5cf 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -99,6 +99,8 @@ func createReaderAsync(client *client, options ReaderOptions, callback func(Read
C.pulsar_reader_configuration_set_subscription_role_prefix(conf, prefix)
}
+ C.pulsar_reader_configuration_set_read_compacted(conf, cBool(options.ReadCompacted))
+
if options.Name != "" {
name := C.CString(options.Name)
defer C.free(unsafe.Pointer(name))
diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go
index ed56d9e..b9f2616 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -20,8 +20,8 @@
package pulsar
import (
- "time"
"context"
+ "time"
)
// Pair of a Consumer and Message
@@ -92,6 +92,16 @@ type ConsumerOptions struct {
// Set the consumer name.
Name string
+
+ // If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
+ // of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
+ // each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+ // point, the messages will be sent as normal.
+ //
+ // ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
+ // failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
+ // shared subscription, will lead to the subscription call throwing a PulsarClientException.
+ ReadCompacted bool
}
// An interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go
index 75a454b..f81ce56 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -22,6 +22,9 @@ package pulsar
import (
"context"
"fmt"
+ "io/ioutil"
+ "net/http"
+ "strings"
"testing"
"time"
)
@@ -99,6 +102,111 @@ func TestConsumer(t *testing.T) {
consumer.Unsubscribe()
}
+func TestConsumerCompaction(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+
+ assertNil(t, err)
+ defer client.Close()
+
+ topic := fmt.Sprintf("my-compaction-topic-%d", time.Now().Unix())
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+
+ assertNil(t, err)
+ defer producer.Close()
+
+ // Pre-create both subscriptions to retain published messages
+ consumer1, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub-1",
+ })
+
+ assertNil(t, err)
+ consumer1.Close()
+
+ consumer2, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub-2",
+ ReadCompacted: true,
+ })
+
+ assertNil(t, err)
+ consumer2.Close()
+
+ ctx := context.Background()
+
+ for i := 0; i < 10; i++ {
+ if err := producer.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ Key: "Same-Key",
+ }); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ // Compact topic and wait for operation to complete
+ url := fmt.Sprintf("http://localhost:8080/admin/v2/persistent/public/default/%s/compaction", topic)
+ makeHttpPutCall(t, url)
+ for {
+ res := makeHttpGetCall(t, url)
+ if strings.Contains(res, "RUNNING") {
+ fmt.Println("Compaction still running")
+ time.Sleep(100 * time.Millisecond)
+ continue
+ } else {
+ assertEqual(t, strings.Contains(res, "SUCCESS"), true)
+ fmt.Println("Compaction is done")
+ break
+ }
+ }
+
+ // Restart the consumers
+
+ consumer1, err = client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub-1",
+ })
+
+ assertNil(t, err)
+ defer consumer1.Close()
+
+ consumer2, err = client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub-2",
+ ReadCompacted: true,
+ })
+
+ assertNil(t, err)
+ defer consumer2.Close()
+
+ // Consumer-1 will receive all messages
+ for i := 0; i < 10; i++ {
+ msg, err := consumer1.Receive(context.Background())
+ assertNil(t, err)
+ assertNotNil(t, msg)
+
+ assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
+ }
+
+ // Consumer-2 will only receive the last message
+ msg, err := consumer2.Receive(context.Background())
+ assertNil(t, err)
+ assertNotNil(t, msg)
+ assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-9"))
+
+ // No more messages on consumer-2
+ ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
+ defer cancel()
+
+ msg, err = consumer2.Receive(ctx)
+ assertNil(t, msg)
+ assertNotNil(t, err)
+}
+
func TestConsumerWithInvalidConf(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
@@ -125,13 +233,44 @@ func TestConsumerWithInvalidConf(t *testing.T) {
SubscriptionName: "my-subscription",
})
- // Expect error in creating cosnumer
+ // Expect error in creating consumer
assertNil(t, consumer)
assertNotNil(t, err)
assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
}
+func makeHttpPutCall(t *testing.T, url string) string {
+ return makeHttpCall(t, http.MethodPut, url)
+}
+
+func makeHttpGetCall(t *testing.T, url string) string {
+ return makeHttpCall(t, http.MethodGet, url)
+}
+
+func makeHttpCall(t *testing.T, method string, url string) string {
+ client := http.Client{}
+
+ req, err := http.NewRequest(method, url, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Set("Accept", "application/json")
+
+ res, err := client.Do(req)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ body, err := ioutil.ReadAll(res.Body)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ return string(body)
+}
func TestConsumerMultiTopics(t *testing.T) {
client, err := NewClient(ClientOptions{
diff --git a/pulsar-client-go/pulsar/producer_test.go b/pulsar-client-go/pulsar/producer_test.go
index d7748f7..940be85 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -62,7 +62,6 @@ func TestProducer(t *testing.T) {
OperationTimeoutSeconds: 30,
ConcurrentLookupRequests: 1000,
MessageListenerThreads: 5,
- EnableTLS: false,
})
assertNil(t, err)
diff --git a/pulsar-client-go/pulsar/reader.go b/pulsar-client-go/pulsar/reader.go
index 7015c9c..5592630 100644
--- a/pulsar-client-go/pulsar/reader.go
+++ b/pulsar-client-go/pulsar/reader.go
@@ -57,6 +57,15 @@ type ReaderOptions struct {
// Set the subscription role prefix. The default prefix is "reader".
SubscriptionRolePrefix string
+
+ // If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog
+ // of the topic. This means that, if the topic has been compacted, the reader will only see the latest value for
+ // each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+ // point, the messages will be sent as normal.
+ //
+ // ReadCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent
+ // topics will lead to the reader create call throwing a PulsarClientException.
+ ReadCompacted bool
}
// A Reader can be used to scan through all the messages currently available in a topic.
diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go
index 3b075e1..a0a63ae 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -22,7 +22,9 @@ package pulsar
import (
"context"
"fmt"
+ "strings"
"testing"
+ "time"
)
func TestReaderConnectError(t *testing.T) {
@@ -128,3 +130,92 @@ func TestReaderWithInvalidConf(t *testing.T) {
assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
}
+
+
+func TestReaderCompaction(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+
+ assertNil(t, err)
+ defer client.Close()
+
+ topic := fmt.Sprintf("my-reader-compaction-topic-%d", time.Now().Unix())
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+
+ assertNil(t, err)
+ defer producer.Close()
+
+ ctx := context.Background()
+
+ for i := 0; i < 10; i++ {
+ if err := producer.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ Key: "Same-Key",
+ }); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ // Compact topic and wait for operation to complete
+ url := fmt.Sprintf("http://localhost:8080/admin/v2/persistent/public/default/%s/compaction", topic)
+ makeHttpPutCall(t, url)
+ for {
+ res := makeHttpGetCall(t, url)
+ if strings.Contains(res, "RUNNING") {
+ fmt.Println("Compaction still running")
+ time.Sleep(100 * time.Millisecond)
+ continue
+ } else {
+ assertEqual(t, strings.Contains(res, "SUCCESS"), true)
+ fmt.Println("Compaction is done")
+ break
+ }
+ }
+
+ // Restart the consumers
+
+ reader1, err := client.CreateReader(ReaderOptions{
+ Topic: topic,
+ StartMessageID: EarliestMessage,
+ })
+
+ assertNil(t, err)
+ defer reader1.Close()
+
+ reader2, err := client.CreateReader(ReaderOptions{
+ Topic: topic,
+ StartMessageID: EarliestMessage,
+ ReadCompacted: true,
+ })
+
+ assertNil(t, err)
+ defer reader2.Close()
+
+ // Reader-1 will receive all messages
+ for i := 0; i < 10; i++ {
+ msg, err := reader1.Next(context.Background())
+ assertNil(t, err)
+ assertNotNil(t, msg)
+
+ assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
+ }
+
+ // Reader-2 will only receive the last message
+ msg, err := reader2.Next(context.Background())
+ assertNil(t, err)
+ assertNotNil(t, msg)
+ assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-9"))
+
+ // No more messages on consumer-2
+ ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
+ defer cancel()
+
+ msg, err = reader2.Next(ctx)
+ assertNil(t, msg)
+ assertNotNil(t, err)
+}
+