You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/23 20:33:31 UTC
[pulsar] branch master updated: PIP-45: Removed
LocalZooKeeperConnectionService (#12163)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cf6673f PIP-45: Removed LocalZooKeeperConnectionService (#12163)
cf6673f is described below
commit cf6673f0c727d6b8a7bd8d40ac7e72057a1bae1f
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Sep 23 13:32:48 2021 -0700
PIP-45: Removed LocalZooKeeperConnectionService (#12163)
---
.../org/apache/pulsar/broker/PulsarService.java | 33 ++---
.../pulsar/metadata/impl/ZKMetadataStore.java | 4 +
.../zookeeper/LocalZooKeeperConnectionService.java | 157 ---------------------
.../LocalZooKeeperConnectionServiceTest.java | 101 -------------
4 files changed, 21 insertions(+), 274 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 0ea4f0c..b6c85cb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -153,6 +153,7 @@ import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.packages.management.core.PackagesManagement;
import org.apache.pulsar.packages.management.core.PackagesStorage;
import org.apache.pulsar.packages.management.core.PackagesStorageProvider;
@@ -165,7 +166,6 @@ import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
-import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import org.apache.zookeeper.ZooKeeper;
@@ -192,7 +192,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private WebSocketService webSocketService = null;
private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED;
private BookKeeperClientFactory bkClientFactory;
- private LocalZooKeeperConnectionService localZooKeeperConnectionProvider;
private Compactor compactor;
private ResourceUsageTransportManager resourceUsageTransportManager;
private ResourceGroupService resourceGroupServiceManager;
@@ -328,10 +327,10 @@ public class PulsarService implements AutoCloseable, ShutdownService {
*
* This will immediately release all the resource locks held by this broker on the coordination service.
*
- * @throws IOException if the close operation fails
+ * @throws Exception if the close operation fails
*/
- public void closeMetadataServiceSession() throws IOException {
- localZooKeeperConnectionProvider.close();
+ public void closeMetadataServiceSession() throws Exception {
+ localMetadataStore.close();
}
@Override
@@ -430,11 +429,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
this.leaderElectionService = null;
}
- if (localZooKeeperConnectionProvider != null) {
- localZooKeeperConnectionProvider.close();
- localZooKeeperConnectionProvider = null;
- }
-
if (adminClient != null) {
adminClient.close();
adminClient = null;
@@ -624,14 +618,10 @@ public class PulsarService implements AutoCloseable, ShutdownService {
protocolHandlers.initialize(config);
// Now we are ready to start services
- localZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
- config.getZookeeperServers(), config.getZooKeeperSessionTimeoutMillis());
- localZooKeeperConnectionProvider.start();
-
this.bkClientFactory = newBookKeeperClientFactory();
managedLedgerClientFactory = ManagedLedgerStorage.create(
- config, localMetadataStore, localZooKeeperConnectionProvider.getLocalZooKeeper(),
+ config, localMetadataStore, getZkClient(),
bkClientFactory, ioEventLoopGroup
);
@@ -1221,7 +1211,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
Object factoryInstance = storageClass.getDeclaredConstructor().newInstance();
Method createMethod = storageClass.getMethod("create", PulsarService.class, ZooKeeper.class);
SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, this,
- localZooKeeperConnectionProvider.getLocalZooKeeper());
+ getZkClient());
schemaStorage.start();
return schemaStorage;
}
@@ -1647,4 +1637,15 @@ public class PulsarService implements AutoCloseable, ShutdownService {
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
return new BrokerService(pulsar, ioEventLoopGroup);
}
+
+ /**
+ * This is a temporary solution until we'll have migrated BK metadata to map to MetadataStore interface.
+ */
+ private ZooKeeper getZkClient() {
+ if (localMetadataStore instanceof ZKMetadataStore) {
+ return ((ZKMetadataStore) localMetadataStore).getZkClient();
+ } else {
+ throw new RuntimeException("MetadataStore implemenation is not based on ZooKeeper");
+ }
+ }
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 8d5e510..4ac9bb1 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -410,6 +410,10 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
return zkc.getSessionId();
}
+ public ZooKeeper getZkClient() {
+ return zkc;
+ }
+
@Override
public CompletableFuture<Void> initializeCluster() {
if (this.metadataURL == null) {
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java
deleted file mode 100644
index c459a79..0000000
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.zookeeper;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Charsets;
-
-/**
- * This class provides functions to create ZooKeeper connection
- *
- *
- */
-public class LocalZooKeeperConnectionService implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(LocalZooKeeperConnectionService.class);
-
- private final ZooKeeperClientFactory zkClientFactory;
- private final String zkConnect;
- private final long zkSessionTimeoutMillis;
-
- private ZooKeeper localZooKeeper;
-
- public LocalZooKeeperConnectionService(ZooKeeperClientFactory zkClientFactory, String zkConnect,
- long zkSessionTimeoutMillis) {
- this.zkClientFactory = zkClientFactory;
- this.zkConnect = zkConnect;
- this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
- }
-
- public void start() throws IOException {
- // Connect to local ZK
- CompletableFuture<ZooKeeper> zkFuture = zkClientFactory.create(zkConnect, SessionType.ReadWrite,
- (int) zkSessionTimeoutMillis);
-
- try {
- localZooKeeper = zkFuture.get(zkSessionTimeoutMillis, TimeUnit.MILLISECONDS);
- } catch (Exception e) {
- throw new IOException("Failed to establish session with local ZK", e);
- }
- }
-
- public void close() throws IOException {
- if (localZooKeeper != null) {
- try {
- localZooKeeper.close();
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
- }
-
- public ZooKeeper getLocalZooKeeper() {
- return this.localZooKeeper;
- }
-
- /**
- * Check if a persist node exists. If not, it attempts to create the znode.
- *
- * @param path
- * znode path
- * @throws KeeperException
- * zookeeper exception.
- * @throws InterruptedException
- * zookeeper exception.
- */
- public static void checkAndCreatePersistNode(ZooKeeper zkc, String path)
- throws KeeperException, InterruptedException {
-
- // check if the node exists
- if (zkc.exists(path, false) == null) {
- //create znode
- try {
- // do create the node
- zkc.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
- LOG.info("created znode, path={}", path);
- } catch (Exception e) {
- LOG.warn("create znode failed, path={} : {}", path, e.getMessage(), e);
- }
- }
- }
-
- public static String createIfAbsent(ZooKeeper zk, String path, String data, CreateMode createMode)
- throws KeeperException, InterruptedException {
- return createIfAbsent(zk, path, data, createMode, false);
- }
-
- public static String createIfAbsent(ZooKeeper zk, String path, String data, CreateMode createMode, boolean gc)
- throws KeeperException, InterruptedException {
- return createIfAbsent(zk, path, data.getBytes(Charsets.UTF_8), createMode, gc);
- }
-
- public static String createIfAbsent(ZooKeeper zk, String path, byte[] data, CreateMode createMode)
- throws KeeperException, InterruptedException {
- return createIfAbsent(zk, path, data, createMode, false);
- }
-
- public static String createIfAbsent(ZooKeeper zk, String path, byte[] data, CreateMode createMode, boolean gc)
- throws KeeperException, InterruptedException {
- String pathCreated = null;
- try {
- pathCreated = zk.create(path, data, Ids.OPEN_ACL_UNSAFE, createMode);
- } catch (NodeExistsException e) {
- // OK
- LOG.debug("Create skipped for existing znode: path={}", path);
- }
- // reset if what exists is the ephemeral garbage.
- if (gc && (pathCreated == null) && CreateMode.EPHEMERAL.equals(createMode)) {
- Stat stat = zk.exists(path, false);
- if (stat != null && zk.getSessionId() != stat.getEphemeralOwner()) {
- deleteIfExists(zk, path, -1);
- pathCreated = zk.create(path, data, Ids.OPEN_ACL_UNSAFE, createMode);
- }
- }
- return pathCreated;
- }
-
- public static void deleteIfExists(ZooKeeper zk, String path, int version)
- throws KeeperException, InterruptedException {
- try {
- zk.delete(path, version);
- } catch (NoNodeException e) {
- // OK
- LOG.debug("Delete skipped for non-existing znode: path={}", path);
- }
- }
-}
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java
deleted file mode 100644
index c822cdf..0000000
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.zookeeper;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.MockZooKeeper;
-import org.testng.annotations.Test;
-
-public class LocalZooKeeperConnectionServiceTest {
-
- @Test
- public void testSimpleZooKeeperConnection() throws Exception {
- MockedZooKeeperClientFactoryImpl mockZkClientFactory = new MockedZooKeeperClientFactoryImpl();
- LocalZooKeeperConnectionService localZkConnectionService = new LocalZooKeeperConnectionService(
- mockZkClientFactory, "dummy", 1000);
- localZkConnectionService.start();
-
- // Get ZooKeeper client
- MockZooKeeper zk = (MockZooKeeper) localZkConnectionService.getLocalZooKeeper();
-
- // Check status
- assertTrue(zk.getState().isConnected());
-
- // Create persistent node
- LocalZooKeeperConnectionService.checkAndCreatePersistNode(zk, "/path1");
- assertNotNull(zk.exists("/path1", false));
-
- // Delete and re-create existing node
- zk.setSessionId(-1L); // The sessionId must be set to except 0L in order to re-create.
- LocalZooKeeperConnectionService.createIfAbsent(zk, "/path1", "data1", CreateMode.EPHEMERAL, true);
- assertEquals(zk.getData("/path1", null, null), "data1".getBytes());
-
- // Try to create existing node (nothing should happen)
- LocalZooKeeperConnectionService.checkAndCreatePersistNode(zk, "/path1");
- assertNotNull(zk.exists("/path1", false));
-
- // Create new node (data is given as String)
- LocalZooKeeperConnectionService.createIfAbsent(zk, "/path2", "data2", CreateMode.EPHEMERAL);
- assertNotNull(zk.exists("/path2", false));
- assertEquals(zk.getData("/path2", null, null), "data2".getBytes());
-
- // Create new node (data is given as bytes)
- LocalZooKeeperConnectionService.createIfAbsent(zk, "/path3", "data3".getBytes(), CreateMode.EPHEMERAL);
- assertNotNull(zk.exists("/path3", false));
- assertEquals(zk.getData("/path3", null, null), "data3".getBytes());
-
- // delete nodes
- LocalZooKeeperConnectionService.deleteIfExists(zk, "/path1", -1);
- assertNull(zk.exists("/path1", false));
- LocalZooKeeperConnectionService.deleteIfExists(zk, "/path2", -1);
- assertNull(zk.exists("/path2", false));
- LocalZooKeeperConnectionService.deleteIfExists(zk, "/path3", -1);
- assertNull(zk.exists("/path3", false));
-
- // delete not existing node
- LocalZooKeeperConnectionService.deleteIfExists(zk, "/not_exist", -1);
-
- // Try to create invalid node (nothing should happen)
- LocalZooKeeperConnectionService.checkAndCreatePersistNode(zk, "/////");
- assertNull(zk.exists("//////", false));
-
- localZkConnectionService.close();
-
- mockZkClientFactory.close();
- }
-
- @Test
- public void testSimpleZooKeeperConnectionFail() throws Exception {
- LocalZooKeeperConnectionService localZkConnectionService = new LocalZooKeeperConnectionService(
- new ZookeeperClientFactoryImpl(), "dummy", 1000);
- try {
- localZkConnectionService.start();
- fail("should fail");
- } catch (Exception e) {
- assertTrue(e.getMessage().contains("Failed to establish session with local ZK"));
- }
- localZkConnectionService.close();
- }
-}