You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/10/13 05:51:08 UTC
[4/5] bookkeeper git commit: BOOKKEEPER-612: Region aware placement
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
new file mode 100644
index 0000000..3c41a7c
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.Configurable;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.net.NetUtils;
+import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.NetworkTopologyImpl;
+import org.apache.bookkeeper.net.Node;
+import org.apache.bookkeeper.net.NodeBase;
+import org.apache.bookkeeper.net.ScriptBasedMapping;
+import org.apache.bookkeeper.net.StabilizeNetworkTopology;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+/**
+ * Simple rackware ensemble placement policy.
+ *
+ * Make most of the class and methods as protected, so it could be extended to implement other algorithms.
+ */
+class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacementPolicy {
+
+ static final Logger LOG = LoggerFactory.getLogger(RackawareEnsemblePlacementPolicyImpl.class);
+
+ public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass";
+ public static final String REPP_RANDOM_READ_REORDERING = "ensembleRandomReadReordering";
+
+ static final int RACKNAME_DISTANCE_FROM_LEAVES = 1;
+
+ static class DefaultResolver implements DNSToSwitchMapping {
+
+ @Override
+ public List<String> resolve(List<String> names) {
+ List<String> rNames = new ArrayList<String>(names.size());
+ for (@SuppressWarnings("unused") String name : names) {
+ rNames.add(NetworkTopology.DEFAULT_RACK);
+ }
+ return rNames;
+ }
+
+ @Override
+ public void reloadCachedMappings() {
+ // nop
+ }
+
+ }
+
+ // for now, we just maintain the writable bookies' topology
+ protected NetworkTopology topology;
+ protected DNSToSwitchMapping dnsResolver;
+ protected HashedWheelTimer timer;
+ protected final Map<BookieSocketAddress, BookieNode> knownBookies;
+ protected BookieNode localNode;
+ protected final ReentrantReadWriteLock rwLock;
+ protected ImmutableSet<BookieSocketAddress> readOnlyBookies = null;
+ protected boolean reorderReadsRandom = false;
+ protected boolean enforceDurability = false;
+ protected int stabilizePeriodSeconds = 0;
+ protected StatsLogger statsLogger = null;
+
+ RackawareEnsemblePlacementPolicyImpl() {
+ this(false);
+ }
+
+ RackawareEnsemblePlacementPolicyImpl(boolean enforceDurability) {
+ this.enforceDurability = enforceDurability;
+ topology = new NetworkTopologyImpl();
+ knownBookies = new HashMap<BookieSocketAddress, BookieNode>();
+
+ rwLock = new ReentrantReadWriteLock();
+ }
+
+ protected BookieNode createBookieNode(BookieSocketAddress addr) {
+ return new BookieNode(addr, resolveNetworkLocation(addr));
+ }
+
+ /**
+ * Initialize the policy.
+ *
+ * @param dnsResolver the object used to resolve addresses to their network address
+ * @return initialized ensemble placement policy
+ */
+ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dnsResolver,
+ HashedWheelTimer timer,
+ boolean reorderReadsRandom,
+ int stabilizePeriodSeconds,
+ StatsLogger statsLogger) {
+ this.statsLogger = statsLogger;
+ this.reorderReadsRandom = reorderReadsRandom;
+ this.stabilizePeriodSeconds = stabilizePeriodSeconds;
+ this.dnsResolver = dnsResolver;
+ this.timer = timer;
+
+ // create the network topology
+ if (stabilizePeriodSeconds > 0) {
+ this.topology = new StabilizeNetworkTopology(timer, stabilizePeriodSeconds);
+ } else {
+ this.topology = new NetworkTopologyImpl();
+ }
+
+ BookieNode bn;
+ try {
+ bn = createBookieNode(new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), 0));
+ } catch (UnknownHostException e) {
+ LOG.error("Failed to get local host address : ", e);
+ bn = null;
+ }
+ localNode = bn;
+ LOG.info("Initialize rackaware ensemble placement policy @ {} @ {} : {}.",
+ new Object[] { localNode, null == localNode ? "Unknown" : localNode.getNetworkLocation(),
+ dnsResolver.getClass().getName() });
+ return this;
+ }
+
+ @Override
+ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
+ Optional<DNSToSwitchMapping> optionalDnsResolver,
+ HashedWheelTimer timer,
+ FeatureProvider featureProvider,
+ StatsLogger statsLogger) {
+ DNSToSwitchMapping dnsResolver;
+ if (optionalDnsResolver.isPresent()) {
+ dnsResolver = optionalDnsResolver.get();
+ } else {
+ String dnsResolverName = conf.getString(REPP_DNS_RESOLVER_CLASS, ScriptBasedMapping.class.getName());
+ try {
+ dnsResolver = ReflectionUtils.newInstance(dnsResolverName, DNSToSwitchMapping.class);
+ if (dnsResolver instanceof Configurable) {
+ ((Configurable) dnsResolver).setConf(conf);
+ }
+ } catch (RuntimeException re) {
+ LOG.info("Failed to initialize DNS Resolver {}, used default subnet resolver.", dnsResolverName, re);
+ dnsResolver = new DefaultResolver();
+ }
+ }
+ return initialize(
+ dnsResolver,
+ timer,
+ conf.getBoolean(REPP_RANDOM_READ_REORDERING, false),
+ conf.getNetworkTopologyStabilizePeriodSeconds(),
+ statsLogger);
+ }
+
+ @Override
+ public void uninitalize() {
+ // do nothing
+ }
+
+ protected String resolveNetworkLocation(BookieSocketAddress addr) {
+ return NetUtils.resolveNetworkLocation(dnsResolver, addr.getSocketAddress());
+ }
+
+ @Override
+ public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies,
+ Set<BookieSocketAddress> readOnlyBookies) {
+ rwLock.writeLock().lock();
+ try {
+ ImmutableSet<BookieSocketAddress> joinedBookies, leftBookies, deadBookies;
+ Set<BookieSocketAddress> oldBookieSet = knownBookies.keySet();
+ // left bookies : bookies in known bookies, but not in new writable bookie cluster.
+ leftBookies = Sets.difference(oldBookieSet, writableBookies).immutableCopy();
+ // joined bookies : bookies in new writable bookie cluster, but not in known bookies
+ joinedBookies = Sets.difference(writableBookies, oldBookieSet).immutableCopy();
+ // dead bookies.
+ deadBookies = Sets.difference(leftBookies, readOnlyBookies).immutableCopy();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Cluster changed : left bookies are {}, joined bookies are {}, while dead bookies are {}.",
+ new Object[] { leftBookies, joinedBookies, deadBookies });
+ }
+ handleBookiesThatLeft(leftBookies);
+ handleBookiesThatJoined(joinedBookies);
+
+ if (!readOnlyBookies.isEmpty()) {
+ this.readOnlyBookies = ImmutableSet.copyOf(readOnlyBookies);
+ }
+
+ return deadBookies;
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies) {
+ for (BookieSocketAddress addr : leftBookies) {
+ BookieNode node = knownBookies.remove(addr);
+ if(null != node) {
+ topology.remove(node);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cluster changed : bookie {} left from cluster.", addr);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
+ // node joined
+ for (BookieSocketAddress addr : joinedBookies) {
+ BookieNode node = createBookieNode(addr);
+ topology.add(node);
+ knownBookies.put(addr, node);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cluster changed : bookie {} joined the cluster.", addr);
+ }
+ }
+ }
+
+ protected Set<Node> convertBookiesToNodes(Set<BookieSocketAddress> excludeBookies) {
+ Set<Node> nodes = new HashSet<Node>();
+ for (BookieSocketAddress addr : excludeBookies) {
+ BookieNode bn = knownBookies.get(addr);
+ if (null == bn) {
+ bn = createBookieNode(addr);
+ }
+ nodes.add(bn);
+ }
+ return nodes;
+ }
+
+ @Override
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+ Set<BookieSocketAddress> excludeBookies)
+ throws BKNotEnoughBookiesException {
+ return newEnsembleInternal(ensembleSize, writeQuorumSize, excludeBookies, null, null);
+ }
+
+ protected ArrayList<BookieSocketAddress> newEnsembleInternal(int ensembleSize,
+ int writeQuorumSize,
+ Set<BookieSocketAddress> excludeBookies,
+ Ensemble<BookieNode> parentEnsemble,
+ Predicate<BookieNode> parentPredicate)
+ throws BKNotEnoughBookiesException {
+ return newEnsembleInternal(
+ ensembleSize,
+ writeQuorumSize,
+ writeQuorumSize,
+ excludeBookies,
+ parentEnsemble,
+ parentPredicate);
+ }
+
+ @Override
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Set<BookieSocketAddress> excludeBookies,
+ Ensemble<BookieNode> parentEnsemble,
+ Predicate<BookieNode> parentPredicate)
+ throws BKNotEnoughBookiesException {
+ return newEnsembleInternal(
+ ensembleSize,
+ writeQuorumSize,
+ ackQuorumSize,
+ excludeBookies,
+ parentEnsemble,
+ parentPredicate);
+ }
+
+ protected ArrayList<BookieSocketAddress> newEnsembleInternal(
+ int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ Set<BookieSocketAddress> excludeBookies,
+ Ensemble<BookieNode> parentEnsemble,
+ Predicate<BookieNode> parentPredicate) throws BKNotEnoughBookiesException {
+ rwLock.readLock().lock();
+ try {
+ Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+ RRTopologyAwareCoverageEnsemble ensemble =
+ new RRTopologyAwareCoverageEnsemble(
+ ensembleSize,
+ writeQuorumSize,
+ ackQuorumSize,
+ RACKNAME_DISTANCE_FROM_LEAVES,
+ parentEnsemble,
+ parentPredicate);
+ BookieNode prevNode = null;
+ int numRacks = topology.getNumOfRacks();
+ // only one rack, use the random algorithm.
+ if (numRacks < 2) {
+ List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes, TruePredicate.instance,
+ ensemble);
+ ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>(ensembleSize);
+ for (BookieNode bn : bns) {
+ addrs.add(bn.getAddr());
+ }
+ return addrs;
+ }
+ // pick nodes by racks, to ensure there is at least two racks per write quorum.
+ for (int i = 0; i < ensembleSize; i++) {
+ String curRack;
+ if (null == prevNode) {
+ if ((null == localNode) ||
+ localNode.getNetworkLocation().equals(NetworkTopology.DEFAULT_RACK)) {
+ curRack = NodeBase.ROOT;
+ } else {
+ curRack = localNode.getNetworkLocation();
+ }
+ } else {
+ curRack = "~" + prevNode.getNetworkLocation();
+ }
+ prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble);
+ }
+ ArrayList<BookieSocketAddress> bookieList = ensemble.toList();
+ if (ensembleSize != bookieList.size()) {
+ LOG.error("Not enough {} bookies are available to form an ensemble : {}.",
+ ensembleSize, bookieList);
+ throw new BKNotEnoughBookiesException();
+ }
+ return bookieList;
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+ Collection<BookieSocketAddress> currentEnsemble,
+ BookieSocketAddress bookieToReplace,
+ Set<BookieSocketAddress> excludeBookies)
+ throws BKNotEnoughBookiesException {
+ rwLock.readLock().lock();
+ try {
+ excludeBookies.addAll(currentEnsemble);
+ BookieNode bn = knownBookies.get(bookieToReplace);
+ if (null == bn) {
+ bn = createBookieNode(bookieToReplace);
+ }
+
+ Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+ // add the bookie to replace in exclude set
+ excludeNodes.add(bn);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Try to choose a new bookie to replace {}, excluding {}.", bookieToReplace,
+ excludeNodes);
+ }
+ // pick a candidate from same rack to replace
+ BookieNode candidate = selectFromNetworkLocation(
+ bn.getNetworkLocation(),
+ excludeNodes,
+ TruePredicate.instance,
+ EnsembleForReplacementWithNoConstraints.instance);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bn);
+ }
+ return candidate.getAddr();
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public BookieNode selectFromNetworkLocation(
+ String networkLoc,
+ Set<Node> excludeBookies,
+ Predicate<BookieNode> predicate,
+ Ensemble<BookieNode> ensemble)
+ throws BKNotEnoughBookiesException {
+ // select one from local rack
+ try {
+ return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble);
+ } catch (BKNotEnoughBookiesException e) {
+ LOG.warn("Failed to choose a bookie from {} : "
+ + "excluded {}, fallback to choose bookie randomly from the cluster.",
+ networkLoc, excludeBookies);
+ // randomly choose one from whole cluster, ignore the provided predicate.
+ return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
+ }
+ }
+
+ protected String getRemoteRack(BookieNode node) {
+ return "~" + node.getNetworkLocation();
+ }
+
+ /**
+ * Choose random node under a given network path.
+ *
+ * @param netPath
+ * network path
+ * @param excludeBookies
+ * exclude bookies
+ * @param predicate
+ * predicate to check whether the target is a good target.
+ * @param ensemble
+ * ensemble structure
+ * @return chosen bookie.
+ */
+ protected BookieNode selectRandomFromRack(String netPath, Set<Node> excludeBookies, Predicate<BookieNode> predicate,
+ Ensemble<BookieNode> ensemble) throws BKNotEnoughBookiesException {
+ List<Node> leaves = new ArrayList<Node>(topology.getLeaves(netPath));
+ Collections.shuffle(leaves);
+ for (Node n : leaves) {
+ if (excludeBookies.contains(n)) {
+ continue;
+ }
+ if (!(n instanceof BookieNode) || !predicate.apply((BookieNode) n, ensemble)) {
+ continue;
+ }
+ BookieNode bn = (BookieNode) n;
+ // got a good candidate
+ if (ensemble.addNode(bn)) {
+ // add the candidate to exclude set
+ excludeBookies.add(bn);
+ }
+ return bn;
+ }
+ throw new BKNotEnoughBookiesException();
+ }
+
+ /**
+ * Choose a random node from whole cluster.
+ *
+ * @param numBookies
+ * number bookies to choose
+ * @param excludeBookies
+ * bookies set to exclude.
+ * @param ensemble
+ * ensemble to hold the bookie chosen.
+ * @return the bookie node chosen.
+ * @throws BKNotEnoughBookiesException
+ */
+ protected List<BookieNode> selectRandom(int numBookies,
+ Set<Node> excludeBookies,
+ Predicate<BookieNode> predicate,
+ Ensemble<BookieNode> ensemble)
+ throws BKNotEnoughBookiesException {
+ return selectRandomInternal(new ArrayList<BookieNode>(knownBookies.values()), numBookies, excludeBookies, predicate, ensemble);
+ }
+
+ protected List<BookieNode> selectRandomInternal(List<BookieNode> bookiesToSelectFrom,
+ int numBookies,
+ Set<Node> excludeBookies,
+ Predicate<BookieNode> predicate,
+ Ensemble<BookieNode> ensemble)
+ throws BKNotEnoughBookiesException {
+ Collections.shuffle(bookiesToSelectFrom);
+ List<BookieNode> newBookies = new ArrayList<BookieNode>(numBookies);
+ for (BookieNode bookie : bookiesToSelectFrom) {
+ if (excludeBookies.contains(bookie)) {
+ continue;
+ }
+
+ // When durability is being enforced; we must not violate the
+ // predicate even when selecting a random bookie; as durability
+ // guarantee is not best effort; correctness is implied by it
+ if (enforceDurability && !predicate.apply(bookie, ensemble)) {
+ continue;
+ }
+
+ if (ensemble.addNode(bookie)) {
+ excludeBookies.add(bookie);
+ newBookies.add(bookie);
+ --numBookies;
+ }
+
+ if (numBookies == 0) {
+ return newBookies;
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to find {} bookies : excludeBookies {}, allBookies {}.", new Object[] {
+ numBookies, excludeBookies, bookiesToSelectFrom });
+ }
+ throw new BKNotEnoughBookiesException();
+ }
+
+
+
+ @Override
+ public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+ int ensembleSize = ensemble.size();
+ List<Integer> finalList = new ArrayList<Integer>(writeSet.size());
+ List<Long> observedFailuresList = new ArrayList<Long>(writeSet.size());
+ List<Integer> readOnlyList = new ArrayList<Integer>(writeSet.size());
+ List<Integer> unAvailableList = new ArrayList<Integer>(writeSet.size());
+ for (Integer idx : writeSet) {
+ BookieSocketAddress address = ensemble.get(idx);
+ Long lastFailedEntryOnBookie = bookieFailureHistory.get(address);
+ if (null == knownBookies.get(address)) {
+ // there isn't too much differences between readonly bookies from unavailable bookies. since there
+ // is no write requests to them, so we shouldn't try reading from readonly bookie in prior to writable
+ // bookies.
+ if ((null == readOnlyBookies) || !readOnlyBookies.contains(address)) {
+ unAvailableList.add(idx);
+ } else {
+ readOnlyList.add(idx);
+ }
+ } else {
+ if ((lastFailedEntryOnBookie == null) || (lastFailedEntryOnBookie < 0)) {
+ finalList.add(idx);
+ } else {
+ observedFailuresList.add(lastFailedEntryOnBookie * ensembleSize + idx);
+ }
+ }
+ }
+
+ if (reorderReadsRandom) {
+ Collections.shuffle(finalList);
+ Collections.shuffle(readOnlyList);
+ Collections.shuffle(unAvailableList);
+ }
+
+ Collections.sort(observedFailuresList);
+
+ for(long value: observedFailuresList) {
+ finalList.add((int)(value % ensembleSize));
+ }
+
+ finalList.addAll(readOnlyList);
+ finalList.addAll(unAvailableList);
+ return finalList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
new file mode 100644
index 0000000..abdcb61
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.base.Optional;
+
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.Node;
+import org.apache.bookkeeper.net.NodeBase;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.commons.lang3.tuple.Pair;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicy {
+ static final Logger LOG = LoggerFactory.getLogger(RegionAwareEnsemblePlacementPolicy.class);
+
+ public static final String REPP_REGIONS_TO_WRITE = "reppRegionsToWrite";
+ public static final String REPP_MINIMUM_REGIONS_FOR_DURABILITY = "reppMinimumRegionsForDurability";
+ public static final String REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE = "reppEnableDurabilityEnforcementInReplace";
+ public static final String REPP_DISABLE_DURABILITY_FEATURE_NAME = "reppDisableDurabilityFeatureName";
+ public static final String REPP_DISALLOW_BOOKIE_PLACEMENT_IN_REGION_FEATURE_NAME = "reppDisallowBookiePlacementInRegionFeatureName";
+ public static final String REPP_DISABLE_DURABILITY_ENFORCEMENT_FEATURE = "reppDisableDurabilityEnforcementFeature";
+ public static final String REPP_ENABLE_VALIDATION = "reppEnableValidation";
+ public static final String REGION_AWARE_ANOMALOUS_ENSEMBLE = "region_aware_anomalous_ensemble";
+ static final int MINIMUM_REGIONS_FOR_DURABILITY_DEFAULT = 2;
+ static final int REGIONID_DISTANCE_FROM_LEAVES = 2;
+ static final String UNKNOWN_REGION = "UnknownRegion";
+ static final int REMOTE_NODE_IN_REORDER_SEQUENCE = 2;
+
+ protected final Map<String, TopologyAwareEnsemblePlacementPolicy> perRegionPlacement;
+ protected final ConcurrentMap<BookieSocketAddress, String> address2Region;
+ protected FeatureProvider featureProvider;
+ protected String disallowBookiePlacementInRegionFeatureName;
+ protected String myRegion = null;
+ protected int minRegionsForDurability = 0;
+ protected boolean enableValidation = true;
+ protected boolean enforceDurabilityInReplace = false;
+ protected Feature disableDurabilityFeature;
+
+ RegionAwareEnsemblePlacementPolicy() {
+ super();
+ perRegionPlacement = new HashMap<String, TopologyAwareEnsemblePlacementPolicy>();
+ address2Region = new ConcurrentHashMap<BookieSocketAddress, String>();
+ }
+
+ protected String getRegion(BookieSocketAddress addr) {
+ String region = address2Region.get(addr);
+ if (null == region) {
+ String networkLocation = resolveNetworkLocation(addr);
+ if (NetworkTopology.DEFAULT_RACK.equals(networkLocation)) {
+ region = UNKNOWN_REGION;
+ } else {
+ String[] parts = networkLocation.split(NodeBase.PATH_SEPARATOR_STR);
+ if (parts.length <= 1) {
+ region = UNKNOWN_REGION;
+ } else {
+ region = parts[1];
+ }
+ }
+ address2Region.putIfAbsent(addr, region);
+ }
+ return region;
+ }
+
+ protected String getLocalRegion(BookieNode node) {
+ if (null == node || null == node.getAddr()) {
+ return UNKNOWN_REGION;
+ }
+ return getRegion(node.getAddr());
+ }
+
+ @Override
+ public void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies) {
+ super.handleBookiesThatLeft(leftBookies);
+
+ for(TopologyAwareEnsemblePlacementPolicy policy: perRegionPlacement.values()) {
+ policy.handleBookiesThatLeft(leftBookies);
+ }
+ }
+
+ @Override
+ public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
+ Map<String, Set<BookieSocketAddress>> perRegionClusterChange = new HashMap<String, Set<BookieSocketAddress>>();
+
+ // node joined
+ for (BookieSocketAddress addr : joinedBookies) {
+ BookieNode node = createBookieNode(addr);
+ topology.add(node);
+ knownBookies.put(addr, node);
+ String region = getLocalRegion(node);
+ if (null == perRegionPlacement.get(region)) {
+ perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy()
+ .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds, statsLogger));
+ }
+
+ Set<BookieSocketAddress> regionSet = perRegionClusterChange.get(region);
+ if (null == regionSet) {
+ regionSet = new HashSet<BookieSocketAddress>();
+ regionSet.add(addr);
+ perRegionClusterChange.put(region, regionSet);
+ } else {
+ regionSet.add(addr);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cluster changed : bookie {} joined the cluster.", addr);
+ }
+ }
+
+ for(String region: perRegionPlacement.keySet()) {
+ Set<BookieSocketAddress> regionSet = perRegionClusterChange.get(region);
+ if (null == regionSet) {
+ regionSet = new HashSet<BookieSocketAddress>();
+ }
+ perRegionPlacement.get(region).handleBookiesThatJoined(regionSet);
+ }
+ }
+
+ @Override
+ public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf,
+ Optional<DNSToSwitchMapping> optionalDnsResolver,
+ HashedWheelTimer timer,
+ FeatureProvider featureProvider,
+ StatsLogger statsLogger) {
+ super.initialize(conf, optionalDnsResolver, timer, featureProvider, statsLogger);
+ myRegion = getLocalRegion(localNode);
+ enableValidation = conf.getBoolean(REPP_ENABLE_VALIDATION, true);
+
+ // We have to statically provide regions we want the writes to go through and how many regions
+ // are required for durability. This decision cannot be driven by the active bookies as the
+ // current topology will not be indicative of constraints that must be enforced for durability
+ String regionsString = conf.getString(REPP_REGIONS_TO_WRITE, null);
+ if (null != regionsString) {
+ // Regions are specified as
+ // R1;R2;...
+ String[] regions = regionsString.split(";");
+ for (String region: regions) {
+ perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy(true)
+ .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds, statsLogger));
+ }
+ minRegionsForDurability = conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY, MINIMUM_REGIONS_FOR_DURABILITY_DEFAULT);
+ if (minRegionsForDurability > 0) {
+ enforceDurability = true;
+ enforceDurabilityInReplace = conf.getBoolean(REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE, true);
+ }
+ if (regions.length < minRegionsForDurability) {
+ throw new IllegalArgumentException("Regions provided are insufficient to meet the durability constraints");
+ }
+ }
+ this.featureProvider = featureProvider;
+ this.disallowBookiePlacementInRegionFeatureName = conf.getString(REPP_DISALLOW_BOOKIE_PLACEMENT_IN_REGION_FEATURE_NAME);
+ this.disableDurabilityFeature = conf.getFeature(REPP_DISABLE_DURABILITY_ENFORCEMENT_FEATURE, null);
+ if (null == disableDurabilityFeature) {
+ this.disableDurabilityFeature =
+ featureProvider.getFeature(
+ conf.getString(REPP_DISABLE_DURABILITY_FEATURE_NAME,
+ BookKeeperConstants.FEATURE_REPP_DISABLE_DURABILITY_ENFORCEMENT));
+ }
+ return this;
+ }
+
+ protected List<BookieNode> selectRandomFromRegions(Set<String> availableRegions,
+ int numBookies,
+ Set<Node> excludeBookies,
+ Predicate<BookieNode> predicate,
+ Ensemble<BookieNode> ensemble)
+ throws BKException.BKNotEnoughBookiesException {
+ List<BookieNode> availableBookies = new ArrayList<BookieNode>();
+ for(BookieNode bookieNode: knownBookies.values()) {
+ if (availableRegions.contains(getLocalRegion(bookieNode))) {
+ availableBookies.add(bookieNode);
+ }
+ }
+
+ return selectRandomInternal(availableBookies, numBookies, excludeBookies, predicate, ensemble);
+ }
+
+
+ @Override
+ public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+ Set<BookieSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException {
+
+ int effectiveMinRegionsForDurability = disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
+
+ // All of these conditions indicate bad configuration
+ if (ackQuorumSize < effectiveMinRegionsForDurability) {
+ throw new IllegalArgumentException("Ack Quorum size provided are insufficient to meet the durability constraints");
+ } else if (ensembleSize < writeQuorumSize) {
+ throw new IllegalArgumentException("write quorum (" + writeQuorumSize + ") cannot exceed ensemble size (" + ensembleSize + ")");
+ } else if (writeQuorumSize < ackQuorumSize) {
+ throw new IllegalArgumentException("ack quorum (" + ackQuorumSize + ") cannot exceed write quorum size (" + writeQuorumSize + ")");
+ } else if (effectiveMinRegionsForDurability > 0) {
+ // We must survive the failure of numRegions - effectiveMinRegionsForDurability. When these
+ // regions have failed we would spread the replicas over the remaining
+ // effectiveMinRegionsForDurability regions; we have to make sure that the ack quorum is large
+ // enough such that there is a configuration for spreading the replicas across
+ // effectiveMinRegionsForDurability - 1 regions
+ if (ackQuorumSize <= (writeQuorumSize - (writeQuorumSize / effectiveMinRegionsForDurability))) {
+ throw new IllegalArgumentException("ack quorum (" + ackQuorumSize + ") " +
+ "violates the requirement to satisfy durability constraints when running in degraded mode");
+ }
+ }
+
+ rwLock.readLock().lock();
+ try {
+ Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+ Set<String> availableRegions = new HashSet<String>();
+ for (String region: perRegionPlacement.keySet()) {
+ if ((null == disallowBookiePlacementInRegionFeatureName) ||
+ !featureProvider.scope(region).getFeature(disallowBookiePlacementInRegionFeatureName).isAvailable()) {
+ availableRegions.add(region);
+ }
+ }
+ int numRegionsAvailable = availableRegions.size();
+
+ // If we were unable to get region information or all regions are disallowed which is
+ // an invalid configuration; default to random selection from the set of nodes
+ if (numRegionsAvailable < 1) {
+ // We cant disallow all regions; if we did, raise an alert to draw attention
+ if (perRegionPlacement.keySet().size() >= 1) {
+ LOG.error("No regions available, invalid configuration");
+ }
+ List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes, TruePredicate.instance,
+ EnsembleForReplacementWithNoConstraints.instance);
+ ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>(ensembleSize);
+ for (BookieNode bn : bns) {
+ addrs.add(bn.getAddr());
+ }
+ return addrs;
+ }
+
+ // Single region, fall back to RackAwareEnsemblePlacement
+ if (numRegionsAvailable < 2) {
+ RRTopologyAwareCoverageEnsemble ensemble = new RRTopologyAwareCoverageEnsemble(ensembleSize,
+ writeQuorumSize,
+ ackQuorumSize,
+ REGIONID_DISTANCE_FROM_LEAVES,
+ effectiveMinRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null,
+ effectiveMinRegionsForDurability);
+ TopologyAwareEnsemblePlacementPolicy nextPolicy = perRegionPlacement.get(availableRegions.iterator().next());
+ return nextPolicy.newEnsemble(ensembleSize, writeQuorumSize, writeQuorumSize, excludeBookies, ensemble, ensemble);
+ }
+
+ int remainingEnsemble = ensembleSize;
+ int remainingWriteQuorum = writeQuorumSize;
+
+ // Equally distribute the nodes across all regions to whatever extent possible
+ // with the hierarchy in mind
+ // Try and place as many nodes in a region as possible, the ones that cannot be
+ // accommodated are placed on other regions
+ // Within each region try and follow rack aware placement
+ Map<String, Pair<Integer,Integer>> regionsWiseAllocation = new HashMap<String, Pair<Integer,Integer>>();
+ for (String region: availableRegions) {
+ regionsWiseAllocation.put(region, Pair.of(0,0));
+ }
+ int remainingEnsembleBeforeIteration;
+ Set<String> regionsReachedMaxAllocation = new HashSet<String>();
+ RRTopologyAwareCoverageEnsemble ensemble;
+ int iteration = 0;
+ do {
+ LOG.info("RegionAwareEnsemblePlacementPolicy#newEnsemble Iteration {}", iteration++);
+ int numRemainingRegions = numRegionsAvailable - regionsReachedMaxAllocation.size();
+ ensemble = new RRTopologyAwareCoverageEnsemble(ensembleSize,
+ writeQuorumSize,
+ ackQuorumSize,
+ REGIONID_DISTANCE_FROM_LEAVES,
+ // We pass all regions we know off to the coverage ensemble as
+ // regardless of regions that are available; constraints are
+ // always applied based on all possible regions
+ effectiveMinRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null,
+ effectiveMinRegionsForDurability);
+ remainingEnsembleBeforeIteration = remainingEnsemble;
+ for (Map.Entry<String, Pair<Integer, Integer>> regionEntry: regionsWiseAllocation.entrySet()) {
+ String region = regionEntry.getKey();
+ final Pair<Integer, Integer> currentAllocation = regionEntry.getValue();
+ TopologyAwareEnsemblePlacementPolicy policyWithinRegion = perRegionPlacement.get(region);
+ if (!regionsReachedMaxAllocation.contains(region)) {
+ if (numRemainingRegions <= 0) {
+ LOG.error("Inconsistent State: This should never happen");
+ throw new BKException.BKNotEnoughBookiesException();
+ }
+
+ int addToEnsembleSize = Math.min(remainingEnsemble, (remainingEnsembleBeforeIteration + numRemainingRegions - 1) / numRemainingRegions);
+ boolean success = false;
+ while(addToEnsembleSize > 0) {
+ int addToWriteQuorum = Math.max(1, Math.min(remainingWriteQuorum, Math.round(1.0f * writeQuorumSize * addToEnsembleSize / ensembleSize)));
+
+ // Temp ensemble will be merged back into the ensemble only if we are able to successfully allocate
+ // the target number of bookies in this region; if we fail because we dont have enough bookies; then we
+ // retry the process with a smaller target
+ RRTopologyAwareCoverageEnsemble tempEnsemble = new RRTopologyAwareCoverageEnsemble(ensemble);
+ int newEnsembleSize = currentAllocation.getLeft() + addToEnsembleSize;
+ int newWriteQuorumSize = currentAllocation.getRight() + addToWriteQuorum;
+ try {
+ policyWithinRegion.newEnsemble(newEnsembleSize, newWriteQuorumSize, newWriteQuorumSize, excludeBookies, tempEnsemble, tempEnsemble);
+ ensemble = tempEnsemble;
+ remainingEnsemble -= addToEnsembleSize;
+ remainingWriteQuorum -= writeQuorumSize;
+ regionsWiseAllocation.put(region, Pair.of(newEnsembleSize, newWriteQuorumSize));
+ success = true;
+ LOG.info("Allocated {} bookies in region {} : {}",
+ new Object[]{newEnsembleSize, region, ensemble});
+ break;
+ } catch (BKException.BKNotEnoughBookiesException exc) {
+ LOG.warn("Could not allocate {} bookies in region {}, try allocating {} bookies",
+ new Object[] {newEnsembleSize, region, (newEnsembleSize - 1) });
+ addToEnsembleSize--;
+ }
+ }
+
+ // we couldn't allocate additional bookies from the region,
+ // it should have reached its max allocation.
+ if (!success) {
+ regionsReachedMaxAllocation.add(region);
+ }
+ }
+
+ if (regionsReachedMaxAllocation.contains(region)) {
+ if (currentAllocation.getLeft() > 0) {
+ LOG.info("Allocating {} bookies in region {} : ensemble {} exclude {}",
+ new Object[]{currentAllocation.getLeft(), region, excludeBookies, ensemble});
+ policyWithinRegion.newEnsemble(
+ currentAllocation.getLeft(),
+ currentAllocation.getRight(),
+ currentAllocation.getRight(),
+ excludeBookies,
+ ensemble,
+ ensemble);
+ LOG.info("Allocated {} bookies in region {} : {}",
+ new Object[]{currentAllocation.getLeft(), region, ensemble});
+ }
+ }
+ }
+
+ if (regionsReachedMaxAllocation.containsAll(regionsWiseAllocation.keySet())) {
+ break;
+ }
+ } while ((remainingEnsemble > 0) && (remainingEnsemble < remainingEnsembleBeforeIteration));
+
+ ArrayList<BookieSocketAddress> bookieList = ensemble.toList();
+ if (ensembleSize != bookieList.size()) {
+ LOG.error("Not enough {} bookies are available to form an ensemble : {}.",
+ ensembleSize, bookieList);
+ throw new BKException.BKNotEnoughBookiesException();
+ }
+
+ if(enableValidation && !ensemble.validate()) {
+ LOG.error("Not enough {} bookies are available to form a valid ensemble : {}.",
+ ensembleSize, bookieList);
+ throw new BKException.BKNotEnoughBookiesException();
+ }
+
+ return ensemble.toList();
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace,
+ Set<BookieSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException {
+ rwLock.readLock().lock();
+ try {
+ boolean enforceDurability = enforceDurabilityInReplace && !disableDurabilityFeature.isAvailable();
+ int effectiveMinRegionsForDurability = enforceDurability ? minRegionsForDurability : 1;
+ Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+ RRTopologyAwareCoverageEnsemble ensemble = new RRTopologyAwareCoverageEnsemble(ensembleSize,
+ writeQuorumSize,
+ ackQuorumSize,
+ REGIONID_DISTANCE_FROM_LEAVES,
+ effectiveMinRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null,
+ effectiveMinRegionsForDurability);
+
+ BookieNode bookieNodeToReplace = knownBookies.get(bookieToReplace);
+ if (null == bookieNodeToReplace) {
+ bookieNodeToReplace = createBookieNode(bookieToReplace);
+ }
+ excludeNodes.add(bookieNodeToReplace);
+
+ for(BookieSocketAddress bookieAddress: currentEnsemble) {
+ if (bookieAddress.equals(bookieToReplace)) {
+ continue;
+ }
+
+ BookieNode bn = knownBookies.get(bookieAddress);
+ if (null == bn) {
+ bn = createBookieNode(bookieAddress);
+ }
+
+ excludeNodes.add(bn);
+
+ if (!ensemble.apply(bn, ensemble)) {
+ LOG.warn("Anomalous ensemble detected");
+ if (null != statsLogger) {
+ statsLogger.getCounter(REGION_AWARE_ANOMALOUS_ENSEMBLE).inc();
+ }
+ enforceDurability = false;
+ }
+
+ ensemble.addNode(bn);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Try to choose a new bookie to replace {}, excluding {}.", bookieToReplace,
+ excludeNodes);
+ }
+ // pick a candidate from same rack to replace
+ BookieNode candidate = replaceFromRack(bookieNodeToReplace, excludeNodes,
+ ensemble, ensemble, enforceDurability);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bookieNodeToReplace);
+ }
+ return candidate.getAddr();
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ protected BookieNode replaceFromRack(BookieNode bookieNodeToReplace,
+ Set<Node> excludeBookies,
+ Predicate<BookieNode> predicate,
+ Ensemble<BookieNode> ensemble,
+ boolean enforceDurability)
+ throws BKException.BKNotEnoughBookiesException {
+ Set<String> availableRegions = new HashSet<String>();
+ for (String region: perRegionPlacement.keySet()) {
+ if ((null == disallowBookiePlacementInRegionFeatureName) ||
+ !featureProvider.scope(region).getFeature(disallowBookiePlacementInRegionFeatureName).isAvailable()) {
+ availableRegions.add(region);
+ }
+ }
+ String regionForBookieToReplace = getLocalRegion(bookieNodeToReplace);
+ if (availableRegions.contains(regionForBookieToReplace)) {
+ TopologyAwareEnsemblePlacementPolicy regionPolicy = perRegionPlacement.get(regionForBookieToReplace);
+ if (null != regionPolicy) {
+ try {
+ // select one from local rack => it falls back to selecting a node from the region
+ // if the rack does not have an available node, selecting from the same region
+ // should not violate durability constraints so we can simply not have to check
+ // for that.
+ return regionPolicy.selectFromNetworkLocation(
+ bookieNodeToReplace.getNetworkLocation(),
+ excludeBookies,
+ TruePredicate.instance,
+ EnsembleForReplacementWithNoConstraints.instance);
+ } catch (BKException.BKNotEnoughBookiesException e) {
+ LOG.warn("Failed to choose a bookie from {} : "
+ + "excluded {}, fallback to choose bookie randomly from the cluster.",
+ bookieNodeToReplace.getNetworkLocation(), excludeBookies);
+ }
+ }
+ }
+
+ // randomly choose one from all the regions that are available, ignore the provided predicate if we are not
+ // enforcing durability.
+ return selectRandomFromRegions(availableRegions, 1,
+ excludeBookies,
+ enforceDurability ? predicate : TruePredicate.instance,
+ enforceDurability ? ensemble : EnsembleForReplacementWithNoConstraints.instance).get(0);
+ }
+
+ @Override
+ public final List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+ if (UNKNOWN_REGION.equals(myRegion)) {
+ return super.reorderReadSequence(ensemble, writeSet, bookieFailureHistory);
+ } else {
+ int ensembleSize = ensemble.size();
+ List<Integer> finalList = new ArrayList<Integer>(writeSet.size());
+ List<Integer> localList = new ArrayList<Integer>(writeSet.size());
+ List<Long> localFailures = new ArrayList<Long>(writeSet.size());
+ List<Integer> remoteList = new ArrayList<Integer>(writeSet.size());
+ List<Long> remoteFailures = new ArrayList<Long>(writeSet.size());
+ List<Integer> readOnlyList = new ArrayList<Integer>(writeSet.size());
+ List<Integer> unAvailableList = new ArrayList<Integer>(writeSet.size());
+ for (Integer idx : writeSet) {
+ BookieSocketAddress address = ensemble.get(idx);
+ String region = getRegion(address);
+ Long lastFailedEntryOnBookie = bookieFailureHistory.get(address);
+ if (null == knownBookies.get(address)) {
+ // there isn't too much differences between readonly bookies from unavailable bookies. since there
+ // is no write requests to them, so we shouldn't try reading from readonly bookie in prior to writable
+ // bookies.
+ if ((null == readOnlyBookies) || !readOnlyBookies.contains(address)) {
+ unAvailableList.add(idx);
+ } else {
+ readOnlyList.add(idx);
+ }
+ } else if (region.equals(myRegion)) {
+ if ((lastFailedEntryOnBookie == null) || (lastFailedEntryOnBookie < 0)) {
+ localList.add(idx);
+ } else {
+ localFailures.add(lastFailedEntryOnBookie * ensembleSize + idx);
+ }
+ } else {
+ if ((lastFailedEntryOnBookie == null) || (lastFailedEntryOnBookie < 0)) {
+ remoteList.add(idx);
+ } else {
+ remoteFailures.add(lastFailedEntryOnBookie * ensembleSize + idx);
+ }
+ }
+ }
+
+ // Given that idx is less than ensemble size the order of the elements in these two lists
+ // is determined by the lastFailedEntryOnBookie
+ Collections.sort(localFailures);
+ Collections.sort(remoteFailures);
+
+ if (reorderReadsRandom) {
+ Collections.shuffle(localList);
+ Collections.shuffle(remoteList);
+ Collections.shuffle(readOnlyList);
+ Collections.shuffle(unAvailableList);
+ }
+
+ // nodes within a region are ordered as follows
+ // (Random?) list of nodes that have no history of failure
+ // Nodes with Failure history are ordered in the reverse
+ // order of the most recent entry that generated an error
+ for(long value: localFailures) {
+ localList.add((int)(value % ensembleSize));
+ }
+
+ for(long value: remoteFailures) {
+ remoteList.add((int)(value % ensembleSize));
+ }
+
+ // Insert a node from the remote region at the specified location so we
+ // try more than one region within the max allowed latency
+ for (int i = 0; i < REMOTE_NODE_IN_REORDER_SEQUENCE; i++) {
+ if (localList.size() > 0) {
+ finalList.add(localList.remove(0));
+ } else {
+ break;
+ }
+ }
+
+ if (remoteList.size() > 0) {
+ finalList.add(remoteList.remove(0));
+ }
+
+ // Add all the local nodes
+ finalList.addAll(localList);
+ finalList.addAll(remoteList);
+ finalList.addAll(readOnlyList);
+ finalList.addAll(unAvailableList);
+ return finalList;
+ }
+ }
+
+ @Override
+ public final List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+ if (UNKNOWN_REGION.equals(myRegion)) {
+ return super.reorderReadLACSequence(ensemble, writeSet, bookieFailureHistory);
+ }
+ List<Integer> finalList = reorderReadSequence(ensemble, writeSet, bookieFailureHistory);
+
+ if (finalList.size() < ensemble.size()) {
+ for (int i = 0; i < ensemble.size(); i++) {
+ if (!finalList.contains(i)) {
+ finalList.add(i);
+ }
+ }
+ }
+ return finalList;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
new file mode 100644
index 0000000..c222827
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.NodeBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class TopologyAwareEnsemblePlacementPolicy implements ITopologyAwareEnsemblePlacementPolicy<TopologyAwareEnsemblePlacementPolicy.BookieNode> {
+ static final Logger LOG = LoggerFactory.getLogger(TopologyAwareEnsemblePlacementPolicy.class);
+
+ protected static class TruePredicate implements Predicate<BookieNode> {
+
+ public static final TruePredicate instance = new TruePredicate();
+
+ @Override
+ public boolean apply(BookieNode candidate, Ensemble chosenNodes) {
+ return true;
+ }
+
+ }
+
+ protected static class EnsembleForReplacementWithNoConstraints implements Ensemble<BookieNode> {
+
+ public static final EnsembleForReplacementWithNoConstraints instance = new EnsembleForReplacementWithNoConstraints();
+ static final ArrayList<BookieSocketAddress> EMPTY_LIST = new ArrayList<BookieSocketAddress>(0);
+
+ @Override
+ public boolean addNode(BookieNode node) {
+ // do nothing
+ return true;
+ }
+
+ @Override
+ public ArrayList<BookieSocketAddress> toList() {
+ return EMPTY_LIST;
+ }
+
+ /**
+ * Validates if an ensemble is valid
+ *
+ * @return true if the ensemble is valid; false otherwise
+ */
+ @Override
+ public boolean validate() {
+ return true;
+ }
+
+ }
+
+ protected static class BookieNode extends NodeBase {
+
+ private final BookieSocketAddress addr; // identifier of a bookie node.
+
+ BookieNode(BookieSocketAddress addr, String networkLoc) {
+ super(addr.toString(), networkLoc);
+ this.addr = addr;
+ }
+
+ public BookieSocketAddress getAddr() {
+ return addr;
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof BookieNode)) {
+ return false;
+ }
+ BookieNode other = (BookieNode) obj;
+ return getName().equals(other.getName());
+ }
+
+ @Override
+ public String toString() {
+ return String.format("<Bookie:%s>", name);
+ }
+
+ }
+
+ /**
+ * A predicate checking the rack coverage for write quorum in {@link RoundRobinDistributionSchedule},
+ * which ensures that a write quorum should be covered by at least two racks.
+ */
+ protected static class RRTopologyAwareCoverageEnsemble implements Predicate<BookieNode>, Ensemble<BookieNode> {
+
+ protected interface CoverageSet {
+ boolean apply(BookieNode candidate);
+ void addBookie(BookieNode candidate);
+ public CoverageSet duplicate();
+ }
+
+ protected class RackQuorumCoverageSet implements CoverageSet {
+ HashSet<String> racksOrRegionsInQuorum = new HashSet<String>();
+ int seenBookies = 0;
+
+ @Override
+ public boolean apply(BookieNode candidate) {
+ // If we don't have sufficient members in the write quorum; then we cant enforce
+ // rack/region diversity
+ if (writeQuorumSize < 2) {
+ return true;
+ }
+
+ if (seenBookies + 1 == writeQuorumSize) {
+ return racksOrRegionsInQuorum.size() > (racksOrRegionsInQuorum.contains(candidate.getNetworkLocation(distanceFromLeaves)) ? 1 : 0);
+ }
+ return true;
+ }
+
+ @Override
+ public void addBookie(BookieNode candidate) {
+ ++seenBookies;
+ racksOrRegionsInQuorum.add(candidate.getNetworkLocation(distanceFromLeaves));
+ }
+
+ @Override
+ public RackQuorumCoverageSet duplicate() {
+ RackQuorumCoverageSet ret = new RackQuorumCoverageSet();
+ ret.racksOrRegionsInQuorum = Sets.newHashSet(this.racksOrRegionsInQuorum);
+ ret.seenBookies = this.seenBookies;
+ return ret;
+ }
+ }
+
+ protected class RackOrRegionDurabilityCoverageSet implements CoverageSet {
+ HashMap<String, Integer> allocationToRacksOrRegions = new HashMap<String, Integer>();
+
+ RackOrRegionDurabilityCoverageSet() {
+ for (String rackOrRegion: racksOrRegions) {
+ allocationToRacksOrRegions.put(rackOrRegion, 0);
+ }
+ }
+
+ @Override
+ public RackOrRegionDurabilityCoverageSet duplicate() {
+ RackOrRegionDurabilityCoverageSet ret = new RackOrRegionDurabilityCoverageSet();
+ ret.allocationToRacksOrRegions = Maps.newHashMap(this.allocationToRacksOrRegions);
+ return ret;
+ }
+
+ private boolean checkSumOfSubsetWithinLimit(final Set<String> includedRacksOrRegions,
+ final Set<String> remainingRacksOrRegions,
+ int subsetSize,
+ int maxAllowedSum) {
+ if (remainingRacksOrRegions.isEmpty() || (subsetSize <= 0)) {
+ if (maxAllowedSum < 0) {
+ LOG.trace("CHECK FAILED: RacksOrRegions Included {} Remaining {}, subsetSize {}, maxAllowedSum {}", new Object[]{
+ includedRacksOrRegions, remainingRacksOrRegions, subsetSize, maxAllowedSum
+ });
+ }
+ return (maxAllowedSum >= 0);
+ }
+
+ for(String rackOrRegion: remainingRacksOrRegions) {
+ Integer currentAllocation = allocationToRacksOrRegions.get(rackOrRegion);
+ if (currentAllocation == null) {
+ allocationToRacksOrRegions.put(rackOrRegion, 0);
+ currentAllocation = 0;
+ }
+
+ if (currentAllocation > maxAllowedSum) {
+ LOG.trace("CHECK FAILED: RacksOrRegions Included {} Candidate {}, subsetSize {}, maxAllowedSum {}", new Object[]{
+ includedRacksOrRegions, rackOrRegion, subsetSize, maxAllowedSum
+ });
+ return false;
+ } else {
+ Set<String> remainingElements = new HashSet<String>(remainingRacksOrRegions);
+ Set<String> includedElements = new HashSet<String>(includedRacksOrRegions);
+ includedElements.add(rackOrRegion);
+ remainingElements.remove(rackOrRegion);
+ if (!checkSumOfSubsetWithinLimit(includedElements,
+ remainingElements,
+ subsetSize - 1,
+ maxAllowedSum - currentAllocation)) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean apply(BookieNode candidate) {
+ if (minRacksOrRegionsForDurability <= 1) {
+ return true;
+ }
+
+ String candidateRackOrRegion = candidate.getNetworkLocation(distanceFromLeaves);
+ candidateRackOrRegion = candidateRackOrRegion.startsWith(NodeBase.PATH_SEPARATOR_STR) ? candidateRackOrRegion.substring(1) : candidateRackOrRegion;
+ final Set<String> remainingRacksOrRegions = new HashSet<String>(racksOrRegions);
+ remainingRacksOrRegions.remove(candidateRackOrRegion);
+ final Set<String> includedRacksOrRegions = new HashSet<String>();
+ includedRacksOrRegions.add(candidateRackOrRegion);
+
+ // If minRacksOrRegionsForDurability are required for durability; we must ensure that
+ // no subset of (minRacksOrRegionsForDurability - 1) regions have ackQuorumSize
+ // We are only modifying candidateRackOrRegion if we accept this bookie, so lets only
+ // find sets that contain this candidateRackOrRegion
+ Integer currentAllocation = allocationToRacksOrRegions.get(candidateRackOrRegion);
+ if (currentAllocation == null) {
+ LOG.info("Detected a region that was not initialized {}", candidateRackOrRegion);
+ if (candidateRackOrRegion.equals(NetworkTopology.DEFAULT_REGION)) {
+ LOG.error("Failed to resolve network location {}", candidate);
+ } else if (!racksOrRegions.contains(candidateRackOrRegion)) {
+ LOG.error("Unknown region detected {}", candidateRackOrRegion);
+ }
+ allocationToRacksOrRegions.put(candidateRackOrRegion, 0);
+ currentAllocation = 0;
+ }
+
+ int inclusiveLimit = (ackQuorumSize - 1) - (currentAllocation + 1);
+ return checkSumOfSubsetWithinLimit(includedRacksOrRegions,
+ remainingRacksOrRegions, minRacksOrRegionsForDurability - 2, inclusiveLimit);
+ }
+
+ @Override
+ public void addBookie(BookieNode candidate) {
+ String candidateRackOrRegion = candidate.getNetworkLocation(distanceFromLeaves);
+ candidateRackOrRegion = candidateRackOrRegion.startsWith(NodeBase.PATH_SEPARATOR_STR) ? candidateRackOrRegion.substring(1) : candidateRackOrRegion;
+ int oldCount = 0;
+ if (null != allocationToRacksOrRegions.get(candidateRackOrRegion)) {
+ oldCount = allocationToRacksOrRegions.get(candidateRackOrRegion);
+ }
+ allocationToRacksOrRegions.put(candidateRackOrRegion, oldCount + 1);
+ }
+ }
+
+
+
+ final int distanceFromLeaves;
+ final int ensembleSize;
+ final int writeQuorumSize;
+ final int ackQuorumSize;
+ final int minRacksOrRegionsForDurability;
+ final ArrayList<BookieNode> chosenNodes;
+ final Set<String> racksOrRegions;
+ private final CoverageSet[] quorums;
+ final Predicate<BookieNode> parentPredicate;
+ final Ensemble<BookieNode> parentEnsemble;
+
+ protected RRTopologyAwareCoverageEnsemble(RRTopologyAwareCoverageEnsemble that) {
+ this.distanceFromLeaves = that.distanceFromLeaves;
+ this.ensembleSize = that.ensembleSize;
+ this.writeQuorumSize = that.writeQuorumSize;
+ this.ackQuorumSize = that.ackQuorumSize;
+ this.chosenNodes = Lists.newArrayList(that.chosenNodes);
+ this.quorums = new CoverageSet[that.quorums.length];
+ for (int i = 0; i < that.quorums.length; i++) {
+ if (null != that.quorums[i]) {
+ this.quorums[i] = that.quorums[i].duplicate();
+ } else {
+ this.quorums[i] = null;
+ }
+ }
+ this.parentPredicate = that.parentPredicate;
+ this.parentEnsemble = that.parentEnsemble;
+ if (null != that.racksOrRegions) {
+ this.racksOrRegions = new HashSet<String>(that.racksOrRegions);
+ } else {
+ this.racksOrRegions = null;
+ }
+ this.minRacksOrRegionsForDurability = that.minRacksOrRegionsForDurability;
+ }
+
+ protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ int distanceFromLeaves,
+ Set<String> racksOrRegions,
+ int minRacksOrRegionsForDurability) {
+ this(ensembleSize, writeQuorumSize, ackQuorumSize, distanceFromLeaves, null, null,
+ racksOrRegions, minRacksOrRegionsForDurability);
+ }
+
+ protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ int distanceFromLeaves,
+ Ensemble<BookieNode> parentEnsemble,
+ Predicate<BookieNode> parentPredicate) {
+ this(ensembleSize, writeQuorumSize, ackQuorumSize, distanceFromLeaves, parentEnsemble, parentPredicate,
+ null, 0);
+ }
+
+ protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize,
+ int distanceFromLeaves,
+ Ensemble<BookieNode> parentEnsemble,
+ Predicate<BookieNode> parentPredicate,
+ Set<String> racksOrRegions,
+ int minRacksOrRegionsForDurability) {
+ this.ensembleSize = ensembleSize;
+ this.writeQuorumSize = writeQuorumSize;
+ this.ackQuorumSize = ackQuorumSize;
+ this.distanceFromLeaves = distanceFromLeaves;
+ this.chosenNodes = new ArrayList<BookieNode>(ensembleSize);
+ if (minRacksOrRegionsForDurability > 0) {
+ this.quorums = new RackOrRegionDurabilityCoverageSet[ensembleSize];
+ } else {
+ this.quorums = new RackQuorumCoverageSet[ensembleSize];
+ }
+ this.parentEnsemble = parentEnsemble;
+ this.parentPredicate = parentPredicate;
+ this.racksOrRegions = racksOrRegions;
+ this.minRacksOrRegionsForDurability = minRacksOrRegionsForDurability;
+ }
+
+ @Override
+ public boolean apply(BookieNode candidate, Ensemble<BookieNode> ensemble) {
+ if (ensemble != this) {
+ return false;
+ }
+
+ // An ensemble cannot contain the same node twice
+ if (chosenNodes.contains(candidate)) {
+ return false;
+ }
+
+ // candidate position
+ if ((ensembleSize == writeQuorumSize) && (minRacksOrRegionsForDurability > 0)) {
+ if (null == quorums[0]) {
+ quorums[0] = new RackOrRegionDurabilityCoverageSet();
+ }
+ if (!quorums[0].apply(candidate)) {
+ return false;
+ }
+ } else {
+ int candidatePos = chosenNodes.size();
+ int startPos = candidatePos - writeQuorumSize + 1;
+ for (int i = startPos; i <= candidatePos; i++) {
+ int idx = (i + ensembleSize) % ensembleSize;
+ if (null == quorums[idx]) {
+ if (minRacksOrRegionsForDurability > 0) {
+ quorums[idx] = new RackOrRegionDurabilityCoverageSet();
+ } else {
+ quorums[idx] = new RackQuorumCoverageSet();
+ }
+ }
+ if (!quorums[idx].apply(candidate)) {
+ return false;
+ }
+ }
+ }
+
+ return ((null == parentPredicate) || parentPredicate.apply(candidate, parentEnsemble));
+ }
+
+ @Override
+ public boolean addNode(BookieNode node) {
+ // An ensemble cannot contain the same node twice
+ if (chosenNodes.contains(node)) {
+ return false;
+ }
+
+ if ((ensembleSize == writeQuorumSize) && (minRacksOrRegionsForDurability > 0)) {
+ if (null == quorums[0]) {
+ quorums[0] = new RackOrRegionDurabilityCoverageSet();
+ }
+ quorums[0].addBookie(node);
+ } else {
+ int candidatePos = chosenNodes.size();
+ int startPos = candidatePos - writeQuorumSize + 1;
+ for (int i = startPos; i <= candidatePos; i++) {
+ int idx = (i + ensembleSize) % ensembleSize;
+ if (null == quorums[idx]) {
+ if (minRacksOrRegionsForDurability > 0) {
+ quorums[idx] = new RackOrRegionDurabilityCoverageSet();
+ } else {
+ quorums[idx] = new RackQuorumCoverageSet();
+ }
+ }
+ quorums[idx].addBookie(node);
+ }
+ }
+ chosenNodes.add(node);
+
+ return ((null == parentEnsemble) || parentEnsemble.addNode(node));
+ }
+
+ @Override
+ public ArrayList<BookieSocketAddress> toList() {
+ ArrayList<BookieSocketAddress> addresses = new ArrayList<BookieSocketAddress>(ensembleSize);
+ for (BookieNode bn : chosenNodes) {
+ addresses.add(bn.getAddr());
+ }
+ return addresses;
+ }
+
+ /**
+ * Validates if an ensemble is valid
+ *
+ * @return true if the ensemble is valid; false otherwise
+ */
+ @Override
+ public boolean validate() {
+ HashSet<BookieSocketAddress> addresses = new HashSet<BookieSocketAddress>(ensembleSize);
+ HashSet<String> racksOrRegions = new HashSet<String>();
+ for (BookieNode bn : chosenNodes) {
+ if (addresses.contains(bn.getAddr())) {
+ return false;
+ }
+ addresses.add(bn.getAddr());
+ racksOrRegions.add(bn.getNetworkLocation(distanceFromLeaves));
+ }
+
+ return ((minRacksOrRegionsForDurability == 0) ||
+ (racksOrRegions.size() >= minRacksOrRegionsForDurability));
+ }
+
+ @Override
+ public String toString() {
+ return chosenNodes.toString();
+ }
+ }
+
+ @Override
+ public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+ return writeSet;
+ }
+
+ @Override
+ public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+ List<Integer> retList = new ArrayList<Integer>(reorderReadSequence(ensemble, writeSet, bookieFailureHistory));
+ if (retList.size() < ensemble.size()) {
+ for (int i = 0; i < ensemble.size(); i++) {
+ if (!retList.contains(i)) {
+ retList.add(i);
+ }
+ }
+ }
+ return retList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 57c3790..8e76bb7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -66,6 +66,8 @@ public class ClientConfiguration extends AbstractConfiguration {
protected final static String TIMEOUT_TASK_INTERVAL_MILLIS = "timeoutTaskIntervalMillis";
protected final static String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS = "pcbcTimeoutTimerTickDurationMs";
protected final static String PCBC_TIMEOUT_TIMER_NUM_TICKS = "pcbcTimeoutTimerNumTicks";
+ protected final static String TIMEOUT_TIMER_TICK_DURATION_MS = "timeoutTimerTickDurationMs";
+ protected final static String TIMEOUT_TIMER_NUM_TICKS = "timeoutTimerNumTicks";
// Bookie health check settings
protected final static String BOOKIE_HEALTH_CHECK_ENABLED = "bookieHealthCheckEnabled";
@@ -78,6 +80,7 @@ public class ClientConfiguration extends AbstractConfiguration {
// Ensemble Placement Policy
protected final static String ENSEMBLE_PLACEMENT_POLICY = "ensemblePlacementPolicy";
+ protected final static String NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS = "networkTopologyStabilizePeriodSeconds";
// Stats
protected final static String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats";
@@ -329,6 +332,48 @@ public class ClientConfiguration extends AbstractConfiguration {
}
/**
+ * Get the tick duration in milliseconds that used for timeout timer.
+ *
+ * @return tick duration in milliseconds
+ */
+ public long getTimeoutTimerTickDurationMs() {
+ return getLong(TIMEOUT_TIMER_TICK_DURATION_MS, 100);
+ }
+
+ /**
+ * Set the tick duration in milliseconds that used for timeout timer.
+ *
+ * @param tickDuration
+ * tick duration in milliseconds.
+ * @return client configuration.
+ */
+ public ClientConfiguration setTimeoutTimerTickDurationMs(long tickDuration) {
+ setProperty(TIMEOUT_TIMER_TICK_DURATION_MS, tickDuration);
+ return this;
+ }
+
+ /**
+ * Get number of ticks that used for timeout timer.
+ *
+ * @return number of ticks that used for timeout timer.
+ */
+ public int getTimeoutTimerNumTicks() {
+ return getInt(TIMEOUT_TIMER_NUM_TICKS, 1024);
+ }
+
+ /**
+ * Set number of ticks that used for timeout timer.
+ *
+ * @param numTicks
+ * number of ticks that used for timeout timer.
+ * @return client configuration.
+ */
+ public ClientConfiguration setTimeoutTimerNumTicks(int numTicks) {
+ setProperty(TIMEOUT_TIMER_NUM_TICKS, numTicks);
+ return this;
+ }
+
+ /**
* Get client netty connect timeout in millis.
*
* @return client netty connect timeout in millis.
@@ -666,10 +711,10 @@ public class ClientConfiguration extends AbstractConfiguration {
* @return ensemble placement policy class.
*/
public Class<? extends EnsemblePlacementPolicy> getEnsemblePlacementPolicy()
- throws ConfigurationException {
+ throws ConfigurationException {
return ReflectionUtils.getClass(this, ENSEMBLE_PLACEMENT_POLICY,
- RackawareEnsemblePlacementPolicy.class,
- EnsemblePlacementPolicy.class,
+ RackawareEnsemblePlacementPolicy.class,
+ EnsemblePlacementPolicy.class,
defaultLoader);
}
@@ -685,6 +730,27 @@ public class ClientConfiguration extends AbstractConfiguration {
}
/**
+ * Get the network topology stabilize period in seconds. if it is zero, this feature is turned off.
+ *
+ * @return network topology stabilize period in seconds.
+ */
+ public int getNetworkTopologyStabilizePeriodSeconds() {
+ return getInt(NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS, 0);
+ }
+
+ /**
+ * Set the network topology stabilize period in seconds.
+ *
+ * @see #getNetworkTopologyStabilizePeriodSeconds()
+ * @param seconds stabilize period in seconds
+ * @return client configuration.
+ */
+ public ClientConfiguration setNetworkTopologyStabilizePeriodSeconds(int seconds) {
+ setProperty(NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS, seconds);
+ return this;
+ }
+
+ /**
* Whether to enable recording task execution stats.
*
* @return flag to enable/disable recording task execution stats.
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java
index cba1f7e..99ed038 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// This code has been copied from hadoop-common 0.23.1
+
package org.apache.bookkeeper.net;
import java.util.HashSet;
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
index eb0f6f3..8947abf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
@@ -63,7 +63,7 @@ public class BookieSocketAddress {
}
// Public getters
- public String getHostname() {
+ public String getHostName() {
return hostname;
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java
index 96acbc2..d7ff251 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// This code has been copied from hadoop-common 0.23.1
package org.apache.bookkeeper.net;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java
index a5dce93..d09e422 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java
@@ -69,6 +69,10 @@ public class DNS {
// This is formed by reversing the IP numbers and appending in-addr.arpa
//
String[] parts = hostIp.getHostAddress().split("\\.");
+ if(parts.length !=4) {
+ //Not proper address. May be IPv6
+ throw new NamingException("IPV6");
+ }
String reverseIP = parts[3] + "." + parts[2] + "." + parts[1] + "."
+ parts[0] + ".in-addr.arpa";
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java
index 35f9a36..6156993 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// This code has been copied from hadoop-common 0.23.1
package org.apache.bookkeeper.net;
import java.util.List;