You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/25 03:38:33 UTC

[pulsar] branch master updated: PIP-45: Only run ZKSessionWatcher when MetadataStoreExtended is required (#12150)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e6781d8  PIP-45: Only run ZKSessionWatcher when MetadataStoreExtended is required (#12150)
e6781d8 is described below

commit e6781d877e19d4be4fe836410903722b5cfe161d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Sep 24 20:37:11 2021 -0700

    PIP-45: Only run ZKSessionWatcher when MetadataStoreExtended is required (#12150)
    
    * PIP-45: Only run ZKSessionWatcher when MetadataStoreExtended is required
    
    * Remove inadvertently added spaces
    
    * Check for sessionWatcher null at the beginning
    
    * Fixed typo
---
 .../org/apache/pulsar/broker/PulsarService.java    |  8 ++--
 .../pulsar/metadata/api/MetadataStoreFactory.java  | 10 ++--
 .../api/extended/MetadataStoreExtended.java        | 11 ++---
 .../metadata/impl/MetadataStoreFactoryImpl.java    | 55 ++++++++++++++++++++++
 .../pulsar/metadata/impl/ZKMetadataStore.java      | 21 ++++++---
 5 files changed, 81 insertions(+), 24 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b6c85cb..32c0e0c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -144,8 +144,10 @@ import org.apache.pulsar.compaction.TwoPhaseCompactor;
 import org.apache.pulsar.functions.worker.ErrorNotifier;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.coordination.CoordinationService;
@@ -248,7 +250,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     private CoordinationService coordinationService;
     private TransactionBufferSnapshotService transactionBufferSnapshotService;
 
-    private MetadataStoreExtended configurationMetadataStore;
+    private MetadataStore configurationMetadataStore;
     private PulsarResources pulsarResources;
 
     private TransactionPendingAckStoreProvider transactionPendingAckStoreProvider;
@@ -314,8 +316,8 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                 new DefaultThreadFactory("pulsar-io"));
     }
 
