You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/23 20:33:31 UTC

[pulsar] branch master updated: PIP-45: Removed LocalZooKeeperConnectionService (#12163)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cf6673f  PIP-45: Removed LocalZooKeeperConnectionService (#12163)
cf6673f is described below

commit cf6673f0c727d6b8a7bd8d40ac7e72057a1bae1f
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Sep 23 13:32:48 2021 -0700

    PIP-45: Removed LocalZooKeeperConnectionService (#12163)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  33 ++---
 .../pulsar/metadata/impl/ZKMetadataStore.java      |   4 +
 .../zookeeper/LocalZooKeeperConnectionService.java | 157 ---------------------
 .../LocalZooKeeperConnectionServiceTest.java       | 101 -------------
 4 files changed, 21 insertions(+), 274 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 0ea4f0c..b6c85cb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -153,6 +153,7 @@ import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.pulsar.packages.management.core.PackagesManagement;
 import org.apache.pulsar.packages.management.core.PackagesStorage;
 import org.apache.pulsar.packages.management.core.PackagesStorageProvider;
@@ -165,7 +166,6 @@ import org.apache.pulsar.websocket.WebSocketPingPongServlet;
 import org.apache.pulsar.websocket.WebSocketProducerServlet;
 import org.apache.pulsar.websocket.WebSocketReaderServlet;
 import org.apache.pulsar.websocket.WebSocketService;
-import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
 import org.apache.zookeeper.ZooKeeper;
@@ -192,7 +192,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     private WebSocketService webSocketService = null;
     private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED;
     private BookKeeperClientFactory bkClientFactory;
-    private LocalZooKeeperConnectionService localZooKeeperConnectionProvider;
     private Compactor compactor;
     private ResourceUsageTransportManager resourceUsageTransportManager;
     private ResourceGroupService resourceGroupServiceManager;
@@ -328,10 +327,10 @@ public class PulsarService implements AutoCloseable, ShutdownService {
      *
      * This will immediately release all the resource locks held by this broker on the coordination service.
      *
-     * @throws IOException if the close operation fails
+     * @throws Exception if the close operation fails
      */
-    public void closeMetadataServiceSession() throws IOException {
-        localZooKeeperConnectionProvider.close();
+    public void closeMetadataServiceSession() throws Exception {
+        localMetadataStore.close();
     }
 
     @Override
@@ -430,11 +429,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                 this.leaderElectionService = null;
             }
 
-            if (localZooKeeperConnectionProvider != null) {
-                localZooKeeperConnectionProvider.close();
-                localZooKeeperConnectionProvider = null;
-            }
-
             if (adminClient != null) {
                 adminClient.close();
                 adminClient = null;
@@ -624,14 +618,10 @@ public class PulsarService implements AutoCloseable, ShutdownService {
             protocolHandlers.initialize(config);
 
             // Now we are ready to start services
-            localZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
-                    config.getZookeeperServers(), config.getZooKeeperSessionTimeoutMillis());
-            localZooKeeperConnectionProvider.start();
-
             this.bkClientFactory = newBookKeeperClientFactory();
 
             managedLedgerClientFactory = ManagedLedgerStorage.create(
-                config, localMetadataStore, localZooKeeperConnectionProvider.getLocalZooKeeper(),
+                config, localMetadataStore, getZkClient(),
                     bkClientFactory, ioEventLoopGroup
             );
 
@@ -1221,7 +1211,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         Object factoryInstance = storageClass.getDeclaredConstructor().newInstance();
         Method createMethod = storageClass.getMethod("create", PulsarService.class, ZooKeeper.class);
         SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, this,
-                localZooKeeperConnectionProvider.getLocalZooKeeper());
+                getZkClient());
         schemaStorage.start();
         return schemaStorage;
     }
@@ -1647,4 +1637,15 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
         return new BrokerService(pulsar, ioEventLoopGroup);
     }
+
+    /**
+     * This is a temporary solution until we'll have migrated BK metadata to map to MetadataStore interface.
+     */
+    private ZooKeeper getZkClient() {
+        if (localMetadataStore instanceof ZKMetadataStore) {
+            return ((ZKMetadataStore) localMetadataStore).getZkClient();
+        } else {
+            throw new RuntimeException("MetadataStore implemenation is not based on ZooKeeper");
+        }
+    }
 }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 8d5e510..4ac9bb1 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -410,6 +410,10 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
         return zkc.getSessionId();
     }
 
