You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/20 01:08:34 UTC
[bookkeeper] branch master updated: ISSUE #1495: Option to enforce
minNumRacksPerWriteQuorum
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 10c5103 ISSUE #1495: Option to enforce minNumRacksPerWriteQuorum
10c5103 is described below
commit 10c510333d904debf54eb832524f6e09dcc0685d
Author: cguttapalem <cg...@salesforce.com>
AuthorDate: Tue Jun 19 18:08:26 2018 -0700
ISSUE #1495: Option to enforce minNumRacksPerWriteQuorum
Descriptions of the changes in this PR:
Provide an option to enforce the guarantee of minNumRacksPerWriteQuorum
if it is enabled. If it cann't find a bookie to enforce the guarantee
then the API in RackawareEnsemblePlacementPolicy should throw
BKNotEnoughBookiesException.
Master Issue: #1495
Author: cguttapalem <cg...@salesforce.com>
Reviewers: Sijie Guo <si...@apache.org>
This closes #1496 from reddycharan/enforceminracks, closes #1495
---
.../ITopologyAwareEnsemblePlacementPolicy.java | 54 ++-
.../client/RackawareEnsemblePlacementPolicy.java | 68 ++-
.../RackawareEnsemblePlacementPolicyImpl.java | 87 ++--
.../client/RegionAwareEnsemblePlacementPolicy.java | 9 +-
.../bookkeeper/conf/AbstractConfiguration.java | 17 +
.../TestRackawareEnsemblePlacementPolicy.java | 524 ++++++++++++++++++++-
conf/bk_server.conf | 9 +
site/_data/config/bk_server.yaml | 6 +
8 files changed, 726 insertions(+), 48 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
index 4a05e18..87e6f18 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
@@ -20,6 +20,10 @@ package org.apache.bookkeeper.client;
import java.util.ArrayList;
import java.util.Set;
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
+import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate;
+import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.BookieNode;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -109,16 +113,64 @@ public interface ITopologyAwareEnsemblePlacementPolicy<T extends Node> extends E
* predicate to apply
* @param ensemble
* ensemble
+ * @param fallbackToRandom
+ * fallbackToRandom
* @return the selected bookie.
* @throws BKException.BKNotEnoughBookiesException
*/
T selectFromNetworkLocation(String networkLoc,
Set<Node> excludeBookies,
Predicate<T> predicate,
- Ensemble<T> ensemble)
+ Ensemble<T> ensemble,
+ boolean fallbackToRandom)
throws BKException.BKNotEnoughBookiesException;
/**
+ * Select a node from cluster excluding excludeBookies and bookie nodes of
+ * excludeRacks. If there isn't a BookieNode excluding those racks and
+ * nodes, then if fallbackToRandom is set to true then pick a random node
+ * from cluster just excluding excludeBookies.
+ *
+ * @param excludeRacks
+ * @param excludeBookies
+ * @param predicate
+ * @param ensemble
+ * @param fallbackToRandom
+ * @return
+ * @throws BKException.BKNotEnoughBookiesException
+ */
+ T selectFromNetworkLocation(Set<String> excludeRacks,
+ Set<Node> excludeBookies,
+ Predicate<BookieNode> predicate,
+ Ensemble<BookieNode> ensemble,
+ boolean fallbackToRandom)
+ throws BKException.BKNotEnoughBookiesException;
+
+ /**
+ * Select a node from networkLoc rack excluding excludeBookies. If there
+ * isn't any node in 'networkLoc', then it will try to get a node from
+ * cluster excluding excludeRacks and excludeBookies. If fallbackToRandom is
+ * set to true then it will get a random bookie from cluster excluding
+ * excludeBookies if it couldn't find a bookie
+ *
+ * @param networkLoc
+ * @param excludeRacks
+ * @param excludeBookies
+ * @param predicate
+ * @param ensemble
+ * @param fallbackToRandom
+ * @return
+ * @throws BKNotEnoughBookiesException
+ */
+ T selectFromNetworkLocation(String networkLoc,
+ Set<String> excludeRacks,
+ Set<Node> excludeBookies,
+ Predicate<BookieNode> predicate,
+ Ensemble<BookieNode> ensemble,
+ boolean fallbackToRandom)
+ throws BKNotEnoughBookiesException;
+
+ /**
* Handle bookies that left.
*
* @param leftBookies
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index e3ce8ff..d41ca9f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -23,6 +23,10 @@ import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
+import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Predicate;
+import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.Node;
@@ -54,18 +58,19 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
boolean isWeighted,
int maxWeightMultiple,
int minNumRacksPerWriteQuorum,
- StatsLogger statsLogger) {
+ boolean enforceMinNumRacksPerWriteQuorum,
+ StatsLogger statsLogger) {
if (stabilizePeriodSeconds > 0) {
super.initialize(dnsResolver, timer, reorderReadsRandom, 0, reorderThresholdPendingRequests, isWeighted,
- maxWeightMultiple, minNumRacksPerWriteQuorum, statsLogger);
+ maxWeightMultiple, minNumRacksPerWriteQuorum, enforceMinNumRacksPerWriteQuorum, statsLogger);
slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability);
slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
- reorderThresholdPendingRequests, isWeighted, maxWeightMultiple,
- minNumRacksPerWriteQuorum, statsLogger);
+ reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum,
+ enforceMinNumRacksPerWriteQuorum, statsLogger);
} else {
super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds,
- reorderThresholdPendingRequests, isWeighted, maxWeightMultiple,
- minNumRacksPerWriteQuorum, statsLogger);
+ reorderThresholdPendingRequests, isWeighted, maxWeightMultiple, minNumRacksPerWriteQuorum,
+ enforceMinNumRacksPerWriteQuorum, statsLogger);
slave = null;
}
return this;
@@ -171,15 +176,60 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
String networkLoc,
Set<Node> excludeBookies,
Predicate<BookieNode> predicate,
- Ensemble<BookieNode> ensemble)
+ Ensemble<BookieNode> ensemble,
+ boolean fallbackToRandom)
throws BKException.BKNotEnoughBookiesException {
try {
- return super.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble);
+ return super.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble,
+ fallbackToRandom);
} catch (BKException.BKNotEnoughBookiesException bnebe) {
if (slave == null) {
throw bnebe;
} else {
- return slave.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble);
+ return slave.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble,
+ fallbackToRandom);
+ }
+ }
+ }
+
+ @Override
+ public BookieNode selectFromNetworkLocation(
+ Set<String> excludeRacks,
+ Set<Node> excludeBookies,
+ Predicate<BookieNode> predicate,
+ Ensemble<BookieNode> ensemble,
+ boolean fallbackToRandom)
+ throws BKException.BKNotEnoughBookiesException {
+ try {
+ return super.selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble, fallbackToRandom);
+ } catch (BKException.BKNotEnoughBookiesException bnebe) {
+ if (slave == null) {
+ throw bnebe;
+ } else {
+ return slave.selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble,
+ fallbackToRandom);
+ }
+ }
+ }
+
+ @Override
+ public BookieNode selectFromNetworkLocation(
+ String networkLoc,
+ Set<String> excludeRacks,
+ Set<Node> excludeBookies,
+ Predicate<BookieNode> predicate,
+ Ensemble<BookieNode> ensemble,
+ boolean fallbackToRandom)
+ throws BKNotEnoughBookiesException {
+ try {
+ return super.selectFromNetworkLocation(networkLoc, excludeRacks, excludeBookies, predicate, ensemble,
+ fallbackToRandom);
+ } catch (BKException.BKNotEnoughBookiesException bnebe) {
+ if (slave == null) {
+ throw bnebe;
+ } else {
+ return slave.selectFromNetworkLocation(networkLoc, excludeRacks, excludeBookies, predicate, ensemble,
+ fallbackToRandom);
}
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 23afaa9..ae640bb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -79,7 +79,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
int maxWeightMultiple;
private Map<BookieNode, WeightedObject> bookieInfoMap = new HashMap<BookieNode, WeightedObject>();
private WeightedRandomSelection<BookieNode> weightedSelection;
+
protected int minNumRacksPerWriteQuorum;
+ protected boolean enforceMinNumRacksPerWriteQuorum;
public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass";
public static final String REPP_RANDOM_READ_REORDERING = "ensembleRandomReadReordering";
@@ -238,6 +240,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
boolean isWeighted,
int maxWeightMultiple,
int minNumRacksPerWriteQuorum,
+ boolean enforceMinNumRacksPerWriteQuorum,
StatsLogger statsLogger) {
checkNotNull(statsLogger, "statsLogger should not be null, use NullStatsLogger instead.");
this.statsLogger = statsLogger;
@@ -250,6 +253,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
this.dnsResolver = new DNSResolverDecorator(dnsResolver, () -> this.getDefaultRack());
this.timer = timer;
this.minNumRacksPerWriteQuorum = minNumRacksPerWriteQuorum;
+ this.enforceMinNumRacksPerWriteQuorum = enforceMinNumRacksPerWriteQuorum;
// create the network topology
if (stabilizePeriodSeconds > 0) {
@@ -339,6 +343,7 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
conf.getDiskWeightBasedPlacementEnabled(),
conf.getBookieMaxWeightMultipleForWeightBasedPlacement(),
conf.getMinNumRacksPerWriteQuorum(),
+ conf.getEnforceMinNumRacksPerWriteQuorum(),
statsLogger);
}
@@ -536,6 +541,10 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
int numRacks = topology.getNumOfRacks();
// only one rack, use the random algorithm.
if (numRacks < 2) {
+ if (enforceMinNumRacksPerWriteQuorum && (minNumRacksPerWriteQuorumForThisEnsemble > 1)) {
+ LOG.error("Only one rack available and minNumRacksPerWriteQuorum is enforced, so giving up");
+ throw new BKNotEnoughBookiesException();
+ }
List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes, TruePredicate.INSTANCE,
ensemble);
ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>(ensembleSize);
@@ -610,7 +619,9 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
}
curRack = sb.toString();
}
- prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble);
+ boolean firstBookieInTheEnsemble = (null == prevNode);
+ prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble,
+ !enforceMinNumRacksPerWriteQuorum || firstBookieInTheEnsemble);
racks[i] = prevNode.getNetworkLocation();
}
ArrayList<BookieSocketAddress> bookieList = ensemble.toList();
@@ -657,7 +668,8 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
networkLocationsToBeExcluded,
excludeNodes,
TruePredicate.INSTANCE,
- EnsembleForReplacementWithNoConstraints.INSTANCE);
+ EnsembleForReplacementWithNoConstraints.INSTANCE,
+ !enforceMinNumRacksPerWriteQuorum);
if (LOG.isDebugEnabled()) {
LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bn);
}
@@ -698,12 +710,20 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
String networkLoc,
Set<Node> excludeBookies,
Predicate<BookieNode> predicate,
- Ensemble<BookieNode> ensemble)
+ Ensemble<BookieNode> ensemble,
+ boolean fallbackToRandom)
throws BKNotEnoughBookiesException {
// select one from local rack
try {
return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble);
} catch (BKNotEnoughBookiesException e) {
+ if (!fallbackToRandom) {
+ LOG.error(
+ "Failed to choose a bookie from {} : "
+ + "excluded {}, enforceMinNumRacksPerWriteQuorum is enabled so giving up.",
+ networkLoc, excludeBookies);
+ throw e;
+ }
LOG.warn("Failed to choose a bookie from {} : "
+ "excluded {}, fallback to choose bookie randomly from the cluster.",
networkLoc, excludeBookies);
@@ -712,28 +732,25 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
}
}
- protected BookieNode selectFromNetworkLocation(String networkLoc,
+ @Override
+ public BookieNode selectFromNetworkLocation(String networkLoc,
Set<String> excludeRacks,
Set<Node> excludeBookies,
Predicate<BookieNode> predicate,
- Ensemble<BookieNode> ensemble)
+ Ensemble<BookieNode> ensemble,
+ boolean fallbackToRandom)
throws BKNotEnoughBookiesException {
// first attempt to select one from local rack
try {
return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble);
} catch (BKNotEnoughBookiesException e) {
- if (isWeighted) {
- // if weight based selection is enabled, randomly select one from the whole cluster
- // based on weights and ignore the provided <tt>excludeRacks</tt>.
- // randomly choose one from whole cluster, ignore the provided predicate.
- return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
- } else {
- // if weight based selection is disabled, and there is no enough bookie from local rack,
- // select bookies from the whole cluster and exclude the racks specified at <tt>excludeRacks</tt>.
- return selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble);
- }
+ /*
+ * there is no enough bookie from local rack, select bookies from
+ * the whole cluster and exclude the racks specified at
+ * <tt>excludeRacks</tt>.
+ */
+ return selectFromNetworkLocation(excludeRacks, excludeBookies, predicate, ensemble, fallbackToRandom);
}
-
}
@@ -742,27 +759,39 @@ public class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsembleP
* <i>excludeBookies</i> set. If it fails to find one, it selects a random {@link BookieNode} from the whole
* cluster.
*/
- protected BookieNode selectFromNetworkLocation(Set<String> excludeRacks,
+ @Override
+ public BookieNode selectFromNetworkLocation(Set<String> excludeRacks,
Set<Node> excludeBookies,
Predicate<BookieNode> predicate,
- Ensemble<BookieNode> ensemble)
+ Ensemble<BookieNode> ensemble,
+ boolean fallbackToRandom)
throws BKNotEnoughBookiesException {
- List<BookieNode> knownNodes = new ArrayList<>(knownBookies.values());
- Collections.shuffle(knownNodes);
+ List<BookieNode> knownNodes = new ArrayList<>(knownBookies.values());
+ Set<Node> fullExclusionBookiesList = new HashSet<Node>(excludeBookies);
for (BookieNode knownNode : knownNodes) {
- if (excludeBookies.contains(knownNode)) {
- continue;
- }
if (excludeRacks.contains(knownNode.getNetworkLocation())) {
- continue;
+ fullExclusionBookiesList.add(knownNode);
+ }
+ }
+
+ try {
+ return selectRandomInternal(knownNodes, 1, fullExclusionBookiesList, TruePredicate.INSTANCE,
+ EnsembleForReplacementWithNoConstraints.INSTANCE).get(0);
+ } catch (BKNotEnoughBookiesException e) {
+ if (!fallbackToRandom) {
+ LOG.error(
+ "Failed to choose a bookie excluding Racks: {} "
+ + "Nodes: {}, enforceMinNumRacksPerWriteQuorum is enabled so giving up.",
+ excludeRacks, excludeBookies);
+ throw e;
}
- return knownNode;
+
+ LOG.warn("Failed to choose a bookie: excluded {}, fallback to choose bookie randomly from the cluster.",
+ excludeBookies);
+ // randomly choose one from whole cluster
+ return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
}
- LOG.warn("Failed to choose a bookie: excluded {}, fallback to choose bookie randomly from the cluster.",
- excludeBookies);
- // randomly choose one from whole cluster
- return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
}
private WeightedRandomSelection<BookieNode> prepareForWeightedSelection(List<Node> leaves) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 218781b..dcd4b17 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -131,7 +131,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy()
.initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds,
this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple,
- this.minNumRacksPerWriteQuorum, statsLogger)
+ this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, statsLogger)
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
}
@@ -176,11 +176,11 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
// Regions are specified as
// R1;R2;...
String[] regions = regionsString.split(";");
- for (String region: regions) {
+ for (String region : regions) {
perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy(true)
.initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds,
this.reorderThresholdPendingRequests, this.isWeighted, this.maxWeightMultiple,
- this.minNumRacksPerWriteQuorum, statsLogger)
+ this.minNumRacksPerWriteQuorum, this.enforceMinNumRacksPerWriteQuorum, statsLogger)
.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK));
}
minRegionsForDurability = conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY,
@@ -502,7 +502,8 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
bookieNodeToReplace.getNetworkLocation(),
excludeBookies,
TruePredicate.INSTANCE,
- EnsembleForReplacementWithNoConstraints.INSTANCE);
+ EnsembleForReplacementWithNoConstraints.INSTANCE,
+ true);
} catch (BKException.BKNotEnoughBookiesException e) {
LOG.warn("Failed to choose a bookie from {} : "
+ "excluded {}, fallback to choose bookie randomly from the cluster.",
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index c813d5f..2c80f43 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -144,6 +144,9 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
// minimum number of racks per write quorum
public static final String MIN_NUM_RACKS_PER_WRITE_QUORUM = "minNumRacksPerWriteQuorum";
+ // enforce minimum number of racks per write quorum
+ public static final String ENFORCE_MIN_NUM_RACKS_PER_WRITE_QUORUM = "enforceMinNumRacksPerWriteQuorum";
+
protected AbstractConfiguration() {
super();
if (READ_SYSTEM_PROPERTIES) {
@@ -796,6 +799,20 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
}
/**
+ * Set the flag to enforce minimum number of racks per write quorum.
+ */
+ public void setEnforceMinNumRacksPerWriteQuorum(boolean enforceMinNumRacksPerWriteQuorum) {
+ setProperty(ENFORCE_MIN_NUM_RACKS_PER_WRITE_QUORUM, enforceMinNumRacksPerWriteQuorum);
+ }
+
+ /**
+ * Get the flag which enforces the minimum number of racks per write quorum.
+ */
+ public boolean getEnforceMinNumRacksPerWriteQuorum() {
+ return getBoolean(ENFORCE_MIN_NUM_RACKS_PER_WRITE_QUORUM, false);
+ }
+
+ /**
* Trickery to allow inheritance with fluent style.
*/
protected abstract T getThis();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
index cdff679..e085006 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
@@ -17,8 +17,8 @@
*/
package org.apache.bookkeeper.client;
-import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS;
-import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy.shuffleWithMask;
+import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.REPP_DNS_RESOLVER_CLASS;
+import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.shuffleWithMask;
import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
@@ -28,8 +28,10 @@ import io.netty.util.HashedWheelTimer;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -39,10 +41,14 @@ import junit.framework.TestCase;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.BookieNode;
+import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.EnsembleForReplacementWithNoConstraints;
+import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.TruePredicate;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.Node;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.StaticDNSResolver;
import org.junit.Test;
@@ -673,6 +679,504 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
}
@Test
+ public void testSingleRackWithEnforceMinNumRacks() throws Exception {
+ repp.uninitalize();
+ updateMyRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ ClientConfiguration clientConf = new ClientConfiguration(conf);
+ clientConf.setMinNumRacksPerWriteQuorum(2);
+ clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+ NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr1);
+ addrs.add(addr2);
+ addrs.add(addr3);
+ addrs.add(addr4);
+ repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
+
+ ArrayList<BookieSocketAddress> ensemble;
+ try {
+ ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
+ fail("Should get not enough bookies exception since there is only one rack.");
+ } catch (BKNotEnoughBookiesException bnebe) {
+ }
+
+ try {
+ ensemble = repp.newEnsemble(3, 2, 2, new HashSet<>(), EnsembleForReplacementWithNoConstraints.INSTANCE,
+ TruePredicate.INSTANCE);
+ fail("Should get not enough bookies exception since there is only one rack.");
+ } catch (BKNotEnoughBookiesException bnebe) {
+ }
+ }
+
+ @Test
+ public void testNewEnsembleWithEnforceMinNumRacks() throws Exception {
+ repp.uninitalize();
+
+ int minNumRacksPerWriteQuorum = 4;
+ ClientConfiguration clientConf = new ClientConfiguration(conf);
+ clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+ // set enforceMinNumRacksPerWriteQuorum
+ clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+ NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ int numOfRacks = 3;
+ int numOfBookiesPerRack = 5;
+ BookieSocketAddress[] bookieSocketAddresses = new BookieSocketAddress[numOfRacks * numOfBookiesPerRack];
+
+ for (int i = 0; i < numOfRacks; i++) {
+ for (int j = 0; j < numOfBookiesPerRack; j++) {
+ int index = i * numOfBookiesPerRack + j;
+ bookieSocketAddresses[index] = new BookieSocketAddress("128.0.0." + index, 3181);
+ StaticDNSResolver.addNodeToRack(bookieSocketAddresses[index].getHostName(), "/default-region/r" + i);
+ }
+ }
+
+ repp.onClusterChanged(new HashSet<BookieSocketAddress>(Arrays.asList(bookieSocketAddresses)),
+ new HashSet<BookieSocketAddress>());
+
+ try {
+ repp.newEnsemble(8, 4, 4, null, new HashSet<>());
+ fail("Should get not enough bookies exception since there are only 3 racks");
+ } catch (BKNotEnoughBookiesException bnebe) {
+ }
+
+ try {
+ repp.newEnsemble(8, 4, 4, new HashSet<>(),
+ EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE);
+ fail("Should get not enough bookies exception since there are only 3 racks");
+ } catch (BKNotEnoughBookiesException bnebe) {
+ }
+
+ /*
+ * Though minNumRacksPerWriteQuorum is set to 4, since writeQuorum is 3
+ * and there are enough bookies in 3 racks, this newEnsemble calls should
+ * succeed.
+ */
+ ArrayList<BookieSocketAddress> ensemble;
+ int ensembleSize = numOfRacks * numOfBookiesPerRack;
+ int writeQuorumSize = numOfRacks;
+ int ackQuorumSize = numOfRacks;
+
+ ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
+ assertEquals("Number of writeQuorum sets covered", ensembleSize,
+ getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+
+ ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, new HashSet<>(),
+ EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE);
+ assertEquals("Number of writeQuorum sets covered", ensembleSize,
+ getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+ }
+
+ @Test
+ public void testNewEnsembleWithSufficientRacksAndEnforceMinNumRacks() throws Exception {
+ repp.uninitalize();
+
+ int minNumRacksPerWriteQuorum = 4;
+ ClientConfiguration clientConf = new ClientConfiguration(conf);
+ clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+ // set enforceMinNumRacksPerWriteQuorum
+ clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+ NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ int writeQuorumSize = 3;
+ int ackQuorumSize = 3;
+ int effectiveMinNumRacksPerWriteQuorum = Math.min(minNumRacksPerWriteQuorum, writeQuorumSize);
+
+ int numOfRacks = 2 * effectiveMinNumRacksPerWriteQuorum - 1;
+ int numOfBookiesPerRack = 20;
+ BookieSocketAddress[] bookieSocketAddresses = new BookieSocketAddress[numOfRacks * numOfBookiesPerRack];
+
+ for (int i = 0; i < numOfRacks; i++) {
+ for (int j = 0; j < numOfBookiesPerRack; j++) {
+ int index = i * numOfBookiesPerRack + j;
+ bookieSocketAddresses[index] = new BookieSocketAddress("128.0.0." + index, 3181);
+ StaticDNSResolver.addNodeToRack(bookieSocketAddresses[index].getHostName(), "/default-region/r" + i);
+ }
+ }
+
+ Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ repp.onClusterChanged(new HashSet<BookieSocketAddress>(Arrays.asList(bookieSocketAddresses)),
+ new HashSet<BookieSocketAddress>());
+
+ /*
+ * in this scenario we have enough number of racks (2 *
+ * effectiveMinNumRacksPerWriteQuorum - 1) and more number of bookies in
+ * each rack. So we should be able to create ensemble for all
+ * ensembleSizes (as long as there are enough number of bookies in each
+ * rack).
+ */
+ ArrayList<BookieSocketAddress> ensemble;
+ for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum; ensembleSize < 40; ensembleSize++) {
+ ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
+ assertEquals("Number of writeQuorum sets covered", ensembleSize,
+ getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+
+ ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, new HashSet<>(),
+ EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE);
+ assertEquals("Number of writeQuorum sets covered", ensembleSize,
+ getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
+ }
+ }
+
+ @Test
+ public void testReplaceBookieWithEnforceMinNumRacks() throws Exception {
+ repp.uninitalize();
+
+ int minNumRacksPerWriteQuorum = 4;
+ ClientConfiguration clientConf = new ClientConfiguration(conf);
+ clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+ // set enforceMinNumRacksPerWriteQuorum
+ clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+ NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ int numOfRacks = 3;
+ int numOfBookiesPerRack = 5;
+ Set<BookieSocketAddress> bookieSocketAddresses = new HashSet<BookieSocketAddress>();
+ Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
+ BookieSocketAddress bookieAddress;
+ String rack;
+ for (int i = 0; i < numOfRacks; i++) {
+ for (int j = 0; j < numOfBookiesPerRack; j++) {
+ int index = i * numOfBookiesPerRack + j;
+ bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
+ rack = "/default-region/r" + i;
+ StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rack);
+ bookieSocketAddresses.add(bookieAddress);
+ bookieRackMap.put(bookieAddress, rack);
+ }
+ }
+
+ repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
+
+ /*
+ * Though minNumRacksPerWriteQuorum is set to 4, since writeQuorum is 3
+ * and there are enough bookies in 3 racks, this newEnsemble call should
+ * succeed.
+ */
+ ArrayList<BookieSocketAddress> ensemble;
+ int ensembleSize = numOfRacks * numOfBookiesPerRack;
+ int writeQuorumSize = numOfRacks;
+ int ackQuorumSize = numOfRacks;
+
+ ensemble = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
+
+ BookieSocketAddress bookieInEnsembleToBeReplaced = ensemble.get(7);
+ // get rack of some other bookie
+ String rackOfOtherBookieInEnsemble = bookieRackMap.get(ensemble.get(8));
+ BookieSocketAddress newBookieAddress1 = new BookieSocketAddress("128.0.0.100", 3181);
+ /*
+ * add the newBookie to the rack of some other bookie in the current
+ * ensemble
+ */
+ StaticDNSResolver.addNodeToRack(newBookieAddress1.getHostName(), rackOfOtherBookieInEnsemble);
+ bookieSocketAddresses.add(newBookieAddress1);
+ bookieRackMap.put(newBookieAddress1, rackOfOtherBookieInEnsemble);
+
+ repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
+ try {
+ repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
+ new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, new HashSet<>());
+ fail("Should get not enough bookies exception since there are no more bookies in rack"
+ + "of 'bookieInEnsembleToReplace'"
+ + "and new bookie added belongs to the rack of some other bookie in the ensemble");
+ } catch (BKNotEnoughBookiesException bnebe) {
+ // this is expected
+ }
+
+ String newRack = "/default-region/r100";
+ BookieSocketAddress newBookieAddress2 = new BookieSocketAddress("128.0.0.101", 3181);
+ /*
+ * add the newBookie to a new rack.
+ */
+ StaticDNSResolver.addNodeToRack(newBookieAddress2.getHostName(), newRack);
+ bookieSocketAddresses.add(newBookieAddress2);
+ bookieRackMap.put(newBookieAddress2, newRack);
+
+ repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
+ /*
+ * this replaceBookie should succeed, because a new bookie is added to a
+ * new rack.
+ */
+ BookieSocketAddress replacedBookieAddress = repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+ null, new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, new HashSet<>());
+ assertEquals("It should be newBookieAddress2", newBookieAddress2, replacedBookieAddress);
+
+ Set<BookieSocketAddress> bookiesToExclude = new HashSet<>();
+ bookiesToExclude.add(newBookieAddress2);
+ repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
+ try {
+ repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
+ new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, bookiesToExclude);
+ fail("Should get not enough bookies exception since the only available bookie to replace"
+ + "is added to excludedBookies list");
+ } catch (BKNotEnoughBookiesException bnebe) {
+ // this is expected
+ }
+
+ // get rack of the bookie to be replaced
+ String rackOfBookieToBeReplaced = bookieRackMap.get(bookieInEnsembleToBeReplaced);
+ BookieSocketAddress newBookieAddress3 = new BookieSocketAddress("128.0.0.102", 3181);
+ /*
+ * add the newBookie to rack of the bookie to be replaced.
+ */
+ StaticDNSResolver.addNodeToRack(newBookieAddress3.getHostName(), rackOfBookieToBeReplaced);
+ bookieSocketAddresses.add(newBookieAddress3);
+ bookieRackMap.put(newBookieAddress3, rackOfBookieToBeReplaced);
+
+ repp.onClusterChanged(bookieSocketAddresses, new HashSet<BookieSocketAddress>());
+ /*
+ * here we have added new bookie to the rack of the bookie to be
+ * replaced, so we should be able to replacebookie though
+ * newBookieAddress2 is added to excluded bookies list.
+ */
+ replacedBookieAddress = repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
+ new HashSet<BookieSocketAddress>(ensemble), bookieInEnsembleToBeReplaced, bookiesToExclude);
+ assertEquals("It should be newBookieAddress3", newBookieAddress3, replacedBookieAddress);
+ }
+
+ @Test
+ public void testSelectBookieFromNetworkLoc() throws Exception {
+ repp.uninitalize();
+
+ int minNumRacksPerWriteQuorum = 4;
+ ClientConfiguration clientConf = new ClientConfiguration(conf);
+ clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+ // set enforceMinNumRacksPerWriteQuorum
+ clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+ NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ int numOfRacks = 3;
+ int numOfBookiesPerRack = 5;
+ String[] rackLocationNames = new String[numOfRacks];
+ List<BookieSocketAddress> bookieSocketAddresses = new ArrayList<BookieSocketAddress>();
+ Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
+ BookieSocketAddress bookieAddress;
+
+ for (int i = 0; i < numOfRacks; i++) {
+ rackLocationNames[i] = "/default-region/r" + i;
+ for (int j = 0; j < numOfBookiesPerRack; j++) {
+ int index = i * numOfBookiesPerRack + j;
+ bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
+ StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rackLocationNames[i]);
+ bookieSocketAddresses.add(bookieAddress);
+ bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+ }
+ }
+ String nonExistingRackLocation = "/default-region/r25";
+
+ repp.onClusterChanged(new HashSet<BookieSocketAddress>(bookieSocketAddresses),
+ new HashSet<BookieSocketAddress>());
+
+ String rack = bookieRackMap.get(bookieSocketAddresses.get(0));
+ BookieNode bookieNode = repp.selectFromNetworkLocation(rack, new HashSet<Node>(), TruePredicate.INSTANCE,
+ EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+ String recRack = bookieNode.getNetworkLocation();
+ assertEquals("Rack of node", rack, recRack);
+
+ try {
+ repp.selectFromNetworkLocation(nonExistingRackLocation, new HashSet<Node>(), TruePredicate.INSTANCE,
+ EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+ fail("Should get not enough bookies exception since there are no bookies in this rack");
+ } catch (BKNotEnoughBookiesException bnebe) {
+ // this is expected
+ }
+
+ // it should not fail, since fallback is set to true and it should pick
+ // some random one
+ repp.selectFromNetworkLocation(nonExistingRackLocation, new HashSet<Node>(), TruePredicate.INSTANCE,
+ EnsembleForReplacementWithNoConstraints.INSTANCE, true);
+
+ Set<BookieSocketAddress> excludeBookiesOfRackR0 = new HashSet<BookieSocketAddress>();
+ for (int i = 0; i < numOfBookiesPerRack; i++) {
+ excludeBookiesOfRackR0.add(bookieSocketAddresses.get(i));
+ }
+
+ Set<Node> excludeBookieNodesOfRackR0 = repp.convertBookiesToNodes(excludeBookiesOfRackR0);
+ try {
+ repp.selectFromNetworkLocation(bookieRackMap.get(bookieSocketAddresses.get(0)), excludeBookieNodesOfRackR0,
+ TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+ fail("Should get not enough bookies exception since all the bookies in r0 are added to the exclusion list");
+ } catch (BKNotEnoughBookiesException bnebe) {
+ // this is expected
+ }
+
+ // not expected to get exception since fallback is set to true
+ bookieNode = repp.selectFromNetworkLocation(bookieRackMap.get(bookieSocketAddresses.get(0)),
+ excludeBookieNodesOfRackR0, TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE,
+ true);
+ assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
+ rackLocationNames[1].equals(bookieNode.getNetworkLocation())
+ || rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
+ }
+
+ @Test
+ public void testSelectBookieFromExcludingRacks() throws Exception {
+ repp.uninitalize();
+
+ int minNumRacksPerWriteQuorum = 4;
+ ClientConfiguration clientConf = new ClientConfiguration(conf);
+ clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+ // set enforceMinNumRacksPerWriteQuorum
+ clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+ NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ int numOfRacks = 3;
+ int numOfBookiesPerRack = 5;
+ String[] rackLocationNames = new String[numOfRacks];
+ List<BookieSocketAddress> bookieSocketAddresses = new ArrayList<BookieSocketAddress>();
+ Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
+ BookieSocketAddress bookieAddress;
+
+ for (int i = 0; i < numOfRacks; i++) {
+ rackLocationNames[i] = "/default-region/r" + i;
+ for (int j = 0; j < numOfBookiesPerRack; j++) {
+ int index = i * numOfBookiesPerRack + j;
+ bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
+ StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rackLocationNames[i]);
+ bookieSocketAddresses.add(bookieAddress);
+ bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+ }
+ }
+
+ repp.onClusterChanged(new HashSet<BookieSocketAddress>(bookieSocketAddresses),
+ new HashSet<BookieSocketAddress>());
+
+ Set<BookieSocketAddress> excludeBookiesOfRackR0 = new HashSet<BookieSocketAddress>();
+ for (int i = 0; i < numOfBookiesPerRack; i++) {
+ excludeBookiesOfRackR0.add(bookieSocketAddresses.get(i));
+ }
+
+ Set<Node> excludeBookieNodesOfRackR0 = repp.convertBookiesToNodes(excludeBookiesOfRackR0);
+
+ Set<String> excludeRacksRackR1AndR2 = new HashSet<String>();
+ excludeRacksRackR1AndR2.add(rackLocationNames[1]);
+ excludeRacksRackR1AndR2.add(rackLocationNames[2]);
+
+ try {
+ repp.selectFromNetworkLocation(excludeRacksRackR1AndR2, excludeBookieNodesOfRackR0, TruePredicate.INSTANCE,
+ EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+ fail("Should get not enough bookies exception racks R1 and R2 are"
+ + "excluded and all the bookies in r0 are added to the exclusion list");
+ } catch (BKNotEnoughBookiesException bnebe) {
+ // this is expected
+ }
+
+ BookieNode bookieNode = repp.selectFromNetworkLocation(excludeRacksRackR1AndR2, new HashSet<Node>(),
+ TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+ assertTrue("BookieNode should be from Rack /r0" + bookieNode.getNetworkLocation(),
+ rackLocationNames[0].equals(bookieNode.getNetworkLocation()));
+
+ // not expected to get exception since fallback is set to true
+ bookieNode = repp.selectFromNetworkLocation(excludeRacksRackR1AndR2, excludeBookieNodesOfRackR0,
+ TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, true);
+ assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
+ rackLocationNames[1].equals(bookieNode.getNetworkLocation())
+ || rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
+ }
+
+ @Test
+ public void testSelectBookieFromNetworkLocAndExcludingRacks() throws Exception {
+ repp.uninitalize();
+
+ int minNumRacksPerWriteQuorum = 4;
+ ClientConfiguration clientConf = new ClientConfiguration(conf);
+ clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
+ // set enforceMinNumRacksPerWriteQuorum
+ clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
+ repp = new RackawareEnsemblePlacementPolicy();
+ repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
+ NullStatsLogger.INSTANCE);
+ repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
+
+ int numOfRacks = 3;
+ int numOfBookiesPerRack = 5;
+ String[] rackLocationNames = new String[numOfRacks];
+ List<BookieSocketAddress> bookieSocketAddresses = new ArrayList<BookieSocketAddress>();
+ Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
+ BookieSocketAddress bookieAddress;
+
+ for (int i = 0; i < numOfRacks; i++) {
+ rackLocationNames[i] = "/default-region/r" + i;
+ for (int j = 0; j < numOfBookiesPerRack; j++) {
+ int index = i * numOfBookiesPerRack + j;
+ bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
+ StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rackLocationNames[i]);
+ bookieSocketAddresses.add(bookieAddress);
+ bookieRackMap.put(bookieAddress, rackLocationNames[i]);
+ }
+ }
+ String nonExistingRackLocation = "/default-region/r25";
+
+ repp.onClusterChanged(new HashSet<BookieSocketAddress>(bookieSocketAddresses),
+ new HashSet<BookieSocketAddress>());
+
+ Set<BookieSocketAddress> excludeBookiesOfRackR0 = new HashSet<BookieSocketAddress>();
+ for (int i = 0; i < numOfBookiesPerRack; i++) {
+ excludeBookiesOfRackR0.add(bookieSocketAddresses.get(i));
+ }
+
+ Set<Node> excludeBookieNodesOfRackR0 = repp.convertBookiesToNodes(excludeBookiesOfRackR0);
+
+ Set<String> excludeRacksRackR1AndR2 = new HashSet<String>();
+ excludeRacksRackR1AndR2.add(rackLocationNames[1]);
+ excludeRacksRackR1AndR2.add(rackLocationNames[2]);
+
+ try {
+ repp.selectFromNetworkLocation(nonExistingRackLocation, excludeRacksRackR1AndR2,
+ excludeBookieNodesOfRackR0,
+ TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+ fail("Should get not enough bookies exception racks R1 and R2 are excluded and all the bookies in"
+ + "r0 are added to the exclusion list");
+ } catch (BKNotEnoughBookiesException bnebe) {
+ // this is expected
+ }
+
+ BookieNode bookieNode = repp.selectFromNetworkLocation(rackLocationNames[0], excludeRacksRackR1AndR2,
+ new HashSet<Node>(), TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+ assertTrue("BookieNode should be from Rack /r0" + bookieNode.getNetworkLocation(),
+ rackLocationNames[0].equals(bookieNode.getNetworkLocation()));
+
+ bookieNode = repp.selectFromNetworkLocation(rackLocationNames[0], new HashSet<String>(),
+ excludeBookieNodesOfRackR0, TruePredicate.INSTANCE,
+ EnsembleForReplacementWithNoConstraints.INSTANCE, false);
+ assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
+ rackLocationNames[1].equals(bookieNode.getNetworkLocation())
+ || rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
+
+ bookieNode = repp.selectFromNetworkLocation(nonExistingRackLocation, excludeRacksRackR1AndR2,
+ excludeBookieNodesOfRackR0, TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE,
+ true);
+ assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
+ rackLocationNames[1].equals(bookieNode.getNetworkLocation())
+ || rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
+ }
+
+ @Test
public void testNewEnsembleWithMultipleRacks() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
@@ -904,12 +1408,15 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
@Test
public void testWeightedPlacementAndReplaceBookieWithoutEnoughBookiesInSameRack() throws Exception {
+ BookieSocketAddress addr0 = new BookieSocketAddress("126.0.0.1", 3181);
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
// update dns mapping
StaticDNSResolver.reset();
+ StaticDNSResolver.addNodeToRack(addr0.getSocketAddress().getAddress().getHostAddress(),
+ NetworkTopology.DEFAULT_REGION + "/r0");
StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(),
@@ -920,6 +1427,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
NetworkTopology.DEFAULT_REGION + "/r4");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
+ addrs.add(addr0);
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
@@ -933,13 +1441,15 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
Map<BookieSocketAddress, BookieInfo> bookieInfoMap = new HashMap<BookieSocketAddress, BookieInfo>();
+ bookieInfoMap.put(addr0, new BookieInfo(50L, 50L));
bookieInfoMap.put(addr1, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr2, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr3, new BookieInfo(200L, 200L));
- bookieInfoMap.put(addr4, new BookieInfo(multiple * 100L, multiple * 100L));
+ bookieInfoMap.put(addr4, new BookieInfo(multiple * 50L, multiple * 50L));
repp.updateBookieInfo(bookieInfoMap);
Map<BookieSocketAddress, Long> selectionCounts = new HashMap<BookieSocketAddress, Long>();
+ selectionCounts.put(addr0, 0L);
selectionCounts.put(addr1, 0L);
selectionCounts.put(addr2, 0L);
selectionCounts.put(addr3, 0L);
@@ -951,10 +1461,14 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
// will come from other racks. However, the weight should be honored in such
// selections as well
replacedBookie = repp.replaceBookie(1, 1, 1, null, new HashSet<>(), addr2, new HashSet<>());
- assertTrue(addr1.equals(replacedBookie) || addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
+ assertTrue(addr0.equals(replacedBookie) || addr1.equals(replacedBookie) || addr3.equals(replacedBookie)
+ || addr4.equals(replacedBookie));
selectionCounts.put(replacedBookie, selectionCounts.get(replacedBookie) + 1);
}
-
+ /*
+ * since addr2 has to be replaced, the remaining bookies weight are - 50, 100, 200, 500 (10*50)
+ * So the median calculated by WeightedRandomSelection is (100 + 200) / 2 = 150
+ */
double medianWeight = 150;
double medianSelectionCounts = (double) (medianWeight / bookieInfoMap.get(addr1).getWeight())
* selectionCounts.get(addr1);
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 1450def..7a0f361 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -860,6 +860,15 @@ zkEnableSecurity=false
# The max number of args used in the script provided at `networkTopologyScriptFileName`
# networkTopologyScriptNumberArgs=100
+# minimum number of racks per write quorum. RackawareEnsemblePlacementPolicy will try to
+# get bookies from atleast 'minNumRacksPerWriteQuorum' racks for a writeQuorum.
+# minNumRacksPerWriteQuorum=2
+
+# 'enforceMinNumRacksPerWriteQuorum' enforces RackawareEnsemblePlacementPolicy to pick
+# bookies from 'minNumRacksPerWriteQuorum' racks for a writeQuorum. If it cann't find
+# bookie then it would throw BKNotEnoughBookiesException instead of picking random one.
+# enforceMinNumRacksPerWriteQuorum=false
+
#############################################################################
## Auditor settings
#############################################################################
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 39f2719..ac24fce 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -610,6 +610,12 @@ groups:
- param: networkTopologyScriptNumberArgs
description: |
The max number of args used in the script provided at `networkTopologyScriptFileName`.
+ - param: minNumRacksPerWriteQuorum
+ description: |
+ minimum number of racks per write quorum. RackawareEnsemblePlacementPolicy will try to get bookies from atleast 'minNumRacksPerWriteQuorum' racks for a writeQuorum.
+ - param: enforceMinNumRacksPerWriteQuorum
+ description: |
+ 'enforceMinNumRacksPerWriteQuorum' enforces RackawareEnsemblePlacementPolicy to pick bookies from 'minNumRacksPerWriteQuorum' racks for a writeQuorum. If it cann't find bookie then it would throw BKNotEnoughBookiesException instead of picking random one.
- name: AutoRecovery auditor settings
params: