You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/25 03:38:33 UTC
[pulsar] branch master updated: PIP-45: Only run ZKSessionWatcher
when MetadataStoreExtended is required (#12150)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e6781d8 PIP-45: Only run ZKSessionWatcher when MetadataStoreExtended is required (#12150)
e6781d8 is described below
commit e6781d877e19d4be4fe836410903722b5cfe161d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Sep 24 20:37:11 2021 -0700
PIP-45: Only run ZKSessionWatcher when MetadataStoreExtended is required (#12150)
* PIP-45: Only run ZKSessionWatcher when MetadataStoreExtended is required
* Remove inadvertently added spaces
* Check for sessionWatcher null at the beginning
* Fixed typo
---
.../org/apache/pulsar/broker/PulsarService.java | 8 ++--
.../pulsar/metadata/api/MetadataStoreFactory.java | 10 ++--
.../api/extended/MetadataStoreExtended.java | 11 ++---
.../metadata/impl/MetadataStoreFactoryImpl.java | 55 ++++++++++++++++++++++
.../pulsar/metadata/impl/ZKMetadataStore.java | 21 ++++++---
5 files changed, 81 insertions(+), 24 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index b6c85cb..32c0e0c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -144,8 +144,10 @@ import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
@@ -248,7 +250,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private CoordinationService coordinationService;
private TransactionBufferSnapshotService transactionBufferSnapshotService;
- private MetadataStoreExtended configurationMetadataStore;
+ private MetadataStore configurationMetadataStore;
private PulsarResources pulsarResources;
private TransactionPendingAckStoreProvider transactionPendingAckStoreProvider;
@@ -314,8 +316,8 @@ public class PulsarService implements AutoCloseable, ShutdownService {
new DefaultThreadFactory("pulsar-io"));
}
- public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
- return MetadataStoreExtended.create(config.getConfigurationStoreServers(),
+ public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
+ return MetadataStoreFactory.create(config.getConfigurationStoreServers(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis((int) config.getZooKeeperSessionTimeoutMillis())
.allowReadOnlyOperations(false)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java
index 781a63b..828ea8a 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import lombok.experimental.UtilityClass;
import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
+import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
/**
@@ -41,11 +42,8 @@ public class MetadataStoreFactory {
* @throws IOException
* if the metadata store initialization fails
*/
- public static MetadataStore create(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException {
- if (metadataURL.startsWith("memory://")) {
- return new LocalMemoryMetadataStore(metadataURL, metadataStoreConfig);
- } else {
- return new ZKMetadataStore(metadataURL, metadataStoreConfig);
- }
+ public static MetadataStore create(String metadataURL, MetadataStoreConfig metadataStoreConfig)
+ throws MetadataStoreException {
+ return MetadataStoreFactoryImpl.create(metadataURL, metadataStoreConfig);
}
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
index f4c82a6..fb0078d 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException
import org.apache.pulsar.metadata.api.MetadataStoreException.InvalidImplementationException;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
/**
* Extension of the {@link MetadataStore} interface that includes more methods which might not be supported by all
@@ -37,15 +38,9 @@ import org.apache.pulsar.metadata.api.Stat;
*/
public interface MetadataStoreExtended extends MetadataStore {
- public static MetadataStoreExtended create(String metadataURL, MetadataStoreConfig metadataStoreConfig)
+ static MetadataStoreExtended create(String metadataURL, MetadataStoreConfig metadataStoreConfig)
throws MetadataStoreException {
- MetadataStore store = MetadataStoreFactory.create(metadataURL, metadataStoreConfig);
- if (!(store instanceof MetadataStoreExtended)) {
- throw new InvalidImplementationException(
- "Implemetation does not comply with " + MetadataStoreExtended.class.getName());
- }
-
- return (MetadataStoreExtended) store;
+ return MetadataStoreFactoryImpl.createExtended(metadataURL, metadataStoreConfig);
}
/**
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
new file mode 100644
index 0000000..d7aaf02
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+public class MetadataStoreFactoryImpl {
+
+ public static MetadataStore create(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws
+ MetadataStoreException {
+ return newInstance(metadataURL, metadataStoreConfig, false);
+ }
+
+ public static MetadataStoreExtended createExtended(String metadataURL, MetadataStoreConfig metadataStoreConfig)
+ throws
+ MetadataStoreException {
+ MetadataStore store = MetadataStoreFactoryImpl.newInstance(metadataURL, metadataStoreConfig, true);
+ if (!(store instanceof MetadataStoreExtended)) {
+ throw new MetadataStoreException.InvalidImplementationException(
+ "Implementation does not comply with " + MetadataStoreExtended.class.getName());
+ }
+
+ return (MetadataStoreExtended) store;
+ }
+
+ private static MetadataStore newInstance(String metadataURL, MetadataStoreConfig metadataStoreConfig,
+ boolean enableSessionWatcher)
+ throws MetadataStoreException {
+
+ if (metadataURL.startsWith("memory://")) {
+ return new LocalMemoryMetadataStore(metadataURL, metadataStoreConfig);
+ } else {
+ return new ZKMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher);
+ }
+ }
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 4ac9bb1..2cee1c7 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -61,9 +61,10 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
private final MetadataStoreConfig metadataStoreConfig;
private final boolean isZkManaged;
private final ZooKeeper zkc;
- private ZKSessionWatcher sessionWatcher;
+ private Optional<ZKSessionWatcher> sessionWatcher;
- public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException {
+ public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig, boolean enableSessionWatcher)
+ throws MetadataStoreException {
try {
this.metadataURL = metadataURL;
this.metadataStoreConfig = metadataStoreConfig;
@@ -74,12 +75,16 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
.sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
.watchers(Collections.singleton(event -> {
if (sessionWatcher != null) {
- sessionWatcher.process(event);
+ sessionWatcher.ifPresent(sw -> sw.process(event));
}
}))
.build();
zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE);
- sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
+ if (enableSessionWatcher) {
+ sessionWatcher = Optional.of(new ZKSessionWatcher(zkc, this::receivedSessionEvent));
+ } else {
+ sessionWatcher = Optional.empty();
+ }
} catch (Throwable t) {
throw new MetadataStoreException(t);
}
@@ -92,7 +97,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
this.metadataStoreConfig = null;
this.isZkManaged = false;
this.zkc = zkc;
- this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
+ this.sessionWatcher = Optional.of(new ZKSessionWatcher(zkc, this::receivedSessionEvent));
zkc.addWatch("/", this::handleWatchEvent, AddWatchMode.PERSISTENT_RECURSIVE);
}
@@ -106,7 +111,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
super.receivedSessionEvent(event);
} else {
log.error("Failed to recreate persistent watch on ZooKeeper: {}", Code.get(rc));
- sessionWatcher.setSessionInvalid();
+ sessionWatcher.ifPresent(ZKSessionWatcher::setSessionInvalid);
// On the reconnectable client, mark the session as expired to trigger a new reconnect and
// we will have the chance to set the watch again.
if (zkc instanceof PulsarZooKeeperClient) {
@@ -320,7 +325,9 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
if (isZkManaged) {
zkc.close();
}
- sessionWatcher.close();
+ if (sessionWatcher.isPresent()) {
+ sessionWatcher.get().close();
+ }
super.close();
}