+    public ZooKeeper getZkClient() {
+        return zkc;
+    }
+
     @Override
     public CompletableFuture<Void> initializeCluster() {
         if (this.metadataURL == null) {
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java
deleted file mode 100644
index c459a79..0000000
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionService.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.zookeeper;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Charsets;
-
-/**
- * This class provides functions to create ZooKeeper connection
- *
- *
- */
-public class LocalZooKeeperConnectionService implements Closeable {
-    private static final Logger LOG = LoggerFactory.getLogger(LocalZooKeeperConnectionService.class);
-
-    private final ZooKeeperClientFactory zkClientFactory;
-    private final String zkConnect;
-    private final long zkSessionTimeoutMillis;
-
-    private ZooKeeper localZooKeeper;
-
-    public LocalZooKeeperConnectionService(ZooKeeperClientFactory zkClientFactory, String zkConnect,
-            long zkSessionTimeoutMillis) {
-        this.zkClientFactory = zkClientFactory;
-        this.zkConnect = zkConnect;
-        this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
-    }
-
-    public void start() throws IOException {
-        // Connect to local ZK
-        CompletableFuture<ZooKeeper> zkFuture = zkClientFactory.create(zkConnect, SessionType.ReadWrite,
-                (int) zkSessionTimeoutMillis);
-
-        try {
-            localZooKeeper = zkFuture.get(zkSessionTimeoutMillis, TimeUnit.MILLISECONDS);
-        } catch (Exception e) {
-            throw new IOException("Failed to establish session with local ZK", e);
-        }
-    }
-
-    public void close() throws IOException {
-        if (localZooKeeper != null) {
-            try {
-                localZooKeeper.close();
-            } catch (InterruptedException e) {
-                throw new IOException(e);
-            }
-        }
-    }
-
-    public ZooKeeper getLocalZooKeeper() {
-        return this.localZooKeeper;
-    }
-
-    /**
-     * Check if a persist node exists. If not, it attempts to create the znode.
-     *
-     * @param path
-     *            znode path
-     * @throws KeeperException
-     *             zookeeper exception.
-     * @throws InterruptedException
-     *             zookeeper exception.
-     */
-    public static void checkAndCreatePersistNode(ZooKeeper zkc, String path)
-            throws KeeperException, InterruptedException {
-
-        // check if the node exists
-        if (zkc.exists(path, false) == null) {
-            //create znode
-            try {
-                // do create the node
-                zkc.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-                LOG.info("created znode, path={}", path);
-            } catch (Exception e) {
-                LOG.warn("create znode failed, path={} : {}", path, e.getMessage(), e);
-            }
-        }
-    }
-
-    public static String createIfAbsent(ZooKeeper zk, String path, String data, CreateMode createMode)
-            throws KeeperException, InterruptedException {
-        return createIfAbsent(zk, path, data, createMode, false);
-    }
-
-    public static String createIfAbsent(ZooKeeper zk, String path, String data, CreateMode createMode, boolean gc)
-            throws KeeperException, InterruptedException {
-        return createIfAbsent(zk, path, data.getBytes(Charsets.UTF_8), createMode, gc);
-    }
-
-    public static String createIfAbsent(ZooKeeper zk, String path, byte[] data, CreateMode createMode)
-            throws KeeperException, InterruptedException {
-        return createIfAbsent(zk, path, data, createMode, false);
-    }
-
-    public static String createIfAbsent(ZooKeeper zk, String path, byte[] data, CreateMode createMode, boolean gc)
-            throws KeeperException, InterruptedException {
-        String pathCreated = null;
-        try {
-            pathCreated = zk.create(path, data, Ids.OPEN_ACL_UNSAFE, createMode);
-        } catch (NodeExistsException e) {
-            // OK
-            LOG.debug("Create skipped for existing znode: path={}", path);
-        }
-        // reset if what exists is the ephemeral garbage.
-        if (gc && (pathCreated == null) && CreateMode.EPHEMERAL.equals(createMode)) {
-            Stat stat = zk.exists(path, false);
-            if (stat != null && zk.getSessionId() != stat.getEphemeralOwner()) {
-                deleteIfExists(zk, path, -1);
-                pathCreated = zk.create(path, data, Ids.OPEN_ACL_UNSAFE, createMode);
-            }
-        }
-        return pathCreated;
-    }
-
-    public static void deleteIfExists(ZooKeeper zk, String path, int version)
-            throws KeeperException, InterruptedException {
-        try {
-            zk.delete(path, version);
-        } catch (NoNodeException e) {
-            // OK
-            LOG.debug("Delete skipped for non-existing znode: path={}", path);
-        }
-    }
-}
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java
deleted file mode 100644
index c822cdf..0000000
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalZooKeeperConnectionServiceTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.zookeeper;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.MockZooKeeper;
-import org.testng.annotations.Test;
-
-public class LocalZooKeeperConnectionServiceTest {
-
-    @Test
-    public void testSimpleZooKeeperConnection() throws Exception {
-        MockedZooKeeperClientFactoryImpl mockZkClientFactory = new MockedZooKeeperClientFactoryImpl();
-        LocalZooKeeperConnectionService localZkConnectionService = new LocalZooKeeperConnectionService(
-                mockZkClientFactory, "dummy", 1000);
-        localZkConnectionService.start();
-
-        // Get ZooKeeper client
-        MockZooKeeper zk = (MockZooKeeper) localZkConnectionService.getLocalZooKeeper();
-
-        // Check status
-        assertTrue(zk.getState().isConnected());
-
-        // Create persistent node
-        LocalZooKeeperConnectionService.checkAndCreatePersistNode(zk, "/path1");
-        assertNotNull(zk.exists("/path1", false));
-
-        // Delete and re-create existing node
-        zk.setSessionId(-1L); // The sessionId must be set to except 0L in order to re-create.
-        LocalZooKeeperConnectionService.createIfAbsent(zk, "/path1", "data1", CreateMode.EPHEMERAL, true);
-        assertEquals(zk.getData("/path1", null, null), "data1".getBytes());
-
-        // Try to create existing node (nothing should happen)
-        LocalZooKeeperConnectionService.checkAndCreatePersistNode(zk, "/path1");
-        assertNotNull(zk.exists("/path1", false));
-
-        // Create new node (data is given as String)
-        LocalZooKeeperConnectionService.createIfAbsent(zk, "/path2", "data2", CreateMode.EPHEMERAL);
-        assertNotNull(zk.exists("/path2", false));
-        assertEquals(zk.getData("/path2", null, null), "data2".getBytes());
-
-        // Create new node (data is given as bytes)
-        LocalZooKeeperConnectionService.createIfAbsent(zk, "/path3", "data3".getBytes(), CreateMode.EPHEMERAL);
-        assertNotNull(zk.exists("/path3", false));
-        assertEquals(zk.getData("/path3", null, null), "data3".getBytes());
-
-        // delete nodes
-        LocalZooKeeperConnectionService.deleteIfExists(zk, "/path1", -1);
-        assertNull(zk.exists("/path1", false));
-        LocalZooKeeperConnectionService.deleteIfExists(zk, "/path2", -1);
-        assertNull(zk.exists("/path2", false));
-        LocalZooKeeperConnectionService.deleteIfExists(zk, "/path3", -1);
-        assertNull(zk.exists("/path3", false));
-
-        // delete not existing node
-        LocalZooKeeperConnectionService.deleteIfExists(zk, "/not_exist", -1);
-
-        // Try to create invalid node (nothing should happen)
-        LocalZooKeeperConnectionService.checkAndCreatePersistNode(zk, "/////");
-        assertNull(zk.exists("//////", false));
-
-        localZkConnectionService.close();
-
-        mockZkClientFactory.close();
-    }
-
-    @Test
-    public void testSimpleZooKeeperConnectionFail() throws Exception {
-        LocalZooKeeperConnectionService localZkConnectionService = new LocalZooKeeperConnectionService(
-                new ZookeeperClientFactoryImpl(), "dummy", 1000);
-        try {
-            localZkConnectionService.start();
-            fail("should fail");
-        } catch (Exception e) {
-            assertTrue(e.getMessage().contains("Failed to establish session with local ZK"));
-        }
-        localZkConnectionService.close();
-    }
-}