You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/02 02:46:42 UTC
[pulsar] branch master updated: [fix][broker] Fix etcd cluster error and add test for etcd cluster (#16309)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3d7c1127019 [fix][broker] Fix etcd cluster error and add test for etcd cluster (#16309)
3d7c1127019 is described below
commit 3d7c112701932aef1f6c9a6213e313a251de9a21
Author: Lan <gc...@gmail.com>
AuthorDate: Tue Aug 2 10:46:35 2022 +0800
[fix][broker] Fix etcd cluster error and add test for etcd cluster (#16309)
---
.../pulsar/metadata/impl/EtcdMetadataStore.java | 3 +-
.../metadata/impl/EtcdMetadataStoreTest.java | 58 ++++++++++++++++++++++
2 files changed, 60 insertions(+), 1 deletion(-)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
index 0073db0c8d7..bbe7034d990 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
@@ -126,7 +126,8 @@ public class EtcdMetadataStore extends AbstractBatchedMetadataStore {
private Client newEtcdClient(String metadataURL, MetadataStoreConfig conf) throws IOException {
String etcdUrl = metadataURL.replaceFirst(ETCD_SCHEME_IDENTIFIER, "");
- ClientBuilder clientBuilder = Client.builder().endpoints(etcdUrl);
+ ClientBuilder clientBuilder = Client.builder()
+ .endpoints(etcdUrl.split(","));
if (StringUtils.isNotEmpty(conf.getConfigFilePath())) {
try (InputStream inputStream = Files.newInputStream(Paths.get(conf.getConfigFilePath()))) {
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java
index 180d835830a..fd1b8ac4552 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.io.Resources;
import io.etcd.jetcd.launcher.EtcdCluster;
import io.etcd.jetcd.launcher.EtcdClusterFactory;
+
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@@ -40,6 +41,63 @@ import org.testng.annotations.Test;
@Slf4j
public class EtcdMetadataStoreTest {
+ @Test
+ public void testCluster() throws Exception {
+ @Cleanup
+ EtcdCluster etcdCluster = EtcdClusterFactory.buildCluster("test-cluster", 3, false);
+ etcdCluster.start();
+
+ EtcdConfig etcdConfig = EtcdConfig.builder().useTls(false)
+ .tlsProvider(null)
+ .authority("etcd0")
+ .build();
+
+ Path etcdConfigPath = Files.createTempFile("etcd_config_cluster", ".yml");
+ new ObjectMapper(new YAMLFactory()).writeValue(etcdConfigPath.toFile(), etcdConfig);
+
+ String metadataURL =
+ "etcd:" + etcdCluster.getClientEndpoints().stream().map(URI::toString).collect(Collectors.joining(","));
+
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(metadataURL,
+ MetadataStoreConfig.builder().configFilePath(etcdConfigPath.toString()).build());
+
+ store.put("/test", "value".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+
+ assertTrue(store.exists("/test").join());
+
+ }
+
+ @Test
+ public void testClusterWithTls() throws Exception {
+ @Cleanup
+ EtcdCluster etcdCluster = EtcdClusterFactory.buildCluster("test-cluster", 3, true);
+ etcdCluster.start();
+
+ EtcdConfig etcdConfig = EtcdConfig.builder().useTls(true)
+ .tlsProvider(null)
+ .authority("etcd0")
+ .tlsTrustCertsFilePath(Resources.getResource("ssl/cert/ca.pem").getPath())
+ .tlsKeyFilePath(Resources.getResource("ssl/cert/client-key-pk8.pem").getPath())
+ .tlsCertificateFilePath(Resources.getResource("ssl/cert/client.pem").getPath())
+ .build();
+
+ Path etcdConfigPath = Files.createTempFile("etcd_config_cluster_ssl", ".yml");
+ new ObjectMapper(new YAMLFactory()).writeValue(etcdConfigPath.toFile(), etcdConfig);
+
+ String metadataURL =
+ "etcd:" + etcdCluster.getClientEndpoints().stream().map(URI::toString).collect(Collectors.joining(","));
+
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(metadataURL,
+ MetadataStoreConfig.builder().configFilePath(etcdConfigPath.toString()).build());
+
+ store.put("/test", "value".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+
+ assertTrue(store.exists("/test").join());
+
+ }
+
@Test
public void testTlsInstance() throws Exception {
@Cleanup