You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2024/04/18 13:08:38 UTC

(pulsar-client-go) branch master updated: [improve] Use physical address information in connection pool key (#1206)

This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 26e80855 [improve] Use physical address information in connection pool key (#1206)
26e80855 is described below

commit 26e80855bb9848d41781840368e53d74f7df1102
Author: Dragos Misca <dr...@users.noreply.github.com>
AuthorDate: Thu Apr 18 06:08:33 2024 -0700

    [improve] Use physical address information in connection pool key (#1206)
    
    ### Motivation
    
    Migrate https://github.com/apache/pulsar/pull/22085/ and (parts of) https://github.com/apache/pulsar-client-cpp/pull/411/ over to the Go client. Context for this idea [here](https://github.com/apache/pulsar/pull/22085/files#r1497008116).
    
    Golang client support for blue-green migration needs the connection pool to differentiate between connections with the same logical address, but different physical addresses. Otherwise, the wrong connection might be used by the client, in effect pointing to the old cluster, instead of the new one.
    
    ### Modifications
    
    The connection pool maintains a map of connections, keyed by their logical address and a random connection id. This PR proposes including the physical address in the key also, therefore allowing the upper layer to differentiate between connections with identical logical addresses, but different physical addresses.
    
    In addition to this change, the test setup had to be fixed to address breakages in `TestRetryWithMultipleHosts` and `TestReaderWithMultiHosts`. All tests in the repository are using a local standalone setup currently. This unusual configuration has broker lookup operations reply with flag `proxyThroughServiceUrl=true` ([ref](https://github.com/apache/pulsar/blob/e7c2a75473b545134a3b292ae0e87a79d65cb756/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java#L3 [...]
    
    | Logical Address | Physical Address | Notes |
    | --------------- | ---------------- | ----- |
    | reachable-broker | reachable-broker | Valid |
    | unreachable-broker | unreachable-broker | Valid, but currently unusable |
    | reachable-broker | unreachable-broker | *Invalid entry* |
    
    To address the issue:
    - Switch the test setup to a more common cluster configuration. File `integration-tests/clustered/docker-compose.yml` instructs how this setup should look like.
    - Migrate the tests to separate files and test suites. New test files `pulsar/client_impl_clustered_test.go` and `pulsar/reader_clustered_test.go` contain Go tag `clustered`, allowing them to be ignored during the standalone test runs by virtue of the Go build process.
    - Add script `run-ci-clustered.sh`, specifying the "clustered" tests to run.
    - Changes in the `Makefile` add targets `make test_clustered` `make test_standalone` to run the respective test suites independently, while allowing `make test` to run all the tests, as before.
    - `Dockerfile` and `run-ci.sh` are modified to run the Go build process in the container build, such that it does not need to be run again in the new `run-ci-clustered.sh` script. The image is locally consumed by the tests only and is not published, so there is no risk of contaminating users.
---
 Dockerfile                                     |  13 ++
 Makefile                                       |  10 +-
 integration-tests/clustered/docker-compose.yml | 167 +++++++++++++++++++++++++
 pulsar/client_impl_clustered_test.go           |  89 +++++++++++++
 pulsar/client_impl_test.go                     |  55 --------
 pulsar/internal/connection_pool.go             |   7 +-
 pulsar/reader_clustered_test.go                |  88 +++++++++++++
 pulsar/reader_test.go                          |  52 --------
 scripts/{run-ci.sh => run-ci-clustered.sh}     |  17 +--
 scripts/run-ci.sh                              |  10 --
 10 files changed, 371 insertions(+), 137 deletions(-)

diff --git a/Dockerfile b/Dockerfile
index 818a106a..51e35f0a 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -42,3 +42,16 @@ COPY integration-tests/conf/.htpasswd \
 COPY . /pulsar/pulsar-client-go
 
 ENV PULSAR_EXTRA_OPTS="-Dpulsar.auth.basic.conf=/pulsar/conf/.htpasswd"
+
+WORKDIR /pulsar/pulsar-client-go
+
+ENV GOPATH=/pulsar/go
+ENV GOCACHE=/tmp/go-cache
+
+# Install dependencies
+RUN go mod download
+
+# Basic compilation
+RUN go build ./pulsar
+RUN go build ./pulsaradmin
+RUN go build -o bin/pulsar-perf ./perf
diff --git a/Makefile b/Makefile
index 62e44166..df4d539d 100644
--- a/Makefile
+++ b/Makefile
@@ -44,9 +44,17 @@ container:
 	  --build-arg PULSAR_IMAGE="${PULSAR_IMAGE}" \
 	  --build-arg ARCH="${CONTAINER_ARCH}" .
 
-test: container
+test: container test_standalone test_clustered
+
+test_standalone: container
 	docker run -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci.sh"
 
+test_clustered: container
+	PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml up -d || true
+	until curl http://localhost:8080/metrics > /dev/null 2>&1; do sleep 1; done
+	docker run --network "clustered_pulsar" -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci-clustered.sh"
+	PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml down
+
 clean:
 	docker rmi --force $(IMAGE_NAME) || true
 	rm bin/*
diff --git a/integration-tests/clustered/docker-compose.yml b/integration-tests/clustered/docker-compose.yml
new file mode 100644
index 00000000..cce8eddd
--- /dev/null
+++ b/integration-tests/clustered/docker-compose.yml
@@ -0,0 +1,167 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3'
+networks:
+  pulsar:
+    driver: bridge
+services:
+  # Start ZooKeeper
+  zookeeper:
+    image: apachepulsar/pulsar:${PULSAR_VERSION}
+    container_name: zookeeper
+    restart: on-failure
+    networks:
+      - pulsar
+    environment:
+      - metadataStoreUrl=zk:zookeeper:2181
+      - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+    command: >
+      bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \
+             bin/generate-zookeeper-config.sh conf/zookeeper.conf && \
+             exec bin/pulsar zookeeper"
+    healthcheck:
+      test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"]
+      interval: 10s
+      timeout: 5s
+      retries: 30
+
+  # Initialize cluster metadata
+  pulsar-init:
+    container_name: pulsar-init
+    hostname: pulsar-init
+    image: apachepulsar/pulsar:${PULSAR_VERSION}
+    networks:
+      - pulsar
+    environment:
+      - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+    command: >
+      bin/pulsar initialize-cluster-metadata \
+               --cluster cluster-a \
+               --zookeeper zookeeper:2181 \
+               --configuration-store zookeeper:2181 \
+               --web-service-url http://broker-1:8080 \
+               --broker-service-url pulsar://broker-1:6650
+    depends_on:
+      zookeeper:
+        condition: service_healthy
+
+  # Start bookie
+  bookie:
+    image: apachepulsar/pulsar:${PULSAR_VERSION}
+    container_name: bookie
+    restart: on-failure
+    networks:
+      - pulsar
+    environment:
+      - clusterName=cluster-a
+      - zkServers=zookeeper:2181
+      - metadataServiceUri=metadata-store:zk:zookeeper:2181
+      - advertisedAddress=bookie
+      - BOOKIE_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+    depends_on:
+      zookeeper:
+        condition: service_healthy
+      pulsar-init:
+        condition: service_completed_successfully
+    command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie"
+
+  proxy:
+    image: apachepulsar/pulsar:${PULSAR_VERSION}
+    container_name: proxy
+    hostname: proxy
+    restart: on-failure
+    networks:
+      - pulsar
+    environment:
+      - metadataStoreUrl=zk:zookeeper:2181
+      - zookeeperServers=zookeeper:2181
+      - clusterName=cluster-a
+      - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
+    ports:
+      - "8080:8080"
+      - "6650:6650"
+    depends_on:
+      broker-1:
+        condition: service_healthy
+      broker-2:
+        condition: service_healthy
+    command: bash -c "bin/apply-config-from-env.py conf/proxy.conf && exec bin/pulsar proxy"
+
+  # Start broker 1
+  broker-1:
+    image: apachepulsar/pulsar:${PULSAR_VERSION}
+    container_name: broker-1
+    hostname: broker-1
+    restart: on-failure
+    networks:
+      - pulsar
+    environment:
+      - metadataStoreUrl=zk:zookeeper:2181
+      - zookeeperServers=zookeeper:2181
+      - clusterName=cluster-a
+      - managedLedgerDefaultEnsembleSize=1
+      - managedLedgerDefaultWriteQuorum=1
+      - managedLedgerDefaultAckQuorum=1
+      - advertisedAddress=broker-1
+      - internalListenerName=internal
+      - advertisedListeners=internal:pulsar://broker-1:6650
+      - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
+      - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+    depends_on:
+      zookeeper:
+        condition: service_healthy
+      bookie:
+        condition: service_started
+    command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
+    healthcheck:
+      test: ["CMD-SHELL", "curl -f http://localhost:8080/metrics || exit 1"]
+      interval: 10s
+      timeout: 5s
+      retries: 30
+
+  # Start broker 2
+  broker-2:
+    image: apachepulsar/pulsar:${PULSAR_VERSION}
+    container_name: broker-2
+    hostname: broker-2
+    restart: on-failure
+    networks:
+      - pulsar
+    environment:
+      - metadataStoreUrl=zk:zookeeper:2181
+      - zookeeperServers=zookeeper:2181
+      - clusterName=cluster-a
+      - managedLedgerDefaultEnsembleSize=1
+      - managedLedgerDefaultWriteQuorum=1
+      - managedLedgerDefaultAckQuorum=1
+      - advertisedAddress=broker-2
+      - internalListenerName=internal
+      - advertisedListeners=internal:pulsar://broker-2:6650
+      - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
+      - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
+    depends_on:
+      zookeeper:
+        condition: service_healthy
+      bookie:
+        condition: service_started
+    command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
+    healthcheck:
+      test: ["CMD-SHELL", "curl -f http://localhost:8080/metrics || exit 1"]
+      interval: 10s
+      timeout: 5s
+      retries: 30
diff --git a/pulsar/client_impl_clustered_test.go b/pulsar/client_impl_clustered_test.go
new file mode 100644
index 00000000..e572c774
--- /dev/null
+++ b/pulsar/client_impl_clustered_test.go
@@ -0,0 +1,89 @@
+//go:build clustered
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/stretchr/testify/suite"
+)
+
+type clientClusteredTestSuite struct {
+	suite.Suite
+}
+
+func TestClientClusteredTestSuite(t *testing.T) {
+	suite.Run(t, new(clientClusteredTestSuite))
+}
+
+func (suite *clientClusteredTestSuite) TestRetryWithMultipleHosts() {
+	req := suite.Require()
+	// Multi hosts included an unreached port and the actual port for verify retry logic
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://broker-1:6600,broker-1:6650",
+	})
+	req.NoError(err)
+	defer client.Close()
+
+	topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+	req.NoError(err)
+	defer producer.Close()
+
+	ctx := context.Background()
+	var msgIDs [][]byte
+
+	for i := 0; i < 10; i++ {
+		if msgID, err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			req.NoError(err)
+		} else {
+			req.NotNil(msgID)
+			msgIDs = append(msgIDs, msgID.Serialize())
+		}
+	}
+
+	req.Equal(10, len(msgIDs))
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       topic,
+		SubscriptionName:            "retry-multi-hosts-sub",
+		Type:                        Shared,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	req.NoError(err)
+	defer consumer.Close()
+
+	for i := 0; i < 10; i++ {
+		msg, err := consumer.Receive(context.Background())
+		req.NoError(err)
+		req.Contains(msgIDs, msg.ID().Serialize())
+		consumer.Ack(msg)
+	}
+
+	err = consumer.Unsubscribe()
+	req.NoError(err)
+}
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 78dc1cae..5b6b8f1f 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -570,61 +570,6 @@ func anonymousNamespacePolicy() map[string]interface{} {
 	}
 }
 
-func TestRetryWithMultipleHosts(t *testing.T) {
-	// Multi hosts included an unreached port and the actual port for verify retry logic
-	client, err := NewClient(ClientOptions{
-		URL: "pulsar://localhost:6600,localhost:6650",
-	})
-
-	assert.Nil(t, err)
-	defer client.Close()
-
-	topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName()
-
-	producer, err := client.CreateProducer(ProducerOptions{
-		Topic: topic,
-	})
-
-	assert.Nil(t, err)
-	defer producer.Close()
-
-	ctx := context.Background()
-	var msgIDs [][]byte
-
-	for i := 0; i < 10; i++ {
-		if msgID, err := producer.Send(ctx, &ProducerMessage{
-			Payload: []byte(fmt.Sprintf("hello-%d", i)),
-		}); err != nil {
-			assert.Nil(t, err)
-		} else {
-			assert.NotNil(t, msgID)
-			msgIDs = append(msgIDs, msgID.Serialize())
-		}
-	}
-
-	assert.Equal(t, 10, len(msgIDs))
-
-	consumer, err := client.Subscribe(ConsumerOptions{
-		Topic:                       topic,
-		SubscriptionName:            "retry-multi-hosts-sub",
-		Type:                        Shared,
-		SubscriptionInitialPosition: SubscriptionPositionEarliest,
-	})
-	assert.Nil(t, err)
-	defer consumer.Close()
-
-	for i := 0; i < 10; i++ {
-		msg, err := consumer.Receive(context.Background())
-		assert.Nil(t, err)
-		assert.Contains(t, msgIDs, msg.ID().Serialize())
-		consumer.Ack(msg)
-	}
-
-	err = consumer.Unsubscribe()
-	assert.Nil(t, err)
-
-}
-
 func TestHTTPSConnectionCAError(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL:              webServiceURLTLS,
diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go
index 6ff79919..3d718b75 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -79,7 +79,8 @@ func NewConnectionPool(
 }
 
 func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
-	key := p.getMapKey(logicalAddr)
+	p.log.WithField("logicalAddr", logicalAddr).WithField("physicalAddr", physicalAddr).Debug("Getting pooled connection")
+	key := p.getMapKey(logicalAddr, physicalAddr)
 
 	p.Lock()
 	conn, ok := p.connections[key]
@@ -133,13 +134,13 @@ func (p *connectionPool) Close() {
 	p.Unlock()
 }
 
-func (p *connectionPool) getMapKey(addr *url.URL) string {
+func (p *connectionPool) getMapKey(logicalAddr *url.URL, physicalAddr *url.URL) string {
 	cnt := atomic.AddInt32(&p.roundRobinCnt, 1)
 	if cnt < 0 {
 		cnt = -cnt
 	}
 	idx := cnt % p.maxConnectionsPerHost
-	return fmt.Sprint(addr.Host, '-', idx)
+	return fmt.Sprint(logicalAddr.Host, "-", physicalAddr.Host, "-", idx)
 }
 
 func (p *connectionPool) checkAndCleanIdleConnections(maxIdleTime time.Duration) {
diff --git a/pulsar/reader_clustered_test.go b/pulsar/reader_clustered_test.go
new file mode 100644
index 00000000..db2dc6f1
--- /dev/null
+++ b/pulsar/reader_clustered_test.go
@@ -0,0 +1,88 @@
+//go:build clustered
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/stretchr/testify/suite"
+)
+
+type ReaderClusteredTestSuite struct {
+	suite.Suite
+}
+
+func TestReaderClusteredTestSuite(t *testing.T) {
+	suite.Run(t, new(ReaderClusteredTestSuite))
+}
+
+func (suite *ReaderClusteredTestSuite) TestReaderWithMultipleHosts() {
+	req := suite.Require()
+
+	// Multi hosts included an unreached port and the actual port for verify retry logic
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://broker-1:6600,broker-1:6650",
+	})
+	req.NoError(err)
+	defer client.Close()
+
+	topic := newTopicName()
+	ctx := context.Background()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: true,
+	})
+	req.NoError(err)
+	defer producer.Close()
+
+	// send 10 messages
+	for i := 0; i < 10; i++ {
+		msgID, err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		})
+		req.NoError(err)
+		req.NotNil(msgID)
+	}
+
+	// create reader on 5th message (not included)
+	reader, err := client.CreateReader(ReaderOptions{
+		Topic:          topic,
+		StartMessageID: EarliestMessageID(),
+	})
+	req.NoError(err)
+	defer reader.Close()
+
+	i := 0
+	for reader.HasNext() {
+		msg, err := reader.Next(context.Background())
+		req.NoError(err)
+
+		expectMsg := fmt.Sprintf("hello-%d", i)
+		req.Equal([]byte(expectMsg), msg.Payload())
+
+		i++
+	}
+
+	req.Equal(10, i)
+}
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 78c222da..93787d10 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -724,58 +724,6 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) {
 	assert.False(t, reader.HasNext())
 }
 
-func TestReaderWithMultiHosts(t *testing.T) {
-	// Multi hosts included an unreached port and the actual port for verify retry logic
-	client, err := NewClient(ClientOptions{
-		URL: "pulsar://localhost:6600,localhost:6650",
-	})
-
-	assert.Nil(t, err)
-	defer client.Close()
-
-	topic := newTopicName()
-	ctx := context.Background()
-
-	// create producer
-	producer, err := client.CreateProducer(ProducerOptions{
-		Topic:           topic,
-		DisableBatching: true,
-	})
-	assert.Nil(t, err)
-	defer producer.Close()
-
-	// send 10 messages
-	for i := 0; i < 10; i++ {
-		msgID, err := producer.Send(ctx, &ProducerMessage{
-			Payload: []byte(fmt.Sprintf("hello-%d", i)),
-		})
-		assert.NoError(t, err)
-		assert.NotNil(t, msgID)
-	}
-
-	// create reader on 5th message (not included)
-	reader, err := client.CreateReader(ReaderOptions{
-		Topic:          topic,
-		StartMessageID: EarliestMessageID(),
-	})
-
-	assert.Nil(t, err)
-	defer reader.Close()
-
-	i := 0
-	for reader.HasNext() {
-		msg, err := reader.Next(context.Background())
-		assert.NoError(t, err)
-
-		expectMsg := fmt.Sprintf("hello-%d", i)
-		assert.Equal(t, []byte(expectMsg), msg.Payload())
-
-		i++
-	}
-
-	assert.Equal(t, 10, i)
-}
-
 func TestProducerReaderRSAEncryption(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
diff --git a/scripts/run-ci.sh b/scripts/run-ci-clustered.sh
similarity index 73%
copy from scripts/run-ci.sh
copy to scripts/run-ci-clustered.sh
index 83246a39..5e4f36fa 100755
--- a/scripts/run-ci.sh
+++ b/scripts/run-ci-clustered.sh
@@ -16,23 +16,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-
 set -e -x
 
-export GOPATH=/pulsar/go
-export GOCACHE=/tmp/go-cache
-
-# Install dependencies
-go mod download
-
-# Basic compilation
-go build ./pulsar
-go build -o bin/pulsar-perf ./perf
-
-scripts/pulsar-test-service-start.sh
-
-go test -race -coverprofile=/tmp/coverage -timeout=20m -v ./...
+go test -race -coverprofile=/tmp/coverage -timeout=5m -tags clustered -v -run 'Test.*ClusteredTestSuite' -v ./pulsar
 go tool cover -html=/tmp/coverage -o coverage.html
 
-scripts/pulsar-test-service-stop.sh
-
diff --git a/scripts/run-ci.sh b/scripts/run-ci.sh
index 83246a39..e7a6d79e 100755
--- a/scripts/run-ci.sh
+++ b/scripts/run-ci.sh
@@ -19,16 +19,6 @@
 
 set -e -x
 
-export GOPATH=/pulsar/go
-export GOCACHE=/tmp/go-cache
-
-# Install dependencies
-go mod download
-
-# Basic compilation
-go build ./pulsar
-go build -o bin/pulsar-perf ./perf
-
 scripts/pulsar-test-service-start.sh
 
 go test -race -coverprofile=/tmp/coverage -timeout=20m -v ./...