You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2014/03/07 18:39:38 UTC
svn commit: r1575338 - in /zookeeper/bookkeeper/trunk: ./
hedwig-server/src/main/java/org/apache/hedwig/server/common/
hedwig-server/src/main/java/org/apache/hedwig/server/topics/
hedwig-server/src/test/java/org/apache/hedwig/server/topics/
Author: ivank
Date: Fri Mar 7 17:39:37 2014
New Revision: 1575338
URL: http://svn.apache.org/r1575338
Log:
BOOKKEEPER-363: Re-distributing topics among newly added hubs. (aniruddha via ivank)
Added:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Mar 7 17:39:37 2014
@@ -182,6 +182,8 @@ Trunk (unreleased changes)
BOOKKEEPER-683: TestSubAfterCloseSub fails on 4.2 (jiannan via ivank)
+ BOOKKEEPER-363: Re-distributing topics among newly added hubs. (aniruddha via ivank)
+
hedwig-client:
BOOKKEEPER-598: Fails to compile - RESUBSCRIBE_EXCEPTION conflict (Matthew Farrellee via sijie)
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Fri Mar 7 17:39:37 2014
@@ -27,15 +27,16 @@ import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
+import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang.StringUtils;
-
-import com.google.protobuf.ByteString;
-import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.hedwig.conf.AbstractConfiguration;
import org.apache.hedwig.server.meta.MetadataManagerFactory;
+import org.apache.hedwig.server.topics.HubLoad;
import org.apache.hedwig.util.HedwigSocketAddress;
+import com.google.protobuf.ByteString;
+
public class ServerConfiguration extends AbstractConfiguration {
public final static String REGION = "region";
protected final static String MAX_MESSAGE_SIZE = "max_message_size";
@@ -75,6 +76,9 @@ public class ServerConfiguration extends
protected final static String NUM_DELIVERY_THREADS = "num_delivery_threads";
protected final static String MAX_ENTRIES_PER_LEDGER = "max_entries_per_ledger";
+ protected final static String REBALANCE_TOLERANCE_PERCENTAGE = "rebalance_tolerance";
+ protected final static String REBALANCE_MAX_SHED = "rebalance_max_shed";
+ protected final static String REBALANCE_INTERVAL_SEC = "rebalance_interval_sec";
// manager related settings
protected final static String METADATA_MANAGER_BASED_TOPIC_MANAGER_ENABLED = "metadata_manager_based_topic_manager_enabled";
@@ -153,7 +157,7 @@ public class ServerConfiguration extends
/**
* Maximum number of messages to read ahead. Default is 10.
- *
+ *
* @return int
*/
public int getReadAheadCount() {
@@ -162,7 +166,7 @@ public class ServerConfiguration extends
/**
* Maximum number of bytes to read ahead. Default is 4MB.
- *
+ *
* @return long
*/
public long getReadAheadSizeBytes() {
@@ -172,7 +176,7 @@ public class ServerConfiguration extends
/**
* Maximum cache size. By default is the smallest of 2G or
* half the heap size.
- *
+ *
* @return long
*/
public long getMaximumCacheSize() {
@@ -193,16 +197,16 @@ public class ServerConfiguration extends
/**
* After a scan of a log fails, how long before we retry (in msec)
- *
+ *
* @return long
*/
public long getScanBackoffPeriodMs() {
return conf.getLong(SCAN_BACKOFF_MSEC, 1000);
}
-
+
/**
* Returns server port.
- *
+ *
* @return int
*/
public int getServerPort() {
@@ -211,7 +215,7 @@ public class ServerConfiguration extends
/**
* Returns SSL server port.
- *
+ *
* @return int
*/
public int getSSLServerPort() {
@@ -220,7 +224,7 @@ public class ServerConfiguration extends
/**
* Returns ZooKeeper path prefix.
- *
+ *
* @return string
*/
public String getZkPrefix() {
@@ -263,7 +267,7 @@ public class ServerConfiguration extends
/**
* Return ZooKeeper list of servers. Default is localhost.
- *
+ *
* @return String
*/
public String getZkHost() {
@@ -276,16 +280,16 @@ public class ServerConfiguration extends
/**
* Return ZooKeeper session timeout. Default is 2s.
- *
+ *
* @return int
*/
public int getZkTimeout() {
return conf.getInt(ZK_TIMEOUT, 2000);
}
- /**
+ /**
* Returns true if read-ahead enabled. Default is true.
- *
+ *
* @return boolean
*/
public boolean getReadAheadEnabled() {
@@ -296,7 +300,7 @@ public class ServerConfiguration extends
/**
* Returns true if standalone. Default is false.
- *
+ *
* @return boolean
*/
public boolean isStandalone() {
@@ -304,8 +308,8 @@ public class ServerConfiguration extends
}
/**
- * Returns list of regions.
- *
+ * Returns list of regions.
+ *
* @return List<String>
*/
public List<String> getRegions() {
@@ -317,7 +321,7 @@ public class ServerConfiguration extends
/**
* Returns the name of the SSL certificate if available as a resource.
- *
+ *
* @return String
*/
public String getCertName() {
@@ -326,7 +330,7 @@ public class ServerConfiguration extends
/**
* This is the path to the SSL certificate if it is available as a file.
- *
+ *
* @return String
*/
public String getCertPath() {
@@ -351,7 +355,7 @@ public class ServerConfiguration extends
/**
* Returns the password used for BookKeeper ledgers. Default
* is the empty string.
- *
+ *
* @return
*/
public String getPassword() {
@@ -360,7 +364,7 @@ public class ServerConfiguration extends
/**
* Returns true if SSL is enabled. Default is false.
- *
+ *
* @return boolean
*/
public boolean isSSLEnabled() {
@@ -372,7 +376,7 @@ public class ServerConfiguration extends
* information about consumed messages. A value greater than
* one avoids persisting information about consumed messages
* upon every consumed message. Default is 50.
- *
+ *
* @return int
*/
public int getConsumeInterval() {
@@ -383,7 +387,7 @@ public class ServerConfiguration extends
* Returns the interval to release a topic. If this
* parameter is greater than zero, then schedule a
* task to release an owned topic. Default is 0 (never released).
- *
+ *
* @return int
*/
public int getRetentionSecs() {
@@ -422,7 +426,7 @@ public class ServerConfiguration extends
/**
* True if SSL is enabled across regions.
- *
+ *
* @return boolean
*/
public boolean isInterRegionSSLEnabled() {
@@ -430,10 +434,10 @@ public class ServerConfiguration extends
}
/**
- * This parameter is used to determine how often we run the
- * SubscriptionManager's Messages Consumed timer task thread
+ * This parameter is used to determine how often we run the
+ * SubscriptionManager's Messages Consumed timer task thread
* (in milliseconds).
- *
+ *
* @return int
*/
public int getMessagesConsumedThreadRunInterval() {
@@ -444,7 +448,7 @@ public class ServerConfiguration extends
* This parameter is used to determine how often we run a thread
* to retry those failed remote subscriptions in asynchronous mode
* (in milliseconds).
- *
+ *
* @return int
*/
public int getRetryRemoteSubscribeThreadRunInterval() {
@@ -455,7 +459,7 @@ public class ServerConfiguration extends
* This parameter is for setting the default maximum number of messages which
* can be delivered to a subscriber without being consumed.
* we pause messages delivery to a subscriber when reaching the window size
- *
+ *
* @return int
*/
public int getDefaultMessageWindowSize() {
@@ -466,7 +470,7 @@ public class ServerConfiguration extends
* This parameter is used when Bookkeeper is the persistence
* store and indicates what the ensemble size is (i.e. how
* many bookie servers to stripe the ledger entries across).
- *
+ *
* @return int
*/
public int getBkEnsembleSize() {
@@ -478,7 +482,7 @@ public class ServerConfiguration extends
* This parameter is used when Bookkeeper is the persistence store
* and indicates what the quorum size is (i.e. how many redundant
* copies of each ledger entry is written).
- *
+ *
* @return int
* @deprecated please use #getBkWriteQuorumSize() and #getBkAckQuorumSize()
*/
@@ -525,6 +529,33 @@ public class ServerConfiguration extends
return conf.getLong(MAX_ENTRIES_PER_LEDGER, 0L);
}
+ /**
+ * Get the tolerance percentage for the rebalancer. The rebalancer will not
+ * shed load if it's current load is less than average + average*tolerancePercentage/100.0
+ *
+ * @return the tolerance percentage for the rebalancer.
+ */
+ public double getRebalanceTolerance() {
+ return conf.getDouble(REBALANCE_TOLERANCE_PERCENTAGE, 10.0);
+ }
+
+ /**
+ * Get the maximum load the rebalancer can shed at once. Default is 50.
+ * @return
+ */
+ public HubLoad getRebalanceMaxShed() {
+ return new HubLoad(conf.getLong(REBALANCE_MAX_SHED, 50));
+ }
+
+ /**
+ * Get the interval(in seconds) between rebalancing attempts. The default is
+ * 5 minutes.
+ * @return
+ */
+ public long getRebalanceInterval() {
+ return conf.getLong(REBALANCE_INTERVAL_SEC, 300);
+ }
+
/*
* Is this a valid configuration that we can run with? This code might grow
* over time.
@@ -553,7 +584,14 @@ public class ServerConfiguration extends
throw new ConfigurationException("BK write quorum size (" + getBkWriteQuorumSize()
+ ") is less than the ack quorum size (" + getBkAckQuorumSize() + ")");
}
-
+ // Validate that the rebalance tolerance percentage is not negative.
+ if (getRebalanceTolerance() < 0.0) {
+ throw new ConfigurationException("The rebalance tolerance percentage cannot be negative.");
+ }
+ // Validate that the maximum load to shed during a rebalance is not negative.
+ if (getRebalanceMaxShed().getNumTopics() < 0L) {
+ throw new ConfigurationException("The maximum load to shed during a rebalance cannot be negative.");
+ }
// add other checks here
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java Fri Mar 7 17:39:37 2014
@@ -20,25 +20,25 @@ package org.apache.hedwig.server.topics;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.TopicOpQueuer;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.CallbackUtils;
+import org.apache.hedwig.util.HedwigSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.protobuf.ByteString;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.common.TopicOpQueuer;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.CallbackUtils;
-import org.apache.hedwig.util.HedwigSocketAddress;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
public abstract class AbstractTopicManager implements TopicManager {
@@ -204,15 +204,17 @@ public abstract class AbstractTopicManag
public void operationFailed(final Object ctx, final PubSubException exception) {
// TODO: optimization: we can release this as soon as we experience the first error.
Callback<Void> cb = new Callback<Void>() {
+ @Override
public void operationFinished(Object _ctx, Void _resultOfOperation) {
originalCallback.operationFailed(ctx, exception);
}
+ @Override
public void operationFailed(Object _ctx, PubSubException _exception) {
logger.error("Exception releasing topic", _exception);
originalCallback.operationFailed(ctx, exception);
}
};
-
+
realReleaseTopic(topic, cb, originalContext);
}
};
@@ -241,6 +243,52 @@ public abstract class AbstractTopicManag
queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, cb, ctx));
}
+ @Override
+ public final void releaseTopics(int numTopics, final Callback<Long> callback, final Object ctx) {
+ // This is a best effort function. We sacrifice accuracy to not hold a lock on the topics set.
+ List<ByteString> topicList = getTopicList();
+ // Make sure we release only as many topics as we own.
+ final long numTopicsToRelease = Math.min(topicList.size(), numTopics);
+ // Shuffle the list of topics we own, so that we release a random subset.
+ Collections.shuffle(topicList);
+ Callback<Void> mcb = CallbackUtils.multiCallback((int)numTopicsToRelease, new Callback<Void>() {
+ @Override
+ public void operationFinished(Object ctx, Void ignoreVal) {
+ callback.operationFinished(ctx, numTopicsToRelease);
+ }
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException e) {
+ long notReleased = 0;
+ if (e instanceof PubSubException.CompositeException) {
+ notReleased = ((PubSubException.CompositeException)e).getExceptions().size();
+ }
+ callback.operationFinished(ctx, numTopicsToRelease - notReleased);
+ }
+ }, ctx);
+
+ // Try to release "numTopicsToRelease" topics. It's okay if we're not
+ // able to release some topics. We signal that we tried by invoking the callback's
+ // operationFinished() with the actual number of topics released.
+ logger.info("This hub is releasing {} topics", numTopicsToRelease);
+ long releaseCount = 0;
+ for (ByteString topic : topicList) {
+ if (++releaseCount > numTopicsToRelease) {
+ break;
+ }
+ releaseTopic(topic, mcb, ctx);
+ }
+ }
+
+ @Override
+ public List<ByteString> getTopicList() {
+ List<ByteString> topicList;
+ synchronized (this.topics) {
+ topicList = Lists.newArrayList(this.topics.asMap().keySet());
+ }
+ return topicList;
+ }
+
/**
* This method should "return" the owner of the topic if one has been chosen
* already. If there is no pre-chosen owner, either this hub or some other
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java Fri Mar 7 17:39:37 2014
@@ -38,6 +38,8 @@ public class HubLoad implements Comparab
public static final HubLoad MIN_LOAD = new HubLoad(0);
public static class InvalidHubLoadException extends Exception {
+ private static final long serialVersionUID = 5870487176956413387L;
+
public InvalidHubLoadException(String msg) {
super(msg);
}
@@ -48,7 +50,7 @@ public class HubLoad implements Comparab
}
// how many topics that a hub server serves
- long numTopics;
+ long numTopics;
public HubLoad(long num) {
this.numTopics = num;
@@ -58,11 +60,16 @@ public class HubLoad implements Comparab
this.numTopics = data.getNumTopics();
}
+ // TODO: Make this threadsafe (BOOKKEEPER-379)
public HubLoad setNumTopics(long numTopics) {
this.numTopics = numTopics;
return this;
}
+ public long getNumTopics() {
+ return this.numTopics;
+ }
+
public HubLoadData toHubLoadData() {
return HubLoadData.newBuilder().setNumTopics(numTopics).build();
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java Fri Mar 7 17:39:37 2014
@@ -105,4 +105,20 @@ interface HubServerManager {
* Callback context.
*/
public void chooseLeastLoadedHub(Callback<HubInfo> callback, Object ctx);
+
+ /**
+ * Try to rebalance the load within the cluster. This function will get
+ * the {@link HubLoad} from all available hubs within the cluster, and then
+ * shed additional load.
+ *
+ * @param tolerancePercentage
+ * the percentage of load above average that is permissible.
+ * @param maxLoadToShed
+ * the maximum amount of load to shed per call.
+ * @param callback
+ * Callback indicating whether we reduced load or not.
+ * @param ctx
+ */
+ public void rebalanceCluster(double tolerancePercentage, HubLoad maxLoadToShed,
+ Callback<Boolean> callback, Object ctx);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java Fri Mar 7 17:39:37 2014
@@ -60,14 +60,14 @@ public class MMTopicManager extends Abst
// all of the Ops put into the queuer will fail automatically.
protected volatile boolean isSuspended = false;
- public MMTopicManager(ServerConfiguration cfg, ZooKeeper zk,
+ public MMTopicManager(ServerConfiguration cfg, ZooKeeper zk,
MetadataManagerFactory mmFactory,
ScheduledExecutorService scheduler)
throws UnknownHostException, PubSubException {
super(cfg, scheduler);
// initialize topic ownership manager
this.mm = mmFactory.newTopicOwnershipManager();
- this.hubManager = new ZkHubServerManager(cfg, zk, addr);
+ this.hubManager = new ZkHubServerManager(cfg, zk, addr, this);
final SynchronousQueue<Either<HubInfo, PubSubException>> queue =
new SynchronousQueue<Either<HubInfo, PubSubException>>();
@@ -289,6 +289,11 @@ public class MMTopicManager extends Abst
@Override
protected void postReleaseCleanup(final ByteString topic,
final Callback<Void> cb, final Object ctx) {
+
+ // Reduce load. We've removed the topic from our topic set, so do this as well.
+ // When we reclaim the topic, we will increment the load again.
+ hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size()));
+
mm.readOwnerInfo(topic, new Callback<Versioned<HubInfo>>() {
@Override
public void operationFinished(Object ctx, Versioned<HubInfo> owner) {
Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java?rev=1575338&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java Fri Mar 7 17:39:37 2014
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.util.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Shed load by releasing topics.
+ */
+public class TopicBasedLoadShedder {
+ private static final Logger logger = LoggerFactory.getLogger(TopicBasedLoadShedder.class);
+ private final double tolerancePercentage;
+ private final long maxLoadToShed;
+ private final TopicManager tm;
+ private final List<ByteString> topicList;
+
+ /**
+ * @param tm The topic manager used to handle load shedding
+ * @param tolerancePercentage The tolerance percentage for shedding load
+ * @param maxLoadToShed The maximum amoung of load to shed in one call.
+ */
+ public TopicBasedLoadShedder(TopicManager tm, double tolerancePercentage,
+ HubLoad maxLoadToShed) {
+ // Make sure that all functions in this class have a consistent view
+ // of the load. So, we use the same topic list throughout.
+ this(tm, tm.getTopicList(), tolerancePercentage, maxLoadToShed);
+ }
+
+ /**
+ * This is public because it makes testing easier.
+ * @param tm The topic manager used to handle load shedding
+ * @param topicList The topic list representing topics owned by this hub.
+ * @param tolerancePercentage The tolerance percentage for shedding load
+ * @param maxLoadToShed The maximum amoung of load to shed in one call.
+ */
+ TopicBasedLoadShedder(TopicManager tm, List<ByteString> topicList,
+ double tolerancePercentage,
+ HubLoad maxLoadToShed) {
+ this.tolerancePercentage = tolerancePercentage;
+ this.maxLoadToShed = maxLoadToShed.getNumTopics();
+ this.tm = tm;
+ this.topicList = topicList;
+ }
+
+ /**
+ * Reduce the load on the current hub so that it reaches the target load.
+ * We reduce load by releasing topics using the {@link TopicManager} passed
+ * to the constructor. We use {@link TopicManager#releaseTopics(int, org.apache.hedwig.util.Callback, Object)}
+ * to actually release topics.
+ *
+ * @param targetLoad
+ * @param callback
+ * a Callback<Long> that indicates how many topics we tried to release.
+ * @param ctx
+ */
+ public void reduceLoadTo(HubLoad targetLoad, final Callback<Long> callback, final Object ctx) {
+ int targetTopics = (int)targetLoad.toHubLoadData().getNumTopics();
+ int numTopicsToRelease = topicList.size() - targetTopics;
+
+ // The number of topics we own is less than the target topic size. We don't release
+ // any topics in this case.
+ if (numTopicsToRelease <= 0) {
+ callback.operationFinished(ctx, 0L);
+ return;
+ }
+ // Call releaseTopics() on the topic manager to do this. We let the manager handle the release
+ // policy.
+ tm.releaseTopics(numTopicsToRelease, callback, ctx);
+ }
+
+ /**
+ * Calculate the average number of topics on the currently active hubs and release topics
+ * if required.
+ * We shed topics if we currently hold topics greater than average + average * tolerancePercentage/100.0
+ * We shed a maximum of maxLoadToShed topics
+ * We also hold on to at least one topic.
+ * @param loadMap
+ * @param callback
+ * A return value of true means we tried to rebalance. False means that there was
+ * no need to rebalance.
+ * @param ctx
+ */
+ public void shedLoad(final Map<HubInfo, HubLoad> loadMap, final Callback<Boolean> callback,
+ final Object ctx) {
+
+ long totalTopics = 0L;
+ long myTopics = topicList.size();
+ for (Map.Entry<HubInfo, HubLoad> entry : loadMap.entrySet()) {
+ if (null == entry.getKey() || null == entry.getValue()) {
+ continue;
+ }
+ totalTopics += entry.getValue().toHubLoadData().getNumTopics();
+ }
+
+ double averageTopics = (double)totalTopics/loadMap.size();
+ logger.info("Total topics in the cluster : {}. Average : {}.", totalTopics, averageTopics);
+
+ // Handle the case when averageTopics == 0. We hold on to at least 1 topic.
+ long permissibleTopics =
+ Math.max(1L, (long) Math.ceil(averageTopics + averageTopics * tolerancePercentage / 100.0));
+ logger.info("Permissible topics : {}. Number of topics this hub holds : {}.", permissibleTopics, myTopics);
+ if (myTopics <= permissibleTopics) {
+ // My owned topics are less than those permitted by the current tolerance level. No need to release
+ // any topics.
+ callback.operationFinished(ctx, false);
+ return;
+ }
+
+ // The number of topics I own is more than what I should be holding. We shall now attempt to shed some load.
+ // We shed at most maxLoadToShed number of topics. We also hold on to at least 1 topic.
+ long targetNumTopics = Math.max(1L, Math.max((long)Math.ceil(averageTopics), myTopics - maxLoadToShed));
+
+ // Reduce the load on the current hub to the target load we calculated above.
+ logger.info("Reducing load on this hub to {} topics.", targetNumTopics);
+ reduceLoadTo(new HubLoad(targetNumTopics), new Callback<Long>() {
+ @Override
+ public void operationFinished(Object ctx, Long numReleased) {
+ logger.info("Released {} topics to shed load.", numReleased);
+ callback.operationFinished(ctx, true);
+ }
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException e) {
+ callback.operationFailed(ctx, e);
+ }
+ }, ctx);
+ }
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java Fri Mar 7 17:39:37 2014
@@ -23,6 +23,8 @@ import org.apache.hedwig.server.persiste
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.HedwigSocketAddress;
+import java.util.List;
+
/**
* An implementor of this interface is basically responsible for ensuring that
* there is at most a single host responsible for a given topic at a given time.
@@ -81,6 +83,23 @@ public interface TopicManager {
public void releaseTopic(ByteString topic, Callback<Void> cb, Object ctx);
/**
+ * Release numTopics topics. If you hold fewer, release all.
+ * @param numTopics
+ * Number of topics to release.
+ * @param callback
+ * The callback should be invoked with the number of topics the hub
+ * released successfully.
+ * @param ctx
+ */
+ public void releaseTopics(int numTopics, Callback<Long> callback, Object ctx);
+
+ /**
+ * Get the list of topics this hub believes it is responsible for.
+ * @return
+ */
+ public List<ByteString> getTopicList();
+
+ /**
* Stop topic manager
*/
public void stop();
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java Fri Mar 7 17:39:37 2014
@@ -17,11 +17,16 @@
*/
package org.apache.hedwig.server.topics;
+import static com.google.common.base.Charsets.UTF_8;
+
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
-
-import static com.google.common.base.Charsets.UTF_8;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.server.common.ServerConfiguration;
@@ -38,7 +43,6 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +58,7 @@ class ZkHubServerManager implements HubS
private final ServerConfiguration conf;
private final ZooKeeper zk;
private final HedwigSocketAddress addr;
+ private final TopicManager tm;
private final String ephemeralNodePath;
private final String hubNodesPath;
@@ -61,6 +66,7 @@ class ZkHubServerManager implements HubS
protected HubInfo myHubInfo;
protected volatile boolean isSuspended = false;
protected ManagerListener listener = null;
+ protected final ScheduledExecutorService executor;
// upload hub server load to zookeeper
StatCallback loadReportingStatCallback = new StatCallback() {
@@ -100,25 +106,90 @@ class ZkHubServerManager implements HubS
if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
logger.error("ZK client connection to the ZK server has expired.!");
if (null != listener) {
+ // Shutdown our executor NOW!
+ executor.shutdownNow();
listener.onShutdown();
}
}
}
}
+ class RebalanceRunnable implements Runnable {
+ private final double tolerancePercentage;
+ private final HubLoad maxLoadToShed;
+ private final long delaySeconds;
+
+ public RebalanceRunnable(double tolerancePercentage,
+ HubLoad maxLoadToShed,
+ long delaySeconds) {
+ this.tolerancePercentage = tolerancePercentage;
+ this.maxLoadToShed = maxLoadToShed;
+ this.delaySeconds = delaySeconds;
+ }
+
+ @Override
+ public void run() {
+ // If we are in suspended state, don't attempt a rebalance.
+ if (isSuspended) {
+ executor.schedule(this, delaySeconds, TimeUnit.SECONDS);
+ return;
+ }
+ // We should attempt a rebalance. We reschedule the job at the tail so that
+ // two rebalances don't happen simultaneously.
+ rebalanceCluster(tolerancePercentage, maxLoadToShed, new Callback<Boolean>() {
+ private void reschedule(Runnable task) {
+ executor.schedule(task, delaySeconds, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void operationFinished(Object ctx, Boolean didRebalance) {
+ if (didRebalance == true) {
+ logger.info("The attempt to rebalance the cluster was successful");
+ } else {
+ logger.info("There was no need to rebalance.");
+ }
+ // Our original runnable was passed as the context.
+ reschedule((Runnable)ctx);
+ }
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException e) {
+ logger.error("The attempt to rebalance the cluster did not succeed.", e);
+ // Reschedule the job
+ reschedule((Runnable)ctx);
+ }
+ }, this);
+ }
+
+ public void start() {
+ // Initiate only if delaySeconds > 0
+ if (delaySeconds > 0) {
+ logger.info("Starting the rebalancer thread with tolerance={}, maxLoadToShed={} and delay={}",
+ new Object[] { tolerancePercentage, maxLoadToShed.getNumTopics(), delaySeconds });
+ executor.schedule(this, delaySeconds, TimeUnit.SECONDS);
+ }
+ }
+ }
+
public ZkHubServerManager(ServerConfiguration conf,
ZooKeeper zk,
- HedwigSocketAddress addr) {
+ HedwigSocketAddress addr,
+ TopicManager tm) {
this.conf = conf;
this.zk = zk;
this.addr = addr;
-
+ this.tm = tm;
// znode path to store all available hub servers
this.hubNodesPath = this.conf.getZkHostsPrefix(new StringBuilder()).toString();
// the node's ephemeral node path
this.ephemeralNodePath = getHubZkNodePath(addr);
+ this.executor = Executors.newSingleThreadScheduledExecutor();
// register available hub servers list watcher
zk.register(new ZkHubsWatcher());
+
+ // Start the rebalancer here.
+ new RebalanceRunnable(conf.getRebalanceTolerance(), conf.getRebalanceMaxShed(),
+ conf.getRebalanceInterval()).start();
}
@Override
@@ -157,7 +228,7 @@ class ZkHubServerManager implements HubS
return;
} else {
callback.operationFailed(ctx,
- new PubSubException.ServiceDownException(
+ new PubSubException.ServiceDownException(
"I can't state my hub node after I created it : "
+ ephemeralNodePath));
return;
@@ -167,7 +238,7 @@ class ZkHubServerManager implements HubS
return;
}
if (rc != Code.NODEEXISTS.intValue()) {
- KeeperException ke = ZkUtils .logErrorAndCreateZKException(
+ KeeperException ke = ZkUtils.logErrorAndCreateZKException(
"Could not create ephemeral node to register hub", ephemeralNodePath, rc);
callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
return;
@@ -283,7 +354,7 @@ class ZkHubServerManager implements HubS
if (numResponses == children.size()) {
if (leastLoaded == null) {
- callback.operationFailed(ctx,
+ callback.operationFailed(ctx,
new PubSubException.ServiceDownException("No hub available"));
return;
}
@@ -305,4 +376,95 @@ class ZkHubServerManager implements HubS
dataCallback, child);
}
}
+
+ /**
+ * Get a map of all currently active hubs with their advertised load.
+ * @param callback
+ * @param originalCtx
+ */
+ private void getActiveHubsInfoWithLoad(final Callback<Map<HubInfo, HubLoad>> callback,
+ final Object originalCtx) {
+ // Get the list of children and then for each child, get the data. All asynchronously.
+ zk.getChildren(hubNodesPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
+ @Override
+ public void safeProcessResult(int rc, String path, Object ctx, final List<String> children) {
+ if (rc != Code.OK.intValue()) {
+ KeeperException e = ZkUtils.logErrorAndCreateZKException(
+ "Could not get children for given path", path, rc);
+ callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
+ return;
+ }
+
+ // The data callback for every child node
+ SafeAsyncZKCallback.DataCallback dataCallback = new SafeAsyncZKCallback.DataCallback() {
+ Map<HubInfo, HubLoad> loadMap = new HashMap<HubInfo, HubLoad>();
+ int numResponse = 0;
+ @Override
+ public void safeProcessResult(int rc, String path, Object dataCtx,
+ byte[] data, Stat stat) {
+ synchronized (this) {
+ if (rc == Code.OK.intValue()) {
+ // Put this load in the map. dataCtx is actually the child string which is the
+ // IP:PORT:SSL representation of the hub.
+ try {
+ HubInfo hubInfo =
+ new HubInfo(new HedwigSocketAddress((String)dataCtx), stat.getCzxid());
+ HubLoad hubLoad = HubLoad.parse(new String(data, UTF_8));
+ this.loadMap.put(hubInfo, hubLoad);
+ } catch (HubLoad.InvalidHubLoadException e) {
+ logger.warn("Corrupt data found for a hub. Ignoring.");
+ }
+ }
+ numResponse++;
+ if (numResponse == children.size()) {
+ // We got less number of valid responses than the hubs we saw previously.
+ // Signal an error.
+ if (loadMap.size() != numResponse) {
+ callback.operationFailed(originalCtx,
+ new PubSubException.UnexpectedConditionException(
+ "Fewer OK responses than the number of active hubs seen previously."));
+ return;
+ }
+ // We've seen all responses. All OK.
+ callback.operationFinished(originalCtx, loadMap);
+ }
+ }
+ }
+ };
+
+ for (String child : children) {
+ String znode = conf.getZkHostsPrefix(new StringBuilder()).append("/").append(child).toString();
+ zk.getData(znode, false, dataCallback, child);
+ }
+ }
+ }, originalCtx);
+ }
+
+ @Override
+ public void rebalanceCluster(final double tolerancePercentage, final HubLoad maxLoadToShed,
+ final Callback<Boolean> callback, final Object ctx) {
+ // Get the load on all active hubs and then shed load if required.
+ getActiveHubsInfoWithLoad(new Callback<Map<HubInfo, HubLoad>>() {
+ @Override
+ public void operationFinished(Object ctx, Map<HubInfo, HubLoad> loadMap) {
+ if (null == tm) {
+ // No topic manager, so no load to shed.
+ callback.operationFinished(ctx, false);
+ return;
+ }
+ TopicBasedLoadShedder tbls = new TopicBasedLoadShedder(tm,
+ tolerancePercentage, maxLoadToShed);
+ tbls.shedLoad(loadMap, callback, ctx);
+ }
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException e) {
+ // Rebalance failed. Log this and signal failure on the callback.
+ logger.error("Failed to get active hubs. Cannot attempt a rebalance.");
+ callback.operationFailed(ctx, e);
+ }
+ }, ctx);
+ }
+
+
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java?rev=1575338&r1=1575337&r2=1575338&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java Fri Mar 7 17:39:37 2014
@@ -77,7 +77,7 @@ public class ZkTopicManager extends Abst
super(cfg, scheduler);
this.zk = zk;
- this.hubManager = new ZkHubServerManager(cfg, zk, addr);
+ this.hubManager = new ZkHubServerManager(cfg, zk, addr, this);
myHubLoad = new HubLoad(topics.size());
this.hubManager.registerListener(new HubServerManager.ManagerListener() {
@@ -275,6 +275,10 @@ public class ZkTopicManager extends Abst
@Override
protected void postReleaseCleanup(final ByteString topic, final Callback<Void> cb, Object ctx) {
+ // Reduce load. We've removed the topic from our topic set, so do this as well.
+ // When we reclaim the topic, we will increment the load again.
+ hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size()));
+
zk.getData(hubPath(topic), false, new SafeAsyncZKCallback.DataCallback() {
@Override
public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java?rev=1575338&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/topics/TestTopicBasedLoadShedder.java Fri Mar 7 17:39:37 2014
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.topics;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.SynchronousQueue;
+
+import junit.framework.Assert;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.HedwigSocketAddress;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+
+public class TestTopicBasedLoadShedder {
+
+ final protected SynchronousQueue<Boolean> statusQueue = new SynchronousQueue<Boolean>();
+ private int myTopics = 10;
+ private int numHubs = 10;
+ private List<ByteString> mockTopicList;
+ private final HubLoad infiniteMaxLoad = new HubLoad(10000000);
+ Map<HubInfo, HubLoad> mockLoadMap = new HashMap<HubInfo, HubLoad>();
+
+ class MockTopicBasedLoadShedder extends TopicBasedLoadShedder {
+ // This is set by the reduceLoadTo function.
+ public HubLoad targetLoad;
+ public MockTopicBasedLoadShedder(TopicManager tm, List<ByteString> topicList,
+ Double tolerancePercentage, HubLoad maxLoadToShed) {
+ super(tm, topicList, tolerancePercentage, maxLoadToShed);
+ }
+ @Override
+ public void reduceLoadTo(HubLoad targetLoad, final Callback<Long> callback, final Object ctx) {
+ this.targetLoad = targetLoad;
+ // Indicates that we released these many topics.
+ callback.operationFinished(ctx, targetLoad.toHubLoadData().getNumTopics());
+ }
+ }
+ public Callback<Boolean> getShedLoadCallback(final MockTopicBasedLoadShedder ls, final HubLoad expected,
+ final Boolean shouldRelease, final Boolean shouldFail) {
+ return new Callback<Boolean>() {
+ @Override
+ public void operationFinished(Object o, Boolean aBoolean) {
+ Boolean status = false;
+ status = (aBoolean == shouldRelease);
+ if (shouldRelease) {
+ status &= (ls.targetLoad != null);
+ status &= (expected.numTopics == ls.targetLoad.numTopics);
+ }
+ final Boolean statusToPut = status;
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ ConcurrencyUtils.put(statusQueue, statusToPut);
+ }
+ }).start();
+ }
+
+ @Override
+ public void operationFailed(Object o, PubSubException e) {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ ConcurrencyUtils.put(statusQueue, shouldFail);
+ }
+ }).start();
+ }
+ };
+ }
+
+ private List<ByteString> getMockTopicList(int numTopics) {
+ List<ByteString> topics = new ArrayList<ByteString>();
+ for (int i = 0; i < numTopics; i++) {
+ topics.add(ByteString.copyFromUtf8("MyTopic_" + i));
+ }
+ return topics;
+ }
+
+ private HubInfo getHubInfo(int hubNum) {
+ return new HubInfo(new HedwigSocketAddress("myhub.testdomain.foo"+hubNum+":4080:4080"), 0);
+ }
+
+ private synchronized void initialize(int myTopics, int numHubs, int[] otherHubsLoad) {
+ if (null != otherHubsLoad) {
+ Assert.assertTrue(otherHubsLoad.length == numHubs - 1);
+ }
+ this.myTopics = myTopics;
+ mockTopicList = getMockTopicList(this.myTopics);
+ this.numHubs = numHubs;
+ this.mockLoadMap.clear();
+ this.mockLoadMap.put(getHubInfo(0), new HubLoad(this.myTopics));
+ for (int i = 1; i < this.numHubs; i++) {
+ this.mockLoadMap.put(getHubInfo(i), new HubLoad(otherHubsLoad[i-1]));
+ }
+ }
+
+ private int[] getEqualLoadDistributionArray(int n, int load) {
+ if (n == 0) {
+ return null;
+ }
+ int[] retLoad = new int[n];
+ Arrays.fill(retLoad, load);
+ return retLoad;
+ }
+
+ @Test(timeout = 60000)
+ public synchronized void testAllHubsSameTopics() throws Exception {
+ // All hubs have the same number of topics. We should not release any topics even with a
+ // tolerance of 0.0.
+ initialize(10, 10, getEqualLoadDistributionArray(9, 10));
+ MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad);
+ tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, null, false, false), null);
+ Assert.assertTrue(statusQueue.take());
+ }
+
+ @Test(timeout = 60000)
+ public synchronized void testOneHubUnequalTopics() throws Exception {
+ // The hub has 20 topics while the average is 11. Should reduce the load to 11.
+ initialize(20, 10, getEqualLoadDistributionArray(9, 10));
+ MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad);
+ tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(11), true, false), null);
+ Assert.assertTrue(statusQueue.take());
+ }
+
+ @Test(timeout = 60000)
+ public synchronized void testOneHubUnequalTopicsWithTolerance() throws Exception {
+ // The hub has 20 topics and average is 11. Should still release as tolerance level of 50.0 is
+ // breached. Should get down to average.
+ initialize(20, 10, getEqualLoadDistributionArray(9, 10));
+ MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 50.0, infiniteMaxLoad);
+ tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(11), true, false), null);
+ Assert.assertTrue(statusQueue.take());
+
+ // A tolerance level of 100.0 should result in the hub not releasing topics.
+ tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 100.0, infiniteMaxLoad);
+ tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, null, false, false), null);
+ Assert.assertTrue(statusQueue.take());
+ }
+
+ @Test(timeout = 60000)
+ public synchronized void testMaxLoadShed() throws Exception {
+ // The hub should not shed more than maxLoadShed topics.
+ initialize(20, 10, getEqualLoadDistributionArray(9, 10));
+ MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, new HubLoad(5));
+ // Our load should reduce to 15.
+ tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(15), true, false), null);
+ Assert.assertTrue(statusQueue.take());
+
+ // We should reduce to 11 even when maxLoadShed and average result in the same
+ // values
+ tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, new HubLoad(9));
+ tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(11), true, false), null);
+ Assert.assertTrue(statusQueue.take());
+ }
+
+ @Test(timeout = 60000)
+ public synchronized void testSingleHubLoadShed() throws Exception {
+ // If this is the only hub in the cluster, it should not release any topics.
+ initialize(20, 1, null);
+ MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad);
+ tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, null, false, false), null);
+ Assert.assertTrue(statusQueue.take());
+ }
+
+ @Test(timeout = 60000)
+ public synchronized void testUnderloadedClusterLoadShed() throws Exception {
+ // Hold on to at least one topic while shedding load (if cluster is underloaded)
+ initialize(5, 10, getEqualLoadDistributionArray(9, 0));
+ MockTopicBasedLoadShedder tbls = new MockTopicBasedLoadShedder(null, mockTopicList, 0.0, infiniteMaxLoad);
+ tbls.shedLoad(mockLoadMap, getShedLoadCallback(tbls, new HubLoad(1), true, false), null);
+ Assert.assertTrue(statusQueue.take());
+ }
+}