You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/04 19:44:11 UTC

[incubator-pulsar] branch branch-2.1 updated: Ensure standalone service comes back quickly after ungraceful restarts (#2487)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 113418d  Ensure standalone service comes back quickly after ungraceful restarts (#2487)
113418d is described below

commit 113418d3e7b4d02597bed7a939a46e5f66829621
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Sep 4 12:34:48 2018 -0700

    Ensure standalone service comes back quickly after ungraceful restarts (#2487)
    
    * Ensure standalone service comes back quickly after ungraceful restarts
    
    * Handle NoNode errors when deleting
    
    * Added NoopLoadManager for standalone mode
---
 conf/standalone.conf                               |   2 +
 .../pulsar/broker/loadbalance/NoopLoadManager.java | 158 +++++++++++++++++++++
 .../loadbalance/impl/ModularLoadManagerImpl.java   |   3 +-
 .../pulsar/zookeeper/LocalBookkeeperEnsemble.java  |  15 +-
 4 files changed, 176 insertions(+), 2 deletions(-)

diff --git a/conf/standalone.conf b/conf/standalone.conf
index 74a5702..755b76e 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -311,6 +311,8 @@ autoSkipNonRecoverableData=false
 
 ### --- Load balancer --- ###
 
+loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager
+
 # Enable load balancer
 loadBalancerEnabled=false
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
new file mode 100644
index 0000000..5773c61
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
+import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+
+public class NoopLoadManager implements LoadManager {
+
+    private String lookupServiceAddress;
+    private ResourceUnit localResourceUnit;
+    private ZooKeeper zkClient;
+
+    LocalBrokerData localData;
+
+    private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> ObjectMapperFactory
+            .getThreadLocal()
+            .readValue(content, LocalBrokerData.class);
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePort();
+        localResourceUnit = new SimpleResourceUnit(String.format("http://%s", lookupServiceAddress),
+                new PulsarResourceDescription());
+        zkClient = pulsar.getZkClient();
+
+        localData = new LocalBrokerData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls(),
+                pulsar.getBrokerServiceUrl(), pulsar.getBrokerServiceUrlTls());
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        String brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+
+        try {
+            // When running in standalone, this error can happen when killing the "standalone" process
+            // ungracefully since the ZK session will not be closed and it will take some time for ZK server
+            // to prune the expired sessions after startup.
+            // Since there's a single broker instance running, it's safe, in this mode, to remove the old lock
+
+            // Delete and recreate z-node
+            try {
+                if (zkClient.exists(brokerZnodePath, null) != null) {
+                    zkClient.delete(brokerZnodePath, -1);
+                }
+            } catch (NoNodeException nne) {
+                // Ignore if z-node was just expired
+            }
+
+            ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(),
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+
+        } catch (Exception e) {
+            throw new PulsarServerException(e);
+        }
+    }
+
+    @Override
+    public boolean isCentralized() {
+        return false;
+    }
+
+    @Override
+    public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception {
+        return Optional.of(localResourceUnit);
+    }
+
+    @Override
+    public LoadManagerReport generateLoadReport() throws Exception {
+        return null;
+    }
+
+    @Override
+    public Deserializer<? extends ServiceLookupData> getLoadReportDeserializer() {
+        return loadReportDeserializer;
+    }
+
+    @Override
+    public void setLoadReportForceUpdateFlag() {
+        // do nothing
+    }
+
+    @Override
+    public void writeLoadReportOnZookeeper() throws Exception {
+        // do nothing
+    }
+
+    @Override
+    public void writeResourceQuotasToZooKeeper() throws Exception {
+        // do nothing
+    }
+
+    @Override
+    public List<Metrics> getLoadBalancingMetrics() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void doLoadShedding() {
+        // do nothing
+    }
+
+    @Override
+    public void doNamespaceBundleSplit() throws Exception {
+        // do nothing
+    }
+
+    @Override
+    public void disableBroker() throws Exception {
+        // do nothing
+    }
+
+    @Override
+    public Set<String> getAvailableBrokers() throws Exception {
+        return Collections.singleton(lookupServiceAddress);
+    }
+
+    @Override
+    public void stop() throws PulsarServerException {
+        // do nothing
+    }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 714389f..32742b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -793,7 +793,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
                 if (ownerZkSessionId != 0 && ownerZkSessionId != zkClient.getSessionId()) {
                     log.error("Broker znode - [{}] is own by different zookeeper-ssession {} ", brokerZnodePath,
                             ownerZkSessionId);
-                    throw new PulsarServerException("Broker-znode owned by different zk-session " + ownerZkSessionId);
+                    throw new PulsarServerException(
+                            "Broker-znode owned by different zk-session " + ownerZkSessionId);
                 }
                 // Node may already be created by another load manager: in this case update the data.
                 zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1);
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
index 039a8a2..f6ab92c 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java
@@ -63,6 +63,7 @@ import org.apache.bookkeeper.util.MathUtils;
 import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -220,9 +221,21 @@ public class LocalBookkeeperEnsemble {
                 cleanDirectory(bkDataDir);
             }
 
+            int bookiePort = initialPort + i;
+
+            // Ensure registration Z-nodes are cleared when standalone service is restarted ungracefully
+            String registrationZnode = String.format("/ledgers/available/%s:%d", baseConf.getAdvertisedAddress(), bookiePort);
+            if (zkc.exists(registrationZnode, null) != null) {
+                try {
+                    zkc.delete(registrationZnode, -1);
+                } catch (NoNodeException nne) {
+                    // Ignore if z-node was just expired
+                }
+            }
+
             bsConfs[i] = new ServerConfiguration(baseConf);
             // override settings
-            bsConfs[i].setBookiePort(initialPort + i);
+            bsConfs[i].setBookiePort(bookiePort);
             bsConfs[i].setZkServers("127.0.0.1:" + ZooKeeperDefaultPort);
             bsConfs[i].setJournalDirName(bkDataDir.getPath());
             bsConfs[i].setLedgerDirNames(new String[] { bkDataDir.getPath() });