You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2014/03/07 18:39:38 UTC

svn commit: r1575338 - in /zookeeper/bookkeeper/trunk: ./ hedwig-server/src/main/java/org/apache/hedwig/server/common/ hedwig-server/src/main/java/org/apache/hedwig/server/topics/ hedwig-server/src/test/java/org/apache/hedwig/server/topics/

Author: ivank
Date: Fri Mar  7 17:39:37 2014
New Revision: 1575338

URL: http://svn.apache.org/r1575338
Log:
BOOKKEEPER-363: Re-distributing topics among newly added hubs. (aniruddha via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Mar  7 17:39:37 2014
@@ -182,6 +182,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-683: TestSubAfterCloseSub fails on 4.2 (jiannan via ivank)
 
+        BOOKKEEPER-363: Re-distributing topics among newly added hubs. (aniruddha via ivank)
+
       hedwig-client:
 
         BOOKKEEPER-598: Fails to compile - RESUBSCRIBE_EXCEPTION conflict (Matthew Farrellee via sijie)

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Fri Mar  7 17:39:37 2014
@@ -27,15 +27,16 @@ import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang.StringUtils;
-
-import com.google.protobuf.ByteString;
-import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.hedwig.conf.AbstractConfiguration;
 import org.apache.hedwig.server.meta.MetadataManagerFactory;
+import org.apache.hedwig.server.topics.HubLoad;
 import org.apache.hedwig.util.HedwigSocketAddress;
 
+import com.google.protobuf.ByteString;
+
 public class ServerConfiguration extends AbstractConfiguration {
     public final static String REGION = "region";
     protected final static String MAX_MESSAGE_SIZE = "max_message_size";
@@ -75,6 +76,9 @@ public class ServerConfiguration extends
     protected final static String NUM_DELIVERY_THREADS = "num_delivery_threads";
 
     protected final static String MAX_ENTRIES_PER_LEDGER = "max_entries_per_ledger";
+    protected final static String REBALANCE_TOLERANCE_PERCENTAGE = "rebalance_tolerance";
+    protected final static String REBALANCE_MAX_SHED = "rebalance_max_shed";
+    protected final static String REBALANCE_INTERVAL_SEC = "rebalance_interval_sec";
 
     // manager related settings
     protected final static String METADATA_MANAGER_BASED_TOPIC_MANAGER_ENABLED = "metadata_manager_based_topic_manager_enabled";
@@ -153,7 +157,7 @@ public class ServerConfiguration extends
 
     /**
      * Maximum number of messages to read ahead. Default is 10.
-     * 
+     *
      * @return int
      */
     public int getReadAheadCount() {
@@ -162,7 +166,7 @@ public class ServerConfiguration extends
 
     /**
      * Maximum number of bytes to read ahead. Default is 4MB.
-     * 
+     *
      * @return long
      */
     public long getReadAheadSizeBytes() {
@@ -172,7 +176,7 @@ public class ServerConfiguration extends
     /**
      * Maximum cache size. By default is the smallest of 2G or
      * half the heap size.
-     * 
+     *
      * @return long
      */
     public long getMaximumCacheSize() {
@@ -193,16 +197,16 @@ public class ServerConfiguration extends
 
     /**
      * After a scan of a log fails, how long before we retry (in msec)
-     * 
+     *
      * @return long
      */
     public long getScanBackoffPeriodMs() {
         return conf.getLong(SCAN_BACKOFF_MSEC, 1000);
     }
-    
+
     /**
      * Returns server port.
-     * 
+     *
      * @return int
      */
     public int getServerPort() {
@@ -211,7 +215,7 @@ public class ServerConfiguration extends
 
     /**
      * Returns SSL server port.
-     * 
+     *
      * @return int
      */
     public int getSSLServerPort() {
@@ -220,7 +224,7 @@ public class ServerConfiguration extends
 
     /**
      * Returns ZooKeeper path prefix.
-     * 
+     *
      * @return string
      */
     public String getZkPrefix() {
@@ -263,7 +267,7 @@ public class ServerConfiguration extends
 
     /**
      * Return ZooKeeper list of servers. Default is localhost.
-     * 
+     *
      * @return String
      */
     public String getZkHost() {
@@ -276,16 +280,16 @@ public class ServerConfiguration extends
 
     /**
      * Return ZooKeeper session timeout. Default is 2s.
-     * 
+     *
      * @return int
      */
     public int getZkTimeout() {
         return conf.getInt(ZK_TIMEOUT, 2000);
     }
 
-    /** 
+    /**
      * Returns true if read-ahead enabled. Default is true.
-     * 
+     *
      * @return boolean
      */
     public boolean getReadAheadEnabled() {
@@ -296,7 +300,7 @@ public class ServerConfiguration extends
 
     /**
      * Returns true if standalone. Default is false.
-     * 
+     *
      * @return boolean
      */
     public boolean isStandalone() {
@@ -304,8 +308,8 @@ public class ServerConfiguration extends
     }
 
     /**
-     * Returns list of regions. 
-     * 
+     * Returns list of regions.
+     *
      * @return List<String>
      */
     public List<String> getRegions() {
@@ -317,7 +321,7 @@ public class ServerConfiguration extends
 
     /**
      *  Returns the name of the SSL certificate if available as a resource.
-     * 
+     *
      * @return String
      */
     public String getCertName() {
@@ -326,7 +330,7 @@ public class ServerConfiguration extends
 
     /**
      * This is the path to the SSL certificate if it is available as a file.
-     * 
+     *
      * @return String
      */
     public String getCertPath() {
@@ -351,7 +355,7 @@ public class ServerConfiguration extends
     /**
      * Returns the password used for BookKeeper ledgers. Default
      * is the empty string.
-     * 
+     *
      * @return
      */
     public String getPassword() {
@@ -360,7 +364,7 @@ public class ServerConfiguration extends
 
     /**
      * Returns true if SSL is enabled. Default is false.
-     * 
+     *
      * @return boolean
      */
     public boolean isSSLEnabled() {
@@ -372,7 +376,7 @@ public class ServerConfiguration extends
      * information about consumed messages. A value greater than
      * one avoids persisting information about consumed messages
      * upon every consumed message. Default is 50.
-     * 
+     *
      * @return int
      */
     public int getConsumeInterval() {
@@ -383,7 +387,7 @@ public class ServerConfiguration extends
      * Returns the interval to release a topic. If this
      * parameter is greater than zero, then schedule a
      * task to release an owned topic. Default is 0 (never released).
-     * 
+     *
      * @return int
      */
     public int getRetentionSecs() {
@@ -422,7 +426,7 @@ public class ServerConfiguration extends
 
     /**
      * True if SSL is enabled across regions.
-     * 
+     *
      * @return boolean
      */
     public boolean isInterRegionSSLEnabled() {
@@ -430,10 +434,10 @@ public class ServerConfiguration extends
     }
 
     /**
-     * This parameter is used to determine how often we run the 
-     * SubscriptionManager's Messages Consumed timer task thread 
+     * This parameter is used to determine how often we run the
+     * SubscriptionManager's Messages Consumed timer task thread
      * (in milliseconds).
-     * 
+     *
      * @return int
      */
     public int getMessagesConsumedThreadRunInterval() {
@@ -444,7 +448,7 @@ public class ServerConfiguration extends
      * This parameter is used to determine how often we run a thread
      * to retry those failed remote subscriptions in asynchronous mode
      * (in milliseconds).
-     * 
+     *
      * @return int
      */
     public int getRetryRemoteSubscribeThreadRunInterval() {
@@ -455,7 +459,7 @@ public class ServerConfiguration extends
      * This parameter is for setting the default maximum number of messages which
      * can be delivered to a subscriber without being consumed.
      * we pause messages delivery to a subscriber when reaching the window size
-     * 
+     *
      * @return int
      */
     public int getDefaultMessageWindowSize() {
@@ -466,7 +470,7 @@ public class ServerConfiguration extends
      * This parameter is used when Bookkeeper is the persistence
      * store and indicates what the ensemble size is (i.e. how
      * many bookie servers to stripe the ledger entries across).
-     * 
+     *
      * @return int
      */
     public int getBkEnsembleSize() {
@@ -478,7 +482,7 @@ public class ServerConfiguration extends
      * This parameter is used when Bookkeeper is the persistence store
      * and indicates what the quorum size is (i.e. how many redundant
      * copies of each ledger entry is written).
-     * 
+     *
      * @return int
      * @deprecated please use #getBkWriteQuorumSize() and #getBkAckQuorumSize()
      */
@@ -525,6 +529,33 @@ public class ServerConfiguration extends
         return conf.getLong(MAX_ENTRIES_PER_LEDGER, 0L);
     }
 
+    /**
+     * Get the tolerance percentage for the rebalancer. The rebalancer will not
+     * shed load if it's current load is less than average + average*tolerancePercentage/100.0
+     *
+     * @return the tolerance percentage for the rebalancer.
+     */
+    public double getRebalanceTolerance() {
+        return conf.getDouble(REBALANCE_TOLERANCE_PERCENTAGE, 10.0);
+    }
+
+    /**
+     * Get the maximum load the rebalancer can shed at once. Default is 50.
+     * @return
+     */
+    public HubLoad getRebalanceMaxShed() {
+        return new HubLoad(conf.getLong(REBALANCE_MAX_SHED, 50));
+    }
+
+    /**
+     * Get the interval(in seconds) between rebalancing attempts. The default is
+     * 5 minutes.
+     * @return
+     */
+    public long getRebalanceInterval() {
+        return conf.getLong(REBALANCE_INTERVAL_SEC, 300);
+    }
+
     /*
      * Is this a valid configuration that we can run with? This code might grow
      * over time.
@@ -553,7 +584,14 @@ public class ServerConfiguration extends
             throw new ConfigurationException("BK write quorum size (" + getBkWriteQuorumSize()
                                              + ") is less than the ack quorum size (" + getBkAckQuorumSize() + ")");
         }
-
+        // Validate that the rebalance tolerance percentage is not negative.
+        if (getRebalanceTolerance() < 0.0) {
+            throw new ConfigurationException("The rebalance tolerance percentage cannot be negative.");
+        }
+        // Validate that the maximum load to shed during a rebalance is not negative.
+        if (getRebalanceMaxShed().getNumTopics() < 0L) {
+            throw new ConfigurationException("The maximum load to shed during a rebalance cannot be negative.");
+        }
         // add other checks here
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java Fri Mar  7 17:39:37 2014
@@ -20,25 +20,25 @@ package org.apache.hedwig.server.topics;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.TopicOpQueuer;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.CallbackUtils;
+import org.apache.hedwig.util.HedwigSocketAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.protobuf.ByteString;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.common.TopicOpQueuer;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.CallbackUtils;
-import org.apache.hedwig.util.HedwigSocketAddress;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
 
 public abstract class AbstractTopicManager implements TopicManager {
 
@@ -204,15 +204,17 @@ public abstract class AbstractTopicManag
             public void operationFailed(final Object ctx, final PubSubException exception) {
                 // TODO: optimization: we can release this as soon as we experience the first error.
                 Callback<Void> cb = new Callback<Void>() {
+                    @Override
                     public void operationFinished(Object _ctx, Void _resultOfOperation) {
                         originalCallback.operationFailed(ctx, exception);
                     }
+                    @Override
                     public void operationFailed(Object _ctx, PubSubException _exception) {
                         logger.error("Exception releasing topic", _exception);
                         originalCallback.operationFailed(ctx, exception);
                     }
                 };
-                
+
                 realReleaseTopic(topic, cb, originalContext);
             }
         };
@@ -241,6 +243,52 @@ public abstract class AbstractTopicManag
         queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, cb, ctx));
     }
 
+    @Override
+    public final void releaseTopics(int numTopics, final Callback<Long> callback, final Object ctx) {
+        // This is a best effort function. We sacrifice accuracy to not hold a lock on the topics set.
+        List<ByteString> topicList = getTopicList();
+        // Make sure we release only as many topics as we own.
+        final long numTopicsToRelease = Math.min(topicList.size(), numTopics);
+        // Shuffle the list of topics we own, so that we release a random subset.
+        Collections.shuffle(topicList);
+        Callback<Void> mcb = CallbackUtils.multiCallback((int)numTopicsToRelease, new Callback<Void>() {
+            @Override
+            public void operationFinished(Object ctx, Void ignoreVal) {
+                callback.operationFinished(ctx, numTopicsToRelease);
+            }
+
+            @Override
+            public void operationFailed(Object ctx, PubSubException e) {
+                long notReleased = 0;
+                if (e instanceof PubSubException.CompositeException) {
+                    notReleased = ((PubSubException.CompositeException)e).getExceptions().size();
+                }
+                callback.operationFinished(ctx, numTopicsToRelease - notReleased);
+            }
+        }, ctx);
+
+        // Try to release "numTopicsToRelease" topics. It's okay if we're not
+        // able to release some topics. We signal that we tried by invoking the callback's
+        // operationFinished() with the actual number of topics released.
+        logger.info("This hub is releasing {} topics", numTopicsToRelease);
+        long releaseCount = 0;
+        for (ByteString topic : topicList) {
+            if (++releaseCount > numTopicsToRelease) {
+                break;
+            }
+            releaseTopic(topic, mcb, ctx);
+        }
+    }
+
+    @Override
+    public List<ByteString> getTopicList() {
+        List<ByteString> topicList;
+        synchronized (this.topics) {
+            topicList = Lists.newArrayList(this.topics.asMap().keySet());
+        }
+        return topicList;
+    }
+
     /**
      * This method should "return" the owner of the topic if one has been chosen
      * already. If there is no pre-chosen owner, either this hub or some other

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java Fri Mar  7 17:39:37 2014
@@ -38,6 +38,8 @@ public class HubLoad implements Comparab
     public static final HubLoad MIN_LOAD = new HubLoad(0);
 
     public static class InvalidHubLoadException extends Exception {
+        private static final long serialVersionUID = 5870487176956413387L;
+
         public InvalidHubLoadException(String msg) {
             super(msg);
         }
@@ -48,7 +50,7 @@ public class HubLoad implements Comparab
     }
 
     // how many topics that a hub server serves
-    long numTopics; 
+    long numTopics;
 
     public HubLoad(long num) {
         this.numTopics = num;
@@ -58,11 +60,16 @@ public class HubLoad implements Comparab
         this.numTopics = data.getNumTopics();
     }
 
+    // TODO: Make this threadsafe (BOOKKEEPER-379)
     public HubLoad setNumTopics(long numTopics) {
         this.numTopics = numTopics;
         return this;
     }
 
+    public long getNumTopics() {
+        return this.numTopics;
+    }
+
     public HubLoadData toHubLoadData() {
         return HubLoadData.newBuilder().setNumTopics(numTopics).build();
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java Fri Mar  7 17:39:37 2014
@@ -105,4 +105,20 @@ interface HubServerManager {
      *          Callback context.
      */
     public void chooseLeastLoadedHub(Callback<HubInfo> callback, Object ctx);
+
+    /**
+     * Try to rebalance the load within the cluster. This function will get
+     * the {@link HubLoad} from all available hubs within the cluster, and then
+     * shed additional load.
+     *
+     * @param tolerancePercentage
+     *          the percentage of load above average that is permissible.
+     * @param maxLoadToShed
+     *          the maximum amount of load to shed per call.
+     * @param callback
+     *          Callback indicating whether we reduced load or not.
+     * @param ctx
+     */
+    public void rebalanceCluster(double tolerancePercentage, HubLoad maxLoadToShed,
+                                 Callback<Boolean> callback, Object ctx);
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java Fri Mar  7 17:39:37 2014
@@ -60,14 +60,14 @@ public class MMTopicManager extends Abst
     // all of the Ops put into the queuer will fail automatically.
     protected volatile boolean isSuspended = false;
 
-    public MMTopicManager(ServerConfiguration cfg, ZooKeeper zk, 
+    public MMTopicManager(ServerConfiguration cfg, ZooKeeper zk,
                           MetadataManagerFactory mmFactory,
                           ScheduledExecutorService scheduler)
             throws UnknownHostException, PubSubException {
         super(cfg, scheduler);
         // initialize topic ownership manager
         this.mm = mmFactory.newTopicOwnershipManager();
-        this.hubManager = new ZkHubServerManager(cfg, zk, addr);
+        this.hubManager = new ZkHubServerManager(cfg, zk, addr, this);
 
         final SynchronousQueue<Either<HubInfo, PubSubException>> queue =
             new SynchronousQueue<Either<HubInfo, PubSubException>>();
@@ -289,6 +289,11 @@ public class MMTopicManager extends Abst
     @Override
     protected void postReleaseCleanup(final ByteString topic,
                                       final Callback<Void> cb, final Object ctx) {
+
+        // Reduce load. We've removed the topic from our topic set, so do this as well.
+        // When we reclaim the topic, we will increment the load again.
+        hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size()));
+
         mm.readOwnerInfo(topic, new Callback<Versioned<HubInfo>>() {
             @Override
             public void operationFinished(Object ctx, Versioned<HubInfo> owner) {

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java?rev=1575338&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java Fri Mar  7 17:39:37 2014
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.util.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Shed load by releasing topics.
+ */
+public class TopicBasedLoadShedder {
+    private static final Logger logger = LoggerFactory.getLogger(TopicBasedLoadShedder.class);
+    private final double tolerancePercentage;
+    private final long maxLoadToShed;
+    private final TopicManager tm;
+    private final List<ByteString> topicList;
+
+    /**
+     * @param tm The topic manager used to handle load shedding
+     * @param tolerancePercentage The tolerance percentage for shedding load
+     * @param maxLoadToShed The maximum amoung of load to shed in one call.
+     */
+    public TopicBasedLoadShedder(TopicManager tm, double tolerancePercentage,
+                                 HubLoad maxLoadToShed) {
+        // Make sure that all functions in this class have a consistent view
+        // of the load. So, we use the same topic list throughout.
+        this(tm, tm.getTopicList(), tolerancePercentage, maxLoadToShed);
+    }
+
+    /**
+     * This is public because it makes testing easier.
+     * @param tm The topic manager used to handle load shedding
+     * @param topicList The topic list representing topics owned by this hub.
+     * @param tolerancePercentage The tolerance percentage for shedding load
+     * @param maxLoadToShed The maximum amoung of load to shed in one call.
+     */
+    TopicBasedLoadShedder(TopicManager tm, List<ByteString> topicList,
+                          double tolerancePercentage,
+                          HubLoad maxLoadToShed) {
+        this.tolerancePercentage = tolerancePercentage;
+        this.maxLoadToShed = maxLoadToShed.getNumTopics();
+        this.tm = tm;
+        this.topicList = topicList;
+    }
+
+    /**
+     * Reduce the load on the current hub so that it reaches the target load.
+     * We reduce load by releasing topics using the {@link TopicManager} passed
+     * to the constructor. We use {@link TopicManager#releaseTopics(int, org.apache.hedwig.util.Callback, Object)}
+     * to actually release topics.
+     *
+     * @param targetLoad
+     * @param callback
+     *              a Callback<Long> that indicates how many topics we tried to release.
+     * @param ctx
+     */
+    public void reduceLoadTo(HubLoad targetLoad, final Callback<Long> callback, final Object ctx) {
+        int targetTopics = (int)targetLoad.toHubLoadData().getNumTopics();
+        int numTopicsToRelease = topicList.size() - targetTopics;
+
+        // The number of topics we own is less than the target topic size. We don't release
+        // any topics in this case.
+        if (numTopicsToRelease <= 0) {
+            callback.operationFinished(ctx, 0L);
+            return;
+        }
+        // Call releaseTopics() on the topic manager to do this. We let the manager handle the release
+        // policy.
+        tm.releaseTopics(numTopicsToRelease, callback, ctx);
+    }
+
+    /**
+     * Calculate the average number of topics on the currently active hubs and release topics
+     * if required.
+     * We shed topics if we currently hold topics greater than average + average * tolerancePercentage/100.0
+     * We shed a maximum of maxLoadToShed topics
+     * We also hold on to at least one topic.
+     * @param loadMap
+     * @param callback
+     *          A return value of true means we tried to rebalance. False means that there was
+     *          no need to rebalance.
+     * @param ctx
+     */
+    public void shedLoad(final Map<HubInfo, HubLoad> loadMap, final Callback<Boolean> callback,
+                         final Object ctx) {
+
+        long totalTopics = 0L;
+        long myTopics = topicList.size();
+        for (Map.Entry<HubInfo, HubLoad> entry : loadMap.entrySet()) {
+            if (null == entry.getKey() || null == entry.getValue()) {
+                continue;
+            }
+            totalTopics += entry.getValue().toHubLoadData().getNumTopics();
+        }
+
+        double averageTopics = (double)totalTopics/loadMap.size();
+        logger.info("Total topics in the cluster : {}. Average : {}.", totalTopics, averageTopics);
+
+        // Handle the case when averageTopics == 0. We hold on to at least 1 topic.
+        long permissibleTopics =
+            Math.max(1L, (long) Math.ceil(averageTopics + averageTopics * tolerancePercentage / 100.0));
+        logger.info("Permissible topics : {}. Number of topics this hub holds : {}.", permissibleTopics, myTopics);
+        if (myTopics <= permissibleTopics) {
+            // My owned topics are less than those permitted by the current tolerance level. No need to release
+            // any topics.
+            callback.operationFinished(ctx, false);
+            return;
+        }
+
+        // The number of topics I own is more than what I should be holding. We shall now attempt to shed some load.
+        // We shed at most maxLoadToShed number of topics. We also hold on to at least 1 topic.
+        long targetNumTopics = Math.max(1L, Math.max((long)Math.ceil(averageTopics), myTopics - maxLoadToShed));
+
+        // Reduce the load on the current hub to the target load we calculated above.
+        logger.info("Reducing load on this hub to {} topics.", targetNumTopics);
+        reduceLoadTo(new HubLoad(targetNumTopics), new Callback<Long>() {
+            @Override
+            public void operationFinished(Object ctx, Long numReleased) {
+                logger.info("Released {} topics to shed load.", numReleased);
+                callback.operationFinished(ctx, true);
+            }
+
+            @Override
+            public void operationFailed(Object ctx, PubSubException e) {
+                callback.operationFailed(ctx, e);
+            }
+        }, ctx);
+    }
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java Fri Mar  7 17:39:37 2014
@@ -23,6 +23,8 @@ import org.apache.hedwig.server.persiste
 import org.apache.hedwig.util.Callback;
 import org.apache.hedwig.util.HedwigSocketAddress;
 
+import java.util.List;
+
 /**
  * An implementor of this interface is basically responsible for ensuring that
  * there is at most a single host responsible for a given topic at a given time.
@@ -81,6 +83,23 @@ public interface TopicManager {
     public void releaseTopic(ByteString topic, Callback<Void> cb, Object ctx);
 
     /**
+     * Release numTopics topics. If you hold fewer, release all.
+     * @param numTopics
+     *          Number of topics to release.
+     * @param callback
+     *          The callback should be invoked with the number of topics the hub
+     *          released successfully.
+     * @param ctx
+     */
+    public void releaseTopics(int numTopics, Callback<Long> callback, Object ctx);
+
+    /**
+     * Get the list of topics this hub believes it is responsible for.
+     * @return
+     */
+    public List<ByteString> getTopicList();
+
+    /**
      * Stop topic manager
      */
     public void stop();

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java Fri Mar  7 17:39:37 2014
@@ -17,11 +17,16 @@
  */
 package org.apache.hedwig.server.topics;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
-
-import static com.google.common.base.Charsets.UTF_8;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.server.common.ServerConfiguration;
@@ -38,7 +43,6 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +58,7 @@ class ZkHubServerManager implements HubS
     private final ServerConfiguration conf;
     private final ZooKeeper zk;
     private final HedwigSocketAddress addr;
+    private final TopicManager tm;
     private final String ephemeralNodePath;
     private final String hubNodesPath;
 
@@ -61,6 +66,7 @@ class ZkHubServerManager implements HubS
     protected HubInfo myHubInfo;
     protected volatile boolean isSuspended = false;
     protected ManagerListener listener = null;
+    protected final ScheduledExecutorService executor;
 
     // upload hub server load to zookeeper
     StatCallback loadReportingStatCallback = new StatCallback() {
@@ -100,25 +106,90 @@ class ZkHubServerManager implements HubS
             if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
                 logger.error("ZK client connection to the ZK server has expired.!");
                 if (null != listener) {
+                    // Shutdown our executor NOW!
+                    executor.shutdownNow();
                     listener.onShutdown();
                 }
             }
         }
     }
 
+    class RebalanceRunnable implements Runnable {
+        private final double tolerancePercentage;
+        private final HubLoad maxLoadToShed;
+        private final long delaySeconds;
+
+        public RebalanceRunnable(double tolerancePercentage,
+                                 HubLoad maxLoadToShed,
+                                 long delaySeconds) {
+            this.tolerancePercentage = tolerancePercentage;
+            this.maxLoadToShed = maxLoadToShed;
+            this.delaySeconds = delaySeconds;
+        }
+
+        @Override
+        public void run() {
+            // If we are in suspended state, don't attempt a rebalance.
+            if (isSuspended) {
+                executor.schedule(this, delaySeconds, TimeUnit.SECONDS);
+                return;
+            }
+            // We should attempt a rebalance. We reschedule the job at the tail so that
+            // two rebalances don't happen simultaneously.
+            rebalanceCluster(tolerancePercentage, maxLoadToShed, new Callback<Boolean>() {
+                private void reschedule(Runnable task) {
+                    executor.schedule(task, delaySeconds, TimeUnit.SECONDS);
+                }
+
+                @Override
+                public void operationFinished(Object ctx, Boolean didRebalance) {
+                    if (didRebalance == true) {
+                        logger.info("The attempt to rebalance the cluster was successful");
+                    } else {
+                        logger.info("There was no need to rebalance.");
+                    }
+                    // Our original runnable was passed as the context.
+                    reschedule((Runnable)ctx);
+                }
+
+                @Override
+                public void operationFailed(Object ctx, PubSubException e) {
+                    logger.error("The attempt to rebalance the cluster did not succeed.", e);
+                    // Reschedule the job
+                    reschedule((Runnable)ctx);
+                }
+            }, this);
+        }
+
+        public void start() {
+            // Initiate only if delaySeconds > 0
+            if (delaySeconds > 0) {
+                logger.info("Starting the rebalancer thread with tolerance={}, maxLoadToShed={} and delay={}",
+                    new Object[] { tolerancePercentage, maxLoadToShed.getNumTopics(), delaySeconds });
+                executor.schedule(this, delaySeconds, TimeUnit.SECONDS);
+            }
+        }
+    }
+
     public ZkHubServerManager(ServerConfiguration conf,
                               ZooKeeper zk,
-                              HedwigSocketAddress addr) {
+                              HedwigSocketAddress addr,
+                              TopicManager tm) {
         this.conf = conf;
         this.zk = zk;
         this.addr = addr;
-
+        this.tm = tm;
         // znode path to store all available hub servers
         this.hubNodesPath = this.conf.getZkHostsPrefix(new StringBuilder()).toString();
         // the node's ephemeral node path
         this.ephemeralNodePath = getHubZkNodePath(addr);
+        this.executor = Executors.newSingleThreadScheduledExecutor();
         // register available hub servers list watcher
         zk.register(new ZkHubsWatcher());
+
+        // Start the rebalancer here.
+        new RebalanceRunnable(conf.getRebalanceTolerance(), conf.getRebalanceMaxShed(),
+                              conf.getRebalanceInterval()).start();
     }
 
     @Override
@@ -157,7 +228,7 @@ class ZkHubServerManager implements HubS
                                 return;
                             } else {
                                 callback.operationFailed(ctx,
-                                    new PubSubException.ServiceDownException(
+                                        new PubSubException.ServiceDownException(
                                         "I can't state my hub node after I created it : "
                                         + ephemeralNodePath));
                                 return;
@@ -167,7 +238,7 @@ class ZkHubServerManager implements HubS
                     return;
                 }
                 if (rc != Code.NODEEXISTS.intValue()) {
-                    KeeperException ke = ZkUtils .logErrorAndCreateZKException(
+                    KeeperException ke = ZkUtils.logErrorAndCreateZKException(
                             "Could not create ephemeral node to register hub", ephemeralNodePath, rc);
                     callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
                     return;
@@ -283,7 +354,7 @@ class ZkHubServerManager implements HubS
 
                     if (numResponses == children.size()) {
                         if (leastLoaded == null) {
-                            callback.operationFailed(ctx, 
+                            callback.operationFailed(ctx,
                                 new PubSubException.ServiceDownException("No hub available"));
                             return;
                         }
@@ -305,4 +376,95 @@ class ZkHubServerManager implements HubS
                        dataCallback, child);
         }
     }
+
+    /**
+     * Get a map of all currently active hubs with their advertised load.
+     * @param callback
+     * @param originalCtx
+     */
+    private void getActiveHubsInfoWithLoad(final Callback<Map<HubInfo, HubLoad>> callback,
+                                           final Object originalCtx) {
+        // Get the list of children and then for each child, get the data. All asynchronously.
+        zk.getChildren(hubNodesPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
+            @Override
+            public void safeProcessResult(int rc, String path, Object ctx, final List<String> children) {
+                if (rc != Code.OK.intValue()) {
+                    KeeperException e = ZkUtils.logErrorAndCreateZKException(
+                            "Could not get children for given path", path, rc);
+                    callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+                    return;
+                }
+
+                // The data callback for every child node
+                SafeAsyncZKCallback.DataCallback dataCallback = new SafeAsyncZKCallback.DataCallback() {
+                    Map<HubInfo, HubLoad> loadMap = new HashMap<HubInfo, HubLoad>();
+                    int numResponse = 0;
+                    @Override
+                    public void safeProcessResult(int rc, String path, Object dataCtx,
+                                                  byte[] data, Stat stat) {
+                        synchronized (this) {
+                            if (rc == Code.OK.intValue()) {
+                                // Put this load in the map. dataCtx is actually the child string which is the
+                                // IP:PORT:SSL representation of the hub.
+                                try {
+                                    HubInfo hubInfo =
+                                        new HubInfo(new HedwigSocketAddress((String)dataCtx), stat.getCzxid());
+                                    HubLoad hubLoad = HubLoad.parse(new String(data, UTF_8));
+                                    this.loadMap.put(hubInfo, hubLoad);
+                                } catch (HubLoad.InvalidHubLoadException e) {
+                                    logger.warn("Corrupt data found for a hub. Ignoring.");
+                                }
+                            }
+                            numResponse++;
+                            if (numResponse == children.size()) {
+                                // We got less number of valid responses than the hubs we saw previously.
+                                // Signal an error.
+                                if (loadMap.size() != numResponse) {
+                                    callback.operationFailed(originalCtx,
+                                        new PubSubException.UnexpectedConditionException(
+                                           "Fewer OK responses than the number of active hubs seen previously."));
+                                    return;
+                                }
+                                // We've seen all responses. All OK.
+                                callback.operationFinished(originalCtx, loadMap);
+                            }
+                        }
+                    }
+                };
+
+                for (String child : children) {
+                    String znode = conf.getZkHostsPrefix(new StringBuilder()).append("/").append(child).toString();
+                    zk.getData(znode, false, dataCallback, child);
+                }
+            }
+        }, originalCtx);
+    }
+
+    @Override
+    public void rebalanceCluster(final double tolerancePercentage, final HubLoad maxLoadToShed,
+                                 final Callback<Boolean> callback, final Object ctx) {
+        // Get the load on all active hubs and then shed load if required.
+        getActiveHubsInfoWithLoad(new Callback<Map<HubInfo, HubLoad>>() {
+            @Override
+            public void operationFinished(Object ctx, Map<HubInfo, HubLoad> loadMap) {
+                if (null == tm) {
+                    // No topic manager, so no load to shed.
+                    callback.operationFinished(ctx, false);
+                    return;
+                }
+                TopicBasedLoadShedder tbls = new TopicBasedLoadShedder(tm,
+                        tolerancePercentage, maxLoadToShed);
+                tbls.shedLoad(loadMap, callback, ctx);
+            }
+
+            @Override
+            public void operationFailed(Object ctx, PubSubException e) {
+                // Rebalance failed. Log this and signal failure on the callback.
+                logger.error("Failed to get active hubs. Cannot attempt a rebalance.");
+                callback.operationFailed(ctx, e);
+            }
+        }, ctx);
+    }
+
+
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java Fri Mar  7 17:39:37 2014
@@ -77,7 +77,7 @@ public class ZkTopicManager extends Abst
 
         super(cfg, scheduler);
         this.zk = zk;
-        this.hubManager = new ZkHubServerManager(cfg, zk, addr);
+        this.hubManager = new ZkHubServerManager(cfg, zk, addr, this);
 
         myHubLoad = new HubLoad(topics.size());
         this.hubManager.registerListener(new HubServerManager.ManagerListener() {
@@ -275,6 +275,10 @@ public class ZkTopicManager extends Abst
     @Override
     protected void postReleaseCleanup(final ByteString topic, final Callback<Void> cb, Object ctx) {
 
+        // Reduce load. We've removed the topic from our topic set, so do this as well.
+        // When we reclaim the topic, we will increment the load again.
+        hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size()));
+
         zk.getData(hubPath(topic), false, new SafeAsyncZKCallback.DataCallback() {
             @Override
             public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java?rev=1575338&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java Fri Mar  7 17:39:37 2014
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.SynchronousQueue;
+
+import junit.framework.Assert;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+
+public class TestTopicBasedLoadShedder {
+
+    final protected SynchronousQueue<Boolean> statusQueue = new SynchronousQueue<Boolean>();
+    private int myTopics = 10;
+    private int numHubs = 10;
+    private List<ByteString> mockTopicList;
+    private final HubLoad infiniteMaxLoad = new HubLoad(10000000);
+    Map<HubInfo, HubLoad> mockLoadMap = new HashMap<HubInfo, HubLoad>();
+
+    class MockTopicBasedLoadShedder extends TopicBasedLoadShedder {
+        // This is set by the reduceLoadTo function.
+        public HubLoad targetLoad;
+        public MockTopicBasedLoadShedder(TopicManager tm, List<ByteString> topicList,
+                                         Double tolerancePercentage, HubLoad maxLoadToShed) {
+            super(tm, topicList, tolerancePercentage, maxLoadToShed);
+        }
+        @Override
+        public void reduceLoadTo(HubLoad targetLoad, final Callback<Long> callback, final Object ctx) {
+            this.targetLoad = targetLoad;
+            // Indicates that we released these many topics.
+            callback.operationFinished(ctx, targetLoad.toHubLoadData().getNumTopics());
+        }
+    }
+    public Callback<Boolean> getShedLoadCallback(final MockTopicBasedLoadShedder ls, final HubLoad expected,
+                                                 final Boolean shouldRelease, final Boolean shouldFail) {
+        return new Callback<Boolean>() {
+            @Override
+            public void operationFinished(Object o, Boolean aBoolean) {
+                Boolean status = false;
+                status = (aBoolean == shouldRelease);
+                if (shouldRelease) {
+                    status &= (ls.targetLoad != null);
+                    status &= (expected.numTopics == ls.targetLoad.numTopics);
+                }
+                final Boolean statusToPut = status;
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        ConcurrencyUtils.put(statusQueue, statusToPut);
+                    }
+                }).start();
+            }
+
+            @Override
+            public void operationFailed(Object o, PubSubException e) {
+                new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        ConcurrencyUtils.put(statusQueue, shouldFail);
+                    }
+                }).start();
+            }
+        };
+    }
+
+    private List<ByteString> getMockTopicList(int numTopics) {
+        List<ByteString> topics = new ArrayList<ByteString>();
+        for (int i = 0; i < numTopics; i++) {
+            topics.add(ByteString.copyFromUtf8("MyTopic_" + i));
+        }
+        return topics;
+    }
+
+    private HubInfo getHubInfo(int hubNum) {
+        return new HubInfo(new HedwigSocketAddress("myhub.testdomain.foo"+hubNum+":4080:4080"), 0);
+    }
+
+    private synchronized void initialize(int myTopics, int numHubs, int[] otherHubsLoad) {
+        if (null != otherHubsLoad) {
+            Assert.assertTrue(otherHubsLoad.length == numHubs - 1);
+        }
+        this.myTopics = myTopics;
+        mockTopicList = getMockTopicList(this.myTopics);
+        this.numHubs = numHubs;
+        this.mockLoadMap.clear();
+        this.mockLoadMap.put(getHubInfo(0), new HubLoad(this.myTopics));
+        for (int i = 1; i < this.numHubs; i++) {
+            this.mockLoadMap.put(getHubInfo(i), new HubLoad(otherHubsLoad[i-1]));
+        }
+    }
+
+    private int[] getEqualLoadDistributionArray(int n, int load) {
+        if (n == 0) {
+            return null;
+        }
+        int[] retLoad = new int[n];
+        Arrays.fill(retLoad, load);
+        return retLoad;
+    }
+
+    @Test(timeout = 60000)
+    public synchronized  void testAllHubsSameTopics() throws Exception {
+        // All hubs have the same number of topics. We should not release any topics even with a
+        // tolerance of 0.0.
+        initialize(10, 10, getEqualLoadDistributionArray(9, 10));
+        MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad);
+        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, null, false, false), null);
+        Assert.assertTrue(statusQueue.take());
+    }
+
+    @Test(timeout = 60000)
+    public synchronized void testOneHubUnequalTopics() throws Exception {
+        // The hub has 20 topics while the average is 11. Should reduce the load to 11.
+        initialize(20, 10, getEqualLoadDistributionArray(9, 10));
+        MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad);
+        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(11), true, false), null);
+        Assert.assertTrue(statusQueue.take());
+    }
+
+    @Test(timeout = 60000)
+    public synchronized void testOneHubUnequalTopicsWithTolerance() throws Exception {
+        // The hub has 20 topics and average is 11. Should still release as tolerance level of 50.0 is
+        // breached. Should get down to average.
+        initialize(20, 10, getEqualLoadDistributionArray(9, 10));
+        MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 50.0, infiniteMaxLoad);
+        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(11), true, false), null);
+        Assert.assertTrue(statusQueue.take());
+
+        // A tolerance level of 100.0 should result in the hub not releasing topics.
+        tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 100.0, infiniteMaxLoad);
+        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, null, false, false), null);
+        Assert.assertTrue(statusQueue.take());
+    }
+
+    @Test(timeout = 60000)
+    public synchronized void testMaxLoadShed() throws Exception {
+        // The hub should not shed more than maxLoadShed topics.
+        initialize(20, 10, getEqualLoadDistributionArray(9, 10));
+        MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, new HubLoad(5));
+        // Our load should reduce to 15.
+        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(15), true, false), null);
+        Assert.assertTrue(statusQueue.take());
+
+        // We should reduce to 11 even when maxLoadShed and average result in the same
+        // values
+        tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, new HubLoad(9));
+        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(11), true, false), null);
+        Assert.assertTrue(statusQueue.take());
+    }
+
+    @Test(timeout = 60000)
+    public synchronized void testSingleHubLoadShed() throws Exception {
+        // If this is the only hub in the cluster, it should not release any topics.
+        initialize(20, 1, null);
+        MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad);
+        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, null, false, false), null);
+        Assert.assertTrue(statusQueue.take());
+    }
+
+    @Test(timeout = 60000)
+    public synchronized void testUnderloadedClusterLoadShed() throws Exception {
+        // Hold on to at least one topic while shedding load (if cluster is underloaded)
+        initialize(5, 10, getEqualLoadDistributionArray(9, 0));
+        MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad);
+        tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(1), true, false), null);
+        Assert.assertTrue(statusQueue.take());
+    }
+}