You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/11/28 12:10:57 UTC

(inlong) branch master updated: [INLONG-8674][Manager] Pulsar - Modify the calling method from SDK to HTTP (#8941)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d8c83804b1 [INLONG-8674][Manager] Pulsar - Modify the calling method from SDK to HTTP (#8941)
d8c83804b1 is described below

commit d8c83804b1078089a233779b042f24067378dbc4
Author: haibo.duan <dh...@live.cn>
AuthorDate: Tue Nov 28 20:10:51 2023 +0800

    [INLONG-8674][Manager] Pulsar - Modify the calling method from SDK to HTTP (#8941)
---
 .../inlong/manager/common/util/HttpUtils.java      |  13 +
 .../queue/pulsar/PulsarBrokerEntryMetadata.java    |  33 +
 .../pojo/queue/pulsar/PulsarLookupTopicInfo.java   |  35 +
 .../pojo/queue/pulsar/PulsarMessageInfo.java       |  40 +
 .../pojo/queue/pulsar/PulsarMessageMetadata.java   |  63 ++
 .../pojo/queue/pulsar/PulsarNamespacePolicies.java |  34 +
 .../queue/pulsar/PulsarPersistencePolicies.java    |  35 +
 .../pojo/queue/pulsar/PulsarRetentionPolicies.java |  33 +
 .../pojo/queue/pulsar/PulsarTenantInfo.java        |  36 +
 .../pojo/queue/pulsar/PulsarTopicMetadata.java     |  38 +
 inlong-manager/manager-service/pom.xml             |  85 +-
 .../service/cluster/PulsarClusterOperator.java     |  10 +-
 .../apply/ApproveConsumeProcessListener.java       |  10 +-
 .../node/pulsar/PulsarDataNodeOperator.java        |  10 +-
 .../resource/queue/pulsar/PulsarOperator.java      | 212 +++--
 .../queue/pulsar/PulsarQueueResourceOperator.java  | 163 ++--
 .../service/resource/queue/pulsar/PulsarUtils.java | 958 ++++++++++++++++++++-
 .../sink/pulsar/PulsarResourceOperator.java        |  14 +-
 .../manager/service/queue/PulsarUtilsTest.java     | 581 ++++++++++++-
 19 files changed, 2072 insertions(+), 331 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
index ace3d513f1..1e7efe68ae 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
@@ -112,6 +112,19 @@ public class HttpUtils {
         return response.getBody();
     }
 
+    /**
+     * Send an void HTTP request
+     */
+    public static void request(RestTemplate restTemplate, String url, HttpMethod httpMethod, Object requestBody,
+            HttpHeaders header) {
+        log.debug("begin request to {} by request body {}", url, GSON.toJson(requestBody));
+        HttpEntity<Object> requestEntity = new HttpEntity<>(requestBody, header);
+        ResponseEntity<String> response = restTemplate.exchange(url, httpMethod, requestEntity, String.class);
+
+        log.debug("success request to {}, status code {}", url, response.getStatusCode());
+        Preconditions.expectTrue(response.getStatusCode().is2xxSuccessful(), "Request failed");
+    }
+
     /**
      * Send GET request to the specified URL.
      */
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarBrokerEntryMetadata.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarBrokerEntryMetadata.java
new file mode 100644
index 0000000000..de02d96ad6
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarBrokerEntryMetadata.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarBrokerEntryMetadata {
+
+    private long brokerTimestamp;
+    private long index;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarLookupTopicInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarLookupTopicInfo.java
new file mode 100644
index 0000000000..12596e2ab3
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarLookupTopicInfo.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarLookupTopicInfo {
+
+    private String brokerUrl;
+    private String httpUrl;
+    private String nativeUrl;
+    private String brokerUrlSsl;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageInfo.java
new file mode 100644
index 0000000000..c84b8cbf9f
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageInfo.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarMessageInfo {
+
+    private String messageId;
+    private String topic;
+    private byte[] body;
+    private transient Map<String, String> properties;
+    private boolean poolMessage;
+    private PulsarBrokerEntryMetadata pulsarBrokerEntryMetadata;
+    private PulsarMessageMetadata pulsarMessageMetadata;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageMetadata.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageMetadata.java
new file mode 100644
index 0000000000..3cfbae2fb4
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageMetadata.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarMessageMetadata {
+
+    private int payloadSize;
+    private String partitionKey;
+    private boolean compactedOut;
+    private long eventTime;
+    private boolean partitionKeyB64Encoded;
+    private byte[] orderingKey;
+    private long sequenceId;
+    private boolean nullValue;
+    private boolean nullPartitionKey;
+    private Map<String, String> properties;
+    private long publishTime;
+    private long deliverAtTime;
+    private int markerType;
+    private long txnidLeastBits;
+    private long txnidMostBits;
+    private long highestSequenceId;
+    private String uuid;
+    private int numChunksFromMsg;
+    private int totalChunkMsgSize;
+    private int chunkId;
+    private String producerName;
+    private String replicatedFrom;
+    private int uncompressedSize;
+    private int numMessagesInBatch;
+    private String encryptionAlgo;
+    private String compression;
+    private byte[] encryptionParam;
+    private byte[] schemaVersion;
+    private List<String> replicateTos;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarNamespacePolicies.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarNamespacePolicies.java
new file mode 100644
index 0000000000..94dbd47bf7
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarNamespacePolicies.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarNamespacePolicies {
+
+    private int messageTtlInSeconds;
+    private PulsarRetentionPolicies retentionPolicies;
+    private PulsarPersistencePolicies persistence;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarPersistencePolicies.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarPersistencePolicies.java
new file mode 100644
index 0000000000..ada4f914fe
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarPersistencePolicies.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarPersistencePolicies {
+
+    private int bookkeeperEnsemble;
+    private int bookkeeperWriteQuorum;
+    private int bookkeeperAckQuorum;
+    private double managedLedgerMaxMarkDeleteRate;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarRetentionPolicies.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarRetentionPolicies.java
new file mode 100644
index 0000000000..155675291e
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarRetentionPolicies.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarRetentionPolicies {
+
+    private int retentionTimeInMinutes;
+    private long retentionSizeInMB;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTenantInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTenantInfo.java
new file mode 100644
index 0000000000..2e093c1a57
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTenantInfo.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Set;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarTenantInfo {
+
+    Set<String> adminRoles;
+
+    Set<String> allowedClusters;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicMetadata.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicMetadata.java
new file mode 100644
index 0000000000..c7de1aacf2
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicMetadata.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarTopicMetadata {
+
+    private int partitions;
+
+    private boolean deleted;
+
+    private Map<String, String> properties;
+}
diff --git a/inlong-manager/manager-service/pom.xml b/inlong-manager/manager-service/pom.xml
index f3bd26f7f8..5981d30332 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -122,36 +122,6 @@
             </exclusions>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.pulsar</groupId>
-            <artifactId>pulsar-client</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>javax.validation</groupId>
-                    <artifactId>validation-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.pulsar</groupId>
-                    <artifactId>bouncy-castle-bc</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.bouncycastle</groupId>
-                    <artifactId>bcpkix-jdk15on</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.bouncycastle</groupId>
-                    <artifactId>bcprov-jdk15on</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.bouncycastle</groupId>
-                    <artifactId>bcutil-jdk15on</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.bouncycastle</groupId>
-                    <artifactId>bcprov-ext-jdk15on</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
         <dependency>
             <groupId>com.tencentcloudapi</groupId>
             <artifactId>tencentcloud-sdk-java-cls</artifactId>
@@ -288,48 +258,6 @@
             </exclusions>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.pulsar</groupId>
-            <artifactId>pulsar-client-admin</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.commons</groupId>
-                    <artifactId>commons-compress</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.checkerframework</groupId>
-                    <artifactId>checker-qual</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.validation</groupId>
-                    <artifactId>validation-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>jul-to-slf4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.pulsar</groupId>
-                    <artifactId>bouncy-castle-bc</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.bouncycastle</groupId>
-                    <artifactId>bcpkix-jdk15on</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.bouncycastle</groupId>
-                    <artifactId>bcprov-jdk15on</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.bouncycastle</groupId>
-                    <artifactId>bcutil-jdk15on</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.bouncycastle</groupId>
-                    <artifactId>bcprov-ext-jdk15on</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.dataformat</groupId>
             <artifactId>jackson-dataformat-yaml</artifactId>
@@ -572,6 +500,19 @@
             <artifactId>sdk-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
index b2dea0bb7b..d4b34e83f2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
@@ -33,11 +33,11 @@ import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -60,6 +60,9 @@ public class PulsarClusterOperator extends AbstractClusterOperator {
     @Autowired
     private ObjectMapper objectMapper;
 
+    @Autowired
+    private RestTemplate restTemplate;
+
     @Override
     public Boolean accept(String clusterType) {
         return getClusterType().equals(clusterType);
@@ -127,9 +130,10 @@ public class PulsarClusterOperator extends AbstractClusterOperator {
      * @return
      */
     private Boolean testConnectAdminUrl(PulsarClusterInfo pulsarInfo) {
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarInfo)) {
+
+        try {
             // test connect for pulsar adminUrl
-            pulsarAdmin.tenants().getTenants();
+            PulsarUtils.getTenants(restTemplate, pulsarInfo);
             return true;
         } catch (Exception e) {
             String errMsg = String.format("Pulsar connection failed for AdminUrl=%s", pulsarInfo.getAdminUrl());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
index 1e6114bffa..5b39e14bf2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
@@ -38,7 +38,6 @@ import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
-import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
 import org.apache.inlong.manager.service.resource.queue.tubemq.TubeMQOperator;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -46,7 +45,6 @@ import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -131,7 +129,7 @@ public class ApproveConsumeProcessListener implements ProcessEventListener {
         String clusterTag = groupEntity.getInlongClusterTag();
         ClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.PULSAR);
         PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+        try {
             InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
             String tenant = pulsarDTO.getPulsarTenant();
             if (StringUtils.isBlank(tenant)) {
@@ -142,7 +140,7 @@ public class ApproveConsumeProcessListener implements ProcessEventListener {
             topicMessage.setNamespace(mqResource);
 
             List<String> topics = Arrays.asList(entity.getTopic().split(InlongConstants.COMMA));
-            this.createPulsarSubscription(pulsarAdmin, entity.getConsumerGroup(), topicMessage, topics);
+            this.createPulsarSubscription(pulsarCluster, entity.getConsumerGroup(), topicMessage, topics);
         } catch (Exception e) {
             log.error("create pulsar topic failed", e);
             throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: "
@@ -150,10 +148,10 @@ public class ApproveConsumeProcessListener implements ProcessEventListener {
         }
     }
 
-    private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicInfo topicInfo,
+    private void createPulsarSubscription(PulsarClusterInfo clusterInfo, String subscription, PulsarTopicInfo topicInfo,
             List<String> topics) {
         try {
-            pulsarOperator.createSubscriptions(pulsarAdmin, subscription, topicInfo, topics);
+            pulsarOperator.createSubscriptions(clusterInfo, subscription, topicInfo, topics);
         } catch (Exception e) {
             log.error("create pulsar consumer group failed", e);
             throw new WorkflowListenerException("failed to create pulsar consumer group");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
index 819df3df34..6b9025741b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
@@ -34,11 +34,11 @@ import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
 
 /**
  * Pulsar data node operator
@@ -51,6 +51,9 @@ public class PulsarDataNodeOperator extends AbstractDataNodeOperator {
     @Autowired
     private ObjectMapper objectMapper;
 
+    @Autowired
+    private RestTemplate restTemplate;
+
     @Override
     public Boolean accept(String dataNodeType) {
         return getDataNodeType().equals(dataNodeType);
@@ -106,15 +109,14 @@ public class PulsarDataNodeOperator extends AbstractDataNodeOperator {
 
         PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder().adminUrl(adminUrl)
                 .token(token).build();
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
+        try {
             // test connect for pulsar adminUrl
-            pulsarAdmin.tenants().getTenants();
+            PulsarUtils.getTenants(restTemplate, pulsarClusterInfo);
             return true;
         } catch (Exception e) {
             String errMsg = String.format("Pulsar connection failed for AdminUrl=%s", pulsarClusterInfo.getAdminUrl());
             LOGGER.error(errMsg, e);
             throw new BusinessException(errMsg);
         }
-
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 77feca618c..7c55f65e43 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -23,9 +23,16 @@ import org.apache.inlong.manager.common.conversion.ConversionHandle;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarNamespacePolicies;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarPersistencePolicies;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarRetentionPolicies;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTenantInfo;
 import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
 import org.apache.inlong.manager.service.message.DeserializeOperator;
@@ -33,19 +40,12 @@ import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
 
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.admin.Namespaces;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-import org.apache.pulsar.common.policies.data.PersistencePolicies;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -65,42 +65,44 @@ public class PulsarOperator {
     private static final int MAX_PARTITION = 1000;
     private static final int RETRY_TIMES = 3;
     private static final int DELAY_SECONDS = 5;
-    private static final String PARSE_ATTR_ERROR_STRING = "Could not find %s in attributes!";
     @Autowired
     public DeserializeOperatorFactory deserializeOperatorFactory;
     @Autowired
     private ConversionHandle conversionHandle;
+    @Autowired
+    private RestTemplate restTemplate;
 
     /**
      * Create Pulsar tenant
      */
-    public void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
+    public void createTenant(PulsarClusterInfo pulsarClusterInfo, String tenant) throws Exception {
         LOGGER.info("begin to create pulsar tenant={}", tenant);
         Preconditions.expectNotBlank(tenant, ErrorCodeEnum.INVALID_PARAMETER, "Tenant cannot be empty");
 
         try {
-            List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
-            boolean exists = this.tenantIsExists(pulsarAdmin, tenant);
+            List<String> clusters = PulsarUtils.getClusters(restTemplate, pulsarClusterInfo);
+            boolean exists = this.tenantIsExists(pulsarClusterInfo, tenant);
             if (exists) {
                 LOGGER.warn("pulsar tenant={} already exists, skip to create", tenant);
                 return;
             }
-            TenantInfoImpl tenantInfo = new TenantInfoImpl();
+            PulsarTenantInfo tenantInfo = new PulsarTenantInfo();
             tenantInfo.setAllowedClusters(Sets.newHashSet(clusters));
             tenantInfo.setAdminRoles(Sets.newHashSet());
-            pulsarAdmin.tenants().createTenant(tenant, tenantInfo);
+            PulsarUtils.createTenant(restTemplate, pulsarClusterInfo, tenant, tenantInfo);
             LOGGER.info("success to create pulsar tenant={}", tenant);
-        } catch (PulsarAdminException e) {
+        } catch (Exception e) {
             LOGGER.error("failed to create pulsar tenant=" + tenant, e);
             throw e;
         }
     }
 
     /**
-     * Create Pulsar namespace
+     * Create Pulsar namespace.
      */
-    public void createNamespace(PulsarAdmin pulsarAdmin, InlongPulsarInfo pulsarInfo, String tenant, String namespace)
-            throws PulsarAdminException {
+    public void createNamespace(PulsarClusterInfo pulsarClusterInfo, InlongPulsarInfo pulsarInfo, String tenant,
+            String namespace)
+            throws Exception {
         Preconditions.expectNotBlank(tenant, ErrorCodeEnum.INVALID_PARAMETER,
                 "pulsar tenant cannot be empty during create namespace");
         Preconditions.expectNotBlank(namespace, ErrorCodeEnum.INVALID_PARAMETER,
@@ -110,19 +112,17 @@ public class PulsarOperator {
         LOGGER.info("begin to create namespace={}", namespaceName);
         try {
             // Check whether the namespace exists, and create it if it does not exist
-            boolean isExists = this.namespaceExists(pulsarAdmin, tenant, namespace);
+            boolean isExists = this.namespaceExists(pulsarClusterInfo, tenant, namespace);
             if (isExists) {
                 LOGGER.warn("namespace={} already exists, skip to create", namespaceName);
                 return;
             }
 
-            List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
-            Namespaces namespaces = pulsarAdmin.namespaces();
-            namespaces.createNamespace(namespaceName, Sets.newHashSet(clusters));
+            PulsarNamespacePolicies policies = new PulsarNamespacePolicies();
             // Configure message TTL
             Integer ttl = pulsarInfo.getTtl();
             if (ttl > 0) {
-                namespaces.setNamespaceMessageTTL(namespaceName, conversionHandle.handleConversion(ttl,
+                policies.setMessageTtlInSeconds(conversionHandle.handleConversion(ttl,
                         pulsarInfo.getTtlUnit().toLowerCase() + "_seconds"));
             }
 
@@ -139,15 +139,17 @@ public class PulsarOperator {
             }
 
             // Configure retention policies
-            RetentionPolicies retentionPolicies = new RetentionPolicies(retentionTime, retentionSize);
-            namespaces.setRetention(namespaceName, retentionPolicies);
+            PulsarRetentionPolicies retentionPolicies = new PulsarRetentionPolicies(retentionTime, retentionSize);
+            policies.setRetentionPolicies(retentionPolicies);
 
             // Configure persistence policies
-            PersistencePolicies persistencePolicies = new PersistencePolicies(pulsarInfo.getEnsemble(),
+            PulsarPersistencePolicies persistencePolicies = new PulsarPersistencePolicies(pulsarInfo.getEnsemble(),
                     pulsarInfo.getWriteQuorum(), pulsarInfo.getAckQuorum(), pulsarInfo.getMaxMarkDeleteRate());
-            namespaces.setPersistence(namespaceName, persistencePolicies);
+            policies.setPersistence(persistencePolicies);
+
+            PulsarUtils.createNamespace(restTemplate, pulsarClusterInfo, tenant, namespaceName, policies);
             LOGGER.info("success to create namespace={}", namespaceName);
-        } catch (PulsarAdminException e) {
+        } catch (Exception e) {
             LOGGER.error("failed to create namespace=" + namespaceName, e);
             throw e;
         }
@@ -156,7 +158,7 @@ public class PulsarOperator {
     /**
      * Create Pulsar topic
      */
-    public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicInfo topicInfo) throws PulsarAdminException {
+    public void createTopic(PulsarClusterInfo pulsarClusterInfo, PulsarTopicInfo topicInfo) throws Exception {
         Preconditions.expectNotNull(topicInfo, "pulsar topic info cannot be empty");
         String tenant = topicInfo.getPulsarTenant();
         String namespace = topicInfo.getNamespace();
@@ -164,33 +166,35 @@ public class PulsarOperator {
         String fullTopicName = tenant + "/" + namespace + "/" + topicName;
 
         // Topic will be returned if it exists, and created if it does not exist
-        if (topicExists(pulsarAdmin, tenant, namespace, topicName,
+        if (topicExists(pulsarClusterInfo, tenant, namespace, topicName,
                 InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicInfo.getQueueModule()))) {
-            LOGGER.warn("pulsar topic={} already exists in {}", fullTopicName, pulsarAdmin.getServiceUrl());
+            LOGGER.warn("pulsar topic={} already exists in {}", fullTopicName, pulsarClusterInfo.getAdminUrl());
             return;
         }
 
         try {
             if (InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(topicInfo.getQueueModule())) {
-                pulsarAdmin.topics().createNonPartitionedTopic(fullTopicName);
-                String res = pulsarAdmin.lookups().lookupTopic(fullTopicName);
+                PulsarUtils.createNonPartitionedTopic(restTemplate, pulsarClusterInfo, fullTopicName);
+                String res = PulsarUtils.lookupTopic(restTemplate, pulsarClusterInfo, fullTopicName);
                 LOGGER.info("success to create topic={}, lookup result is {}", fullTopicName, res);
             } else {
                 // The number of brokers as the default value of topic partition
-                List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
+                List<String> clusters = PulsarUtils.getClusters(restTemplate, pulsarClusterInfo);
                 Integer numPartitions = topicInfo.getNumPartitions();
                 if (numPartitions < 0 || numPartitions >= MAX_PARTITION) {
-                    List<String> brokers = pulsarAdmin.brokers().getActiveBrokers(clusters.get(0));
+                    List<String> brokers = PulsarUtils.getBrokers(restTemplate, pulsarClusterInfo);
                     numPartitions = brokers.size();
                 }
-
-                pulsarAdmin.topics().createPartitionedTopic(fullTopicName, numPartitions);
-                Map<String, String> res = pulsarAdmin.lookups().lookupPartitionedTopic(fullTopicName);
+                PulsarUtils.createPartitionedTopic(restTemplate, pulsarClusterInfo, fullTopicName,
+                        numPartitions);
+                Map<String, String> res = PulsarUtils.lookupPartitionedTopic(restTemplate,
+                        pulsarClusterInfo, fullTopicName);
                 // if lookup failed (res.size not equals the partition number)
-                if (res.keySet().size() != numPartitions) {
+                if (res.size() != numPartitions) {
                     // look up partition failed, retry to get partition numbers
-                    for (int i = 0; (i < RETRY_TIMES && res.keySet().size() != numPartitions); i++) {
-                        res = pulsarAdmin.lookups().lookupPartitionedTopic(fullTopicName);
+                    for (int i = 0; (i < RETRY_TIMES && res.size() != numPartitions); i++) {
+                        res = PulsarUtils.lookupPartitionedTopic(restTemplate, pulsarClusterInfo,
+                                fullTopicName);
                         try {
                             Thread.sleep(500);
                         } catch (InterruptedException e) {
@@ -198,12 +202,12 @@ public class PulsarOperator {
                         }
                     }
                 }
-                if (numPartitions != res.keySet().size()) {
-                    throw new PulsarAdminException("The number of partitions not equal to lookupPartitionedTopic");
+                if (numPartitions != res.size()) {
+                    throw new Exception("The number of partitions not equal to lookupPartitionedTopic");
                 }
                 LOGGER.info("success to create topic={}", fullTopicName);
             }
-        } catch (PulsarAdminException e) {
+        } catch (Exception e) {
             LOGGER.error("failed to create topic=" + fullTopicName, e);
             throw e;
         }
@@ -212,25 +216,25 @@ public class PulsarOperator {
     /**
      * Force delete Pulsar topic
      */
-    public void forceDeleteTopic(PulsarAdmin pulsarAdmin, PulsarTopicInfo topicInfo) throws PulsarAdminException {
+    public void forceDeleteTopic(PulsarClusterInfo pulsarClusterInfo, PulsarTopicInfo topicInfo) throws Exception {
         Preconditions.expectNotNull(topicInfo, "pulsar topic info cannot be empty");
 
         String tenant = topicInfo.getPulsarTenant();
         String namespace = topicInfo.getNamespace();
         String topic = topicInfo.getTopicName();
         String fullTopicName = tenant + "/" + namespace + "/" + topic;
+        boolean isPartitioned = InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicInfo.getQueueModule());
 
         // Topic will be returned if it not exists
-        if (topicExists(pulsarAdmin, tenant, namespace, topic,
-                InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicInfo.getQueueModule()))) {
+        if (topicExists(pulsarClusterInfo, tenant, namespace, topic, isPartitioned)) {
             LOGGER.warn("pulsar topic={} already delete", fullTopicName);
             return;
         }
 
         try {
-            pulsarAdmin.topics().delete(fullTopicName, true);
+            PulsarUtils.forceDeleteTopic(restTemplate, pulsarClusterInfo, fullTopicName, isPartitioned);
             LOGGER.info("success to delete topic={}", fullTopicName);
-        } catch (PulsarAdminException e) {
+        } catch (Exception e) {
             LOGGER.error("failed to delete topic=" + fullTopicName, e);
             throw e;
         }
@@ -239,19 +243,20 @@ public class PulsarOperator {
     /**
      * Create a Pulsar subscription for the given topic
      */
-    public void createSubscription(PulsarAdmin pulsarAdmin, String fullTopicName, String queueModule,
-            String subscription) throws PulsarAdminException {
+    public void createSubscription(PulsarClusterInfo pulsarClusterInfo, String fullTopicName, String queueModule,
+            String subscription) throws Exception {
         LOGGER.info("begin to create pulsar subscription={} for topic={}", subscription, fullTopicName);
         try {
-            boolean isExists = this.subscriptionExists(pulsarAdmin, fullTopicName, subscription,
+            boolean isExists = this.subscriptionExists(pulsarClusterInfo, fullTopicName, subscription,
                     InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(queueModule));
             if (isExists) {
                 LOGGER.warn("pulsar subscription={} already exists, skip to create", subscription);
                 return;
             }
-            pulsarAdmin.topics().createSubscription(fullTopicName, subscription, MessageId.latest);
+
+            PulsarUtils.createSubscription(restTemplate, pulsarClusterInfo, fullTopicName, subscription);
             LOGGER.info("success to create subscription={}", subscription);
-        } catch (PulsarAdminException e) {
+        } catch (Exception e) {
             LOGGER.error("failed to create pulsar subscription=" + subscription, e);
             throw e;
         }
@@ -260,31 +265,42 @@ public class PulsarOperator {
     /**
      * Create a Pulsar subscription for the specified topic list
      */
-    public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicInfo topicInfo,
-            List<String> topicList) throws PulsarAdminException {
+    public void createSubscriptions(PulsarClusterInfo pulsarClusterInfo, String subscription, PulsarTopicInfo topicInfo,
+            List<String> topicList) throws Exception {
         for (String topic : topicList) {
             topicInfo.setTopicName(topic);
             String fullTopicName = topicInfo.getPulsarTenant() + "/" + topicInfo.getNamespace() + "/" + topic;
-            this.createSubscription(pulsarAdmin, fullTopicName, topicInfo.getQueueModule(), subscription);
+            this.createSubscription(pulsarClusterInfo, fullTopicName, topicInfo.getQueueModule(), subscription);
         }
         LOGGER.info("success to create subscription={} for multiple topics={}", subscription, topicList);
     }
 
     /**
      * Check if Pulsar tenant exists
+     *
+     * @param pulsarClusterInfo pulsar cluster info
+     * @param tenant pulsar tenant info
+     * @return true or false
+     * @throws Exception any exception if occurred
      */
-    private boolean tenantIsExists(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
-        List<String> tenantList = pulsarAdmin.tenants().getTenants();
-        return tenantList.contains(tenant);
+    private boolean tenantIsExists(PulsarClusterInfo pulsarClusterInfo, String tenant) throws Exception {
+        List<String> tenants = PulsarUtils.getTenants(restTemplate, pulsarClusterInfo);
+        return tenants.contains(tenant);
     }
 
     /**
-     * Check whether the Pulsar namespace exists under the specified tenant
+     * Check whether the Pulsar namespace exists under the specified tenant.
+     *
+     * @param pulsarClusterInfo pulsar cluster info
+     * @param tenant pulsar tenant info
+     * @param namespace pulsar namespace info
+     * @return true or false
+     * @throws Exception any exception if occurred
      */
-    private boolean namespaceExists(PulsarAdmin pulsarAdmin, String tenant, String namespace)
-            throws PulsarAdminException {
-        List<String> namespaceList = pulsarAdmin.namespaces().getNamespaces(tenant);
-        return namespaceList.contains(tenant + "/" + namespace);
+    private boolean namespaceExists(PulsarClusterInfo pulsarClusterInfo, String tenant, String namespace)
+            throws Exception {
+        List<String> namespaces = PulsarUtils.getNamespaces(restTemplate, pulsarClusterInfo, tenant);
+        return namespaces.contains(namespace);
     }
 
     /**
@@ -293,22 +309,23 @@ public class PulsarOperator {
      * @apiNote cannot compare whether the string contains, otherwise it may be misjudged, such as:
      *         Topic "ab" does not exist, but if "abc" exists, "ab" will be mistakenly judged to exist
      */
-    public boolean topicExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topicName,
+    public boolean topicExists(PulsarClusterInfo pulsarClusterInfo, String tenant, String namespace, String topicName,
             boolean isPartitioned) {
         if (StringUtils.isBlank(topicName)) {
             return true;
         }
 
         // persistent://tenant/namespace/topic
-        List<String> topicList;
+        List<String> topics;
         boolean topicExists = false;
         try {
             if (isPartitioned) {
-                topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
+                topics = PulsarUtils.getPartitionedTopics(restTemplate, pulsarClusterInfo, tenant,
+                        namespace);
             } else {
-                topicList = pulsarAdmin.topics().getList(tenant + "/" + namespace);
+                topics = PulsarUtils.getTopics(restTemplate, pulsarClusterInfo, tenant, namespace);
             }
-            for (String t : topicList) {
+            for (String t : topics) {
                 t = t.substring(t.lastIndexOf("/") + 1); // not contains /
                 if (!isPartitioned) {
                     int suffixIndex = t.lastIndexOf("-partition-");
@@ -321,7 +338,7 @@ public class PulsarOperator {
                     break;
                 }
             }
-        } catch (PulsarAdminException pe) {
+        } catch (Exception pe) {
             LOGGER.error("check if the pulsar topic={} exists error, begin retry", topicName, pe);
             int count = 0;
             try {
@@ -329,8 +346,9 @@ public class PulsarOperator {
                     LOGGER.info("check whether the pulsar topic={} exists error, try count={}", topicName, count);
                     Thread.sleep(DELAY_SECONDS);
 
-                    topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
-                    for (String t : topicList) {
+                    topics = PulsarUtils.getPartitionedTopics(restTemplate, pulsarClusterInfo,
+                            tenant, namespace);
+                    for (String t : topics) {
                         t = t.substring(t.lastIndexOf("/") + 1);
                         if (topicName.equals(t)) {
                             topicExists = true;
@@ -348,7 +366,7 @@ public class PulsarOperator {
     /**
      * Check whether the Pulsar topic exists.
      */
-    private boolean subscriptionExists(PulsarAdmin pulsarAdmin, String topic, String subscription,
+    private boolean subscriptionExists(PulsarClusterInfo pulsarClusterInfo, String topic, String subscription,
             boolean isPartitioned) {
         int count = 0;
         while (++count <= RETRY_TIMES) {
@@ -358,22 +376,24 @@ public class PulsarOperator {
 
                 // first lookup to load the topic, and then query whether the subscription exists
                 if (isPartitioned) {
-                    Map<String, String> topicMap = pulsarAdmin.lookups().lookupPartitionedTopic(topic);
+                    Map<String, String> topicMap = PulsarUtils.lookupPartitionedTopic(restTemplate,
+                            pulsarClusterInfo, topic);
                     if (topicMap.isEmpty()) {
                         LOGGER.error("result of lookups topic={} is empty, continue retry", topic);
                         continue;
                     }
                 } else {
-                    String lookupTopic = pulsarAdmin.lookups().lookupTopic(topic);
+                    String lookupTopic = PulsarUtils.lookupTopic(restTemplate, pulsarClusterInfo, topic);
                     if (StringUtils.isBlank(lookupTopic)) {
                         LOGGER.error("result of lookups topic={} is empty, continue retry", topic);
                         continue;
                     }
                 }
 
-                List<String> subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
+                List<String> subscriptionList = PulsarUtils.getSubscriptions(restTemplate,
+                        pulsarClusterInfo, topic);
                 return subscriptionList.contains(subscription);
-            } catch (PulsarAdminException | InterruptedException e) {
+            } catch (Exception e) {
                 LOGGER.error("check if the subscription exists for topic={} error, continue retry", topic, e);
                 if (count == RETRY_TIMES) {
                     LOGGER.error("after {} times retry, still check subscription exception for topic {}", count, topic);
@@ -387,16 +407,17 @@ public class PulsarOperator {
     /**
      * Query topic message for the given pulsar cluster.
      */
-    public List<BriefMQMessage> queryLatestMessage(PulsarAdmin pulsarAdmin, String topicFullName, String subName,
+    public List<BriefMQMessage> queryLatestMessage(PulsarClusterInfo pulsarClusterInfo, String topicFullName,
+            String subName,
             Integer messageCount, InlongStreamInfo streamInfo, boolean serial) {
         LOGGER.info("begin to query message for topic {}, subName={}", topicFullName, subName);
         List<BriefMQMessage> messageList = new ArrayList<>();
-        int partitionCount = getPartitionCount(pulsarAdmin, topicFullName);
+        int partitionCount = getPartitionCount(pulsarClusterInfo, topicFullName);
         for (int messageIndex = 0; messageIndex < messageCount; messageIndex++) {
             int currentPartitionNum = messageIndex % partitionCount;
             int messagePosition = messageIndex / partitionCount;
             String topicNameOfPartition = buildTopicNameOfPartition(topicFullName, currentPartitionNum, serial);
-            messageList.addAll(queryMessageFromPulsar(topicNameOfPartition, pulsarAdmin, messageIndex,
+            messageList.addAll(queryMessageFromPulsar(topicNameOfPartition, pulsarClusterInfo, messageIndex,
                     streamInfo, messagePosition));
         }
         LOGGER.info("success query message by subs={} for topic={}", subName, topicFullName);
@@ -404,36 +425,39 @@ public class PulsarOperator {
     }
 
     /**
-     * Use pulsar admin to get topic partition count
+     * Get topic partition count.
      */
-    private int getPartitionCount(PulsarAdmin pulsarAdmin, String topicFullName) {
-        PartitionedTopicMetadata partitionedTopicMetadata;
+    private int getPartitionCount(PulsarClusterInfo pulsarClusterInfo, String topicFullName) {
+        PulsarTopicMetadata pulsarTopicMetadata;
         try {
-            partitionedTopicMetadata = pulsarAdmin.topics()
-                    .getPartitionedTopicMetadata(topicFullName);
+            pulsarTopicMetadata = PulsarUtils.getPartitionedTopicMetadata(restTemplate,
+                    pulsarClusterInfo, topicFullName);
         } catch (Exception e) {
             String errMsg = "get pulsar partition error ";
             LOGGER.error(errMsg, e);
             throw new BusinessException(errMsg + e.getMessage());
         }
-        return partitionedTopicMetadata.partitions > 0 ? partitionedTopicMetadata.partitions : 1;
+        return pulsarTopicMetadata.getPartitions() > 0 ? pulsarTopicMetadata.getPartitions() : 1;
     }
 
     /**
-     * Use pulsar admin to query message
+     * Query pulsar message.
      */
-    private List<BriefMQMessage> queryMessageFromPulsar(String topicPartition, PulsarAdmin pulsarAdmin, int index,
+    private List<BriefMQMessage> queryMessageFromPulsar(String topicPartition, PulsarClusterInfo pulsarClusterInfo,
+            int index,
             InlongStreamInfo streamInfo, int messagePosition) {
         List<BriefMQMessage> briefMQMessages = new ArrayList<>();
         try {
-            Message<byte[]> pulsarMessage =
-                    pulsarAdmin.topics().examineMessage(topicPartition, "latest", messagePosition);
-            Map<String, String> headers = pulsarMessage.getProperties();
+            ResponseEntity<byte[]> httpResponse =
+                    PulsarUtils.examineMessage(restTemplate, pulsarClusterInfo, topicPartition, "latest",
+                            messagePosition);
+            PulsarMessageInfo messageInfo = PulsarUtils.getMessageFromHttpResponse(httpResponse, topicPartition);
+            Map<String, String> headers = messageInfo.getProperties();
             int wrapTypeId = Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER,
                     Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
             DeserializeOperator deserializeOperator = deserializeOperatorFactory.getInstance(
                     MessageWrapType.valueOf(wrapTypeId));
-            briefMQMessages.addAll(deserializeOperator.decodeMsg(streamInfo, pulsarMessage.getData(),
+            briefMQMessages.addAll(deserializeOperator.decodeMsg(streamInfo, messageInfo.getBody(),
                     headers, index));
         } catch (Exception e) {
             LOGGER.warn("query message from pulsar error for groupId = {}, streamId = {}",
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index 4d4e7051e2..c9d7b4a240 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -44,12 +44,9 @@ import com.google.common.base.Objects;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -101,7 +98,7 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
         List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
         for (ClusterInfo clusterInfo : clusterInfos) {
             PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
-            try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+            try {
                 // create pulsar tenant and namespace
                 if (StringUtils.isBlank(tenant)) {
                     tenant = pulsarCluster.getPulsarTenant();
@@ -109,9 +106,9 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
 
                 // if the group was not successful, need create tenant and namespace
                 if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) {
-                    pulsarOperator.createTenant(pulsarAdmin, tenant);
+                    pulsarOperator.createTenant(pulsarCluster, tenant);
                     String namespace = groupInfo.getMqResource();
-                    pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
+                    pulsarOperator.createNamespace(pulsarCluster, pulsarInfo, tenant, namespace);
 
                     log.info("success to create pulsar resource for groupId={}, tenant={}, namespace={}, cluster={}",
                             groupId, tenant, namespace, pulsarCluster);
@@ -223,21 +220,19 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
      */
     private void createTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName)
             throws Exception {
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-            String tenant = pulsarInfo.getPulsarTenant();
-            if (StringUtils.isBlank(tenant)) {
-                tenant = pulsarCluster.getPulsarTenant();
-            }
-            String namespace = pulsarInfo.getMqResource();
-            PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
-                    .pulsarTenant(tenant)
-                    .namespace(namespace)
-                    .topicName(topicName)
-                    .queueModule(pulsarInfo.getQueueModule())
-                    .numPartitions(pulsarInfo.getPartitionNum())
-                    .build();
-            pulsarOperator.createTopic(pulsarAdmin, topicInfo);
+        String tenant = pulsarInfo.getPulsarTenant();
+        if (StringUtils.isBlank(tenant)) {
+            tenant = pulsarCluster.getPulsarTenant();
         }
+        String namespace = pulsarInfo.getMqResource();
+        PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
+                .pulsarTenant(tenant)
+                .namespace(namespace)
+                .topicName(topicName)
+                .queueModule(pulsarInfo.getQueueModule())
+                .numPartitions(pulsarInfo.getPartitionNum())
+                .build();
+        pulsarOperator.createTopic(pulsarCluster, topicInfo);
     }
 
     /**
@@ -245,41 +240,39 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
      */
     private void createSubscription(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName,
             String streamId) throws Exception {
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-            String tenant = pulsarInfo.getPulsarTenant();
-            if (StringUtils.isBlank(tenant)) {
-                tenant = pulsarCluster.getPulsarTenant();
-            }
-            String namespace = pulsarInfo.getMqResource();
-            String fullTopicName = tenant + "/" + namespace + "/" + topicName;
-            boolean exist = pulsarOperator.topicExists(pulsarAdmin, tenant, namespace, topicName,
-                    InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(pulsarInfo.getQueueModule()));
-            if (!exist) {
-                String serviceUrl = pulsarCluster.getAdminUrl();
-                log.error("topic={} not exists in {}", fullTopicName, serviceUrl);
-                throw new WorkflowListenerException("topic=" + fullTopicName + " not exists in " + serviceUrl);
-            }
+        String tenant = pulsarInfo.getPulsarTenant();
+        if (StringUtils.isBlank(tenant)) {
+            tenant = pulsarCluster.getPulsarTenant();
+        }
+        String namespace = pulsarInfo.getMqResource();
+        String fullTopicName = tenant + "/" + namespace + "/" + topicName;
+        boolean exist = pulsarOperator.topicExists(pulsarCluster, tenant, namespace, topicName,
+                InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(pulsarInfo.getQueueModule()));
+        if (!exist) {
+            String serviceUrl = pulsarCluster.getAdminUrl();
+            log.error("topic={} not exists in {}", fullTopicName, serviceUrl);
+            throw new WorkflowListenerException("topic=" + fullTopicName + " not exists in " + serviceUrl);
+        }
 
-            // create subscription for all sinks
-            String groupId = pulsarInfo.getInlongGroupId();
-            List<StreamSink> streamSinks = sinkService.listSink(groupId, streamId);
-            if (CollectionUtils.isEmpty(streamSinks)) {
-                log.warn("no need to create subs, as no sink exists for groupId={}, streamId={}", groupId, streamId);
-                return;
-            }
+        // create subscription for all sinks
+        String groupId = pulsarInfo.getInlongGroupId();
+        List<StreamSink> streamSinks = sinkService.listSink(groupId, streamId);
+        if (CollectionUtils.isEmpty(streamSinks)) {
+            log.warn("no need to create subs, as no sink exists for groupId={}, streamId={}", groupId, streamId);
+            return;
+        }
 
-            // subscription naming rules: clusterTag_topicName_sinkId_consumer_group
-            String clusterTag = pulsarInfo.getInlongClusterTag();
-            for (StreamSink sink : streamSinks) {
-                String subs = String.format(PULSAR_SUBSCRIPTION, clusterTag, topicName, sink.getId());
-                pulsarOperator.createSubscription(pulsarAdmin, fullTopicName, pulsarInfo.getQueueModule(), subs);
-                log.info("success to create subs={} for groupId={}, topic={}", subs, groupId, fullTopicName);
-
-                // insert the consumer group info into the inlong_consume table
-                Integer id = consumeService.saveBySystem(pulsarInfo, topicName, subs);
-                log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}",
-                        id, subs, groupId, topicName);
-            }
+        // subscription naming rules: clusterTag_topicName_sinkId_consumer_group
+        String clusterTag = pulsarInfo.getInlongClusterTag();
+        for (StreamSink sink : streamSinks) {
+            String subs = String.format(PULSAR_SUBSCRIPTION, clusterTag, topicName, sink.getId());
+            pulsarOperator.createSubscription(pulsarCluster, fullTopicName, pulsarInfo.getQueueModule(), subs);
+            log.info("success to create subs={} for groupId={}, topic={}", subs, groupId, fullTopicName);
+
+            // insert the consumer group info into the inlong_consume table
+            Integer id = consumeService.saveBySystem(pulsarInfo, topicName, subs);
+            log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}",
+                    id, subs, groupId, topicName);
         }
     }
 
@@ -289,54 +282,46 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
      */
     private void deletePulsarTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName)
             throws Exception {
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-            String tenant = pulsarInfo.getPulsarTenant();
-            if (StringUtils.isBlank(tenant)) {
-                tenant = pulsarCluster.getPulsarTenant();
-            }
-            String namespace = pulsarInfo.getMqResource();
-            PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
-                    .pulsarTenant(tenant)
-                    .namespace(namespace)
-                    .topicName(topicName)
-                    .build();
-            pulsarOperator.forceDeleteTopic(pulsarAdmin, topicInfo);
+        String tenant = pulsarInfo.getPulsarTenant();
+        if (StringUtils.isBlank(tenant)) {
+            tenant = pulsarCluster.getPulsarTenant();
         }
+        String namespace = pulsarInfo.getMqResource();
+        PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
+                .pulsarTenant(tenant)
+                .namespace(namespace)
+                .topicName(topicName)
+                .build();
+        pulsarOperator.forceDeleteTopic(pulsarCluster, topicInfo);
     }
 
     /**
      * Query latest message from pulsar
      */
     public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
-            InlongStreamInfo streamInfo, Integer messageCount) throws PulsarClientException {
+            InlongStreamInfo streamInfo, Integer messageCount) throws Exception {
         String groupId = streamInfo.getInlongGroupId();
         InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo);
         PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(groupInfo.getInlongClusterTag(),
                 null, ClusterType.PULSAR);
-        List<BriefMQMessage> briefMQMessages = new ArrayList<>();
-
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-            String tenant = inlongPulsarInfo.getPulsarTenant();
-            if (StringUtils.isBlank(tenant)) {
-                tenant = pulsarCluster.getPulsarTenant();
-            }
-
-            String namespace = groupInfo.getMqResource();
-            String topicName = streamInfo.getMqResource();
-            String fullTopicName = tenant + "/" + namespace + "/" + topicName;
-            String clusterTag = inlongPulsarInfo.getInlongClusterTag();
-            String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, clusterTag, topicName);
-            boolean serial = InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
-            briefMQMessages =
-                    pulsarOperator.queryLatestMessage(pulsarAdmin, fullTopicName, subs, messageCount, streamInfo,
-                            serial);
-
-            // insert the consumer group info into the inlong_consume table
-            Integer id = consumeService.saveBySystem(groupInfo, topicName, subs);
-            log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}",
-                    id, subs, groupId, topicName);
+        String tenant = inlongPulsarInfo.getPulsarTenant();
+        if (StringUtils.isBlank(tenant)) {
+            tenant = pulsarCluster.getPulsarTenant();
         }
+
+        String namespace = groupInfo.getMqResource();
+        String topicName = streamInfo.getMqResource();
+        String fullTopicName = tenant + "/" + namespace + "/" + topicName;
+        String clusterTag = inlongPulsarInfo.getInlongClusterTag();
+        String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, clusterTag, topicName);
+        boolean serial = InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
+        List<BriefMQMessage> briefMQMessages = pulsarOperator.queryLatestMessage(pulsarCluster, fullTopicName, subs,
+                messageCount, streamInfo, serial);
+
+        // insert the consumer group info into the inlong_consume table
+        Integer id = consumeService.saveBySystem(groupInfo, topicName, subs);
+        log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}",
+                id, subs, groupId, topicName);
         return briefMQMessages;
     }
-
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
index 81c3de2b6a..9fd81aac03 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
@@ -17,18 +17,45 @@
 
 package org.apache.inlong.manager.service.resource.queue.pulsar;
 
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.util.HttpUtils;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarBrokerEntryMetadata;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarLookupTopicInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageMetadata;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarNamespacePolicies;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTenantInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
 
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * Pulsar connection utils
@@ -39,54 +66,921 @@ public class PulsarUtils {
     private PulsarUtils() {
     }
 
+    private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(
+            ZoneId.systemDefault());
+
+    public static final String QUERY_CLUSTERS_PATH = "/admin/v2/clusters";
+    public static final String QUERY_BROKERS_PATH = "/admin/v2/brokers";
+    public static final String QUERY_TENANTS_PATH = "/admin/v2/tenants";
+    public static final String QUERY_NAMESPACE_PATH = "/admin/v2/namespaces";
+    public static final String QUERY_PERSISTENT_PATH = "/admin/v2/persistent";
+    public static final String LOOKUP_TOPIC_PATH = "/lookup/v2/topic";
+
+    private static final Gson GSON = new GsonBuilder().create(); // thread safe
+
+    /**
+     * Get http headers by token.
+     *
+     * @param token pulsar token info
+     * @return add http headers for token info
+     */
+    private static HttpHeaders getHttpHeaders(String token) {
+        HttpHeaders headers = new HttpHeaders();
+        if (StringUtils.isNotEmpty(token)) {
+            headers.add("Authorization", "Bearer " + token);
+        }
+        return headers;
+    }
+
+    /**
+     * Get pulsar cluster info list.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @return list of pulsar cluster infos
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getClusters(RestTemplate restTemplate, PulsarClusterInfo clusterInfo)
+            throws Exception {
+        final String url = clusterInfo.getAdminUrl() + QUERY_CLUSTERS_PATH;
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
     /**
-     * Get pulsar admin info
+     * Get the list of active brokers.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @return list of pulsar broker infos
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getBrokers(RestTemplate restTemplate, PulsarClusterInfo clusterInfo)
+            throws Exception {
+        List<String> clusters = getClusters(restTemplate, clusterInfo);
+        List<String> brokers = new ArrayList<>();
+        for (String brokerName : clusters) {
+            String url = clusterInfo.getAdminUrl() + QUERY_BROKERS_PATH + "/" + brokerName;
+            brokers.addAll(
+                    HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+                            ArrayList.class));
+        }
+        return brokers;
+    }
+
+    /**
+     * Get pulsar tenant info list.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @return list of pulsar tenant infos
+     * @throws Exception any exception if occurred
      */
-    public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarCluster) throws PulsarClientException {
-        Preconditions.expectNotBlank(pulsarCluster.getAdminUrl(), ErrorCodeEnum.INVALID_PARAMETER,
-                "Pulsar adminUrl cannot be empty");
-        PulsarAdmin pulsarAdmin;
-        if (StringUtils.isEmpty(pulsarCluster.getToken())) {
-            pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl());
+    public static List<String> getTenants(RestTemplate restTemplate, PulsarClusterInfo clusterInfo)
+            throws Exception {
+        final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH;
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Get pulsar namespace info list.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param tenant pulsar tenant name
+     * @return list of pulsar namespace infos
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getNamespaces(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+            String tenant) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH + "/" + tenant;
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Create a new pulsar tenant.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param tenant pulsar tenant name
+     * @param tenantInfo  pulsar tenant info
+     * @throws Exception any exception if occurred
+     */
+    public static void createTenant(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String tenant,
+            PulsarTenantInfo tenantInfo) throws Exception {
+        final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH + "/" + tenant;
+        HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
+        MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
+        headers.setContentType(type);
+        headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+        String param = GSON.toJson(tenantInfo);
+        HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers);
+    }
+
+    /**
+     * Creates a new pulsar namespace with the specified policies.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param tenant pulsar namespace name
+     * @param namespaceName pulsar namespace name
+     * @param policies pulsar namespace policies info
+     * @throws Exception any exception if occurred
+     */
+    public static void createNamespace(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String tenant,
+            String namespaceName, PulsarNamespacePolicies policies) throws Exception {
+        final String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH + InlongConstants.SLASH + tenant
+                + InlongConstants.SLASH + namespaceName;
+        HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
+        MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
+        headers.setContentType(type);
+        headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+        String param = GSON.toJson(policies);
+        param = param.replaceAll("messageTtlInSeconds", "message_ttl_in_seconds")
+                .replaceAll("retentionPolicies", "retention_policies");
+        HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers);
+    }
+
+    /**
+     * Get the list of topics under a namespace.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param tenant pulsar tenant name
+     * @param namespace pulsar namespace name
+     * @return list of pulsar topic infos
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getTopics(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String tenant,
+            String namespace) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + tenant + "/" + namespace;
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Get the list of partitioned topics under a namespace.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param tenant pulsar tenant name
+     * @param namespace pulsar namespace name
+     * @return list of pulsar partitioned topic infos
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getPartitionedTopics(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+            String tenant, String namespace) throws Exception {
+        String url =
+                clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + tenant + "/" + namespace + "/partitioned";
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Create a non-partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void createNonPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath;
+        HttpUtils.request(restTemplate, url, HttpMethod.PUT, null, getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Create a partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void createPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+            String topicPath, Integer numPartitions) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitions";
+        HttpUtils.request(restTemplate, url, HttpMethod.PUT, numPartitions.toString(),
+                getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Get the stats-internal for the partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @return pulsar internal stat info of partitioned topic
+     * @throws Exception any exception if occurred
+     */
+    public static JsonObject getInternalStatsPartitionedTopics(RestTemplate restTemplate,
+            PulsarClusterInfo clusterInfo, String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitioned-internalStats";
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+                JsonObject.class);
+    }
+
+    /**
+     * Get partitioned topic metadata.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @return pulsar topic metadata info
+     * @throws Exception any exception if occurred
+     */
+    public static PulsarTopicMetadata getPartitionedTopicMetadata(RestTemplate restTemplate,
+            PulsarClusterInfo clusterInfo, String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitions";
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+                PulsarTopicMetadata.class);
+    }
+
+    /**
+     * Delete a topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void deleteNonPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath;
+        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, null, getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Force delete a topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void forceDeleteNonPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath;
+        Map<String, Boolean> uriVariables = new HashMap<>();
+        uriVariables.put("force", true);
+        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, uriVariables, getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Delete a partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void deletePartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitions";
+        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, null, getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Force delete a partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @throws Exception any exception if occurred
+     */
+    public static void forceDeletePartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitions";
+        Map<String, Boolean> uriVariables = new HashMap<>();
+        uriVariables.put("force", true);
+        HttpUtils.request(restTemplate, url, HttpMethod.DELETE, uriVariables, getHttpHeaders(clusterInfo.getToken()));
+    }
+
+    /**
+     * Delete a partitioned or non-partitioned topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @param isPartitioned pulsar is partitioned topic
+     * @throws Exception any exception if occurred
+     */
+    public static void deleteTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String topicPath,
+            boolean isPartitioned) throws Exception {
+        if (isPartitioned) {
+            deletePartitionedTopic(restTemplate, clusterInfo, topicPath);
         } else {
-            pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl(), pulsarCluster.getToken());
+            deleteNonPartitionedTopic(restTemplate, clusterInfo, topicPath);
         }
-        return pulsarAdmin;
     }
 
     /**
-     * Get the pulsar admin from the given service URL.
+     * Force delete a partitioned or non-partitioned topic.
      *
-     * @apiNote It must be closed after use.
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @param isPartitioned pulsar is partitioned topic
+     * @throws Exception any exception if occurred
      */
-    public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl) throws PulsarClientException {
-        return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
+    public static void forceDeleteTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String topicPath,
+            boolean isPartitioned)
+            throws Exception {
+        if (isPartitioned) {
+            forceDeletePartitionedTopic(restTemplate, clusterInfo, topicPath);
+        } else {
+            forceDeleteNonPartitionedTopic(restTemplate, clusterInfo, topicPath);
+        }
     }
 
     /**
-     * Get the pulsar admin from the given service URL and token.
-     * <p/>
-     * Currently only token is supported as an authentication type.
+     * lookup persistent topic info.
      *
-     * @apiNote It must be closed after use.
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @return pulsar broker url
+     * @throws Exception any exception if occurred
      */
-    public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String token) throws PulsarClientException {
-        return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl)
-                .authentication(AuthenticationFactory.token(token)).build();
+    public static String lookupTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String topicPath)
+            throws Exception {
+        String url = clusterInfo.getAdminUrl() + LOOKUP_TOPIC_PATH + "/persistent/" + topicPath;
+        PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
+                getHttpHeaders(clusterInfo.getToken()), PulsarLookupTopicInfo.class);
+        return topicInfo.getBrokerUrl();
     }
 
     /**
-     * Get pulsar cluster info list.
+     * lookup persistent partitioned topic info.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @return map of partitioned topic info
+     * @throws Exception any exception if occurred
+     */
+    public static Map<String, String> lookupPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        PulsarTopicMetadata metadata = getPartitionedTopicMetadata(restTemplate, clusterInfo, topicPath);
+        Map<String, String> map = new LinkedHashMap<>();
+        for (int i = 0; i < metadata.getPartitions(); i++) {
+            String partitionTopicName = topicPath + "-partition-" + i;
+            String partitionUrl = clusterInfo.getAdminUrl() + LOOKUP_TOPIC_PATH + "/persistent/" + partitionTopicName;
+            PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate, partitionUrl, HttpMethod.GET, null,
+                    getHttpHeaders(clusterInfo.getToken()), PulsarLookupTopicInfo.class);
+            map.put(partitionTopicName, topicInfo.getBrokerUrl());
+        }
+        return map;
+    }
+
+    /**
+     * Get the list of persistent subscriptions for a given topic.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @return list of pulsar topic subscription info
+     * @throws Exception any exception if occurred
+     */
+    public static List<String> getSubscriptions(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+            String topicPath) throws Exception {
+        String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/subscriptions";
+        return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+                ArrayList.class);
+    }
+
+    /**
+     * Create a subscription on the topic.
+     *
+     * @param restTemplate  spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPath pulsar topic path
+     * @param subscription pulsar topic subscription info
+     * @throws Exception any exception if occurred
+     */
+    public static void createSubscription(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String topicPath,
+            String subscription) throws Exception {
+        String url =
+                clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/subscription/" + subscription;
+        JsonObject jsonObject = new JsonObject();
+        jsonObject.addProperty("entryId", Long.MAX_VALUE);
+        jsonObject.addProperty("ledgerId", Long.MAX_VALUE);
+        jsonObject.addProperty("partitionIndex", -1);
+        HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
+        MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
+        headers.setContentType(type);
+        headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+        HttpUtils.request(restTemplate, url, HttpMethod.PUT, jsonObject.toString(), headers);
+    }
+
+    /**
+     * Examine a specific message on a topic by position relative to the earliest or the latest message.
+     *
+     * @param restTemplate spring framework RestTemplate
+     * @param clusterInfo pulsar cluster info
+     * @param topicPartition  pulsar topic partition info
+     * @param messageType pulsar message type info
+     * @param messagePosition pulsar message position info
+     * @return spring framework HttpEntity
+     * @throws Exception any exception if occurred
+     */
+    public static ResponseEntity<byte[]> examineMessage(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+            String topicPartition, String messageType, int messagePosition) throws Exception {
+        StringBuilder urlBuilder = new StringBuilder().append(clusterInfo.getAdminUrl())
+                .append(QUERY_PERSISTENT_PATH)
+                .append("/")
+                .append(topicPartition)
+                .append("/examinemessage")
+                .append("?initialPosition=")
+                .append(messageType)
+                .append("&messagePosition=")
+                .append(messagePosition);
+        ResponseEntity<byte[]> response = restTemplate.exchange(urlBuilder.toString(), HttpMethod.GET,
+                new HttpEntity<>(getHttpHeaders(clusterInfo.getToken())), byte[].class);
+        if (!response.getStatusCode().is2xxSuccessful()) {
+            log.error("request error for {}, status code {}, body {}", urlBuilder.toString(), response.getStatusCode(),
+                    response.getBody());
+        }
+        return response;
+    }
+
+    public static PulsarMessageInfo getMessageFromHttpResponse(ResponseEntity<byte[]> response, String topic)
+            throws Exception {
+        List<PulsarMessageInfo> messages = PulsarUtils.getMessagesFromHttpResponse(response, topic);
+        if (messages.size() > 0) {
+            return messages.get(0);
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     *  Copy from getMessagesFromHttpResponse method of org.apache.pulsar.client.admin.internal.TopicsImpl class.
+     *
+     * @param response
+     * @param topic
+     * @return
+     * @throws Exception
      */
-    public static List<String> getPulsarClusters(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
-        return pulsarAdmin.clusters().getClusters();
+    public static List<PulsarMessageInfo> getMessagesFromHttpResponse(ResponseEntity<byte[]> response, String topic)
+            throws Exception {
+        HttpHeaders headers = response.getHeaders();
+        String msgId = headers.getFirst("X-Pulsar-Message-ID");
+        String brokerEntryTimestamp = headers.getFirst("X-Pulsar-Broker-Entry-METADATA-timestamp");
+        String brokerEntryIndex = headers.getFirst("X-Pulsar-Broker-Entry-METADATA-index");
+        PulsarBrokerEntryMetadata brokerEntryMetadata;
+        if (brokerEntryTimestamp == null && brokerEntryIndex == null) {
+            brokerEntryMetadata = null;
+        } else {
+            brokerEntryMetadata = new PulsarBrokerEntryMetadata();
+            if (brokerEntryTimestamp != null) {
+                brokerEntryMetadata.setBrokerTimestamp(parse(brokerEntryTimestamp.toString()));
+            }
+            if (brokerEntryIndex != null) {
+                brokerEntryMetadata.setIndex(Long.parseLong(brokerEntryIndex));
+            }
+        }
+
+        PulsarMessageMetadata messageMetadata = new PulsarMessageMetadata();
+        Map<String, String> properties = Maps.newTreeMap();
+
+        Object tmp = headers.getFirst("X-Pulsar-publish-time");
+        if (tmp != null) {
+            messageMetadata.setPublishTime(parse(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-event-time");
+        if (tmp != null) {
+            messageMetadata.setEventTime(parse(tmp.toString()));
+        }
+        tmp = headers.getFirst("X-Pulsar-deliver-at-time");
+        if (tmp != null) {
+            messageMetadata.setDeliverAtTime(parse(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-null-value");
+        if (tmp != null) {
+            messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-producer-name");
+        if (tmp != null) {
+            messageMetadata.setProducerName(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-sequence-id");
+        if (tmp != null) {
+            messageMetadata.setSequenceId(Long.parseLong(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-replicated-from");
+        if (tmp != null) {
+            messageMetadata.setReplicatedFrom(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-partition-key");
+        if (tmp != null) {
+            messageMetadata.setPartitionKey(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-compression");
+        if (tmp != null) {
+            messageMetadata.setCompression(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-uncompressed-size");
+        if (tmp != null) {
+            messageMetadata.setUncompressedSize(Integer.parseInt(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-encryption-algo");
+        if (tmp != null) {
+            messageMetadata.setEncryptionAlgo(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-partition-key-b64-encoded");
+        if (tmp != null) {
+            messageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-marker-type");
+        if (tmp != null) {
+            messageMetadata.setMarkerType(Integer.parseInt(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-txnid-least-bits");
+        if (tmp != null) {
+            messageMetadata.setTxnidLeastBits(Long.parseLong(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-txnid-most-bits");
+        if (tmp != null) {
+            messageMetadata.setTxnidMostBits(Long.parseLong(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-highest-sequence-id");
+        if (tmp != null) {
+            messageMetadata.setHighestSequenceId(Long.parseLong(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-uuid");
+        if (tmp != null) {
+            messageMetadata.setUuid(tmp.toString());
+        }
+
+        tmp = headers.getFirst("X-Pulsar-num-chunks-from-msg");
+        if (tmp != null) {
+            messageMetadata.setNumChunksFromMsg(Integer.parseInt(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-total-chunk-msg-size");
+        if (tmp != null) {
+            messageMetadata.setTotalChunkMsgSize(Integer.parseInt(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-chunk-id");
+        if (tmp != null) {
+            messageMetadata.setChunkId(Integer.parseInt(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-null-partition-key");
+        if (tmp != null) {
+            messageMetadata.setNullPartitionKey(Boolean.parseBoolean(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
+        if (tmp != null) {
+            messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-Base64-ordering-key");
+        if (tmp != null) {
+            messageMetadata.setOrderingKey(Base64.getDecoder().decode(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-Base64-schema-version-b64encoded");
+        if (tmp != null) {
+            messageMetadata.setSchemaVersion(Base64.getDecoder().decode(tmp.toString()));
+        }
+
+        tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
+        if (tmp != null) {
+            messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
+        }
+
+        List<String> tmpList = (List) headers.get("X-Pulsar-replicated-to");
+        if (ObjectUtils.isNotEmpty(tmpList)) {
+            if (ObjectUtils.isEmpty(messageMetadata.getReplicateTos())) {
+                messageMetadata.setReplicateTos(Lists.newArrayList(tmpList));
+            } else {
+                messageMetadata.getReplicateTos().addAll(tmpList);
+            }
+        }
+
+        tmp = headers.getFirst("X-Pulsar-batch-size");
+        if (tmp != null) {
+            properties.put("X-Pulsar-batch-size", (String) tmp);
+        }
+
+        for (Entry<String, List<String>> entry : headers.entrySet()) {
+            if (entry.getKey().contains("X-Pulsar-PROPERTY-")) {
+                String keyName = entry.getKey().substring("X-Pulsar-PROPERTY-".length());
+                properties.put(keyName, (String) ((List) entry.getValue()).get(0));
+            }
+        }
+
+        tmp = headers.getFirst("X-Pulsar-num-batch-message");
+        if (tmp != null) {
+            properties.put("X-Pulsar-num-batch-message", (String) tmp);
+        }
+        boolean isEncrypted = false;
+        tmp = headers.getFirst("X-Pulsar-Is-Encrypted");
+        if (tmp != null) {
+            isEncrypted = Boolean.parseBoolean(tmp.toString());
+        }
+
+        if (!isEncrypted && headers.get("X-Pulsar-num-batch-message") != null) {
+            return getIndividualMsgsFromBatch(topic, msgId, response.getBody(), properties, messageMetadata,
+                    brokerEntryMetadata);
+        }
+
+        PulsarMessageInfo messageInfo = new PulsarMessageInfo();
+        messageInfo.setTopic(topic);
+        messageInfo.setMessageId(msgId);
+        messageInfo.setProperties(messageMetadata.getProperties());
+        messageInfo.setBody(response.getBody());
+        messageInfo.setPulsarMessageMetadata(messageMetadata);
+        if (brokerEntryMetadata != null) {
+            messageInfo.setPulsarBrokerEntryMetadata(brokerEntryMetadata);
+        }
+        return Collections.singletonList(messageInfo);
+    }
+
+    private static long parse(String datetime) throws DateTimeParseException {
+        Instant instant = Instant.from(DATE_FORMAT.parse(datetime));
+        return instant.toEpochMilli();
+    }
+
+    /**
+     * Copy from getIndividualMsgsFromBatch method of org.apache.pulsar.client.admin.internal.TopicsImpl class.
+     *
+     * @param topic
+     * @param msgId
+     * @param data
+     * @param properties
+     * @param metadata
+     * @param brokerMetadata
+     * @return
+     */
+    private static List<PulsarMessageInfo> getIndividualMsgsFromBatch(String topic, String msgId, byte[] data,
+            Map<String, String> properties, PulsarMessageMetadata metadata, PulsarBrokerEntryMetadata brokerMetadata) {
+        List<PulsarMessageInfo> ret = new ArrayList<>();
+        int batchSize = Integer.parseInt(properties.get("X-Pulsar-num-batch-message"));
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        for (int i = 0; i < batchSize; ++i) {
+            String batchMsgId = msgId + ":" + i;
+            PulsarMessageMetadata singleMetadata = new PulsarMessageMetadata();
+            singleMetadata.setProperties(properties);
+            ByteBuffer singleMessagePayload = deSerializeSingleMessageInBatch(buffer, singleMetadata, i, batchSize);
+            PulsarMessageInfo messageInfo = new PulsarMessageInfo();
+            messageInfo.setTopic(topic);
+            messageInfo.setMessageId(batchMsgId);
+            messageInfo.setProperties(singleMetadata.getProperties());
+            messageInfo.setPulsarMessageMetadata(metadata);
+            messageInfo.setBody(singleMessagePayload.array());
+            if (brokerMetadata != null) {
+                messageInfo.setPulsarBrokerEntryMetadata(brokerMetadata);
+            }
+            ret.add(messageInfo);
+        }
+        buffer.clear();
+        return ret;
+    }
+
+    /**
+     * Copy from deSerializeSingleMessageInBatch method of org.apache.pulsar.common.protocol.Commands class.
+     *
+     * @param uncompressedPayload
+     * @param metadata
+     * @param index
+     * @param batchSize
+     * @return
+     */
+    private static ByteBuffer deSerializeSingleMessageInBatch(ByteBuffer uncompressedPayload,
+            PulsarMessageMetadata metadata, int index, int batchSize) {
+        int singleMetaSize = (int) uncompressedPayload.getInt();
+        metaDataParseFrom(metadata, uncompressedPayload, singleMetaSize);
+        int singleMessagePayloadSize = metadata.getPayloadSize();
+        int readerIndex = uncompressedPayload.position();
+        byte[] singleMessagePayload = new byte[singleMessagePayloadSize];
+        uncompressedPayload.get(singleMessagePayload);
+        if (index < batchSize) {
+            uncompressedPayload.position(readerIndex + singleMessagePayloadSize);
+        }
+        return ByteBuffer.wrap(singleMessagePayload);
+    }
+
+    /**
+     * Copy from parseFrom method of org.apache.pulsar.common.api.proto.SingleMessageMetadata class.
+     *
+     * @param metadata
+     * @param buffer
+     * @param size
+     */
+    private static void metaDataParseFrom(PulsarMessageMetadata metadata, ByteBuffer buffer, int size) {
+        int endIdx = size + buffer.position();
+        while (buffer.position() < endIdx) {
+            int tag = readVarInt(buffer);
+            switch (tag) {
+                case 10:
+                    int _propertiesSize = readVarInt(buffer);
+                    parseFrom(metadata, buffer, _propertiesSize);
+                    break;
+                case 18:
+                    int _partitionKeyBufferLen = readVarInt(buffer);
+                    byte[] partitionKeyArray = new byte[_partitionKeyBufferLen];
+                    buffer.get(partitionKeyArray);
+                    metadata.setPartitionKey(new String(partitionKeyArray));
+                    break;
+                case 24:
+                    int payloadSize = readVarInt(buffer);
+                    metadata.setPayloadSize(payloadSize);
+                    break;
+                case 32:
+                    boolean compactedOut = readVarInt(buffer) == 1;
+                    metadata.setCompactedOut(compactedOut);
+                    break;
+                case 40:
+                    long eventTime = readVarInt64(buffer);
+                    metadata.setEventTime(eventTime);
+                    break;
+                case 48:
+                    boolean partitionKeyB64Encoded = readVarInt(buffer) == 1;
+                    metadata.setPartitionKeyB64Encoded(partitionKeyB64Encoded);
+                    break;
+                case 58:
+                    int _orderingKeyLen = readVarInt(buffer);
+                    byte[] orderingKeyArray = new byte[_orderingKeyLen];
+                    metadata.setOrderingKey(orderingKeyArray);
+                    break;
+                case 64:
+                    long sequenceId = readVarInt64(buffer);
+                    metadata.setSequenceId(sequenceId);
+                    break;
+                case 72:
+                    boolean nullValue = readVarInt(buffer) == 1;
+                    metadata.setNullValue(nullValue);
+                    break;
+                case 80:
+                    boolean nullPartitionKey = readVarInt(buffer) == 1;
+                    metadata.setNullPartitionKey(nullPartitionKey);
+                    break;
+                default:
+                    skipUnknownField(tag, buffer);
+            }
+        }
     }
 
     /**
-     * Get pulsar cluster service url.
+     * Copy from readVarInt method of org.apache.pulsar.common.api.proto.LightProtoCodec class.
+     *
+     * @param buf
+     * @return
      */
-    public static String getServiceUrl(PulsarAdmin pulsarAdmin, String pulsarCluster) throws PulsarAdminException {
-        return pulsarAdmin.clusters().getCluster(pulsarCluster).getServiceUrl();
+    private static int readVarInt(ByteBuffer buf) {
+        byte tmp = buf.get();
+        if (tmp >= 0) {
+            return tmp;
+        } else {
+            int result = tmp & 127;
+            if ((tmp = buf.get()) >= 0) {
+                result |= tmp << 7;
+            } else {
+                result |= (tmp & 127) << 7;
+                if ((tmp = buf.get()) >= 0) {
+                    result |= tmp << 14;
+                } else {
+                    result |= (tmp & 127) << 14;
+                    if ((tmp = buf.get()) >= 0) {
+                        result |= tmp << 21;
+                    } else {
+                        result |= (tmp & 127) << 21;
+                        result |= (tmp = buf.get()) << 28;
+                        if (tmp < 0) {
+                            for (int i = 0; i < 5; ++i) {
+                                if (buf.get() >= 0) {
+                                    return result;
+                                }
+                            }
+                            throw new IllegalArgumentException("Encountered a malformed varint.");
+                        }
+                    }
+                }
+            }
+            return result;
+        }
     }
 
+    /**
+     * Copy from readVarInt64 method of org.apache.pulsar.common.api.proto.LightProtoCodec class.
+     *
+     * @param buf
+     * @return
+     */
+    private static long readVarInt64(ByteBuffer buf) {
+        int shift = 0;
+        for (long result = 0L; shift < 64; shift += 7) {
+            byte b = buf.get();
+            result |= (long) (b & 127) << shift;
+            if ((b & 128) == 0) {
+                return result;
+            }
+        }
+        throw new IllegalArgumentException("Encountered a malformed varint.");
+    }
+
+    /**
+     * Copy from getTagType method of org.apache.pulsar.common.api.proto.LightProtoCodec class.
+     *
+     * @param tag
+     * @return
+     */
+    private static int getTagType(int tag) {
+        return tag & 7;
+    }
+
+    /**
+     * Copy from skipUnknownField method of org.apache.pulsar.common.api.proto.LightProtoCodec class.
+     *
+     * @param tag
+     * @param buffer
+     */
+    private static void skipUnknownField(int tag, ByteBuffer buffer) {
+        int tagType = getTagType(tag);
+        switch (tagType) {
+            case 0:
+                readVarInt(buffer);
+                break;
+            case 1:
+                buffer.get(new byte[8]);
+                break;
+            case 2:
+                int len = readVarInt(buffer);
+                buffer.get(new byte[len]);
+                break;
+            case 3:
+            case 4:
+            default:
+                throw new IllegalArgumentException("Invalid unknonwn tag type: " + tagType);
+            case 5:
+                buffer.get(new byte[4]);
+        }
+    }
+
+    /**
+     * Copy from parseFrom method of org.apache.pulsar.common.api.proto.KeyValue class.
+     *
+     * @param metadata
+     * @param buffer
+     * @param size
+     */
+    private static void parseFrom(PulsarMessageMetadata metadata, ByteBuffer buffer, int size) {
+        if (ObjectUtils.isEmpty(metadata.getProperties())) {
+            metadata.setProperties(new HashMap<>());
+        }
+        Map<String, String> properties = metadata.getProperties();
+        int endIdx = buffer.position() + size;
+        String key = null;
+        String value = null;
+        while (buffer.position() < endIdx) {
+            int tag = readVarInt(buffer);
+            if (StringUtils.isNotEmpty(key) && StringUtils.isNotEmpty(value)) {
+                properties.put(key, value);
+                key = null;
+                value = null;
+            }
+            switch (tag) {
+                case 10:
+                    int keyBufferLen = readVarInt(buffer);
+                    byte[] keyArray = new byte[keyBufferLen];
+                    buffer.get(keyArray);
+                    key = new String(keyArray);
+                    break;
+                case 18:
+                    int valueBufferLen = readVarInt(buffer);
+                    byte[] valueArray = new byte[valueBufferLen];
+                    buffer.get(valueArray);
+                    value = new String(valueArray);
+                    break;
+                default:
+                    skipUnknownField(tag, buffer);
+            }
+        }
+        if (StringUtils.isNotEmpty(key) && StringUtils.isNotEmpty(value)) {
+            properties.put(key, value);
+        }
+    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
index a38b391131..f790604fc8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
@@ -33,13 +33,9 @@ import org.apache.inlong.manager.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO;
 import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
 import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
-import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
 import org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -86,12 +82,11 @@ public class PulsarResourceOperator extends AbstractStandaloneSinkResourceOperat
 
             PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder().adminUrl(pulsarDataNodeInfo.getAdminUrl())
                     .token(pulsarDataNodeInfo.getToken()).build();
-            PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo);
             // create pulsar tenant
-            pulsarOperator.createTenant(pulsarAdmin, pulsarSinkDTO.getPulsarTenant());
+            pulsarOperator.createTenant(pulsarClusterInfo, pulsarSinkDTO.getPulsarTenant());
             // use default config to create namespace
             InlongPulsarInfo pulsarInfo = new InlongPulsarInfo();
-            pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, pulsarSinkDTO.getPulsarTenant(),
+            pulsarOperator.createNamespace(pulsarClusterInfo, pulsarInfo, pulsarSinkDTO.getPulsarTenant(),
                     pulsarSinkDTO.getNamespace());
             String queueModel = pulsarSinkDTO.getPartitionNum() > 0 ? InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL
                     : InlongConstants.PULSAR_QUEUE_TYPE_SERIAL;
@@ -102,15 +97,14 @@ public class PulsarResourceOperator extends AbstractStandaloneSinkResourceOperat
                     .queueModule(queueModel)
                     .build();
             // create topic
-            pulsarOperator.createTopic(pulsarAdmin, topicInfo);
+            pulsarOperator.createTopic(pulsarClusterInfo, topicInfo);
             final String info = "success to create Pulsar resource";
             sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
             LOG.info(info + " for sinkInfo={}", sinkInfo);
-        } catch (PulsarClientException | PulsarAdminException e) {
+        } catch (Exception e) {
             LOG.error("init pulsar admin error", e);
             throw new BusinessException();
         }
-
     }
 
     private PulsarDataNodeDTO getPulsarDataNodeInfo(SinkInfo sinkInfo) {
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/PulsarUtilsTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/PulsarUtilsTest.java
index d248286820..0676b944f8 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/PulsarUtilsTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/PulsarUtilsTest.java
@@ -17,39 +17,578 @@
 
 package org.apache.inlong.manager.service.queue;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarNamespacePolicies;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarPersistencePolicies;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarRetentionPolicies;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTenantInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata;
 import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
 
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.junit.jupiter.api.Assertions;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Ignore;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.springframework.util.ReflectionUtils;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
 
-import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.powermock.api.mockito.PowerMockito.when;
 
 /**
  * Test class for Pulsar utils.
  */
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
 public class PulsarUtilsTest {
 
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarUtilsTest.class);
+
+    public static final Network NETWORK = Network.newNetwork();
+
+    private static final String INTER_CONTAINER_PULSAR_ALIAS = "pulsar";
+
+    private static final Gson GSON = new GsonBuilder().create(); // thread safe
+
+    private static final String DEFAULT_SERVICE_URL = "http://127.0.0.1:8080";
+
+    private static final String DEFAULT_TENANT = "public";
+
+    private static final String DEFAULT_CREATE_TENANT = "test_tenant";
+
+    private static final String DEFAULT_NAMESPACE = "default";
+
+    private static final int DEFAULT_PARTITIONS_NUM = 3;
+
+    private static final String PERSISTENT_TOPIC_HEAD = "persistent://";
+
+    @Mock
+    private RestTemplate restTemplate;
+
+    @Mock
+    private ResponseEntity<byte[]> byteExchange;
+
+    @Mock
+    private PulsarClusterInfo pulsarClusterInfo;
+
+    private static PulsarNamespacePolicies policies;
+
+    private static final PulsarContainer PULSAT_CONTAINER = new PulsarContainer(
+            DockerImageName.parse("apachepulsar/pulsar:2.8.2")
+                    .asCompatibleSubstituteFor("apachepulsar/pulsar"))
+                            .withNetwork(NETWORK)
+                            .withAccessToHost(true)
+                            .withNetworkAliases(INTER_CONTAINER_PULSAR_ALIAS)
+                            .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+    private static final RestTemplate client = new RestTemplate();
+
+    private static final PulsarClusterInfo pulsarCluster = new PulsarClusterInfo();
+
+    @BeforeAll
+    public static void beforeAll() {
+        policies = new PulsarNamespacePolicies();
+        policies.setMessageTtlInSeconds(10);
+        PulsarPersistencePolicies persistencePolicies = new PulsarPersistencePolicies();
+        persistencePolicies.setBookkeeperEnsemble(3);
+        persistencePolicies.setBookkeeperAckQuorum(3);
+        persistencePolicies.setBookkeeperWriteQuorum(3);
+        persistencePolicies.setManagedLedgerMaxMarkDeleteRate(4.0);
+        policies.setPersistence(persistencePolicies);
+        PulsarRetentionPolicies retentionPolicies = new PulsarRetentionPolicies();
+        retentionPolicies.setRetentionSizeInMB(2048l);
+        retentionPolicies.setRetentionTimeInMinutes(500);
+        policies.setRetentionPolicies(retentionPolicies);
+
+        PULSAT_CONTAINER.setPortBindings(Arrays.asList("6650:6650", "8080:8080"));
+        Startables.deepStart(Stream.of(PULSAT_CONTAINER)).join();
+        LOG.info("Containers are started.");
+        pulsarCluster.setAdminUrl("http://127.0.0.1:8080");
+    }
+
+    @AfterAll
+    public static void teardown() {
+        if (PULSAT_CONTAINER != null) {
+            PULSAT_CONTAINER.stop();
+        }
+    }
+
+    @BeforeEach
+    public void before() {
+        when(pulsarClusterInfo.getAdminUrl()).thenReturn(DEFAULT_SERVICE_URL);
+        when(pulsarClusterInfo.getToken()).thenReturn("testtoken");
+
+    }
+    /**
+     * Test cases for {@link PulsarUtils#getClusters(RestTemplate, PulsarClusterInfo)}.
+     *
+     * @throws Exception
+     */
     @Test
-    public void testGetPulsarAdmin() {
-        final String defaultServiceUrl = "http://127.0.0.1:10080";
-        try {
-            PulsarAdmin admin = PulsarUtils.getPulsarAdmin(defaultServiceUrl);
-            Assertions.assertEquals(defaultServiceUrl, admin.getServiceUrl());
-            Field auth = ReflectionUtils.findField(PulsarAdminImpl.class, "auth");
-            assert auth != null;
-            auth.setAccessible(true);
-            Authentication authentication = (Authentication) auth.get(admin);
-            Assertions.assertNotNull(authentication);
-            Assertions.assertTrue(authentication instanceof AuthenticationDisabled);
-        } catch (PulsarClientException | IllegalAccessException e) {
-            Assertions.fail();
+    public void testGetClusters() throws Exception {
+        final String result = "[\"standalone\"]";
+        List<String> expected = GSON.fromJson(result, ArrayList.class);
+        List<String> clusters = PulsarUtils.getClusters(client, pulsarCluster);
+        assertEquals(expected.size(), clusters.size());
+        assertEquals(expected, clusters);
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#getBrokers(RestTemplate, PulsarClusterInfo)}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testGetBrokers() throws Exception {
+        final String result = "[\"localhost:8080\"]";
+        List<String> expected = GSON.fromJson(result, ArrayList.class);
+        List<String> brokers = PulsarUtils.getBrokers(client, pulsarCluster);
+        assertEquals(expected.size(), brokers.size());
+        assertEquals(expected, brokers);
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#getTenants(RestTemplate, PulsarClusterInfo)}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testGetTenants() throws Exception {
+        List<String> tenants = PulsarUtils.getTenants(client, pulsarCluster);
+        assertNotNull(tenants);
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#getNamespaces(RestTemplate, PulsarClusterInfo, String)}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testGetNamespaces() throws Exception {
+        List<String> namespaces = PulsarUtils.getNamespaces(client, pulsarCluster, DEFAULT_TENANT);
+        assertNotNull(namespaces);
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#createTenant}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testCreateTenant() throws Exception {
+        PulsarTenantInfo pulsarTenantInfo = new PulsarTenantInfo();
+        pulsarTenantInfo.setAdminRoles(Sets.newHashSet());
+        pulsarTenantInfo.setAllowedClusters(Sets.newHashSet("standalone"));
+        PulsarUtils.createTenant(client, pulsarCluster, DEFAULT_CREATE_TENANT, pulsarTenantInfo);
+        Thread.sleep(500);
+        List<String> tenants = PulsarUtils.getTenants(client, pulsarCluster);
+        assertTrue(tenants.contains(DEFAULT_CREATE_TENANT));
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#createNamespace(RestTemplate, PulsarClusterInfo, String, String, PulsarNamespacePolicies)}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testCreateNamespace() throws Exception {
+        final String namespaceName = "testCreateNamespace";
+        final String namespaceInfo = DEFAULT_TENANT + InlongConstants.SLASH + namespaceName;
+        String param = GSON.toJson(policies);
+        param = param.replaceAll("messageTtlInSeconds", "message_ttl_in_seconds")
+                .replaceAll("retentionPolicies", "retention_policies");
+        final HttpHeaders headers = new HttpHeaders();
+        if (StringUtils.isNotEmpty(pulsarClusterInfo.getToken())) {
+            headers.add("Authorization", "Bearer " + pulsarClusterInfo.getToken());
         }
+        final MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
+        headers.setContentType(type);
+        headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+
+        PulsarUtils.createNamespace(client, pulsarCluster, DEFAULT_TENANT, namespaceName, policies);
+        Thread.sleep(500);
+        List<String> namespaces = PulsarUtils.getNamespaces(client, pulsarCluster, DEFAULT_TENANT);
+        assertTrue(namespaces.contains(namespaceInfo));
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#getTopics(RestTemplate, PulsarClusterInfo, String, String)}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testGetTopics() throws Exception {
+        List<String> topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT,
+                DEFAULT_NAMESPACE);
+        assertTrue(topics.size() >= 0);
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#getPartitionedTopics(RestTemplate, PulsarClusterInfo, String, String)}.
+     * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_getList</a>
+     * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/partitioned
+     * method is: GET
+     * request body: none
+     * response : ["string"]
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testGetPartitionedTopics() throws Exception {
+        List<String> topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT,
+                DEFAULT_NAMESPACE);
+        assertTrue(topics.size() >= 0);
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#createNonPartitionedTopic(RestTemplate, PulsarClusterInfo, String)}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testCreateNonPartitionedTopic() throws Exception {
+        final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+                + "testCreateNonPartitionedTopic";
+        final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath;
+
+        Thread.sleep(500);
+        PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath);
+        Thread.sleep(500);
+        List<String> topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT,
+                DEFAULT_NAMESPACE);
+        assertTrue(topics.contains(topicInfo));
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#createPartitionedTopic(RestTemplate, PulsarClusterInfo, String, Integer)}.
+     * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_createPartitionedTopic</a>
+     * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions
+     * method is: PUT
+     * request body: integer <int32> (The number of partitions for the topic)
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testCreatePartitionedTopic() throws Exception {
+        final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+                + "testCreatePartitionedTopic";
+        final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath;
+
+        PulsarUtils.createPartitionedTopic(client, pulsarCluster, topicPath, DEFAULT_PARTITIONS_NUM);
+        Thread.sleep(500);
+        List<String> topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT,
+                DEFAULT_NAMESPACE);
+        assertTrue(topics.contains(topicInfo));
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#getInternalStatsPartitionedTopics(RestTemplate, PulsarClusterInfo, String)}.
+     * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_getPartitionedStatsInternal</a>
+     * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/partitioned-internalStats
+     * method is: GET
+     * request body: none
+     * response : json
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testGetInternalStatsPartitionedTopics() throws Exception {
+        final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+                + "testGetInternalStatsPartitionedTopics";
+
+        PulsarUtils.createPartitionedTopic(client, pulsarCluster, topicPath, DEFAULT_PARTITIONS_NUM);
+        Thread.sleep(500);
+        JsonObject stats = PulsarUtils.getInternalStatsPartitionedTopics(client, pulsarCluster, topicPath);
+        assertNotNull(stats);
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#getPartitionedTopicMetadata(RestTemplate, PulsarClusterInfo, String)}.
+     * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_getPartitionedMetadata</a>
+     * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions
+     * method is: GET
+     * request body: none
+     * response : json
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testGetPartitionedTopicMetadata() throws Exception {
+        final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+                + "testGetPartitionedTopicMetadata";
+        PulsarUtils.createPartitionedTopic(client, pulsarCluster, topicPath, DEFAULT_PARTITIONS_NUM);
+        Thread.sleep(500);
+        PulsarTopicMetadata metadata = PulsarUtils.getPartitionedTopicMetadata(client, pulsarCluster,
+                topicPath);
+        assertNotNull(metadata);
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#deleteNonPartitionedTopic(RestTemplate, PulsarClusterInfo, String)}.
+     * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_deleteTopic</a>
+     * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}
+     * method is: DELETE
+     * request body: none
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testDeleteNonPartitionedTopic() throws Exception {
+        final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+                + "testDeleteNonPartitionedTopic";
+        final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath;
+
+        PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath);
+        Thread.sleep(500);
+        List<String> topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE);
+        assertTrue(topics.contains(topicInfo));
+
+        PulsarUtils.deleteNonPartitionedTopic(client, pulsarCluster, topicPath);
+        Thread.sleep(500);
+        topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE);
+        assertTrue(!topics.contains(topicInfo));
     }
 
+    /**
+     * Test cases for {@link PulsarUtils#forceDeleteNonPartitionedTopic(RestTemplate, PulsarClusterInfo, String)}.
+     * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_deleteTopic</a>
+     * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}
+     * method is: DELETE
+     * request body: none
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testForceDeleteNonPartitionedTopic() throws Exception {
+        final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+                + "testForceDeleteNonPartitionedTopic";
+        final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath;
+
+        PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath);
+        Thread.sleep(500);
+        List<String> topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE);
+        assertTrue(topics.contains(topicInfo));
+
+        PulsarUtils.forceDeleteNonPartitionedTopic(client, pulsarCluster, topicPath);
+        Thread.sleep(500);
+        topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE);
+        assertTrue(!topics.contains(topicInfo));
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#deletePartitionedTopic(RestTemplate, PulsarClusterInfo, String)}.
+     * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_deletePartitionedTopic</a>
+     * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions
+     * method is: DELETE
+     * request body: none
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testDeletePartitionedTopic() throws Exception {
+        final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+                + "testDeletePartitionedTopic";
+        final int numPartitions = 3;
+        final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath;
+
+        PulsarUtils.createPartitionedTopic(client, pulsarCluster, topicPath, numPartitions);
+        Thread.sleep(500);
+        List<String> topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT,
+                DEFAULT_NAMESPACE);
+        assertTrue(topics.contains(topicInfo));
+
+        PulsarUtils.deletePartitionedTopic(client, pulsarCluster, topicPath);
+        Thread.sleep(500);
+        topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE);
+        assertTrue(!topics.contains(topicInfo));
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#forceDeletePartitionedTopic(RestTemplate, PulsarClusterInfo, String)}.
+     * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_deletePartitionedTopic</a>
+     * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions
+     * method is: DELETE
+     * request body: none
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testForceDeletePartitionedTopic() throws Exception {
+        final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+                + "testForceDeletePartitionedTopic";
+        final int numPartitions = 3;
+        final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath;
+
+        PulsarUtils.createPartitionedTopic(client, pulsarCluster, topicPath, numPartitions);
+        Thread.sleep(500);
+        List<String> topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT,
+                DEFAULT_NAMESPACE);
+        assertTrue(topics.contains(topicInfo));
+
+        PulsarUtils.deletePartitionedTopic(client, pulsarCluster, topicPath);
+        Thread.sleep(500);
+        topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE);
+        assertTrue(!topics.contains(topicInfo));
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#lookupTopic(RestTemplate, PulsarClusterInfo, String)}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testLookupTopic() throws Exception {
+        final String topicPath =
+                DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH + "testLookupTopic";
+        PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath);
+        Thread.sleep(500);
+        String actual = PulsarUtils.lookupTopic(client, pulsarCluster, topicPath);
+        assertNotNull(actual);
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#lookupPartitionedTopic(RestTemplate, PulsarClusterInfo, String)}.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testLookupPartitionedTopic() throws Exception {
+        final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+                + "testLookupPartitionedTopic";
+        PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath);
+        Thread.sleep(500);
+        Map<String, String> actual = PulsarUtils.lookupPartitionedTopic(client, pulsarCluster, topicPath);
+        assertNotNull(actual);
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#getSubscriptions(RestTemplate, PulsarClusterInfo, String)}.
+     * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_getSubscriptions</a>
+     * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/subscriptions
+     * method is: GET
+     * response body: ["string"]
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testGetSubscriptions() throws Exception {
+        final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+                + "testGetSubscriptions";
+        PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath);
+        Thread.sleep(500);
+        List<String> actual = PulsarUtils.getSubscriptions(client, pulsarCluster, topicPath);
+        assertTrue(actual.size() >= 0);
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#createSubscription(RestTemplate, PulsarClusterInfo, String, String)}.
+     * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_createSubscription</a>
+     * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subscriptionName}
+     * method is: PUT
+     * response body: json
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testCreateSubscription() throws Exception {
+        final String subscriptionName = "test_subscription";
+        final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+                + "testCreateSubscription";
+        PulsarUtils.createSubscription(client, pulsarCluster, topicPath, subscriptionName);
+        Thread.sleep(500);
+        List<String> actual = PulsarUtils.getSubscriptions(client, pulsarCluster, topicPath);
+        assertTrue(actual.contains(subscriptionName));
+    }
+
+    /**
+     * Test cases for {@link PulsarUtils#examineMessage(RestTemplate, PulsarClusterInfo, String, String, int)}.
+     * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_examineMessage</a>
+     * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/examinemessage
+     * method is: GET
+     * request parameters:
+     *   - initialPosition: Relative start position to examine message.It can be 'latest' or 'earliest'
+     *   - messagePosition: The position of messages (default 1)
+     *   - authoritative: Whether leader broker redirected this call to this broker. For internal use.
+     * response body: byteArray
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testExamineMessage() throws Exception {
+        /*
+         * Since admin API cannot send messages to the topic, this test case will be simulated using mockito.
+         */
+        final String messageType = "latest";
+        final int messagePosition = 1;
+        String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+                + "testtopic-partition-1";
+        StringBuilder urlBuilder = new StringBuilder().append(DEFAULT_SERVICE_URL)
+                .append(PulsarUtils.QUERY_PERSISTENT_PATH).append("/").append(topicPath).append("/examinemessage")
+                .append("?initialPosition=").append(messageType).append("&messagePosition=").append(messagePosition);
+        final String expected = "test message!";
+
+        when(restTemplate.exchange(eq(urlBuilder.toString()), eq(HttpMethod.GET), any(HttpEntity.class),
+                eq(byte[].class))).thenReturn(byteExchange);
+        when(byteExchange.getStatusCode()).thenReturn(HttpStatus.OK);
+        when(byteExchange.getBody()).thenReturn(expected.getBytes(StandardCharsets.UTF_8));
+
+        ResponseEntity<byte[]> response = PulsarUtils.examineMessage(restTemplate, pulsarClusterInfo, topicPath,
+                messageType, messagePosition);
+        assertEquals(expected, new String(response.getBody()));
+    }
+
+    /**
+     * The case only supports local testing.
+     *
+     * @throws Exception
+     */
+    @Ignore
+    public void localTest() throws Exception {
+        RestTemplate restTemplate = new RestTemplate();
+        PulsarClusterInfo pulsarClusterInfo = new PulsarClusterInfo();
+        pulsarClusterInfo.setAdminUrl("http://127.0.0.1:8080");
+        String topic = "public/test_pulsar_group/test_pulsar_stream";
+
+        ResponseEntity<byte[]> pulsarMessage =
+                PulsarUtils.examineMessage(restTemplate, pulsarClusterInfo,
+                        topic, "latest", 1);
+        List<PulsarMessageInfo> result = PulsarUtils.getMessagesFromHttpResponse(pulsarMessage, topic);
+        System.out.println(result);
+    }
 }