You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/02 02:46:42 UTC

[pulsar] branch master updated: [fix][broker] Fix etcd cluster error and add test for etcd cluster (#16309)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d7c1127019 [fix][broker] Fix etcd cluster error and add test for etcd cluster (#16309)
3d7c1127019 is described below

commit 3d7c112701932aef1f6c9a6213e313a251de9a21
Author: Lan <gc...@gmail.com>
AuthorDate: Tue Aug 2 10:46:35 2022 +0800

    [fix][broker] Fix etcd cluster error and add test for etcd cluster (#16309)
---
 .../pulsar/metadata/impl/EtcdMetadataStore.java    |  3 +-
 .../metadata/impl/EtcdMetadataStoreTest.java       | 58 ++++++++++++++++++++++
 2 files changed, 60 insertions(+), 1 deletion(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
index 0073db0c8d7..bbe7034d990 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
@@ -126,7 +126,8 @@ public class EtcdMetadataStore extends AbstractBatchedMetadataStore {
 
     private Client newEtcdClient(String metadataURL, MetadataStoreConfig conf) throws IOException {
         String etcdUrl = metadataURL.replaceFirst(ETCD_SCHEME_IDENTIFIER, "");
-        ClientBuilder clientBuilder = Client.builder().endpoints(etcdUrl);
+        ClientBuilder clientBuilder = Client.builder()
+                .endpoints(etcdUrl.split(","));
 
         if (StringUtils.isNotEmpty(conf.getConfigFilePath())) {
             try (InputStream inputStream = Files.newInputStream(Paths.get(conf.getConfigFilePath()))) {
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java
index 180d835830a..fd1b8ac4552 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import com.google.common.io.Resources;
 import io.etcd.jetcd.launcher.EtcdCluster;
 import io.etcd.jetcd.launcher.EtcdClusterFactory;
+
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -40,6 +41,63 @@ import org.testng.annotations.Test;
 @Slf4j
 public class EtcdMetadataStoreTest {
 
+    @Test
+    public void testCluster() throws Exception {
+        @Cleanup
+        EtcdCluster etcdCluster = EtcdClusterFactory.buildCluster("test-cluster", 3, false);
+        etcdCluster.start();
+
+        EtcdConfig etcdConfig = EtcdConfig.builder().useTls(false)
+                .tlsProvider(null)
+                .authority("etcd0")
+                .build();
+
+        Path etcdConfigPath = Files.createTempFile("etcd_config_cluster", ".yml");
+        new ObjectMapper(new YAMLFactory()).writeValue(etcdConfigPath.toFile(), etcdConfig);
+
+        String metadataURL =
+                "etcd:" + etcdCluster.getClientEndpoints().stream().map(URI::toString).collect(Collectors.joining(","));
+
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(metadataURL,
+                MetadataStoreConfig.builder().configFilePath(etcdConfigPath.toString()).build());
+
+        store.put("/test", "value".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+
+        assertTrue(store.exists("/test").join());
+
+    }
+
+    @Test
+    public void testClusterWithTls() throws Exception {
+        @Cleanup
+        EtcdCluster etcdCluster = EtcdClusterFactory.buildCluster("test-cluster", 3, true);
+        etcdCluster.start();
+
+        EtcdConfig etcdConfig = EtcdConfig.builder().useTls(true)
+                .tlsProvider(null)
+                .authority("etcd0")
+                .tlsTrustCertsFilePath(Resources.getResource("ssl/cert/ca.pem").getPath())
+                .tlsKeyFilePath(Resources.getResource("ssl/cert/client-key-pk8.pem").getPath())
+                .tlsCertificateFilePath(Resources.getResource("ssl/cert/client.pem").getPath())
+                .build();
+
+        Path etcdConfigPath = Files.createTempFile("etcd_config_cluster_ssl", ".yml");
+        new ObjectMapper(new YAMLFactory()).writeValue(etcdConfigPath.toFile(), etcdConfig);
+
+        String metadataURL =
+                "etcd:" + etcdCluster.getClientEndpoints().stream().map(URI::toString).collect(Collectors.joining(","));
+
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(metadataURL,
+                MetadataStoreConfig.builder().configFilePath(etcdConfigPath.toString()).build());
+
+        store.put("/test", "value".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+
+        assertTrue(store.exists("/test").join());
+
+    }
+
     @Test
     public void testTlsInstance() throws Exception {
         @Cleanup