You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/10/13 05:51:09 UTC
[5/5] bookkeeper git commit: BOOKKEEPER-612: Region aware placement
BOOKKEEPER-612: Region aware placement
- Introduce the concept of a two level Network Topology with region as the first level and rack as the second level
- NodeBase, Node and NetworkTopology manage this two level hierarchy and position of individual nodes in this hierarchy
- An implementation of RegionawareEnsemblePlacementPolicy that distributes nodes across regions and within regions uses rack aware placement to place nodes
This is a stacked diff (opening to start a review), we would still merge the dependent pull request first.
Author: Robin Dhamankar <ro...@Robins-MacBook-Air.local>
Reviewers: Sijie Guo <si...@apache.org>
Closes #56 from robindh/RegionAwarePlacement
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/bbd1eb8d
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/bbd1eb8d
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/bbd1eb8d
Branch: refs/heads/master
Commit: bbd1eb8d8560b03834175fbd996b85237df09f5c
Parents: 9dc05fc
Author: Robin Dhamankar <ro...@gmail.com>
Authored: Wed Oct 12 22:50:42 2016 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Wed Oct 12 22:50:42 2016 -0700
----------------------------------------------------------------------
bookkeeper-server/pom.xml | 5 +
.../LocalBookieEnsemblePlacementPolicy.java | 37 +-
.../apache/bookkeeper/client/BookKeeper.java | 72 +-
.../apache/bookkeeper/client/BookieWatcher.java | 17 +-
.../client/DefaultEnsemblePlacementPolicy.java | 45 +-
.../client/EnsemblePlacementPolicy.java | 65 +-
.../ITopologyAwareEnsemblePlacementPolicy.java | 127 ++
.../bookkeeper/client/LedgerCreateOp.java | 4 +-
.../apache/bookkeeper/client/LedgerHandle.java | 9 +-
.../RackawareEnsemblePlacementPolicy.java | 609 ++-------
.../RackawareEnsemblePlacementPolicyImpl.java | 554 ++++++++
.../RegionAwareEnsemblePlacementPolicy.java | 602 +++++++++
.../TopologyAwareEnsemblePlacementPolicy.java | 467 +++++++
.../bookkeeper/conf/ClientConfiguration.java | 72 +-
.../net/AbstractDNSToSwitchMapping.java | 2 +-
.../bookkeeper/net/BookieSocketAddress.java | 2 +-
.../net/CachedDNSToSwitchMapping.java | 1 -
.../java/org/apache/bookkeeper/net/DNS.java | 4 +
.../bookkeeper/net/DNSToSwitchMapping.java | 1 -
.../org/apache/bookkeeper/net/NetUtils.java | 26 +-
.../apache/bookkeeper/net/NetworkTopology.java | 861 +-----------
.../bookkeeper/net/NetworkTopologyImpl.java | 880 ++++++++++++
.../java/org/apache/bookkeeper/net/Node.java | 4 +-
.../org/apache/bookkeeper/net/NodeBase.java | 15 +-
.../bookkeeper/net/ScriptBasedMapping.java | 2 +-
.../net/StabilizeNetworkTopology.java | 154 +++
.../proto/PerChannelBookieClient.java | 2 +-
.../bookkeeper/util/BookKeeperConstants.java | 2 +
.../TestRackawareEnsemblePlacementPolicy.java | 388 ++++--
.../TestRegionAwareEnsemblePlacementPolicy.java | 1262 ++++++++++++++++++
.../bookkeeper/client/UpdateLedgerCmdTest.java | 2 +-
.../bookkeeper/client/UpdateLedgerOpTest.java | 6 +-
.../bookkeeper/util/StaticDNSResolver.java | 10 +
33 files changed, 4837 insertions(+), 1472 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/pom.xml
----------------------------------------------------------------------
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index e47d0d3..bd143f1 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -84,6 +84,11 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.3.2</version>
+ </dependency>
+ <dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.6</version>
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 2b18029..508511b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -17,16 +17,21 @@
*/
package org.apache.bookkeeper.bookie;
+import com.google.common.base.Optional;
+
import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Set;
+import java.util.*;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.jboss.netty.util.HashedWheelTimer;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,8 +48,7 @@ public class LocalBookieEnsemblePlacementPolicy implements EnsemblePlacementPoli
private BookieSocketAddress bookieAddress;
@Override
- public EnsemblePlacementPolicy initialize(Configuration conf) {
-
+ public EnsemblePlacementPolicy initialize(ClientConfiguration conf, Optional<DNSToSwitchMapping> optionalDnsResolver, HashedWheelTimer hashedWheelTimer, FeatureProvider featureProvider, StatsLogger statsLogger) {
// Configuration will have already the bookie configuration inserted
ServerConfiguration serverConf = new ServerConfiguration();
serverConf.addConfiguration(conf);
@@ -70,7 +74,22 @@ public class LocalBookieEnsemblePlacementPolicy implements EnsemblePlacementPoli
}
@Override
- public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
+ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+ throw new BKNotEnoughBookiesException();
+ }
+
+ @Override
+ public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+ return null;
+ }
+
+ @Override
+ public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+ return null;
+ }
+
+ @Override
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
if (ensembleSize > 1) {
throw new IllegalArgumentException("Local ensemble policy can only return 1 bookie");
@@ -79,10 +98,4 @@ public class LocalBookieEnsemblePlacementPolicy implements EnsemblePlacementPoli
return Lists.newArrayList(bookieAddress);
}
- @Override
- public BookieSocketAddress replaceBookie(BookieSocketAddress bookieToReplace,
- Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
- throw new BKNotEnoughBookiesException();
- }
-
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 75ab759..b683ca4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
@@ -36,11 +37,14 @@ import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback;
import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.meta.CleanupLedgerManager;
import org.apache.bookkeeper.meta.LedgerIdGenerator;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -56,6 +60,7 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,6 +106,9 @@ public class BookKeeper implements AutoCloseable {
final OrderedSafeExecutor mainWorkerPool;
final ScheduledExecutorService scheduler;
+ final HashedWheelTimer requestTimer;
+ final boolean ownTimer;
+ final FeatureProvider featureProvider;
// Ledger manager responsible for how to store ledger meta data
final LedgerManagerFactory ledgerManagerFactory;
@@ -122,6 +130,9 @@ public class BookKeeper implements AutoCloseable {
ZooKeeper zk = null;
ClientSocketChannelFactory channelFactory = null;
StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+ DNSToSwitchMapping dnsResolver = null;
+ HashedWheelTimer requestTimer = null;
+ FeatureProvider featureProvider = null;
Builder(ClientConfiguration conf) {
this.conf = conf;
@@ -142,9 +153,25 @@ public class BookKeeper implements AutoCloseable {
return this;
}
+
+ public Builder dnsResolver(DNSToSwitchMapping dnsResolver) {
+ this.dnsResolver = dnsResolver;
+ return this;
+ }
+
+ public Builder requestTimer(HashedWheelTimer requestTimer) {
+ this.requestTimer = requestTimer;
+ return this;
+ }
+
+ public Builder featureProvider(FeatureProvider featureProvider) {
+ this.featureProvider = featureProvider;
+ return this;
+ }
+
public BookKeeper build() throws IOException, InterruptedException, KeeperException {
Preconditions.checkNotNull(statsLogger, "No stats logger provided");
- return new BookKeeper(conf, zk, channelFactory, statsLogger);
+ return new BookKeeper(conf, zk, channelFactory, statsLogger, dnsResolver, requestTimer, featureProvider);
}
}
@@ -183,7 +210,8 @@ public class BookKeeper implements AutoCloseable {
*/
public BookKeeper(final ClientConfiguration conf)
throws IOException, InterruptedException, KeeperException {
- this(conf, null, null, NullStatsLogger.INSTANCE);
+ this(conf, null, null, NullStatsLogger.INSTANCE,
+ null, null, null);
}
private static ZooKeeper validateZooKeeper(ZooKeeper zk) throws NullPointerException {
@@ -240,7 +268,8 @@ public class BookKeeper implements AutoCloseable {
*/
public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory)
throws IOException, InterruptedException, KeeperException {
- this(conf, validateZooKeeper(zk), validateChannelFactory(channelFactory), NullStatsLogger.INSTANCE);
+ this(conf, validateZooKeeper(zk), validateChannelFactory(channelFactory), NullStatsLogger.INSTANCE,
+ null, null, null);
}
/**
@@ -249,7 +278,10 @@ public class BookKeeper implements AutoCloseable {
private BookKeeper(ClientConfiguration conf,
ZooKeeper zkc,
ClientSocketChannelFactory channelFactory,
- StatsLogger statsLogger)
+ StatsLogger statsLogger,
+ DNSToSwitchMapping dnsResolver,
+ HashedWheelTimer requestTimer,
+ FeatureProvider featureProvider)
throws IOException, InterruptedException, KeeperException {
this.conf = conf;
@@ -286,6 +318,23 @@ public class BookKeeper implements AutoCloseable {
this.ownChannelFactory = false;
}
+ if (null == requestTimer) {
+ this.requestTimer = new HashedWheelTimer(
+ new ThreadFactoryBuilder().setNameFormat("BookieClientTimer-%d").build(),
+ conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
+ conf.getTimeoutTimerNumTicks());
+ this.ownTimer = true;
+ } else {
+ this.requestTimer = requestTimer;
+ this.ownTimer = false;
+ }
+
+ if (null == featureProvider) {
+ this.featureProvider = SettableFeatureProvider.DISABLE_ALL;
+ } else {
+ this.featureProvider = featureProvider;
+ }
+
// initialize scheduler
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat(
"BookKeeperClientScheduler-%d");
@@ -297,7 +346,8 @@ public class BookKeeper implements AutoCloseable {
initOpLoggers(this.statsLogger);
// initialize the ensemble placement
- this.placementPolicy = initializeEnsemblePlacementPolicy(conf);
+ this.placementPolicy = initializeEnsemblePlacementPolicy(conf,
+ dnsResolver, this.requestTimer, this.featureProvider, this.statsLogger);
// initialize main worker pool
this.mainWorkerPool = OrderedSafeExecutor.newBuilder()
@@ -321,11 +371,16 @@ public class BookKeeper implements AutoCloseable {
scheduleBookieHealthCheckIfEnabled();
}
- private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf)
+ private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
+ DNSToSwitchMapping dnsResolver,
+ HashedWheelTimer timer,
+ FeatureProvider featureProvider,
+ StatsLogger statsLogger)
throws IOException {
try {
Class<? extends EnsemblePlacementPolicy> policyCls = conf.getEnsemblePlacementPolicy();
- return ReflectionUtils.newInstance(policyCls).initialize(conf);
+ return ReflectionUtils.newInstance(policyCls).initialize(conf, Optional.fromNullable(dnsResolver),
+ timer, featureProvider, statsLogger);
} catch (ConfigurationException e) {
throw new IOException("Failed to initialize ensemble placement policy : ", e);
}
@@ -1001,6 +1056,9 @@ public class BookKeeper implements AutoCloseable {
LOG.warn("The mainWorkerPool did not shutdown cleanly");
}
+ if (ownTimer) {
+ requestTimer.stop();
+ }
if (ownChannelFactory) {
channelFactory.releaseExternalResources();
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index 1e49cf8..b8d8951 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -255,17 +255,18 @@ class BookieWatcher implements Watcher, ChildrenCallback {
* @return list of bookies for new ensemble.
* @throws BKNotEnoughBookiesException
*/
- public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize)
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize)
throws BKNotEnoughBookiesException {
try {
// we try to only get from the healthy bookies first
- return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, new HashSet<BookieSocketAddress>(
+ return placementPolicy.newEnsemble(ensembleSize,
+ writeQuorumSize, ackQuorumSize, new HashSet<BookieSocketAddress>(
quarantinedBookies.asMap().keySet()));
} catch (BKNotEnoughBookiesException e) {
if (logger.isDebugEnabled()) {
logger.debug("Not enough healthy bookies available, using quarantined bookies");
}
- return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, EMPTY_SET);
+ return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, EMPTY_SET);
}
}
@@ -278,19 +279,23 @@ class BookieWatcher implements Watcher, ChildrenCallback {
* @return the bookie to replace.
* @throws BKNotEnoughBookiesException
*/
- public BookieSocketAddress replaceBookie(List<BookieSocketAddress> existingBookies, int bookieIdx)
+ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+ List<BookieSocketAddress> existingBookies, int bookieIdx,
+ Set<BookieSocketAddress> excludeBookies)
throws BKNotEnoughBookiesException {
BookieSocketAddress addr = existingBookies.get(bookieIdx);
try {
// we exclude the quarantined bookies also first
Set<BookieSocketAddress> existingAndQuarantinedBookies = new HashSet<BookieSocketAddress>(existingBookies);
existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet());
- return placementPolicy.replaceBookie(addr, existingAndQuarantinedBookies);
+ return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+ existingAndQuarantinedBookies, addr, excludeBookies);
} catch (BKNotEnoughBookiesException e) {
if (logger.isDebugEnabled()) {
logger.debug("Not enough healthy bookies available, using quarantined bookies");
}
- return placementPolicy.replaceBookie(addr, new HashSet<BookieSocketAddress>(existingBookies));
+ return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+ new HashSet<BookieSocketAddress>(existingBookies), addr, excludeBookies);
}
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 5f2d2c3..640bdb7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -18,14 +18,22 @@
package org.apache.bookkeeper.client;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import com.google.common.base.Optional;
+
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.commons.configuration.Configuration;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.jboss.netty.util.HashedWheelTimer;
/**
* Default Ensemble Placement Policy, which picks bookies randomly
@@ -37,7 +45,7 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
private Set<BookieSocketAddress> knownBookies = new HashSet<BookieSocketAddress>();
@Override
- public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int quorumSize,
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int quorumSize, int ackQuorumSize,
Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
ArrayList<BookieSocketAddress> newBookies = new ArrayList<BookieSocketAddress>(ensembleSize);
if (ensembleSize <= 0) {
@@ -62,9 +70,11 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
}
@Override
- public BookieSocketAddress replaceBookie(BookieSocketAddress bookieToReplace,
- Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
- ArrayList<BookieSocketAddress> addresses = newEnsemble(1, 1, excludeBookies);
+ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection<BookieSocketAddress> currentEnsemble,
+ BookieSocketAddress bookieToReplace,
+ Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+ excludeBookies.addAll(currentEnsemble);
+ ArrayList<BookieSocketAddress> addresses = newEnsemble(1, 1, 1, excludeBookies);
return addresses.get(0);
}
@@ -81,7 +91,29 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
}
@Override
- public EnsemblePlacementPolicy initialize(Configuration conf) {
+ public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+ return writeSet;
+ }
+
+ @Override
+ public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+ List<Integer> retList = new ArrayList<Integer>(writeSet);
+ if (retList.size() < ensemble.size()) {
+ for (int i = 0; i < ensemble.size(); i++) {
+ if (!retList.contains(i)) {
+ retList.add(i);
+ }
+ }
+ }
+ return retList;
+ }
+
+ @Override
+ public EnsemblePlacementPolicy initialize(ClientConfiguration conf,
+ Optional<DNSToSwitchMapping> optionalDnsResolver,
+ HashedWheelTimer timer,
+ FeatureProvider featureProvider,
+ StatsLogger statsLogger) {
return this;
}
@@ -89,5 +121,4 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
public void uninitalize() {
// do nothing
}
-
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index a1d8ce3..2af8108 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -18,11 +18,20 @@
package org.apache.bookkeeper.client;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import com.google.common.base.Optional;
+
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.commons.configuration.Configuration;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.jboss.netty.util.HashedWheelTimer;
/**
* Encapsulation of the algorithm that selects a number of bookies from the cluster as an ensemble for storing
@@ -33,11 +42,17 @@ public interface EnsemblePlacementPolicy {
/**
* Initialize the policy.
*
- * @param conf
- * client configuration.
- * @return initialized ensemble placement policy
+ * @param conf client configuration
+ * @param optionalDnsResolver dns resolver
+ * @param hashedWheelTimer timer
+ * @param featureProvider feature provider
+ * @param statsLogger stats logger
*/
- public EnsemblePlacementPolicy initialize(Configuration conf);
+ public EnsemblePlacementPolicy initialize(ClientConfiguration conf,
+ Optional<DNSToSwitchMapping> optionalDnsResolver,
+ HashedWheelTimer hashedWheelTimer,
+ FeatureProvider featureProvider,
+ StatsLogger statsLogger);
/**
* Uninitialize the policy
@@ -55,7 +70,7 @@ public interface EnsemblePlacementPolicy {
* @return the dead bookies during this cluster change.
*/
public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies,
- Set<BookieSocketAddress> readOnlyBookies);
+ Set<BookieSocketAddress> readOnlyBookies);
/**
* Choose <i>numBookies</i> bookies for ensemble. If the count is more than the number of available
@@ -70,8 +85,8 @@ public interface EnsemblePlacementPolicy {
* @return list of bookies chosen as targets.
* @throws BKNotEnoughBookiesException if not enough bookies available.
*/
- public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
- Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+ Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
/**
* Choose a new bookie to replace <i>bookieToReplace</i>. If no bookie available in the cluster,
@@ -84,6 +99,36 @@ public interface EnsemblePlacementPolicy {
* @return the bookie chosen as target.
* @throws BKNotEnoughBookiesException
*/
- public BookieSocketAddress replaceBookie(BookieSocketAddress bookieToReplace,
- Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+ Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace,
+ Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+
+ /**
+ * Reorder the read sequence of a given write quorum <i>writeSet</i>.
+ *
+ * @param ensemble
+ * Ensemble to read entries.
+ * @param writeSet
+ * Write quorum to read entries.
+ * @param bookieFailureHistory
+ * Observed failures on the bookies
+ * @return read sequence of bookies
+ */
+ public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble,
+ List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory);
+
+
+ /**
+ * Reorder the read last add confirmed sequence of a given write quorum <i>writeSet</i>.
+ *
+ * @param ensemble
+ * Ensemble to read entries.
+ * @param writeSet
+ * Write quorum to read entries.
+ * @param bookieFailureHistory
+ * Observed failures on the bookies
+ * @return read sequence of bookies
+ */
+ public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble,
+ List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory);
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
new file mode 100644
index 0000000..535fffe
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.Node;
+
+import java.util.ArrayList;
+import java.util.Set;
+
+/**
+ * Interface for topology aware ensemble placement policy.
+ */
+public interface ITopologyAwareEnsemblePlacementPolicy<T extends Node> extends EnsemblePlacementPolicy {
+ /**
+ * Predicate used when choosing an ensemble.
+ */
+ public static interface Predicate<T extends Node> {
+ boolean apply(T candidate, Ensemble<T> chosenBookies);
+ }
+
+ /**
+ * Ensemble used to hold the result of an ensemble selected for placement.
+ */
+ public static interface Ensemble<T extends Node> {
+
+ /**
+ * Append the new bookie node to the ensemble only if the ensemble doesnt
+ * already contain the same bookie
+ *
+ * @param node
+ * new candidate bookie node.
+ * @return
+ * true if the node was added
+ */
+ public boolean addNode(T node);
+
+ /**
+ * @return list of addresses representing the ensemble
+ */
+ public ArrayList<BookieSocketAddress> toList();
+
+ /**
+ * Validates if an ensemble is valid
+ *
+ * @return true if the ensemble is valid; false otherwise
+ */
+ public boolean validate();
+
+ }
+
+ /**
+ * Create an ensemble with parent ensemble.
+ *
+ * @param ensembleSize
+ * ensemble size
+ * @param writeQuorumSize
+ * write quorum size
+ * @param ackQuorumSize
+ * ack quorum size
+ * @param excludeBookies
+ * exclude bookies
+ * @param parentEnsemble
+ * parent ensemble
+ * @return list of bookies forming the ensemble
+ * @throws BKException.BKNotEnoughBookiesException
+ */
+ ArrayList<BookieSocketAddress> newEnsemble(
+ int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Set<BookieSocketAddress> excludeBookies,
+ Ensemble<T> parentEnsemble,
+ Predicate<T> parentPredicate)
+ throws BKException.BKNotEnoughBookiesException;
+
+ /**
+ * Select a node from a given network location.
+ *
+ * @param networkLoc
+ * network location
+ * @param excludeBookies
+ * exclude bookies set
+ * @param predicate
+ * predicate to apply
+ * @param ensemble
+ * ensemble
+ * @return the selected bookie.
+ * @throws BKException.BKNotEnoughBookiesException
+ */
+ T selectFromNetworkLocation(String networkLoc,
+ Set<Node> excludeBookies,
+ Predicate<T> predicate,
+ Ensemble<T> ensemble)
+ throws BKException.BKNotEnoughBookiesException;
+
+ /**
+ * Handle bookies that left.
+ *
+ * @param leftBookies
+ * bookies that left
+ */
+ void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies);
+
+ /**
+ * Handle bookies that joined
+ *
+ * @param joinedBookies
+ * bookies that joined.
+ */
+ void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies);
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 3626ce0..e88df31 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -105,7 +105,9 @@ class LedgerCreateOp implements GenericCallback<Void> {
ArrayList<BookieSocketAddress> ensemble;
try {
ensemble = bk.bookieWatcher
- .newEnsemble(metadata.getEnsembleSize(), metadata.getWriteQuorumSize());
+ .newEnsemble(metadata.getEnsembleSize(),
+ metadata.getWriteQuorumSize(),
+ metadata.getAckQuorumSize());
} catch (BKNotEnoughBookiesException e) {
LOG.error("Not enough bookies to create ledger");
createComplete(e.getCode(), null);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 3c8d475..06f84eb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -26,6 +26,7 @@ import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -1005,9 +1006,13 @@ public class LedgerHandle implements AutoCloseable {
// avoid parallel ensemble changes to same ensemble.
synchronized (metadata) {
- newBookie = bk.bookieWatcher.replaceBookie(metadata.currentEnsemble, bookieIndex);
-
newEnsemble.addAll(metadata.currentEnsemble);
+ newBookie = bk.bookieWatcher.replaceBookie(metadata.getEnsembleSize(),
+ metadata.getWriteQuorumSize(),
+ metadata.getAckQuorumSize(), newEnsemble,
+ bookieIndex, new HashSet<>(Arrays.asList(addr)));
+
+
newEnsemble.set(bookieIndex, newBookie);
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 7b15d9e..f42e42a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -17,556 +17,177 @@
*/
package org.apache.bookkeeper.client;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
-import org.apache.bookkeeper.conf.Configurable;
import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.net.CachedDNSToSwitchMapping;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
-import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.net.Node;
-import org.apache.bookkeeper.net.NodeBase;
-import org.apache.bookkeeper.net.ScriptBasedMapping;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.jboss.netty.util.HashedWheelTimer;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
+public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicyImpl
+ implements ITopologyAwareEnsemblePlacementPolicy<TopologyAwareEnsemblePlacementPolicy.BookieNode> {
-/**
- * Simple rackware ensemble placement policy.
- *
- * Make most of the class and methods as protected, so it could be extended to implement other algorithms.
- */
-public class RackawareEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
-
- static final Logger LOG = LoggerFactory.getLogger(RackawareEnsemblePlacementPolicy.class);
-
- public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass";
-
- /**
- * Predicate used when choosing an ensemble.
- */
- protected static interface Predicate {
- boolean apply(BookieNode candidate, Ensemble chosenBookies);
- }
-
- /**
- * Ensemble used to hold the result of an ensemble selected for placement.
- */
- protected static interface Ensemble {
-
- /**
- * Append the new bookie node to the ensemble.
- *
- * @param node
- * new candidate bookie node.
- */
- public void addBookie(BookieNode node);
+ RackawareEnsemblePlacementPolicyImpl slave = null;
- /**
- * @return list of addresses representing the ensemble
- */
- public ArrayList<BookieSocketAddress> toList();
+ RackawareEnsemblePlacementPolicy() {
+ super();
}
- protected static class TruePredicate implements Predicate {
-
- public static final TruePredicate instance = new TruePredicate();
-
- @Override
- public boolean apply(BookieNode candidate, Ensemble chosenNodes) {
- return true;
- }
-
+ RackawareEnsemblePlacementPolicy(boolean enforceDurability) {
+ super(enforceDurability);
}
- protected static class EnsembleForReplacement implements Ensemble {
-
- public static final EnsembleForReplacement instance = new EnsembleForReplacement();
- static final ArrayList<BookieSocketAddress> EMPTY_LIST = new ArrayList<BookieSocketAddress>(0);
-
- @Override
- public void addBookie(BookieNode node) {
- // do nothing
- }
-
- @Override
- public ArrayList<BookieSocketAddress> toList() {
- return EMPTY_LIST;
+ @Override
+ protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping dnsResolver,
+ HashedWheelTimer timer,
+ boolean reorderReadsRandom,
+ int stabilizePeriodSeconds,
+ StatsLogger statsLogger) {
+ if (stabilizePeriodSeconds > 0) {
+ super.initialize(dnsResolver, timer, reorderReadsRandom, 0, statsLogger);
+ slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability);
+ slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, statsLogger);
+ } else {
+ super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, statsLogger);
+ slave = null;
}
-
+ return this;
}
- /**
- * A predicate checking the rack coverage for write quorum in {@link RoundRobinDistributionSchedule},
- * which ensures that a write quorum should be covered by at least two racks.
- */
- protected static class RRRackCoverageEnsemble implements Predicate, Ensemble {
-
- class QuorumCoverageSet {
- Set<String> racks = new HashSet<String>();
- int seenBookies = 0;
-
- boolean apply(BookieNode candidate) {
- if (seenBookies + 1 == writeQuorumSize) {
- return racks.size() > (racks.contains(candidate.getNetworkLocation()) ? 1 : 0);
- }
- return true;
- }
-
- void addBookie(BookieNode candidate) {
- ++seenBookies;
- racks.add(candidate.getNetworkLocation());
- }
- }
-
- final int ensembleSize;
- final int writeQuorumSize;
- final ArrayList<BookieNode> chosenNodes;
- private final QuorumCoverageSet[] quorums;
-
- protected RRRackCoverageEnsemble(int ensembleSize, int writeQuorumSize) {
- this.ensembleSize = ensembleSize;
- this.writeQuorumSize = writeQuorumSize;
- this.chosenNodes = new ArrayList<BookieNode>(ensembleSize);
- this.quorums = new QuorumCoverageSet[ensembleSize];
- }
-
- @Override
- public boolean apply(BookieNode candidate, Ensemble ensemble) {
- if (ensemble != this) {
- return false;
- }
- // candidate position
- int candidatePos = chosenNodes.size();
- int startPos = candidatePos - writeQuorumSize + 1;
- for (int i = startPos; i <= candidatePos; i++) {
- int idx = (i + ensembleSize) % ensembleSize;
- if (null == quorums[idx]) {
- quorums[idx] = new QuorumCoverageSet();
- }
- if (!quorums[idx].apply(candidate)) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public void addBookie(BookieNode node) {
- int candidatePos = chosenNodes.size();
- int startPos = candidatePos - writeQuorumSize + 1;
- for (int i = startPos; i <= candidatePos; i++) {
- int idx = (i + ensembleSize) % ensembleSize;
- if (null == quorums[idx]) {
- quorums[idx] = new QuorumCoverageSet();
- }
- quorums[idx].addBookie(node);
- }
- chosenNodes.add(node);
- }
-
- @Override
- public ArrayList<BookieSocketAddress> toList() {
- ArrayList<BookieSocketAddress> addresses = new ArrayList<BookieSocketAddress>(ensembleSize);
- for (BookieNode bn : chosenNodes) {
- addresses.add(bn.getAddr());
- }
- return addresses;
- }
-
- @Override
- public String toString() {
- return chosenNodes.toString();
+ @Override
+ public void uninitalize() {
+ super.uninitalize();
+ if (null != slave) {
+ slave.uninitalize();
}
-
}
- protected static class BookieNode implements Node {
-
- private final BookieSocketAddress addr; // identifier of a bookie node.
-
- private int level; // the level in topology tree
- private Node parent; // its parent in topology tree
- private String location = NetworkTopology.DEFAULT_RACK; // its network location
- private final String name;
-
- BookieNode(BookieSocketAddress addr, String networkLoc) {
- this.addr = addr;
- this.name = addr.toString();
- setNetworkLocation(networkLoc);
- }
-
- public BookieSocketAddress getAddr() {
- return addr;
- }
-
- @Override
- public int getLevel() {
- return level;
- }
-
- @Override
- public void setLevel(int level) {
- this.level = level;
- }
-
- @Override
- public Node getParent() {
- return parent;
- }
-
- @Override
- public void setParent(Node parent) {
- this.parent = parent;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public String getNetworkLocation() {
- return location;
- }
-
- @Override
- public void setNetworkLocation(String location) {
- this.location = location;
- }
-
- @Override
- public int hashCode() {
- return name.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof BookieNode)) {
- return false;
- }
- BookieNode other = (BookieNode) obj;
- return getName().equals(other.getName());
- }
-
- @Override
- public String toString() {
- return String.format("<Bookie:%s>", name);
+ @Override
+ public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies, Set<BookieSocketAddress> readOnlyBookies) {
+ Set<BookieSocketAddress> deadBookies = super.onClusterChanged(writableBookies, readOnlyBookies);
+ if (null != slave) {
+ deadBookies = slave.onClusterChanged(writableBookies, readOnlyBookies);
}
-
+ return deadBookies;
}
- static class DefaultResolver implements DNSToSwitchMapping {
-
- @Override
- public List<String> resolve(List<String> names) {
- List<String> rNames = new ArrayList<String>(names.size());
- for (@SuppressWarnings("unused") String name : names) {
- rNames.add(NetworkTopology.DEFAULT_RACK);
+ @Override
+ public ArrayList<BookieSocketAddress> newEnsemble(
+ int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Set<BookieSocketAddress> excludeBookies)
+ throws BKException.BKNotEnoughBookiesException {
+ try {
+ return super.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, excludeBookies);
+ } catch (BKException.BKNotEnoughBookiesException bnebe) {
+ if (slave == null) {
+ throw bnebe;
+ } else {
+ return slave.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, excludeBookies);
}
- return rNames;
}
-
- @Override
- public void reloadCachedMappings() {
- // nop
- }
-
- };
-
- // for now, we just maintain the writable bookies' topology
- private final NetworkTopology topology;
- private DNSToSwitchMapping dnsResolver;
- private final Map<BookieSocketAddress, BookieNode> knownBookies;
- private BookieNode localNode;
- private final ReentrantReadWriteLock rwLock;
-
- public RackawareEnsemblePlacementPolicy() {
- topology = new NetworkTopology();
- knownBookies = new HashMap<BookieSocketAddress, BookieNode>();
-
- rwLock = new ReentrantReadWriteLock();
- }
-
- private BookieNode createBookieNode(BookieSocketAddress addr) {
- return new BookieNode(addr, resolveNetworkLocation(addr));
}
@Override
- public EnsemblePlacementPolicy initialize(Configuration conf) {
- String dnsResolverName = conf.getString(REPP_DNS_RESOLVER_CLASS, ScriptBasedMapping.class.getName());
+ public BookieSocketAddress replaceBookie(
+ int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Collection<BookieSocketAddress> currentEnsemble,
+ BookieSocketAddress bookieToReplace,
+ Set<BookieSocketAddress> excludeBookies)
+ throws BKException.BKNotEnoughBookiesException {
try {
- dnsResolver = ReflectionUtils.newInstance(dnsResolverName, DNSToSwitchMapping.class);
- if (dnsResolver instanceof Configurable) {
- ((Configurable) dnsResolver).setConf(conf);
+ return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+ currentEnsemble, bookieToReplace, excludeBookies);
+ } catch (BKException.BKNotEnoughBookiesException bnebe) {
+ if (slave == null) {
+ throw bnebe;
+ } else {
+ return slave.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+ currentEnsemble, bookieToReplace, excludeBookies);
}
- } catch (RuntimeException re) {
- LOG.info("Failed to initialize DNS Resolver {}, used default subnet resolver.", dnsResolverName, re);
- dnsResolver = new DefaultResolver();
}
-
- BookieNode bn;
- try {
- bn = createBookieNode(new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), 0));
- } catch (UnknownHostException e) {
- LOG.error("Failed to get local host address : ", e);
- bn = null;
- }
- localNode = bn;
- LOG.info("Initialize rackaware ensemble placement policy @ {} : {}.", localNode,
- dnsResolver.getClass().getName());
- return this;
}
@Override
- public void uninitalize() {
- // do nothing
- }
-
- private String resolveNetworkLocation(BookieSocketAddress addr) {
- List<String> names = new ArrayList<String>(1);
- if (dnsResolver instanceof CachedDNSToSwitchMapping) {
- names.add(addr.getSocketAddress().getAddress().getHostAddress());
- } else {
- names.add(addr.getSocketAddress().getHostName());
- }
- // resolve network addresses
- List<String> rNames = dnsResolver.resolve(names);
- String netLoc;
- if (null == rNames) {
- LOG.warn("Failed to resolve network location for {}, using default rack for them : {}.", names,
- NetworkTopology.DEFAULT_RACK);
- netLoc = NetworkTopology.DEFAULT_RACK;
- } else {
- netLoc = rNames.get(0);
- }
- return netLoc;
+ public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble,
+ List<Integer> writeSet,
+ Map<BookieSocketAddress, Long> bookieFailureHistory) {
+ return super.reorderReadSequence(ensemble, writeSet, bookieFailureHistory);
}
@Override
- public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies,
- Set<BookieSocketAddress> readOnlyBookies) {
- rwLock.writeLock().lock();
- try {
- ImmutableSet<BookieSocketAddress> joinedBookies, leftBookies, deadBookies;
- Set<BookieSocketAddress> oldBookieSet = knownBookies.keySet();
- // left bookies : bookies in known bookies, but not in new writable bookie cluster.
- leftBookies = Sets.difference(oldBookieSet, writableBookies).immutableCopy();
- // joined bookies : bookies in new writable bookie cluster, but not in known bookies
- joinedBookies = Sets.difference(writableBookies, oldBookieSet).immutableCopy();
- // dead bookies.
- deadBookies = Sets.difference(leftBookies, readOnlyBookies).immutableCopy();
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Cluster changed : left bookies are {}, joined bookies are {}, while dead bookies are {}.",
- new Object[] { leftBookies, joinedBookies, deadBookies });
- }
-
- // node left
- for (BookieSocketAddress addr : leftBookies) {
- BookieNode node = knownBookies.remove(addr);
- topology.remove(node);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cluster changed : bookie {} left from cluster.", addr);
- }
- }
-
- // node joined
- for (BookieSocketAddress addr : joinedBookies) {
- BookieNode node = createBookieNode(addr);
- topology.add(node);
- knownBookies.put(addr, node);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cluster changed : bookie {} joined the cluster.", addr);
- }
- }
-
- return deadBookies;
- } finally {
- rwLock.writeLock().unlock();
- }
- }
-
- private Set<Node> convertBookiesToNodes(Set<BookieSocketAddress> excludeBookies) {
- Set<Node> nodes = new HashSet<Node>();
- for (BookieSocketAddress addr : excludeBookies) {
- BookieNode bn = knownBookies.get(addr);
- if (null == bn) {
- bn = createBookieNode(addr);
- }
- nodes.add(bn);
- }
- return nodes;
+ public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble,
+ List<Integer> writeSet,
+ Map<BookieSocketAddress, Long> bookieFailureHistory) {
+ return super.reorderReadLACSequence(ensemble, writeSet, bookieFailureHistory);
}
@Override
- public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
- Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
- rwLock.readLock().lock();
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Set<BookieSocketAddress> excludeBookies,
+ Ensemble<BookieNode> parentEnsemble,
+ Predicate<BookieNode> parentPredicate)
+ throws BKException.BKNotEnoughBookiesException {
try {
- Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
- RRRackCoverageEnsemble ensemble = new RRRackCoverageEnsemble(ensembleSize, writeQuorumSize);
- BookieNode prevNode = null;
- int numRacks = topology.getNumOfRacks();
- // only one rack, use the random algorithm.
- if (numRacks < 2) {
- List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes,
- EnsembleForReplacement.instance);
- ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>(ensembleSize);
- for (BookieNode bn : bns) {
- addrs.add(bn.addr);
- }
- return addrs;
+ return super.newEnsemble(
+ ensembleSize,
+ writeQuorumSize,
+ ackQuorumSize,
+ excludeBookies,
+ parentEnsemble,
+ parentPredicate);
+ } catch (BKException.BKNotEnoughBookiesException bnebe) {
+ if (slave == null) {
+ throw bnebe;
+ } else {
+ return slave.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
+ excludeBookies, parentEnsemble, parentPredicate);
}
- // pick nodes by racks, to ensure there is at least two racks per write quorum.
- for (int i = 0; i < ensembleSize; i++) {
- String curRack;
- if (null == prevNode) {
- if (null == localNode) {
- curRack = NodeBase.ROOT;
- } else {
- curRack = localNode.getNetworkLocation();
- }
- } else {
- curRack = "~" + prevNode.getNetworkLocation();
- }
- prevNode = selectFromRack(curRack, excludeNodes, ensemble, ensemble);
- }
- return ensemble.toList();
- } finally {
- rwLock.readLock().unlock();
}
}
@Override
- public BookieSocketAddress replaceBookie(BookieSocketAddress bookieToReplace,
- Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
- rwLock.readLock().lock();
+ public BookieNode selectFromNetworkLocation(
+ String networkLoc,
+ Set<Node> excludeBookies,
+ Predicate<BookieNode> predicate,
+ Ensemble<BookieNode> ensemble)
+ throws BKException.BKNotEnoughBookiesException {
try {
- BookieNode bn = knownBookies.get(bookieToReplace);
- if (null == bn) {
- bn = createBookieNode(bookieToReplace);
- }
-
- Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
- // add the bookie to replace in exclude set
- excludeNodes.add(bn);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Try to choose a new bookie to replace {}, excluding {}.", bookieToReplace,
- excludeNodes);
+ return super.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble);
+ } catch (BKException.BKNotEnoughBookiesException bnebe) {
+ if (slave == null) {
+ throw bnebe;
+ } else {
+ return slave.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble);
}
- // pick a candidate from same rack to replace
- BookieNode candidate = selectFromRack(bn.getNetworkLocation(), excludeNodes,
- TruePredicate.instance, EnsembleForReplacement.instance);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bn);
- }
- return candidate.addr;
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
- protected BookieNode selectFromRack(String networkLoc, Set<Node> excludeBookies, Predicate predicate,
- Ensemble ensemble) throws BKNotEnoughBookiesException {
- // select one from local rack
- try {
- return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble);
- } catch (BKNotEnoughBookiesException e) {
- LOG.warn("Failed to choose a bookie from {} : "
- + "excluded {}, fallback to choose bookie randomly from the cluster.",
- networkLoc, excludeBookies);
- // randomly choose one from whole cluster, ignore the provided predicate.
- return selectRandom(1, excludeBookies, ensemble).get(0);
}
}
- protected String getRemoteRack(BookieNode node) {
- return "~" + node.getNetworkLocation();
- }
-
- /**
- * Choose random node under a given network path.
- *
- * @param netPath
- * network path
- * @param excludeBookies
- * exclude bookies
- * @param predicate
- * predicate to check whether the target is a good target.
- * @param ensemble
- * ensemble structure
- * @return chosen bookie.
- */
- protected BookieNode selectRandomFromRack(String netPath, Set<Node> excludeBookies, Predicate predicate,
- Ensemble ensemble) throws BKNotEnoughBookiesException {
- List<Node> leaves = new ArrayList<Node>(topology.getLeaves(netPath));
- Collections.shuffle(leaves);
- for (Node n : leaves) {
- if (excludeBookies.contains(n)) {
- continue;
- }
- if (!(n instanceof BookieNode) || !predicate.apply((BookieNode) n, ensemble)) {
- continue;
- }
- BookieNode bn = (BookieNode) n;
- // got a good candidate
- ensemble.addBookie(bn);
- // add the candidate to exclude set
- excludeBookies.add(bn);
- return bn;
+ @Override
+ public void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies) {
+ super.handleBookiesThatLeft(leftBookies);
+ if (null != slave) {
+ slave.handleBookiesThatLeft(leftBookies);
}
- throw new BKNotEnoughBookiesException();
}
- /**
- * Choose a random node from whole cluster.
- *
- * @param numBookies
- * number bookies to choose
- * @param excludeBookies
- * bookies set to exclude.
- * @param ensemble
- * ensemble to hold the bookie chosen.
- * @return the bookie node chosen.
- * @throws BKNotEnoughBookiesException
- */
- protected List<BookieNode> selectRandom(int numBookies, Set<Node> excludeBookies, Ensemble ensemble)
- throws BKNotEnoughBookiesException {
- List<BookieNode> allBookies = new ArrayList<BookieNode>(knownBookies.values());
- Collections.shuffle(allBookies);
- List<BookieNode> newBookies = new ArrayList<BookieNode>(numBookies);
- for (BookieNode bookie : allBookies) {
- if (excludeBookies.contains(bookie)) {
- continue;
- }
- ensemble.addBookie(bookie);
- excludeBookies.add(bookie);
- newBookies.add(bookie);
- --numBookies;
- if (numBookies == 0) {
- return newBookies;
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to find {} bookies : excludeBookies {}, allBookies {}.", new Object[] {
- numBookies, excludeBookies, allBookies });
+ @Override
+ public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
+ super.handleBookiesThatJoined(joinedBookies);
+ if (null != slave) {
+ slave.handleBookiesThatJoined(joinedBookies);
}
- throw new BKNotEnoughBookiesException();
}
-
}