You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/11/28 12:10:57 UTC
(inlong) branch master updated: [INLONG-8674][Manager] Pulsar - Modify the calling method from SDK to HTTP (#8941)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d8c83804b1 [INLONG-8674][Manager] Pulsar - Modify the calling method from SDK to HTTP (#8941)
d8c83804b1 is described below
commit d8c83804b1078089a233779b042f24067378dbc4
Author: haibo.duan <dh...@live.cn>
AuthorDate: Tue Nov 28 20:10:51 2023 +0800
[INLONG-8674][Manager] Pulsar - Modify the calling method from SDK to HTTP (#8941)
---
.../inlong/manager/common/util/HttpUtils.java | 13 +
.../queue/pulsar/PulsarBrokerEntryMetadata.java | 33 +
.../pojo/queue/pulsar/PulsarLookupTopicInfo.java | 35 +
.../pojo/queue/pulsar/PulsarMessageInfo.java | 40 +
.../pojo/queue/pulsar/PulsarMessageMetadata.java | 63 ++
.../pojo/queue/pulsar/PulsarNamespacePolicies.java | 34 +
.../queue/pulsar/PulsarPersistencePolicies.java | 35 +
.../pojo/queue/pulsar/PulsarRetentionPolicies.java | 33 +
.../pojo/queue/pulsar/PulsarTenantInfo.java | 36 +
.../pojo/queue/pulsar/PulsarTopicMetadata.java | 38 +
inlong-manager/manager-service/pom.xml | 85 +-
.../service/cluster/PulsarClusterOperator.java | 10 +-
.../apply/ApproveConsumeProcessListener.java | 10 +-
.../node/pulsar/PulsarDataNodeOperator.java | 10 +-
.../resource/queue/pulsar/PulsarOperator.java | 212 +++--
.../queue/pulsar/PulsarQueueResourceOperator.java | 163 ++--
.../service/resource/queue/pulsar/PulsarUtils.java | 958 ++++++++++++++++++++-
.../sink/pulsar/PulsarResourceOperator.java | 14 +-
.../manager/service/queue/PulsarUtilsTest.java | 581 ++++++++++++-
19 files changed, 2072 insertions(+), 331 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
index ace3d513f1..1e7efe68ae 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
@@ -112,6 +112,19 @@ public class HttpUtils {
return response.getBody();
}
+ /**
+ * Send an void HTTP request
+ */
+ public static void request(RestTemplate restTemplate, String url, HttpMethod httpMethod, Object requestBody,
+ HttpHeaders header) {
+ log.debug("begin request to {} by request body {}", url, GSON.toJson(requestBody));
+ HttpEntity<Object> requestEntity = new HttpEntity<>(requestBody, header);
+ ResponseEntity<String> response = restTemplate.exchange(url, httpMethod, requestEntity, String.class);
+
+ log.debug("success request to {}, status code {}", url, response.getStatusCode());
+ Preconditions.expectTrue(response.getStatusCode().is2xxSuccessful(), "Request failed");
+ }
+
/**
* Send GET request to the specified URL.
*/
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarBrokerEntryMetadata.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarBrokerEntryMetadata.java
new file mode 100644
index 0000000000..de02d96ad6
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarBrokerEntryMetadata.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarBrokerEntryMetadata {
+
+ private long brokerTimestamp;
+ private long index;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarLookupTopicInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarLookupTopicInfo.java
new file mode 100644
index 0000000000..12596e2ab3
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarLookupTopicInfo.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarLookupTopicInfo {
+
+ private String brokerUrl;
+ private String httpUrl;
+ private String nativeUrl;
+ private String brokerUrlSsl;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageInfo.java
new file mode 100644
index 0000000000..c84b8cbf9f
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageInfo.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarMessageInfo {
+
+ private String messageId;
+ private String topic;
+ private byte[] body;
+ private transient Map<String, String> properties;
+ private boolean poolMessage;
+ private PulsarBrokerEntryMetadata pulsarBrokerEntryMetadata;
+ private PulsarMessageMetadata pulsarMessageMetadata;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageMetadata.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageMetadata.java
new file mode 100644
index 0000000000..3cfbae2fb4
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarMessageMetadata.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarMessageMetadata {
+
+ private int payloadSize;
+ private String partitionKey;
+ private boolean compactedOut;
+ private long eventTime;
+ private boolean partitionKeyB64Encoded;
+ private byte[] orderingKey;
+ private long sequenceId;
+ private boolean nullValue;
+ private boolean nullPartitionKey;
+ private Map<String, String> properties;
+ private long publishTime;
+ private long deliverAtTime;
+ private int markerType;
+ private long txnidLeastBits;
+ private long txnidMostBits;
+ private long highestSequenceId;
+ private String uuid;
+ private int numChunksFromMsg;
+ private int totalChunkMsgSize;
+ private int chunkId;
+ private String producerName;
+ private String replicatedFrom;
+ private int uncompressedSize;
+ private int numMessagesInBatch;
+ private String encryptionAlgo;
+ private String compression;
+ private byte[] encryptionParam;
+ private byte[] schemaVersion;
+ private List<String> replicateTos;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarNamespacePolicies.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarNamespacePolicies.java
new file mode 100644
index 0000000000..94dbd47bf7
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarNamespacePolicies.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarNamespacePolicies {
+
+ private int messageTtlInSeconds;
+ private PulsarRetentionPolicies retentionPolicies;
+ private PulsarPersistencePolicies persistence;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarPersistencePolicies.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarPersistencePolicies.java
new file mode 100644
index 0000000000..ada4f914fe
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarPersistencePolicies.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarPersistencePolicies {
+
+ private int bookkeeperEnsemble;
+ private int bookkeeperWriteQuorum;
+ private int bookkeeperAckQuorum;
+ private double managedLedgerMaxMarkDeleteRate;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarRetentionPolicies.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarRetentionPolicies.java
new file mode 100644
index 0000000000..155675291e
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarRetentionPolicies.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarRetentionPolicies {
+
+ private int retentionTimeInMinutes;
+ private long retentionSizeInMB;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTenantInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTenantInfo.java
new file mode 100644
index 0000000000..2e093c1a57
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTenantInfo.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Set;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarTenantInfo {
+
+ Set<String> adminRoles;
+
+ Set<String> allowedClusters;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicMetadata.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicMetadata.java
new file mode 100644
index 0000000000..c7de1aacf2
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicMetadata.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.queue.pulsar;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PulsarTopicMetadata {
+
+ private int partitions;
+
+ private boolean deleted;
+
+ private Map<String, String> properties;
+}
diff --git a/inlong-manager/manager-service/pom.xml b/inlong-manager/manager-service/pom.xml
index f3bd26f7f8..5981d30332 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -122,36 +122,6 @@
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client</artifactId>
- <exclusions>
- <exclusion>
- <groupId>javax.validation</groupId>
- <artifactId>validation-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>bouncy-castle-bc</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcpkix-jdk15on</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-jdk15on</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcutil-jdk15on</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-ext-jdk15on</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>com.tencentcloudapi</groupId>
<artifactId>tencentcloud-sdk-java-cls</artifactId>
@@ -288,48 +258,6 @@
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-admin</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.checkerframework</groupId>
- <artifactId>checker-qual</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.validation</groupId>
- <artifactId>validation-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>jul-to-slf4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>bouncy-castle-bc</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcpkix-jdk15on</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-jdk15on</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcutil-jdk15on</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-ext-jdk15on</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
@@ -572,6 +500,19 @@
<artifactId>sdk-common</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>pulsar</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
index b2dea0bb7b..d4b34e83f2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
@@ -33,11 +33,11 @@ import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -60,6 +60,9 @@ public class PulsarClusterOperator extends AbstractClusterOperator {
@Autowired
private ObjectMapper objectMapper;
+ @Autowired
+ private RestTemplate restTemplate;
+
@Override
public Boolean accept(String clusterType) {
return getClusterType().equals(clusterType);
@@ -127,9 +130,10 @@ public class PulsarClusterOperator extends AbstractClusterOperator {
* @return
*/
private Boolean testConnectAdminUrl(PulsarClusterInfo pulsarInfo) {
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarInfo)) {
+
+ try {
// test connect for pulsar adminUrl
- pulsarAdmin.tenants().getTenants();
+ PulsarUtils.getTenants(restTemplate, pulsarInfo);
return true;
} catch (Exception e) {
String errMsg = String.format("Pulsar connection failed for AdminUrl=%s", pulsarInfo.getAdminUrl());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
index 1e6114bffa..5b39e14bf2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
@@ -38,7 +38,6 @@ import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
-import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
import org.apache.inlong.manager.service.resource.queue.tubemq.TubeMQOperator;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -46,7 +45,6 @@ import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -131,7 +129,7 @@ public class ApproveConsumeProcessListener implements ProcessEventListener {
String clusterTag = groupEntity.getInlongClusterTag();
ClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.PULSAR);
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+ try {
InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
String tenant = pulsarDTO.getPulsarTenant();
if (StringUtils.isBlank(tenant)) {
@@ -142,7 +140,7 @@ public class ApproveConsumeProcessListener implements ProcessEventListener {
topicMessage.setNamespace(mqResource);
List<String> topics = Arrays.asList(entity.getTopic().split(InlongConstants.COMMA));
- this.createPulsarSubscription(pulsarAdmin, entity.getConsumerGroup(), topicMessage, topics);
+ this.createPulsarSubscription(pulsarCluster, entity.getConsumerGroup(), topicMessage, topics);
} catch (Exception e) {
log.error("create pulsar topic failed", e);
throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: "
@@ -150,10 +148,10 @@ public class ApproveConsumeProcessListener implements ProcessEventListener {
}
}
- private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicInfo topicInfo,
+ private void createPulsarSubscription(PulsarClusterInfo clusterInfo, String subscription, PulsarTopicInfo topicInfo,
List<String> topics) {
try {
- pulsarOperator.createSubscriptions(pulsarAdmin, subscription, topicInfo, topics);
+ pulsarOperator.createSubscriptions(clusterInfo, subscription, topicInfo, topics);
} catch (Exception e) {
log.error("create pulsar consumer group failed", e);
throw new WorkflowListenerException("failed to create pulsar consumer group");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
index 819df3df34..6b9025741b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/pulsar/PulsarDataNodeOperator.java
@@ -34,11 +34,11 @@ import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
/**
* Pulsar data node operator
@@ -51,6 +51,9 @@ public class PulsarDataNodeOperator extends AbstractDataNodeOperator {
@Autowired
private ObjectMapper objectMapper;
+ @Autowired
+ private RestTemplate restTemplate;
+
@Override
public Boolean accept(String dataNodeType) {
return getDataNodeType().equals(dataNodeType);
@@ -106,15 +109,14 @@ public class PulsarDataNodeOperator extends AbstractDataNodeOperator {
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder().adminUrl(adminUrl)
.token(token).build();
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
+ try {
// test connect for pulsar adminUrl
- pulsarAdmin.tenants().getTenants();
+ PulsarUtils.getTenants(restTemplate, pulsarClusterInfo);
return true;
} catch (Exception e) {
String errMsg = String.format("Pulsar connection failed for AdminUrl=%s", pulsarClusterInfo.getAdminUrl());
LOGGER.error(errMsg, e);
throw new BusinessException(errMsg);
}
-
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 77feca618c..7c55f65e43 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -23,9 +23,16 @@ import org.apache.inlong.manager.common.conversion.ConversionHandle;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarNamespacePolicies;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarPersistencePolicies;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarRetentionPolicies;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTenantInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.inlong.manager.service.message.DeserializeOperator;
@@ -33,19 +40,12 @@ import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.admin.Namespaces;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
-import org.apache.pulsar.common.policies.data.PersistencePolicies;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
import java.util.ArrayList;
import java.util.List;
@@ -65,42 +65,44 @@ public class PulsarOperator {
private static final int MAX_PARTITION = 1000;
private static final int RETRY_TIMES = 3;
private static final int DELAY_SECONDS = 5;
- private static final String PARSE_ATTR_ERROR_STRING = "Could not find %s in attributes!";
@Autowired
public DeserializeOperatorFactory deserializeOperatorFactory;
@Autowired
private ConversionHandle conversionHandle;
+ @Autowired
+ private RestTemplate restTemplate;
/**
* Create Pulsar tenant
*/
- public void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
+ public void createTenant(PulsarClusterInfo pulsarClusterInfo, String tenant) throws Exception {
LOGGER.info("begin to create pulsar tenant={}", tenant);
Preconditions.expectNotBlank(tenant, ErrorCodeEnum.INVALID_PARAMETER, "Tenant cannot be empty");
try {
- List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
- boolean exists = this.tenantIsExists(pulsarAdmin, tenant);
+ List<String> clusters = PulsarUtils.getClusters(restTemplate, pulsarClusterInfo);
+ boolean exists = this.tenantIsExists(pulsarClusterInfo, tenant);
if (exists) {
LOGGER.warn("pulsar tenant={} already exists, skip to create", tenant);
return;
}
- TenantInfoImpl tenantInfo = new TenantInfoImpl();
+ PulsarTenantInfo tenantInfo = new PulsarTenantInfo();
tenantInfo.setAllowedClusters(Sets.newHashSet(clusters));
tenantInfo.setAdminRoles(Sets.newHashSet());
- pulsarAdmin.tenants().createTenant(tenant, tenantInfo);
+ PulsarUtils.createTenant(restTemplate, pulsarClusterInfo, tenant, tenantInfo);
LOGGER.info("success to create pulsar tenant={}", tenant);
- } catch (PulsarAdminException e) {
+ } catch (Exception e) {
LOGGER.error("failed to create pulsar tenant=" + tenant, e);
throw e;
}
}
/**
- * Create Pulsar namespace
+ * Create Pulsar namespace.
*/
- public void createNamespace(PulsarAdmin pulsarAdmin, InlongPulsarInfo pulsarInfo, String tenant, String namespace)
- throws PulsarAdminException {
+ public void createNamespace(PulsarClusterInfo pulsarClusterInfo, InlongPulsarInfo pulsarInfo, String tenant,
+ String namespace)
+ throws Exception {
Preconditions.expectNotBlank(tenant, ErrorCodeEnum.INVALID_PARAMETER,
"pulsar tenant cannot be empty during create namespace");
Preconditions.expectNotBlank(namespace, ErrorCodeEnum.INVALID_PARAMETER,
@@ -110,19 +112,17 @@ public class PulsarOperator {
LOGGER.info("begin to create namespace={}", namespaceName);
try {
// Check whether the namespace exists, and create it if it does not exist
- boolean isExists = this.namespaceExists(pulsarAdmin, tenant, namespace);
+ boolean isExists = this.namespaceExists(pulsarClusterInfo, tenant, namespace);
if (isExists) {
LOGGER.warn("namespace={} already exists, skip to create", namespaceName);
return;
}
- List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
- Namespaces namespaces = pulsarAdmin.namespaces();
- namespaces.createNamespace(namespaceName, Sets.newHashSet(clusters));
+ PulsarNamespacePolicies policies = new PulsarNamespacePolicies();
// Configure message TTL
Integer ttl = pulsarInfo.getTtl();
if (ttl > 0) {
- namespaces.setNamespaceMessageTTL(namespaceName, conversionHandle.handleConversion(ttl,
+ policies.setMessageTtlInSeconds(conversionHandle.handleConversion(ttl,
pulsarInfo.getTtlUnit().toLowerCase() + "_seconds"));
}
@@ -139,15 +139,17 @@ public class PulsarOperator {
}
// Configure retention policies
- RetentionPolicies retentionPolicies = new RetentionPolicies(retentionTime, retentionSize);
- namespaces.setRetention(namespaceName, retentionPolicies);
+ PulsarRetentionPolicies retentionPolicies = new PulsarRetentionPolicies(retentionTime, retentionSize);
+ policies.setRetentionPolicies(retentionPolicies);
// Configure persistence policies
- PersistencePolicies persistencePolicies = new PersistencePolicies(pulsarInfo.getEnsemble(),
+ PulsarPersistencePolicies persistencePolicies = new PulsarPersistencePolicies(pulsarInfo.getEnsemble(),
pulsarInfo.getWriteQuorum(), pulsarInfo.getAckQuorum(), pulsarInfo.getMaxMarkDeleteRate());
- namespaces.setPersistence(namespaceName, persistencePolicies);
+ policies.setPersistence(persistencePolicies);
+
+ PulsarUtils.createNamespace(restTemplate, pulsarClusterInfo, tenant, namespaceName, policies);
LOGGER.info("success to create namespace={}", namespaceName);
- } catch (PulsarAdminException e) {
+ } catch (Exception e) {
LOGGER.error("failed to create namespace=" + namespaceName, e);
throw e;
}
@@ -156,7 +158,7 @@ public class PulsarOperator {
/**
* Create Pulsar topic
*/
- public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicInfo topicInfo) throws PulsarAdminException {
+ public void createTopic(PulsarClusterInfo pulsarClusterInfo, PulsarTopicInfo topicInfo) throws Exception {
Preconditions.expectNotNull(topicInfo, "pulsar topic info cannot be empty");
String tenant = topicInfo.getPulsarTenant();
String namespace = topicInfo.getNamespace();
@@ -164,33 +166,35 @@ public class PulsarOperator {
String fullTopicName = tenant + "/" + namespace + "/" + topicName;
// Topic will be returned if it exists, and created if it does not exist
- if (topicExists(pulsarAdmin, tenant, namespace, topicName,
+ if (topicExists(pulsarClusterInfo, tenant, namespace, topicName,
InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicInfo.getQueueModule()))) {
- LOGGER.warn("pulsar topic={} already exists in {}", fullTopicName, pulsarAdmin.getServiceUrl());
+ LOGGER.warn("pulsar topic={} already exists in {}", fullTopicName, pulsarClusterInfo.getAdminUrl());
return;
}
try {
if (InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(topicInfo.getQueueModule())) {
- pulsarAdmin.topics().createNonPartitionedTopic(fullTopicName);
- String res = pulsarAdmin.lookups().lookupTopic(fullTopicName);
+ PulsarUtils.createNonPartitionedTopic(restTemplate, pulsarClusterInfo, fullTopicName);
+ String res = PulsarUtils.lookupTopic(restTemplate, pulsarClusterInfo, fullTopicName);
LOGGER.info("success to create topic={}, lookup result is {}", fullTopicName, res);
} else {
// The number of brokers as the default value of topic partition
- List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
+ List<String> clusters = PulsarUtils.getClusters(restTemplate, pulsarClusterInfo);
Integer numPartitions = topicInfo.getNumPartitions();
if (numPartitions < 0 || numPartitions >= MAX_PARTITION) {
- List<String> brokers = pulsarAdmin.brokers().getActiveBrokers(clusters.get(0));
+ List<String> brokers = PulsarUtils.getBrokers(restTemplate, pulsarClusterInfo);
numPartitions = brokers.size();
}
-
- pulsarAdmin.topics().createPartitionedTopic(fullTopicName, numPartitions);
- Map<String, String> res = pulsarAdmin.lookups().lookupPartitionedTopic(fullTopicName);
+ PulsarUtils.createPartitionedTopic(restTemplate, pulsarClusterInfo, fullTopicName,
+ numPartitions);
+ Map<String, String> res = PulsarUtils.lookupPartitionedTopic(restTemplate,
+ pulsarClusterInfo, fullTopicName);
// if lookup failed (res.size not equals the partition number)
- if (res.keySet().size() != numPartitions) {
+ if (res.size() != numPartitions) {
// look up partition failed, retry to get partition numbers
- for (int i = 0; (i < RETRY_TIMES && res.keySet().size() != numPartitions); i++) {
- res = pulsarAdmin.lookups().lookupPartitionedTopic(fullTopicName);
+ for (int i = 0; (i < RETRY_TIMES && res.size() != numPartitions); i++) {
+ res = PulsarUtils.lookupPartitionedTopic(restTemplate, pulsarClusterInfo,
+ fullTopicName);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
@@ -198,12 +202,12 @@ public class PulsarOperator {
}
}
}
- if (numPartitions != res.keySet().size()) {
- throw new PulsarAdminException("The number of partitions not equal to lookupPartitionedTopic");
+ if (numPartitions != res.size()) {
+ throw new Exception("The number of partitions not equal to lookupPartitionedTopic");
}
LOGGER.info("success to create topic={}", fullTopicName);
}
- } catch (PulsarAdminException e) {
+ } catch (Exception e) {
LOGGER.error("failed to create topic=" + fullTopicName, e);
throw e;
}
@@ -212,25 +216,25 @@ public class PulsarOperator {
/**
* Force delete Pulsar topic
*/
- public void forceDeleteTopic(PulsarAdmin pulsarAdmin, PulsarTopicInfo topicInfo) throws PulsarAdminException {
+ public void forceDeleteTopic(PulsarClusterInfo pulsarClusterInfo, PulsarTopicInfo topicInfo) throws Exception {
Preconditions.expectNotNull(topicInfo, "pulsar topic info cannot be empty");
String tenant = topicInfo.getPulsarTenant();
String namespace = topicInfo.getNamespace();
String topic = topicInfo.getTopicName();
String fullTopicName = tenant + "/" + namespace + "/" + topic;
+ boolean isPartitioned = InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicInfo.getQueueModule());
// Topic will be returned if it not exists
- if (topicExists(pulsarAdmin, tenant, namespace, topic,
- InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicInfo.getQueueModule()))) {
+ if (topicExists(pulsarClusterInfo, tenant, namespace, topic, isPartitioned)) {
LOGGER.warn("pulsar topic={} already delete", fullTopicName);
return;
}
try {
- pulsarAdmin.topics().delete(fullTopicName, true);
+ PulsarUtils.forceDeleteTopic(restTemplate, pulsarClusterInfo, fullTopicName, isPartitioned);
LOGGER.info("success to delete topic={}", fullTopicName);
- } catch (PulsarAdminException e) {
+ } catch (Exception e) {
LOGGER.error("failed to delete topic=" + fullTopicName, e);
throw e;
}
@@ -239,19 +243,20 @@ public class PulsarOperator {
/**
* Create a Pulsar subscription for the given topic
*/
- public void createSubscription(PulsarAdmin pulsarAdmin, String fullTopicName, String queueModule,
- String subscription) throws PulsarAdminException {
+ public void createSubscription(PulsarClusterInfo pulsarClusterInfo, String fullTopicName, String queueModule,
+ String subscription) throws Exception {
LOGGER.info("begin to create pulsar subscription={} for topic={}", subscription, fullTopicName);
try {
- boolean isExists = this.subscriptionExists(pulsarAdmin, fullTopicName, subscription,
+ boolean isExists = this.subscriptionExists(pulsarClusterInfo, fullTopicName, subscription,
InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(queueModule));
if (isExists) {
LOGGER.warn("pulsar subscription={} already exists, skip to create", subscription);
return;
}
- pulsarAdmin.topics().createSubscription(fullTopicName, subscription, MessageId.latest);
+
+ PulsarUtils.createSubscription(restTemplate, pulsarClusterInfo, fullTopicName, subscription);
LOGGER.info("success to create subscription={}", subscription);
- } catch (PulsarAdminException e) {
+ } catch (Exception e) {
LOGGER.error("failed to create pulsar subscription=" + subscription, e);
throw e;
}
@@ -260,31 +265,42 @@ public class PulsarOperator {
/**
* Create a Pulsar subscription for the specified topic list
*/
- public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicInfo topicInfo,
- List<String> topicList) throws PulsarAdminException {
+ public void createSubscriptions(PulsarClusterInfo pulsarClusterInfo, String subscription, PulsarTopicInfo topicInfo,
+ List<String> topicList) throws Exception {
for (String topic : topicList) {
topicInfo.setTopicName(topic);
String fullTopicName = topicInfo.getPulsarTenant() + "/" + topicInfo.getNamespace() + "/" + topic;
- this.createSubscription(pulsarAdmin, fullTopicName, topicInfo.getQueueModule(), subscription);
+ this.createSubscription(pulsarClusterInfo, fullTopicName, topicInfo.getQueueModule(), subscription);
}
LOGGER.info("success to create subscription={} for multiple topics={}", subscription, topicList);
}
/**
* Check if Pulsar tenant exists
+ *
+ * @param pulsarClusterInfo pulsar cluster info
+ * @param tenant pulsar tenant info
+ * @return true or false
+ * @throws Exception any exception if occurred
*/
- private boolean tenantIsExists(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
- List<String> tenantList = pulsarAdmin.tenants().getTenants();
- return tenantList.contains(tenant);
+ private boolean tenantIsExists(PulsarClusterInfo pulsarClusterInfo, String tenant) throws Exception {
+ List<String> tenants = PulsarUtils.getTenants(restTemplate, pulsarClusterInfo);
+ return tenants.contains(tenant);
}
/**
- * Check whether the Pulsar namespace exists under the specified tenant
+ * Check whether the Pulsar namespace exists under the specified tenant.
+ *
+ * @param pulsarClusterInfo pulsar cluster info
+ * @param tenant pulsar tenant info
+ * @param namespace pulsar namespace info
+ * @return true or false
+ * @throws Exception any exception if occurred
*/
- private boolean namespaceExists(PulsarAdmin pulsarAdmin, String tenant, String namespace)
- throws PulsarAdminException {
- List<String> namespaceList = pulsarAdmin.namespaces().getNamespaces(tenant);
- return namespaceList.contains(tenant + "/" + namespace);
+ private boolean namespaceExists(PulsarClusterInfo pulsarClusterInfo, String tenant, String namespace)
+ throws Exception {
+ List<String> namespaces = PulsarUtils.getNamespaces(restTemplate, pulsarClusterInfo, tenant);
+ return namespaces.contains(namespace);
}
/**
@@ -293,22 +309,23 @@ public class PulsarOperator {
* @apiNote cannot compare whether the string contains, otherwise it may be misjudged, such as:
* Topic "ab" does not exist, but if "abc" exists, "ab" will be mistakenly judged to exist
*/
- public boolean topicExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topicName,
+ public boolean topicExists(PulsarClusterInfo pulsarClusterInfo, String tenant, String namespace, String topicName,
boolean isPartitioned) {
if (StringUtils.isBlank(topicName)) {
return true;
}
// persistent://tenant/namespace/topic
- List<String> topicList;
+ List<String> topics;
boolean topicExists = false;
try {
if (isPartitioned) {
- topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
+ topics = PulsarUtils.getPartitionedTopics(restTemplate, pulsarClusterInfo, tenant,
+ namespace);
} else {
- topicList = pulsarAdmin.topics().getList(tenant + "/" + namespace);
+ topics = PulsarUtils.getTopics(restTemplate, pulsarClusterInfo, tenant, namespace);
}
- for (String t : topicList) {
+ for (String t : topics) {
t = t.substring(t.lastIndexOf("/") + 1); // not contains /
if (!isPartitioned) {
int suffixIndex = t.lastIndexOf("-partition-");
@@ -321,7 +338,7 @@ public class PulsarOperator {
break;
}
}
- } catch (PulsarAdminException pe) {
+ } catch (Exception pe) {
LOGGER.error("check if the pulsar topic={} exists error, begin retry", topicName, pe);
int count = 0;
try {
@@ -329,8 +346,9 @@ public class PulsarOperator {
LOGGER.info("check whether the pulsar topic={} exists error, try count={}", topicName, count);
Thread.sleep(DELAY_SECONDS);
- topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
- for (String t : topicList) {
+ topics = PulsarUtils.getPartitionedTopics(restTemplate, pulsarClusterInfo,
+ tenant, namespace);
+ for (String t : topics) {
t = t.substring(t.lastIndexOf("/") + 1);
if (topicName.equals(t)) {
topicExists = true;
@@ -348,7 +366,7 @@ public class PulsarOperator {
/**
* Check whether the Pulsar topic exists.
*/
- private boolean subscriptionExists(PulsarAdmin pulsarAdmin, String topic, String subscription,
+ private boolean subscriptionExists(PulsarClusterInfo pulsarClusterInfo, String topic, String subscription,
boolean isPartitioned) {
int count = 0;
while (++count <= RETRY_TIMES) {
@@ -358,22 +376,24 @@ public class PulsarOperator {
// first lookup to load the topic, and then query whether the subscription exists
if (isPartitioned) {
- Map<String, String> topicMap = pulsarAdmin.lookups().lookupPartitionedTopic(topic);
+ Map<String, String> topicMap = PulsarUtils.lookupPartitionedTopic(restTemplate,
+ pulsarClusterInfo, topic);
if (topicMap.isEmpty()) {
LOGGER.error("result of lookups topic={} is empty, continue retry", topic);
continue;
}
} else {
- String lookupTopic = pulsarAdmin.lookups().lookupTopic(topic);
+ String lookupTopic = PulsarUtils.lookupTopic(restTemplate, pulsarClusterInfo, topic);
if (StringUtils.isBlank(lookupTopic)) {
LOGGER.error("result of lookups topic={} is empty, continue retry", topic);
continue;
}
}
- List<String> subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
+ List<String> subscriptionList = PulsarUtils.getSubscriptions(restTemplate,
+ pulsarClusterInfo, topic);
return subscriptionList.contains(subscription);
- } catch (PulsarAdminException | InterruptedException e) {
+ } catch (Exception e) {
LOGGER.error("check if the subscription exists for topic={} error, continue retry", topic, e);
if (count == RETRY_TIMES) {
LOGGER.error("after {} times retry, still check subscription exception for topic {}", count, topic);
@@ -387,16 +407,17 @@ public class PulsarOperator {
/**
* Query topic message for the given pulsar cluster.
*/
- public List<BriefMQMessage> queryLatestMessage(PulsarAdmin pulsarAdmin, String topicFullName, String subName,
+ public List<BriefMQMessage> queryLatestMessage(PulsarClusterInfo pulsarClusterInfo, String topicFullName,
+ String subName,
Integer messageCount, InlongStreamInfo streamInfo, boolean serial) {
LOGGER.info("begin to query message for topic {}, subName={}", topicFullName, subName);
List<BriefMQMessage> messageList = new ArrayList<>();
- int partitionCount = getPartitionCount(pulsarAdmin, topicFullName);
+ int partitionCount = getPartitionCount(pulsarClusterInfo, topicFullName);
for (int messageIndex = 0; messageIndex < messageCount; messageIndex++) {
int currentPartitionNum = messageIndex % partitionCount;
int messagePosition = messageIndex / partitionCount;
String topicNameOfPartition = buildTopicNameOfPartition(topicFullName, currentPartitionNum, serial);
- messageList.addAll(queryMessageFromPulsar(topicNameOfPartition, pulsarAdmin, messageIndex,
+ messageList.addAll(queryMessageFromPulsar(topicNameOfPartition, pulsarClusterInfo, messageIndex,
streamInfo, messagePosition));
}
LOGGER.info("success query message by subs={} for topic={}", subName, topicFullName);
@@ -404,36 +425,39 @@ public class PulsarOperator {
}
/**
- * Use pulsar admin to get topic partition count
+ * Get topic partition count.
*/
- private int getPartitionCount(PulsarAdmin pulsarAdmin, String topicFullName) {
- PartitionedTopicMetadata partitionedTopicMetadata;
+ private int getPartitionCount(PulsarClusterInfo pulsarClusterInfo, String topicFullName) {
+ PulsarTopicMetadata pulsarTopicMetadata;
try {
- partitionedTopicMetadata = pulsarAdmin.topics()
- .getPartitionedTopicMetadata(topicFullName);
+ pulsarTopicMetadata = PulsarUtils.getPartitionedTopicMetadata(restTemplate,
+ pulsarClusterInfo, topicFullName);
} catch (Exception e) {
String errMsg = "get pulsar partition error ";
LOGGER.error(errMsg, e);
throw new BusinessException(errMsg + e.getMessage());
}
- return partitionedTopicMetadata.partitions > 0 ? partitionedTopicMetadata.partitions : 1;
+ return pulsarTopicMetadata.getPartitions() > 0 ? pulsarTopicMetadata.getPartitions() : 1;
}
/**
- * Use pulsar admin to query message
+ * Query pulsar message.
*/
- private List<BriefMQMessage> queryMessageFromPulsar(String topicPartition, PulsarAdmin pulsarAdmin, int index,
+ private List<BriefMQMessage> queryMessageFromPulsar(String topicPartition, PulsarClusterInfo pulsarClusterInfo,
+ int index,
InlongStreamInfo streamInfo, int messagePosition) {
List<BriefMQMessage> briefMQMessages = new ArrayList<>();
try {
- Message<byte[]> pulsarMessage =
- pulsarAdmin.topics().examineMessage(topicPartition, "latest", messagePosition);
- Map<String, String> headers = pulsarMessage.getProperties();
+ ResponseEntity<byte[]> httpResponse =
+ PulsarUtils.examineMessage(restTemplate, pulsarClusterInfo, topicPartition, "latest",
+ messagePosition);
+ PulsarMessageInfo messageInfo = PulsarUtils.getMessageFromHttpResponse(httpResponse, topicPartition);
+ Map<String, String> headers = messageInfo.getProperties();
int wrapTypeId = Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER,
Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
DeserializeOperator deserializeOperator = deserializeOperatorFactory.getInstance(
MessageWrapType.valueOf(wrapTypeId));
- briefMQMessages.addAll(deserializeOperator.decodeMsg(streamInfo, pulsarMessage.getData(),
+ briefMQMessages.addAll(deserializeOperator.decodeMsg(streamInfo, messageInfo.getBody(),
headers, index));
} catch (Exception e) {
LOGGER.warn("query message from pulsar error for groupId = {}, streamId = {}",
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index 4d4e7051e2..c9d7b4a240 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -44,12 +44,9 @@ import com.google.common.base.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.util.ArrayList;
import java.util.List;
/**
@@ -101,7 +98,7 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
for (ClusterInfo clusterInfo : clusterInfos) {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+ try {
// create pulsar tenant and namespace
if (StringUtils.isBlank(tenant)) {
tenant = pulsarCluster.getPulsarTenant();
@@ -109,9 +106,9 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
// if the group was not successful, need create tenant and namespace
if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) {
- pulsarOperator.createTenant(pulsarAdmin, tenant);
+ pulsarOperator.createTenant(pulsarCluster, tenant);
String namespace = groupInfo.getMqResource();
- pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
+ pulsarOperator.createNamespace(pulsarCluster, pulsarInfo, tenant, namespace);
log.info("success to create pulsar resource for groupId={}, tenant={}, namespace={}, cluster={}",
groupId, tenant, namespace, pulsarCluster);
@@ -223,21 +220,19 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
*/
private void createTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName)
throws Exception {
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- String tenant = pulsarInfo.getPulsarTenant();
- if (StringUtils.isBlank(tenant)) {
- tenant = pulsarCluster.getPulsarTenant();
- }
- String namespace = pulsarInfo.getMqResource();
- PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
- .pulsarTenant(tenant)
- .namespace(namespace)
- .topicName(topicName)
- .queueModule(pulsarInfo.getQueueModule())
- .numPartitions(pulsarInfo.getPartitionNum())
- .build();
- pulsarOperator.createTopic(pulsarAdmin, topicInfo);
+ String tenant = pulsarInfo.getPulsarTenant();
+ if (StringUtils.isBlank(tenant)) {
+ tenant = pulsarCluster.getPulsarTenant();
}
+ String namespace = pulsarInfo.getMqResource();
+ PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
+ .pulsarTenant(tenant)
+ .namespace(namespace)
+ .topicName(topicName)
+ .queueModule(pulsarInfo.getQueueModule())
+ .numPartitions(pulsarInfo.getPartitionNum())
+ .build();
+ pulsarOperator.createTopic(pulsarCluster, topicInfo);
}
/**
@@ -245,41 +240,39 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
*/
private void createSubscription(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName,
String streamId) throws Exception {
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- String tenant = pulsarInfo.getPulsarTenant();
- if (StringUtils.isBlank(tenant)) {
- tenant = pulsarCluster.getPulsarTenant();
- }
- String namespace = pulsarInfo.getMqResource();
- String fullTopicName = tenant + "/" + namespace + "/" + topicName;
- boolean exist = pulsarOperator.topicExists(pulsarAdmin, tenant, namespace, topicName,
- InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(pulsarInfo.getQueueModule()));
- if (!exist) {
- String serviceUrl = pulsarCluster.getAdminUrl();
- log.error("topic={} not exists in {}", fullTopicName, serviceUrl);
- throw new WorkflowListenerException("topic=" + fullTopicName + " not exists in " + serviceUrl);
- }
+ String tenant = pulsarInfo.getPulsarTenant();
+ if (StringUtils.isBlank(tenant)) {
+ tenant = pulsarCluster.getPulsarTenant();
+ }
+ String namespace = pulsarInfo.getMqResource();
+ String fullTopicName = tenant + "/" + namespace + "/" + topicName;
+ boolean exist = pulsarOperator.topicExists(pulsarCluster, tenant, namespace, topicName,
+ InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(pulsarInfo.getQueueModule()));
+ if (!exist) {
+ String serviceUrl = pulsarCluster.getAdminUrl();
+ log.error("topic={} not exists in {}", fullTopicName, serviceUrl);
+ throw new WorkflowListenerException("topic=" + fullTopicName + " not exists in " + serviceUrl);
+ }
- // create subscription for all sinks
- String groupId = pulsarInfo.getInlongGroupId();
- List<StreamSink> streamSinks = sinkService.listSink(groupId, streamId);
- if (CollectionUtils.isEmpty(streamSinks)) {
- log.warn("no need to create subs, as no sink exists for groupId={}, streamId={}", groupId, streamId);
- return;
- }
+ // create subscription for all sinks
+ String groupId = pulsarInfo.getInlongGroupId();
+ List<StreamSink> streamSinks = sinkService.listSink(groupId, streamId);
+ if (CollectionUtils.isEmpty(streamSinks)) {
+ log.warn("no need to create subs, as no sink exists for groupId={}, streamId={}", groupId, streamId);
+ return;
+ }
- // subscription naming rules: clusterTag_topicName_sinkId_consumer_group
- String clusterTag = pulsarInfo.getInlongClusterTag();
- for (StreamSink sink : streamSinks) {
- String subs = String.format(PULSAR_SUBSCRIPTION, clusterTag, topicName, sink.getId());
- pulsarOperator.createSubscription(pulsarAdmin, fullTopicName, pulsarInfo.getQueueModule(), subs);
- log.info("success to create subs={} for groupId={}, topic={}", subs, groupId, fullTopicName);
-
- // insert the consumer group info into the inlong_consume table
- Integer id = consumeService.saveBySystem(pulsarInfo, topicName, subs);
- log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}",
- id, subs, groupId, topicName);
- }
+ // subscription naming rules: clusterTag_topicName_sinkId_consumer_group
+ String clusterTag = pulsarInfo.getInlongClusterTag();
+ for (StreamSink sink : streamSinks) {
+ String subs = String.format(PULSAR_SUBSCRIPTION, clusterTag, topicName, sink.getId());
+ pulsarOperator.createSubscription(pulsarCluster, fullTopicName, pulsarInfo.getQueueModule(), subs);
+ log.info("success to create subs={} for groupId={}, topic={}", subs, groupId, fullTopicName);
+
+ // insert the consumer group info into the inlong_consume table
+ Integer id = consumeService.saveBySystem(pulsarInfo, topicName, subs);
+ log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}",
+ id, subs, groupId, topicName);
}
}
@@ -289,54 +282,46 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
*/
private void deletePulsarTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName)
throws Exception {
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- String tenant = pulsarInfo.getPulsarTenant();
- if (StringUtils.isBlank(tenant)) {
- tenant = pulsarCluster.getPulsarTenant();
- }
- String namespace = pulsarInfo.getMqResource();
- PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
- .pulsarTenant(tenant)
- .namespace(namespace)
- .topicName(topicName)
- .build();
- pulsarOperator.forceDeleteTopic(pulsarAdmin, topicInfo);
+ String tenant = pulsarInfo.getPulsarTenant();
+ if (StringUtils.isBlank(tenant)) {
+ tenant = pulsarCluster.getPulsarTenant();
}
+ String namespace = pulsarInfo.getMqResource();
+ PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
+ .pulsarTenant(tenant)
+ .namespace(namespace)
+ .topicName(topicName)
+ .build();
+ pulsarOperator.forceDeleteTopic(pulsarCluster, topicInfo);
}
/**
* Query latest message from pulsar
*/
public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
- InlongStreamInfo streamInfo, Integer messageCount) throws PulsarClientException {
+ InlongStreamInfo streamInfo, Integer messageCount) throws Exception {
String groupId = streamInfo.getInlongGroupId();
InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo);
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(groupInfo.getInlongClusterTag(),
null, ClusterType.PULSAR);
- List<BriefMQMessage> briefMQMessages = new ArrayList<>();
-
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- String tenant = inlongPulsarInfo.getPulsarTenant();
- if (StringUtils.isBlank(tenant)) {
- tenant = pulsarCluster.getPulsarTenant();
- }
-
- String namespace = groupInfo.getMqResource();
- String topicName = streamInfo.getMqResource();
- String fullTopicName = tenant + "/" + namespace + "/" + topicName;
- String clusterTag = inlongPulsarInfo.getInlongClusterTag();
- String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, clusterTag, topicName);
- boolean serial = InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
- briefMQMessages =
- pulsarOperator.queryLatestMessage(pulsarAdmin, fullTopicName, subs, messageCount, streamInfo,
- serial);
-
- // insert the consumer group info into the inlong_consume table
- Integer id = consumeService.saveBySystem(groupInfo, topicName, subs);
- log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}",
- id, subs, groupId, topicName);
+ String tenant = inlongPulsarInfo.getPulsarTenant();
+ if (StringUtils.isBlank(tenant)) {
+ tenant = pulsarCluster.getPulsarTenant();
}
+
+ String namespace = groupInfo.getMqResource();
+ String topicName = streamInfo.getMqResource();
+ String fullTopicName = tenant + "/" + namespace + "/" + topicName;
+ String clusterTag = inlongPulsarInfo.getInlongClusterTag();
+ String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, clusterTag, topicName);
+ boolean serial = InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
+ List<BriefMQMessage> briefMQMessages = pulsarOperator.queryLatestMessage(pulsarCluster, fullTopicName, subs,
+ messageCount, streamInfo, serial);
+
+ // insert the consumer group info into the inlong_consume table
+ Integer id = consumeService.saveBySystem(groupInfo, topicName, subs);
+ log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}",
+ id, subs, groupId, topicName);
return briefMQMessages;
}
-
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
index 81c3de2b6a..9fd81aac03 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
@@ -17,18 +17,45 @@
package org.apache.inlong.manager.service.resource.queue.pulsar;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.util.HttpUtils;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarBrokerEntryMetadata;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarLookupTopicInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageMetadata;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarNamespacePolicies;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTenantInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.PulsarClientException;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
/**
* Pulsar connection utils
@@ -39,54 +66,921 @@ public class PulsarUtils {
private PulsarUtils() {
}
+ private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(
+ ZoneId.systemDefault());
+
+ public static final String QUERY_CLUSTERS_PATH = "/admin/v2/clusters";
+ public static final String QUERY_BROKERS_PATH = "/admin/v2/brokers";
+ public static final String QUERY_TENANTS_PATH = "/admin/v2/tenants";
+ public static final String QUERY_NAMESPACE_PATH = "/admin/v2/namespaces";
+ public static final String QUERY_PERSISTENT_PATH = "/admin/v2/persistent";
+ public static final String LOOKUP_TOPIC_PATH = "/lookup/v2/topic";
+
+ private static final Gson GSON = new GsonBuilder().create(); // thread safe
+
+ /**
+ * Get http headers by token.
+ *
+ * @param token pulsar token info
+ * @return add http headers for token info
+ */
+ private static HttpHeaders getHttpHeaders(String token) {
+ HttpHeaders headers = new HttpHeaders();
+ if (StringUtils.isNotEmpty(token)) {
+ headers.add("Authorization", "Bearer " + token);
+ }
+ return headers;
+ }
+
+ /**
+ * Get pulsar cluster info list.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @return list of pulsar cluster infos
+ * @throws Exception any exception if occurred
+ */
+ public static List<String> getClusters(RestTemplate restTemplate, PulsarClusterInfo clusterInfo)
+ throws Exception {
+ final String url = clusterInfo.getAdminUrl() + QUERY_CLUSTERS_PATH;
+ return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+ ArrayList.class);
+ }
+
/**
- * Get pulsar admin info
+ * Get the list of active brokers.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @return list of pulsar broker infos
+ * @throws Exception any exception if occurred
+ */
+ public static List<String> getBrokers(RestTemplate restTemplate, PulsarClusterInfo clusterInfo)
+ throws Exception {
+ List<String> clusters = getClusters(restTemplate, clusterInfo);
+ List<String> brokers = new ArrayList<>();
+ for (String brokerName : clusters) {
+ String url = clusterInfo.getAdminUrl() + QUERY_BROKERS_PATH + "/" + brokerName;
+ brokers.addAll(
+ HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+ ArrayList.class));
+ }
+ return brokers;
+ }
+
+ /**
+ * Get pulsar tenant info list.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @return list of pulsar tenant infos
+ * @throws Exception any exception if occurred
*/
- public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarCluster) throws PulsarClientException {
- Preconditions.expectNotBlank(pulsarCluster.getAdminUrl(), ErrorCodeEnum.INVALID_PARAMETER,
- "Pulsar adminUrl cannot be empty");
- PulsarAdmin pulsarAdmin;
- if (StringUtils.isEmpty(pulsarCluster.getToken())) {
- pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl());
+ public static List<String> getTenants(RestTemplate restTemplate, PulsarClusterInfo clusterInfo)
+ throws Exception {
+ final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH;
+ return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+ ArrayList.class);
+ }
+
+ /**
+ * Get pulsar namespace info list.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param tenant pulsar tenant name
+ * @return list of pulsar namespace infos
+ * @throws Exception any exception if occurred
+ */
+ public static List<String> getNamespaces(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+ String tenant) throws Exception {
+ String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH + "/" + tenant;
+ return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+ ArrayList.class);
+ }
+
+ /**
+ * Create a new pulsar tenant.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param tenant pulsar tenant name
+ * @param tenantInfo pulsar tenant info
+ * @throws Exception any exception if occurred
+ */
+ public static void createTenant(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String tenant,
+ PulsarTenantInfo tenantInfo) throws Exception {
+ final String url = clusterInfo.getAdminUrl() + QUERY_TENANTS_PATH + "/" + tenant;
+ HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
+ MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
+ headers.setContentType(type);
+ headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+ String param = GSON.toJson(tenantInfo);
+ HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers);
+ }
+
+ /**
+ * Creates a new pulsar namespace with the specified policies.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param tenant pulsar namespace name
+ * @param namespaceName pulsar namespace name
+ * @param policies pulsar namespace policies info
+ * @throws Exception any exception if occurred
+ */
+ public static void createNamespace(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String tenant,
+ String namespaceName, PulsarNamespacePolicies policies) throws Exception {
+ final String url = clusterInfo.getAdminUrl() + QUERY_NAMESPACE_PATH + InlongConstants.SLASH + tenant
+ + InlongConstants.SLASH + namespaceName;
+ HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
+ MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
+ headers.setContentType(type);
+ headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+ String param = GSON.toJson(policies);
+ param = param.replaceAll("messageTtlInSeconds", "message_ttl_in_seconds")
+ .replaceAll("retentionPolicies", "retention_policies");
+ HttpUtils.request(restTemplate, url, HttpMethod.PUT, param, headers);
+ }
+
+ /**
+ * Get the list of topics under a namespace.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param tenant pulsar tenant name
+ * @param namespace pulsar namespace name
+ * @return list of pulsar topic infos
+ * @throws Exception any exception if occurred
+ */
+ public static List<String> getTopics(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String tenant,
+ String namespace) throws Exception {
+ String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + tenant + "/" + namespace;
+ return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+ ArrayList.class);
+ }
+
+ /**
+ * Get the list of partitioned topics under a namespace.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param tenant pulsar tenant name
+ * @param namespace pulsar namespace name
+ * @return list of pulsar partitioned topic infos
+ * @throws Exception any exception if occurred
+ */
+ public static List<String> getPartitionedTopics(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+ String tenant, String namespace) throws Exception {
+ String url =
+ clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + tenant + "/" + namespace + "/partitioned";
+ return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+ ArrayList.class);
+ }
+
+ /**
+ * Create a non-partitioned topic.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @throws Exception any exception if occurred
+ */
+ public static void createNonPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+ String topicPath) throws Exception {
+ String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath;
+ HttpUtils.request(restTemplate, url, HttpMethod.PUT, null, getHttpHeaders(clusterInfo.getToken()));
+ }
+
+ /**
+ * Create a partitioned topic.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @throws Exception any exception if occurred
+ */
+ public static void createPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+ String topicPath, Integer numPartitions) throws Exception {
+ String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitions";
+ HttpUtils.request(restTemplate, url, HttpMethod.PUT, numPartitions.toString(),
+ getHttpHeaders(clusterInfo.getToken()));
+ }
+
+ /**
+ * Get the stats-internal for the partitioned topic.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @return pulsar internal stat info of partitioned topic
+ * @throws Exception any exception if occurred
+ */
+ public static JsonObject getInternalStatsPartitionedTopics(RestTemplate restTemplate,
+ PulsarClusterInfo clusterInfo, String topicPath) throws Exception {
+ String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitioned-internalStats";
+ return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+ JsonObject.class);
+ }
+
+ /**
+ * Get partitioned topic metadata.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @return pulsar topic metadata info
+ * @throws Exception any exception if occurred
+ */
+ public static PulsarTopicMetadata getPartitionedTopicMetadata(RestTemplate restTemplate,
+ PulsarClusterInfo clusterInfo, String topicPath) throws Exception {
+ String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitions";
+ return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+ PulsarTopicMetadata.class);
+ }
+
+ /**
+ * Delete a topic.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @throws Exception any exception if occurred
+ */
+ public static void deleteNonPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+ String topicPath) throws Exception {
+ String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath;
+ HttpUtils.request(restTemplate, url, HttpMethod.DELETE, null, getHttpHeaders(clusterInfo.getToken()));
+ }
+
+ /**
+ * Force delete a topic.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @throws Exception any exception if occurred
+ */
+ public static void forceDeleteNonPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+ String topicPath) throws Exception {
+ String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath;
+ Map<String, Boolean> uriVariables = new HashMap<>();
+ uriVariables.put("force", true);
+ HttpUtils.request(restTemplate, url, HttpMethod.DELETE, uriVariables, getHttpHeaders(clusterInfo.getToken()));
+ }
+
+ /**
+ * Delete a partitioned topic.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @throws Exception any exception if occurred
+ */
+ public static void deletePartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+ String topicPath) throws Exception {
+ String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitions";
+ HttpUtils.request(restTemplate, url, HttpMethod.DELETE, null, getHttpHeaders(clusterInfo.getToken()));
+ }
+
+ /**
+ * Force delete a partitioned topic.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @throws Exception any exception if occurred
+ */
+ public static void forceDeletePartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+ String topicPath) throws Exception {
+ String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/partitions";
+ Map<String, Boolean> uriVariables = new HashMap<>();
+ uriVariables.put("force", true);
+ HttpUtils.request(restTemplate, url, HttpMethod.DELETE, uriVariables, getHttpHeaders(clusterInfo.getToken()));
+ }
+
+ /**
+ * Delete a partitioned or non-partitioned topic.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @param isPartitioned pulsar is partitioned topic
+ * @throws Exception any exception if occurred
+ */
+ public static void deleteTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String topicPath,
+ boolean isPartitioned) throws Exception {
+ if (isPartitioned) {
+ deletePartitionedTopic(restTemplate, clusterInfo, topicPath);
} else {
- pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl(), pulsarCluster.getToken());
+ deleteNonPartitionedTopic(restTemplate, clusterInfo, topicPath);
}
- return pulsarAdmin;
}
/**
- * Get the pulsar admin from the given service URL.
+ * Force delete a partitioned or non-partitioned topic.
*
- * @apiNote It must be closed after use.
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @param isPartitioned pulsar is partitioned topic
+ * @throws Exception any exception if occurred
*/
- public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl) throws PulsarClientException {
- return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
+ public static void forceDeleteTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String topicPath,
+ boolean isPartitioned)
+ throws Exception {
+ if (isPartitioned) {
+ forceDeletePartitionedTopic(restTemplate, clusterInfo, topicPath);
+ } else {
+ forceDeleteNonPartitionedTopic(restTemplate, clusterInfo, topicPath);
+ }
}
/**
- * Get the pulsar admin from the given service URL and token.
- * <p/>
- * Currently only token is supported as an authentication type.
+ * lookup persistent topic info.
*
- * @apiNote It must be closed after use.
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @return pulsar broker url
+ * @throws Exception any exception if occurred
*/
- public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String token) throws PulsarClientException {
- return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl)
- .authentication(AuthenticationFactory.token(token)).build();
+ public static String lookupTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String topicPath)
+ throws Exception {
+ String url = clusterInfo.getAdminUrl() + LOOKUP_TOPIC_PATH + "/persistent/" + topicPath;
+ PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate, url, HttpMethod.GET, null,
+ getHttpHeaders(clusterInfo.getToken()), PulsarLookupTopicInfo.class);
+ return topicInfo.getBrokerUrl();
}
/**
- * Get pulsar cluster info list.
+ * lookup persistent partitioned topic info.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @return map of partitioned topic info
+ * @throws Exception any exception if occurred
+ */
+ public static Map<String, String> lookupPartitionedTopic(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+ String topicPath) throws Exception {
+ PulsarTopicMetadata metadata = getPartitionedTopicMetadata(restTemplate, clusterInfo, topicPath);
+ Map<String, String> map = new LinkedHashMap<>();
+ for (int i = 0; i < metadata.getPartitions(); i++) {
+ String partitionTopicName = topicPath + "-partition-" + i;
+ String partitionUrl = clusterInfo.getAdminUrl() + LOOKUP_TOPIC_PATH + "/persistent/" + partitionTopicName;
+ PulsarLookupTopicInfo topicInfo = HttpUtils.request(restTemplate, partitionUrl, HttpMethod.GET, null,
+ getHttpHeaders(clusterInfo.getToken()), PulsarLookupTopicInfo.class);
+ map.put(partitionTopicName, topicInfo.getBrokerUrl());
+ }
+ return map;
+ }
+
+ /**
+ * Get the list of persistent subscriptions for a given topic.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @return list of pulsar topic subscription info
+ * @throws Exception any exception if occurred
+ */
+ public static List<String> getSubscriptions(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+ String topicPath) throws Exception {
+ String url = clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/subscriptions";
+ return HttpUtils.request(restTemplate, url, HttpMethod.GET, null, getHttpHeaders(clusterInfo.getToken()),
+ ArrayList.class);
+ }
+
+ /**
+ * Create a subscription on the topic.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPath pulsar topic path
+ * @param subscription pulsar topic subscription info
+ * @throws Exception any exception if occurred
+ */
+ public static void createSubscription(RestTemplate restTemplate, PulsarClusterInfo clusterInfo, String topicPath,
+ String subscription) throws Exception {
+ String url =
+ clusterInfo.getAdminUrl() + QUERY_PERSISTENT_PATH + "/" + topicPath + "/subscription/" + subscription;
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty("entryId", Long.MAX_VALUE);
+ jsonObject.addProperty("ledgerId", Long.MAX_VALUE);
+ jsonObject.addProperty("partitionIndex", -1);
+ HttpHeaders headers = getHttpHeaders(clusterInfo.getToken());
+ MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
+ headers.setContentType(type);
+ headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+ HttpUtils.request(restTemplate, url, HttpMethod.PUT, jsonObject.toString(), headers);
+ }
+
+ /**
+ * Examine a specific message on a topic by position relative to the earliest or the latest message.
+ *
+ * @param restTemplate spring framework RestTemplate
+ * @param clusterInfo pulsar cluster info
+ * @param topicPartition pulsar topic partition info
+ * @param messageType pulsar message type info
+ * @param messagePosition pulsar message position info
+ * @return spring framework HttpEntity
+ * @throws Exception any exception if occurred
+ */
+ public static ResponseEntity<byte[]> examineMessage(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
+ String topicPartition, String messageType, int messagePosition) throws Exception {
+ StringBuilder urlBuilder = new StringBuilder().append(clusterInfo.getAdminUrl())
+ .append(QUERY_PERSISTENT_PATH)
+ .append("/")
+ .append(topicPartition)
+ .append("/examinemessage")
+ .append("?initialPosition=")
+ .append(messageType)
+ .append("&messagePosition=")
+ .append(messagePosition);
+ ResponseEntity<byte[]> response = restTemplate.exchange(urlBuilder.toString(), HttpMethod.GET,
+ new HttpEntity<>(getHttpHeaders(clusterInfo.getToken())), byte[].class);
+ if (!response.getStatusCode().is2xxSuccessful()) {
+ log.error("request error for {}, status code {}, body {}", urlBuilder.toString(), response.getStatusCode(),
+ response.getBody());
+ }
+ return response;
+ }
+
+ public static PulsarMessageInfo getMessageFromHttpResponse(ResponseEntity<byte[]> response, String topic)
+ throws Exception {
+ List<PulsarMessageInfo> messages = PulsarUtils.getMessagesFromHttpResponse(response, topic);
+ if (messages.size() > 0) {
+ return messages.get(0);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Copy from getMessagesFromHttpResponse method of org.apache.pulsar.client.admin.internal.TopicsImpl class.
+ *
+ * @param response
+ * @param topic
+ * @return
+ * @throws Exception
*/
- public static List<String> getPulsarClusters(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
- return pulsarAdmin.clusters().getClusters();
+ public static List<PulsarMessageInfo> getMessagesFromHttpResponse(ResponseEntity<byte[]> response, String topic)
+ throws Exception {
+ HttpHeaders headers = response.getHeaders();
+ String msgId = headers.getFirst("X-Pulsar-Message-ID");
+ String brokerEntryTimestamp = headers.getFirst("X-Pulsar-Broker-Entry-METADATA-timestamp");
+ String brokerEntryIndex = headers.getFirst("X-Pulsar-Broker-Entry-METADATA-index");
+ PulsarBrokerEntryMetadata brokerEntryMetadata;
+ if (brokerEntryTimestamp == null && brokerEntryIndex == null) {
+ brokerEntryMetadata = null;
+ } else {
+ brokerEntryMetadata = new PulsarBrokerEntryMetadata();
+ if (brokerEntryTimestamp != null) {
+ brokerEntryMetadata.setBrokerTimestamp(parse(brokerEntryTimestamp.toString()));
+ }
+ if (brokerEntryIndex != null) {
+ brokerEntryMetadata.setIndex(Long.parseLong(brokerEntryIndex));
+ }
+ }
+
+ PulsarMessageMetadata messageMetadata = new PulsarMessageMetadata();
+ Map<String, String> properties = Maps.newTreeMap();
+
+ Object tmp = headers.getFirst("X-Pulsar-publish-time");
+ if (tmp != null) {
+ messageMetadata.setPublishTime(parse(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-event-time");
+ if (tmp != null) {
+ messageMetadata.setEventTime(parse(tmp.toString()));
+ }
+ tmp = headers.getFirst("X-Pulsar-deliver-at-time");
+ if (tmp != null) {
+ messageMetadata.setDeliverAtTime(parse(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-null-value");
+ if (tmp != null) {
+ messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-producer-name");
+ if (tmp != null) {
+ messageMetadata.setProducerName(tmp.toString());
+ }
+
+ tmp = headers.getFirst("X-Pulsar-sequence-id");
+ if (tmp != null) {
+ messageMetadata.setSequenceId(Long.parseLong(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-replicated-from");
+ if (tmp != null) {
+ messageMetadata.setReplicatedFrom(tmp.toString());
+ }
+
+ tmp = headers.getFirst("X-Pulsar-partition-key");
+ if (tmp != null) {
+ messageMetadata.setPartitionKey(tmp.toString());
+ }
+
+ tmp = headers.getFirst("X-Pulsar-compression");
+ if (tmp != null) {
+ messageMetadata.setCompression(tmp.toString());
+ }
+
+ tmp = headers.getFirst("X-Pulsar-uncompressed-size");
+ if (tmp != null) {
+ messageMetadata.setUncompressedSize(Integer.parseInt(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-encryption-algo");
+ if (tmp != null) {
+ messageMetadata.setEncryptionAlgo(tmp.toString());
+ }
+
+ tmp = headers.getFirst("X-Pulsar-partition-key-b64-encoded");
+ if (tmp != null) {
+ messageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-marker-type");
+ if (tmp != null) {
+ messageMetadata.setMarkerType(Integer.parseInt(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-txnid-least-bits");
+ if (tmp != null) {
+ messageMetadata.setTxnidLeastBits(Long.parseLong(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-txnid-most-bits");
+ if (tmp != null) {
+ messageMetadata.setTxnidMostBits(Long.parseLong(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-highest-sequence-id");
+ if (tmp != null) {
+ messageMetadata.setHighestSequenceId(Long.parseLong(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-uuid");
+ if (tmp != null) {
+ messageMetadata.setUuid(tmp.toString());
+ }
+
+ tmp = headers.getFirst("X-Pulsar-num-chunks-from-msg");
+ if (tmp != null) {
+ messageMetadata.setNumChunksFromMsg(Integer.parseInt(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-total-chunk-msg-size");
+ if (tmp != null) {
+ messageMetadata.setTotalChunkMsgSize(Integer.parseInt(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-chunk-id");
+ if (tmp != null) {
+ messageMetadata.setChunkId(Integer.parseInt(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-null-partition-key");
+ if (tmp != null) {
+ messageMetadata.setNullPartitionKey(Boolean.parseBoolean(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
+ if (tmp != null) {
+ messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-Base64-ordering-key");
+ if (tmp != null) {
+ messageMetadata.setOrderingKey(Base64.getDecoder().decode(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-Base64-schema-version-b64encoded");
+ if (tmp != null) {
+ messageMetadata.setSchemaVersion(Base64.getDecoder().decode(tmp.toString()));
+ }
+
+ tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
+ if (tmp != null) {
+ messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
+ }
+
+ List<String> tmpList = (List) headers.get("X-Pulsar-replicated-to");
+ if (ObjectUtils.isNotEmpty(tmpList)) {
+ if (ObjectUtils.isEmpty(messageMetadata.getReplicateTos())) {
+ messageMetadata.setReplicateTos(Lists.newArrayList(tmpList));
+ } else {
+ messageMetadata.getReplicateTos().addAll(tmpList);
+ }
+ }
+
+ tmp = headers.getFirst("X-Pulsar-batch-size");
+ if (tmp != null) {
+ properties.put("X-Pulsar-batch-size", (String) tmp);
+ }
+
+ for (Entry<String, List<String>> entry : headers.entrySet()) {
+ if (entry.getKey().contains("X-Pulsar-PROPERTY-")) {
+ String keyName = entry.getKey().substring("X-Pulsar-PROPERTY-".length());
+ properties.put(keyName, (String) ((List) entry.getValue()).get(0));
+ }
+ }
+
+ tmp = headers.getFirst("X-Pulsar-num-batch-message");
+ if (tmp != null) {
+ properties.put("X-Pulsar-num-batch-message", (String) tmp);
+ }
+ boolean isEncrypted = false;
+ tmp = headers.getFirst("X-Pulsar-Is-Encrypted");
+ if (tmp != null) {
+ isEncrypted = Boolean.parseBoolean(tmp.toString());
+ }
+
+ if (!isEncrypted && headers.get("X-Pulsar-num-batch-message") != null) {
+ return getIndividualMsgsFromBatch(topic, msgId, response.getBody(), properties, messageMetadata,
+ brokerEntryMetadata);
+ }
+
+ PulsarMessageInfo messageInfo = new PulsarMessageInfo();
+ messageInfo.setTopic(topic);
+ messageInfo.setMessageId(msgId);
+ messageInfo.setProperties(messageMetadata.getProperties());
+ messageInfo.setBody(response.getBody());
+ messageInfo.setPulsarMessageMetadata(messageMetadata);
+ if (brokerEntryMetadata != null) {
+ messageInfo.setPulsarBrokerEntryMetadata(brokerEntryMetadata);
+ }
+ return Collections.singletonList(messageInfo);
+ }
+
+ private static long parse(String datetime) throws DateTimeParseException {
+ Instant instant = Instant.from(DATE_FORMAT.parse(datetime));
+ return instant.toEpochMilli();
+ }
+
+ /**
+ * Copy from getIndividualMsgsFromBatch method of org.apache.pulsar.client.admin.internal.TopicsImpl class.
+ *
+ * @param topic
+ * @param msgId
+ * @param data
+ * @param properties
+ * @param metadata
+ * @param brokerMetadata
+ * @return
+ */
+ private static List<PulsarMessageInfo> getIndividualMsgsFromBatch(String topic, String msgId, byte[] data,
+ Map<String, String> properties, PulsarMessageMetadata metadata, PulsarBrokerEntryMetadata brokerMetadata) {
+ List<PulsarMessageInfo> ret = new ArrayList<>();
+ int batchSize = Integer.parseInt(properties.get("X-Pulsar-num-batch-message"));
+ ByteBuffer buffer = ByteBuffer.wrap(data);
+ for (int i = 0; i < batchSize; ++i) {
+ String batchMsgId = msgId + ":" + i;
+ PulsarMessageMetadata singleMetadata = new PulsarMessageMetadata();
+ singleMetadata.setProperties(properties);
+ ByteBuffer singleMessagePayload = deSerializeSingleMessageInBatch(buffer, singleMetadata, i, batchSize);
+ PulsarMessageInfo messageInfo = new PulsarMessageInfo();
+ messageInfo.setTopic(topic);
+ messageInfo.setMessageId(batchMsgId);
+ messageInfo.setProperties(singleMetadata.getProperties());
+ messageInfo.setPulsarMessageMetadata(metadata);
+ messageInfo.setBody(singleMessagePayload.array());
+ if (brokerMetadata != null) {
+ messageInfo.setPulsarBrokerEntryMetadata(brokerMetadata);
+ }
+ ret.add(messageInfo);
+ }
+ buffer.clear();
+ return ret;
+ }
+
+ /**
+ * Copy from deSerializeSingleMessageInBatch method of org.apache.pulsar.common.protocol.Commands class.
+ *
+ * @param uncompressedPayload
+ * @param metadata
+ * @param index
+ * @param batchSize
+ * @return
+ */
+ private static ByteBuffer deSerializeSingleMessageInBatch(ByteBuffer uncompressedPayload,
+ PulsarMessageMetadata metadata, int index, int batchSize) {
+ int singleMetaSize = (int) uncompressedPayload.getInt();
+ metaDataParseFrom(metadata, uncompressedPayload, singleMetaSize);
+ int singleMessagePayloadSize = metadata.getPayloadSize();
+ int readerIndex = uncompressedPayload.position();
+ byte[] singleMessagePayload = new byte[singleMessagePayloadSize];
+ uncompressedPayload.get(singleMessagePayload);
+ if (index < batchSize) {
+ uncompressedPayload.position(readerIndex + singleMessagePayloadSize);
+ }
+ return ByteBuffer.wrap(singleMessagePayload);
+ }
+
+ /**
+ * Copy from parseFrom method of org.apache.pulsar.common.api.proto.SingleMessageMetadata class.
+ *
+ * @param metadata
+ * @param buffer
+ * @param size
+ */
+ private static void metaDataParseFrom(PulsarMessageMetadata metadata, ByteBuffer buffer, int size) {
+ int endIdx = size + buffer.position();
+ while (buffer.position() < endIdx) {
+ int tag = readVarInt(buffer);
+ switch (tag) {
+ case 10:
+ int _propertiesSize = readVarInt(buffer);
+ parseFrom(metadata, buffer, _propertiesSize);
+ break;
+ case 18:
+ int _partitionKeyBufferLen = readVarInt(buffer);
+ byte[] partitionKeyArray = new byte[_partitionKeyBufferLen];
+ buffer.get(partitionKeyArray);
+ metadata.setPartitionKey(new String(partitionKeyArray));
+ break;
+ case 24:
+ int payloadSize = readVarInt(buffer);
+ metadata.setPayloadSize(payloadSize);
+ break;
+ case 32:
+ boolean compactedOut = readVarInt(buffer) == 1;
+ metadata.setCompactedOut(compactedOut);
+ break;
+ case 40:
+ long eventTime = readVarInt64(buffer);
+ metadata.setEventTime(eventTime);
+ break;
+ case 48:
+ boolean partitionKeyB64Encoded = readVarInt(buffer) == 1;
+ metadata.setPartitionKeyB64Encoded(partitionKeyB64Encoded);
+ break;
+ case 58:
+ int _orderingKeyLen = readVarInt(buffer);
+ byte[] orderingKeyArray = new byte[_orderingKeyLen];
+ metadata.setOrderingKey(orderingKeyArray);
+ break;
+ case 64:
+ long sequenceId = readVarInt64(buffer);
+ metadata.setSequenceId(sequenceId);
+ break;
+ case 72:
+ boolean nullValue = readVarInt(buffer) == 1;
+ metadata.setNullValue(nullValue);
+ break;
+ case 80:
+ boolean nullPartitionKey = readVarInt(buffer) == 1;
+ metadata.setNullPartitionKey(nullPartitionKey);
+ break;
+ default:
+ skipUnknownField(tag, buffer);
+ }
+ }
}
/**
- * Get pulsar cluster service url.
+ * Copy from readVarInt method of org.apache.pulsar.common.api.proto.LightProtoCodec class.
+ *
+ * @param buf
+ * @return
*/
- public static String getServiceUrl(PulsarAdmin pulsarAdmin, String pulsarCluster) throws PulsarAdminException {
- return pulsarAdmin.clusters().getCluster(pulsarCluster).getServiceUrl();
+ private static int readVarInt(ByteBuffer buf) {
+ byte tmp = buf.get();
+ if (tmp >= 0) {
+ return tmp;
+ } else {
+ int result = tmp & 127;
+ if ((tmp = buf.get()) >= 0) {
+ result |= tmp << 7;
+ } else {
+ result |= (tmp & 127) << 7;
+ if ((tmp = buf.get()) >= 0) {
+ result |= tmp << 14;
+ } else {
+ result |= (tmp & 127) << 14;
+ if ((tmp = buf.get()) >= 0) {
+ result |= tmp << 21;
+ } else {
+ result |= (tmp & 127) << 21;
+ result |= (tmp = buf.get()) << 28;
+ if (tmp < 0) {
+ for (int i = 0; i < 5; ++i) {
+ if (buf.get() >= 0) {
+ return result;
+ }
+ }
+ throw new IllegalArgumentException("Encountered a malformed varint.");
+ }
+ }
+ }
+ }
+ return result;
+ }
}
+ /**
+ * Copy from readVarInt64 method of org.apache.pulsar.common.api.proto.LightProtoCodec class.
+ *
+ * @param buf
+ * @return
+ */
+ private static long readVarInt64(ByteBuffer buf) {
+ int shift = 0;
+ for (long result = 0L; shift < 64; shift += 7) {
+ byte b = buf.get();
+ result |= (long) (b & 127) << shift;
+ if ((b & 128) == 0) {
+ return result;
+ }
+ }
+ throw new IllegalArgumentException("Encountered a malformed varint.");
+ }
+
+ /**
+ * Copy from getTagType method of org.apache.pulsar.common.api.proto.LightProtoCodec class.
+ *
+ * @param tag
+ * @return
+ */
+ private static int getTagType(int tag) {
+ return tag & 7;
+ }
+
+ /**
+ * Copy from skipUnknownField method of org.apache.pulsar.common.api.proto.LightProtoCodec class.
+ *
+ * @param tag
+ * @param buffer
+ */
+ private static void skipUnknownField(int tag, ByteBuffer buffer) {
+ int tagType = getTagType(tag);
+ switch (tagType) {
+ case 0:
+ readVarInt(buffer);
+ break;
+ case 1:
+ buffer.get(new byte[8]);
+ break;
+ case 2:
+ int len = readVarInt(buffer);
+ buffer.get(new byte[len]);
+ break;
+ case 3:
+ case 4:
+ default:
+ throw new IllegalArgumentException("Invalid unknonwn tag type: " + tagType);
+ case 5:
+ buffer.get(new byte[4]);
+ }
+ }
+
+ /**
+ * Copy from parseFrom method of org.apache.pulsar.common.api.proto.KeyValue class.
+ *
+ * @param metadata
+ * @param buffer
+ * @param size
+ */
+ private static void parseFrom(PulsarMessageMetadata metadata, ByteBuffer buffer, int size) {
+ if (ObjectUtils.isEmpty(metadata.getProperties())) {
+ metadata.setProperties(new HashMap<>());
+ }
+ Map<String, String> properties = metadata.getProperties();
+ int endIdx = buffer.position() + size;
+ String key = null;
+ String value = null;
+ while (buffer.position() < endIdx) {
+ int tag = readVarInt(buffer);
+ if (StringUtils.isNotEmpty(key) && StringUtils.isNotEmpty(value)) {
+ properties.put(key, value);
+ key = null;
+ value = null;
+ }
+ switch (tag) {
+ case 10:
+ int keyBufferLen = readVarInt(buffer);
+ byte[] keyArray = new byte[keyBufferLen];
+ buffer.get(keyArray);
+ key = new String(keyArray);
+ break;
+ case 18:
+ int valueBufferLen = readVarInt(buffer);
+ byte[] valueArray = new byte[valueBufferLen];
+ buffer.get(valueArray);
+ value = new String(valueArray);
+ break;
+ default:
+ skipUnknownField(tag, buffer);
+ }
+ }
+ if (StringUtils.isNotEmpty(key) && StringUtils.isNotEmpty(value)) {
+ properties.put(key, value);
+ }
+ }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
index a38b391131..f790604fc8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/pulsar/PulsarResourceOperator.java
@@ -33,13 +33,9 @@ import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.pulsar.PulsarSinkDTO;
import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
-import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
import org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -86,12 +82,11 @@ public class PulsarResourceOperator extends AbstractStandaloneSinkResourceOperat
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder().adminUrl(pulsarDataNodeInfo.getAdminUrl())
.token(pulsarDataNodeInfo.getToken()).build();
- PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo);
// create pulsar tenant
- pulsarOperator.createTenant(pulsarAdmin, pulsarSinkDTO.getPulsarTenant());
+ pulsarOperator.createTenant(pulsarClusterInfo, pulsarSinkDTO.getPulsarTenant());
// use default config to create namespace
InlongPulsarInfo pulsarInfo = new InlongPulsarInfo();
- pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, pulsarSinkDTO.getPulsarTenant(),
+ pulsarOperator.createNamespace(pulsarClusterInfo, pulsarInfo, pulsarSinkDTO.getPulsarTenant(),
pulsarSinkDTO.getNamespace());
String queueModel = pulsarSinkDTO.getPartitionNum() > 0 ? InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL
: InlongConstants.PULSAR_QUEUE_TYPE_SERIAL;
@@ -102,15 +97,14 @@ public class PulsarResourceOperator extends AbstractStandaloneSinkResourceOperat
.queueModule(queueModel)
.build();
// create topic
- pulsarOperator.createTopic(pulsarAdmin, topicInfo);
+ pulsarOperator.createTopic(pulsarClusterInfo, topicInfo);
final String info = "success to create Pulsar resource";
sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
LOG.info(info + " for sinkInfo={}", sinkInfo);
- } catch (PulsarClientException | PulsarAdminException e) {
+ } catch (Exception e) {
LOG.error("init pulsar admin error", e);
throw new BusinessException();
}
-
}
private PulsarDataNodeDTO getPulsarDataNodeInfo(SinkInfo sinkInfo) {
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/PulsarUtilsTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/PulsarUtilsTest.java
index d248286820..0676b944f8 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/PulsarUtilsTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/PulsarUtilsTest.java
@@ -17,39 +17,578 @@
package org.apache.inlong.manager.service.queue;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarMessageInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarNamespacePolicies;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarPersistencePolicies;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarRetentionPolicies;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTenantInfo;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.junit.jupiter.api.Assertions;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Ignore;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.springframework.util.ReflectionUtils;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
-import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.powermock.api.mockito.PowerMockito.when;
/**
* Test class for Pulsar utils.
*/
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
public class PulsarUtilsTest {
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarUtilsTest.class);
+
+ public static final Network NETWORK = Network.newNetwork();
+
+ private static final String INTER_CONTAINER_PULSAR_ALIAS = "pulsar";
+
+ private static final Gson GSON = new GsonBuilder().create(); // thread safe
+
+ private static final String DEFAULT_SERVICE_URL = "http://127.0.0.1:8080";
+
+ private static final String DEFAULT_TENANT = "public";
+
+ private static final String DEFAULT_CREATE_TENANT = "test_tenant";
+
+ private static final String DEFAULT_NAMESPACE = "default";
+
+ private static final int DEFAULT_PARTITIONS_NUM = 3;
+
+ private static final String PERSISTENT_TOPIC_HEAD = "persistent://";
+
+ @Mock
+ private RestTemplate restTemplate;
+
+ @Mock
+ private ResponseEntity<byte[]> byteExchange;
+
+ @Mock
+ private PulsarClusterInfo pulsarClusterInfo;
+
+ private static PulsarNamespacePolicies policies;
+
+ private static final PulsarContainer PULSAT_CONTAINER = new PulsarContainer(
+ DockerImageName.parse("apachepulsar/pulsar:2.8.2")
+ .asCompatibleSubstituteFor("apachepulsar/pulsar"))
+ .withNetwork(NETWORK)
+ .withAccessToHost(true)
+ .withNetworkAliases(INTER_CONTAINER_PULSAR_ALIAS)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ private static final RestTemplate client = new RestTemplate();
+
+ private static final PulsarClusterInfo pulsarCluster = new PulsarClusterInfo();
+
+ @BeforeAll
+ public static void beforeAll() {
+ policies = new PulsarNamespacePolicies();
+ policies.setMessageTtlInSeconds(10);
+ PulsarPersistencePolicies persistencePolicies = new PulsarPersistencePolicies();
+ persistencePolicies.setBookkeeperEnsemble(3);
+ persistencePolicies.setBookkeeperAckQuorum(3);
+ persistencePolicies.setBookkeeperWriteQuorum(3);
+ persistencePolicies.setManagedLedgerMaxMarkDeleteRate(4.0);
+ policies.setPersistence(persistencePolicies);
+ PulsarRetentionPolicies retentionPolicies = new PulsarRetentionPolicies();
+ retentionPolicies.setRetentionSizeInMB(2048l);
+ retentionPolicies.setRetentionTimeInMinutes(500);
+ policies.setRetentionPolicies(retentionPolicies);
+
+ PULSAT_CONTAINER.setPortBindings(Arrays.asList("6650:6650", "8080:8080"));
+ Startables.deepStart(Stream.of(PULSAT_CONTAINER)).join();
+ LOG.info("Containers are started.");
+ pulsarCluster.setAdminUrl("http://127.0.0.1:8080");
+ }
+
+ @AfterAll
+ public static void teardown() {
+ if (PULSAT_CONTAINER != null) {
+ PULSAT_CONTAINER.stop();
+ }
+ }
+
+ @BeforeEach
+ public void before() {
+ when(pulsarClusterInfo.getAdminUrl()).thenReturn(DEFAULT_SERVICE_URL);
+ when(pulsarClusterInfo.getToken()).thenReturn("testtoken");
+
+ }
+ /**
+ * Test cases for {@link PulsarUtils#getClusters(RestTemplate, PulsarClusterInfo)}.
+ *
+ * @throws Exception
+ */
@Test
- public void testGetPulsarAdmin() {
- final String defaultServiceUrl = "http://127.0.0.1:10080";
- try {
- PulsarAdmin admin = PulsarUtils.getPulsarAdmin(defaultServiceUrl);
- Assertions.assertEquals(defaultServiceUrl, admin.getServiceUrl());
- Field auth = ReflectionUtils.findField(PulsarAdminImpl.class, "auth");
- assert auth != null;
- auth.setAccessible(true);
- Authentication authentication = (Authentication) auth.get(admin);
- Assertions.assertNotNull(authentication);
- Assertions.assertTrue(authentication instanceof AuthenticationDisabled);
- } catch (PulsarClientException | IllegalAccessException e) {
- Assertions.fail();
+ public void testGetClusters() throws Exception {
+ final String result = "[\"standalone\"]";
+ List<String> expected = GSON.fromJson(result, ArrayList.class);
+ List<String> clusters = PulsarUtils.getClusters(client, pulsarCluster);
+ assertEquals(expected.size(), clusters.size());
+ assertEquals(expected, clusters);
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#getBrokers(RestTemplate, PulsarClusterInfo)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testGetBrokers() throws Exception {
+ final String result = "[\"localhost:8080\"]";
+ List<String> expected = GSON.fromJson(result, ArrayList.class);
+ List<String> brokers = PulsarUtils.getBrokers(client, pulsarCluster);
+ assertEquals(expected.size(), brokers.size());
+ assertEquals(expected, brokers);
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#getTenants(RestTemplate, PulsarClusterInfo)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testGetTenants() throws Exception {
+ List<String> tenants = PulsarUtils.getTenants(client, pulsarCluster);
+ assertNotNull(tenants);
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#getNamespaces(RestTemplate, PulsarClusterInfo, String)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testGetNamespaces() throws Exception {
+ List<String> namespaces = PulsarUtils.getNamespaces(client, pulsarCluster, DEFAULT_TENANT);
+ assertNotNull(namespaces);
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#createTenant}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCreateTenant() throws Exception {
+ PulsarTenantInfo pulsarTenantInfo = new PulsarTenantInfo();
+ pulsarTenantInfo.setAdminRoles(Sets.newHashSet());
+ pulsarTenantInfo.setAllowedClusters(Sets.newHashSet("standalone"));
+ PulsarUtils.createTenant(client, pulsarCluster, DEFAULT_CREATE_TENANT, pulsarTenantInfo);
+ Thread.sleep(500);
+ List<String> tenants = PulsarUtils.getTenants(client, pulsarCluster);
+ assertTrue(tenants.contains(DEFAULT_CREATE_TENANT));
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#createNamespace(RestTemplate, PulsarClusterInfo, String, String, PulsarNamespacePolicies)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCreateNamespace() throws Exception {
+ final String namespaceName = "testCreateNamespace";
+ final String namespaceInfo = DEFAULT_TENANT + InlongConstants.SLASH + namespaceName;
+ String param = GSON.toJson(policies);
+ param = param.replaceAll("messageTtlInSeconds", "message_ttl_in_seconds")
+ .replaceAll("retentionPolicies", "retention_policies");
+ final HttpHeaders headers = new HttpHeaders();
+ if (StringUtils.isNotEmpty(pulsarClusterInfo.getToken())) {
+ headers.add("Authorization", "Bearer " + pulsarClusterInfo.getToken());
}
+ final MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
+ headers.setContentType(type);
+ headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+
+ PulsarUtils.createNamespace(client, pulsarCluster, DEFAULT_TENANT, namespaceName, policies);
+ Thread.sleep(500);
+ List<String> namespaces = PulsarUtils.getNamespaces(client, pulsarCluster, DEFAULT_TENANT);
+ assertTrue(namespaces.contains(namespaceInfo));
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#getTopics(RestTemplate, PulsarClusterInfo, String, String)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testGetTopics() throws Exception {
+ List<String> topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT,
+ DEFAULT_NAMESPACE);
+ assertTrue(topics.size() >= 0);
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#getPartitionedTopics(RestTemplate, PulsarClusterInfo, String, String)}.
+ * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_getList</a>
+ * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/partitioned
+ * method is: GET
+ * request body: none
+ * response : ["string"]
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testGetPartitionedTopics() throws Exception {
+ List<String> topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT,
+ DEFAULT_NAMESPACE);
+ assertTrue(topics.size() >= 0);
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#createNonPartitionedTopic(RestTemplate, PulsarClusterInfo, String)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCreateNonPartitionedTopic() throws Exception {
+ final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+ + "testCreateNonPartitionedTopic";
+ final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath;
+
+ Thread.sleep(500);
+ PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath);
+ Thread.sleep(500);
+ List<String> topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT,
+ DEFAULT_NAMESPACE);
+ assertTrue(topics.contains(topicInfo));
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#createPartitionedTopic(RestTemplate, PulsarClusterInfo, String, Integer)}.
+ * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_createPartitionedTopic</a>
+ * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions
+ * method is: PUT
+ * request body: integer <int32> (The number of partitions for the topic)
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCreatePartitionedTopic() throws Exception {
+ final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+ + "testCreatePartitionedTopic";
+ final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath;
+
+ PulsarUtils.createPartitionedTopic(client, pulsarCluster, topicPath, DEFAULT_PARTITIONS_NUM);
+ Thread.sleep(500);
+ List<String> topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT,
+ DEFAULT_NAMESPACE);
+ assertTrue(topics.contains(topicInfo));
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#getInternalStatsPartitionedTopics(RestTemplate, PulsarClusterInfo, String)}.
+ * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_getPartitionedStatsInternal</a>
+ * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/partitioned-internalStats
+ * method is: GET
+ * request body: none
+ * response : json
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testGetInternalStatsPartitionedTopics() throws Exception {
+ final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+ + "testGetInternalStatsPartitionedTopics";
+
+ PulsarUtils.createPartitionedTopic(client, pulsarCluster, topicPath, DEFAULT_PARTITIONS_NUM);
+ Thread.sleep(500);
+ JsonObject stats = PulsarUtils.getInternalStatsPartitionedTopics(client, pulsarCluster, topicPath);
+ assertNotNull(stats);
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#getPartitionedTopicMetadata(RestTemplate, PulsarClusterInfo, String)}.
+ * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_getPartitionedMetadata</a>
+ * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions
+ * method is: GET
+ * request body: none
+ * response : json
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testGetPartitionedTopicMetadata() throws Exception {
+ final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+ + "testGetPartitionedTopicMetadata";
+ PulsarUtils.createPartitionedTopic(client, pulsarCluster, topicPath, DEFAULT_PARTITIONS_NUM);
+ Thread.sleep(500);
+ PulsarTopicMetadata metadata = PulsarUtils.getPartitionedTopicMetadata(client, pulsarCluster,
+ topicPath);
+ assertNotNull(metadata);
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#deleteNonPartitionedTopic(RestTemplate, PulsarClusterInfo, String)}.
+ * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_deleteTopic</a>
+ * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}
+ * method is: DELETE
+ * request body: none
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testDeleteNonPartitionedTopic() throws Exception {
+ final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+ + "testDeleteNonPartitionedTopic";
+ final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath;
+
+ PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath);
+ Thread.sleep(500);
+ List<String> topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE);
+ assertTrue(topics.contains(topicInfo));
+
+ PulsarUtils.deleteNonPartitionedTopic(client, pulsarCluster, topicPath);
+ Thread.sleep(500);
+ topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE);
+ assertTrue(!topics.contains(topicInfo));
}
+ /**
+ * Test cases for {@link PulsarUtils#forceDeleteNonPartitionedTopic(RestTemplate, PulsarClusterInfo, String)}.
+ * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_deleteTopic</a>
+ * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}
+ * method is: DELETE
+ * request body: none
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testForceDeleteNonPartitionedTopic() throws Exception {
+ final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+ + "testForceDeleteNonPartitionedTopic";
+ final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath;
+
+ PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath);
+ Thread.sleep(500);
+ List<String> topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE);
+ assertTrue(topics.contains(topicInfo));
+
+ PulsarUtils.forceDeleteNonPartitionedTopic(client, pulsarCluster, topicPath);
+ Thread.sleep(500);
+ topics = PulsarUtils.getTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE);
+ assertTrue(!topics.contains(topicInfo));
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#deletePartitionedTopic(RestTemplate, PulsarClusterInfo, String)}.
+ * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_deletePartitionedTopic</a>
+ * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions
+ * method is: DELETE
+ * request body: none
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testDeletePartitionedTopic() throws Exception {
+ final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+ + "testDeletePartitionedTopic";
+ final int numPartitions = 3;
+ final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath;
+
+ PulsarUtils.createPartitionedTopic(client, pulsarCluster, topicPath, numPartitions);
+ Thread.sleep(500);
+ List<String> topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT,
+ DEFAULT_NAMESPACE);
+ assertTrue(topics.contains(topicInfo));
+
+ PulsarUtils.deletePartitionedTopic(client, pulsarCluster, topicPath);
+ Thread.sleep(500);
+ topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE);
+ assertTrue(!topics.contains(topicInfo));
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#forceDeletePartitionedTopic(RestTemplate, PulsarClusterInfo, String)}.
+ * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_deletePartitionedTopic</a>
+ * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/partitions
+ * method is: DELETE
+ * request body: none
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testForceDeletePartitionedTopic() throws Exception {
+ final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+ + "testForceDeletePartitionedTopic";
+ final int numPartitions = 3;
+ final String topicInfo = PERSISTENT_TOPIC_HEAD + topicPath;
+
+ PulsarUtils.createPartitionedTopic(client, pulsarCluster, topicPath, numPartitions);
+ Thread.sleep(500);
+ List<String> topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT,
+ DEFAULT_NAMESPACE);
+ assertTrue(topics.contains(topicInfo));
+
+ PulsarUtils.deletePartitionedTopic(client, pulsarCluster, topicPath);
+ Thread.sleep(500);
+ topics = PulsarUtils.getPartitionedTopics(client, pulsarCluster, DEFAULT_TENANT, DEFAULT_NAMESPACE);
+ assertTrue(!topics.contains(topicInfo));
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#lookupTopic(RestTemplate, PulsarClusterInfo, String)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testLookupTopic() throws Exception {
+ final String topicPath =
+ DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH + "testLookupTopic";
+ PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath);
+ Thread.sleep(500);
+ String actual = PulsarUtils.lookupTopic(client, pulsarCluster, topicPath);
+ assertNotNull(actual);
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#lookupPartitionedTopic(RestTemplate, PulsarClusterInfo, String)}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testLookupPartitionedTopic() throws Exception {
+ final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+ + "testLookupPartitionedTopic";
+ PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath);
+ Thread.sleep(500);
+ Map<String, String> actual = PulsarUtils.lookupPartitionedTopic(client, pulsarCluster, topicPath);
+ assertNotNull(actual);
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#getSubscriptions(RestTemplate, PulsarClusterInfo, String)}.
+ * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_getSubscriptions</a>
+ * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/subscriptions
+ * method is: GET
+ * response body: ["string"]
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testGetSubscriptions() throws Exception {
+ final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+ + "testGetSubscriptions";
+ PulsarUtils.createNonPartitionedTopic(client, pulsarCluster, topicPath);
+ Thread.sleep(500);
+ List<String> actual = PulsarUtils.getSubscriptions(client, pulsarCluster, topicPath);
+ assertTrue(actual.size() >= 0);
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#createSubscription(RestTemplate, PulsarClusterInfo, String, String)}.
+ * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_createSubscription</a>
+ * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subscriptionName}
+ * method is: PUT
+ * response body: json
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testCreateSubscription() throws Exception {
+ final String subscriptionName = "test_subscription";
+ final String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+ + "testCreateSubscription";
+ PulsarUtils.createSubscription(client, pulsarCluster, topicPath, subscriptionName);
+ Thread.sleep(500);
+ List<String> actual = PulsarUtils.getSubscriptions(client, pulsarCluster, topicPath);
+ assertTrue(actual.contains(subscriptionName));
+ }
+
+ /**
+ * Test cases for {@link PulsarUtils#examineMessage(RestTemplate, PulsarClusterInfo, String, String, int)}.
+ * href: <a>https://pulsar.apache.org/admin-rest-api/#operation/PersistentTopics_examineMessage</a>
+ * url like this: http://{host}:{port}/admin/v2/persistent/{tenant}/{namespace}/{topic}/examinemessage
+ * method is: GET
+ * request parameters:
+ * - initialPosition: Relative start position to examine message.It can be 'latest' or 'earliest'
+ * - messagePosition: The position of messages (default 1)
+ * - authoritative: Whether leader broker redirected this call to this broker. For internal use.
+ * response body: byteArray
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testExamineMessage() throws Exception {
+ /*
+ * Since admin API cannot send messages to the topic, this test case will be simulated using mockito.
+ */
+ final String messageType = "latest";
+ final int messagePosition = 1;
+ String topicPath = DEFAULT_TENANT + InlongConstants.SLASH + DEFAULT_NAMESPACE + InlongConstants.SLASH
+ + "testtopic-partition-1";
+ StringBuilder urlBuilder = new StringBuilder().append(DEFAULT_SERVICE_URL)
+ .append(PulsarUtils.QUERY_PERSISTENT_PATH).append("/").append(topicPath).append("/examinemessage")
+ .append("?initialPosition=").append(messageType).append("&messagePosition=").append(messagePosition);
+ final String expected = "test message!";
+
+ when(restTemplate.exchange(eq(urlBuilder.toString()), eq(HttpMethod.GET), any(HttpEntity.class),
+ eq(byte[].class))).thenReturn(byteExchange);
+ when(byteExchange.getStatusCode()).thenReturn(HttpStatus.OK);
+ when(byteExchange.getBody()).thenReturn(expected.getBytes(StandardCharsets.UTF_8));
+
+ ResponseEntity<byte[]> response = PulsarUtils.examineMessage(restTemplate, pulsarClusterInfo, topicPath,
+ messageType, messagePosition);
+ assertEquals(expected, new String(response.getBody()));
+ }
+
+ /**
+ * The case only supports local testing.
+ *
+ * @throws Exception
+ */
+ @Ignore
+ public void localTest() throws Exception {
+ RestTemplate restTemplate = new RestTemplate();
+ PulsarClusterInfo pulsarClusterInfo = new PulsarClusterInfo();
+ pulsarClusterInfo.setAdminUrl("http://127.0.0.1:8080");
+ String topic = "public/test_pulsar_group/test_pulsar_stream";
+
+ ResponseEntity<byte[]> pulsarMessage =
+ PulsarUtils.examineMessage(restTemplate, pulsarClusterInfo,
+ topic, "latest", 1);
+ List<PulsarMessageInfo> result = PulsarUtils.getMessagesFromHttpResponse(pulsarMessage, topic);
+ System.out.println(result);
+ }
}