-    public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
-        return MetadataStoreExtended.create(config.getConfigurationStoreServers(),
+    public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
+        return MetadataStoreFactory.create(config.getConfigurationStoreServers(),
                 MetadataStoreConfig.builder()
                         .sessionTimeoutMillis((int) config.getZooKeeperSessionTimeoutMillis())
                         .allowReadOnlyOperations(false)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java
index 781a63b..828ea8a 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import lombok.experimental.UtilityClass;
 
 import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
+import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 
 /**
@@ -41,11 +42,8 @@ public class MetadataStoreFactory {
      * @throws IOException
      *             if the metadata store initialization fails
      */
-    public static MetadataStore create(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException {
-        if (metadataURL.startsWith("memory://")) {
-            return new LocalMemoryMetadataStore(metadataURL, metadataStoreConfig);
-        } else {
-            return new ZKMetadataStore(metadataURL, metadataStoreConfig);
-        }
+    public static MetadataStore create(String metadataURL, MetadataStoreConfig metadataStoreConfig)
+            throws MetadataStoreException {
+        return MetadataStoreFactoryImpl.create(metadataURL, metadataStoreConfig);
     }
 }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
index f4c82a6..fb0078d 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException
 import org.apache.pulsar.metadata.api.MetadataStoreException.InvalidImplementationException;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
 import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
 
 /**
  * Extension of the {@link MetadataStore} interface that includes more methods which might not be supported by all
@@ -37,15 +38,9 @@ import org.apache.pulsar.metadata.api.Stat;
  */
 public interface MetadataStoreExtended extends MetadataStore {
 
-    public static MetadataStoreExtended create(String metadataURL, MetadataStoreConfig metadataStoreConfig)
+    static MetadataStoreExtended create(String metadataURL, MetadataStoreConfig metadataStoreConfig)
             throws MetadataStoreException {
-        MetadataStore store = MetadataStoreFactory.create(metadataURL, metadataStoreConfig);
-        if (!(store instanceof MetadataStoreExtended)) {
-            throw new InvalidImplementationException(
-                    "Implemetation does not comply with " + MetadataStoreExtended.class.getName());
-        }
-
-        return (MetadataStoreExtended) store;
+        return MetadataStoreFactoryImpl.createExtended(metadataURL, metadataStoreConfig);
     }
 
     /**
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
new file mode 100644
index 0000000..d7aaf02
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+public class MetadataStoreFactoryImpl {
+
+    public static MetadataStore create(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws
+            MetadataStoreException {
+        return newInstance(metadataURL, metadataStoreConfig, false);
+    }
+
+    public static MetadataStoreExtended createExtended(String metadataURL, MetadataStoreConfig metadataStoreConfig)
+            throws
+            MetadataStoreException {
+        MetadataStore store = MetadataStoreFactoryImpl.newInstance(metadataURL, metadataStoreConfig, true);
+        if (!(store instanceof MetadataStoreExtended)) {
+            throw new MetadataStoreException.InvalidImplementationException(
+                    "Implementation does not comply with " + MetadataStoreExtended.class.getName());
+        }
+
+        return (MetadataStoreExtended) store;
+    }
+
+    private static MetadataStore newInstance(String metadataURL, MetadataStoreConfig metadataStoreConfig,
+                                             boolean enableSessionWatcher)
+            throws MetadataStoreException {
+
+        if (metadataURL.startsWith("memory://")) {
+            return new LocalMemoryMetadataStore(metadataURL, metadataStoreConfig);
+        } else {
+            return new ZKMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher);
+        }
+    }
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 4ac9bb1..2cee1c7 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -61,9 +61,10 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
     private final MetadataStoreConfig metadataStoreConfig;
     private final boolean isZkManaged;
     private final ZooKeeper zkc;
-    private ZKSessionWatcher sessionWatcher;
+    private Optional<ZKSessionWatcher> sessionWatcher;
 
-    public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException {
+    public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig, boolean enableSessionWatcher)
+            throws MetadataStoreException {
         try {
             this.metadataURL = metadataURL;
             this.metadataStoreConfig = metadataStoreConfig;
@@ -74,12 +75,16 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
                     .sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
                     .watchers(Collections.singleton(event -> {
                         if (sessionWatcher != null) {
-                            sessionWatcher.process(event);
+                            sessionWatcher.ifPresent(sw -> sw.process(event));
                         }
                     }))
                     .build();
             zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE);
-            sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
+            if (enableSessionWatcher) {
+                sessionWatcher = Optional.of(new ZKSessionWatcher(zkc, this::receivedSessionEvent));
+            } else {
+                sessionWatcher = Optional.empty();
+            }
         } catch (Throwable t) {
             throw new MetadataStoreException(t);
         }
@@ -92,7 +97,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
         this.metadataStoreConfig = null;
         this.isZkManaged = false;
         this.zkc = zkc;
-        this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
+        this.sessionWatcher = Optional.of(new ZKSessionWatcher(zkc, this::receivedSessionEvent));
         zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE);
     }
 
@@ -106,7 +111,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
                             super.receivedSessionEvent(event);
                         } else {
                             log.error("Failed to recreate persistent watch on ZooKeeper: {}", Code.get(rc));
-                            sessionWatcher.setSessionInvalid();
+                            sessionWatcher.ifPresent(ZKSessionWatcher::setSessionInvalid);
                             // On the reconnectable client, mark the session as expired to trigger a new reconnect and 
                             // we will have the chance to set the watch again.
                             if (zkc instanceof PulsarZooKeeperClient) {
@@ -320,7 +325,9 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
         if (isZkManaged) {
             zkc.close();
         }
-        sessionWatcher.close();
+        if (sessionWatcher.isPresent()) {
+            sessionWatcher.get().close();
+        }
         super.close();
     }