You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/04 19:44:11 UTC
[incubator-pulsar] branch branch-2.1 updated: Ensure standalone
service comes back quickly after ungraceful restarts (#2487)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 113418d Ensure standalone service comes back quickly after ungraceful restarts (#2487)
113418d is described below
commit 113418d3e7b4d02597bed7a939a46e5f66829621
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Sep 4 12:34:48 2018 -0700
Ensure standalone service comes back quickly after ungraceful restarts (#2487)
* Ensure standalone service comes back quickly after ungraceful restarts
* Handle NoNode errors when deleting
* Added NoopLoadManager for standalone mode
---
conf/standalone.conf | 2 +
.../pulsar/broker/loadbalance/NoopLoadManager.java | 158 +++++++++++++++++++++
.../loadbalance/impl/ModularLoadManagerImpl.java | 3 +-
.../pulsar/zookeeper/LocalBookkeeperEnsemble.java | 15 +-
4 files changed, 176 insertions(+), 2 deletions(-)
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 74a5702..755b76e 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -311,6 +311,8 @@ autoSkipNonRecoverableData=false
### --- Load balancer --- ###
+loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager
+
# Enable load balancer
loadBalancerEnabled=false
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
new file mode 100644
index 0000000..5773c61
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+
+public class NoopLoadManager implements LoadManager {
+
+ private String lookupServiceAddress;
+ private ResourceUnit localResourceUnit;
+ private ZooKeeper zkClient;
+
+ LocalBrokerData localData;
+
+ private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> ObjectMapperFactory
+ .getThreadLocal()
+ .readValue(content, LocalBrokerData.class);
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePort();
+ localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
+ new PulsarResourceDescription());
+ zkClient = pulsar.getZkClient();
+
+ localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+ pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ String brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+
+ try {
+ // When running in standalone, this error can happen when killing the "standalone" process
+ // ungracefully since the ZK session will not be closed and it will take some time for ZK server
+ // to prune the expired sessions after startup.
+ // Since there's a single broker instance running, it's safe, in this mode, to remove the old lock
+
+ // Delete and recreate z-node
+ try {
+ if (zkClient.exists(brokerZnodePath, null) != null) {
+ zkClient.delete(brokerZnodePath, -1);
+ }
+ } catch (NoNodeException nne) {
+ // Ignore if z-node was just expired
+ }
+
+ ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+
+ } catch (Exception e) {
+ throw new PulsarServerException(e);
+ }
+ }
+
+ @Override
+ public boolean isCentralized() {
+ return false;
+ }
+
+ @Override
+ public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception {
+ return Optional.of(localResourceUnit);
+ }
+
+ @Override
+ public LoadManagerReport generateLoadReport() throws Exception {
+ return null;
+ }
+
+ @Override
+ public Deserializer<? extends ServiceLookupData> getLoadReportDeserializer() {
+ return loadReportDeserializer;
+ }
+
+ @Override
+ public void setLoadReportForceUpdateFlag() {
+ // do nothing
+ }
+
+ @Override
+ public void writeLoadReportOnZookeeper() throws Exception {
+ // do nothing
+ }
+
+ @Override
+ public void writeResourceQuotasToZooKeeper() throws Exception {
+ // do nothing
+ }
+
+ @Override
+ public List<Metrics> getLoadBalancingMetrics() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void doLoadShedding() {
+ // do nothing
+ }
+
+ @Override
+ public void doNamespaceBundleSplit() throws Exception {
+ // do nothing
+ }
+
+ @Override
+ public void disableBroker() throws Exception {
+ // do nothing
+ }
+
+ @Override
+ public Set<String> getAvailableBrokers() throws Exception {
+ return Collections.singleton(lookupServiceAddress);
+ }
+
+ @Override
+ public void stop() throws PulsarServerException {
+ // do nothing
+ }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 714389f..32742b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -793,7 +793,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
if (ownerZkSessionId != 0 && ownerZkSessionId != zkClient.getSessionId()) {
log.error("Broker znode - [{}] is own by different zookeeper-ssession {} ", brokerZnodePath,
ownerZkSessionId);
- throw new PulsarServerException("Broker-znode owned by different zk-session " + ownerZkSessionId);
+ throw new PulsarServerException(
+ "Broker-znode owned by different zk-session " + ownerZkSessionId);
}
// Node may already be created by another load manager: in this case update the data.
zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index 039a8a2..f6ab92c 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -63,6 +63,7 @@ import org.apache.bookkeeper.util.MathUtils;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -220,9 +221,21 @@ public class LocalBookkeeperEnsemble {
cleanDirectory(bkDataDir);
}
+ int bookiePort = initialPort + i;
+
+ // Ensure registration Z-nodes are cleared when standalone service is restarted ungracefully
+ String registrationZnode = String.format("/ledgers/available/%s:%d", baseConf.getAdvertisedAddress(), bookiePort);
+ if (zkc.exists(registrationZnode, null) != null) {
+ try {
+ zkc.delete(registrationZnode, -1);
+ } catch (NoNodeException nne) {
+ // Ignore if z-node was just expired
+ }
+ }
+
bsConfs[i] = new ServerConfiguration(baseConf);
// override settings
- bsConfs[i].setBookiePort(initialPort + i);
+ bsConfs[i].setBookiePort(bookiePort);
bsConfs[i].setZkServers("127.0.0.1:" + ZooKeeperDefaultPort);
bsConfs[i].setJournalDirName(bkDataDir.getPath());
bsConfs[i].setLedgerDirNames(new String[] { bkDataDir.getPath() });