You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/06/12 18:41:48 UTC
svn commit: r1492274 [1/3] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ bookkee...
Author: ivank
Date: Wed Jun 12 16:41:47 2013
New Revision: 1492274
URL: http://svn.apache.org/r1492274
Log:
BOOKKEEPER-592: allow application to recommend ledger data locality (sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/Configurable.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CommonConfigurationKeys.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Shell.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRackawareEnsemblePlacementPolicy.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/StaticDNSResolver.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Jun 12 16:41:47 2013
@@ -86,6 +86,8 @@ Trunk (unreleased changes)
BOOKKEEPER-603: Support Boost 1.53 for Hedwig Cpp Client (jiannan via ivank)
+ BOOKKEEPER-592: allow application to recommend ledger data locality (sijie via ivank)
+
NEW FEATURE:
BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Wed Jun 12 16:41:47 2013
@@ -37,8 +37,10 @@ import org.apache.bookkeeper.meta.Ledger
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@@ -85,6 +87,9 @@ public class BookKeeper {
final LedgerManagerFactory ledgerManagerFactory;
final LedgerManager ledgerManager;
+ // Ensemble Placement Policy
+ final EnsemblePlacementPolicy placementPolicy;
+
final ClientConfiguration conf;
interface ZKConnectCallback {
@@ -131,10 +136,12 @@ public class BookKeeper {
this.channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
this.scheduler = Executors.newSingleThreadScheduledExecutor();
+ // initialize the ensemble placement
+ this.placementPolicy = initializeEnsemblePlacementPolicy(conf);
mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
- bookieWatcher = new BookieWatcher(conf, scheduler, this);
+ bookieWatcher = new BookieWatcher(conf, scheduler, placementPolicy, this);
bookieWatcher.readBookiesBlocking();
ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
@@ -195,16 +202,28 @@ public class BookKeeper {
this.zk = zk;
this.channelFactory = channelFactory;
this.scheduler = Executors.newSingleThreadScheduledExecutor();
+ // initialize the ensemble placement
+ this.placementPolicy = initializeEnsemblePlacementPolicy(conf);
mainWorkerPool = new OrderedSafeExecutor(conf.getNumWorkerThreads());
bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
- bookieWatcher = new BookieWatcher(conf, scheduler, this);
+ bookieWatcher = new BookieWatcher(conf, scheduler, placementPolicy, this);
bookieWatcher.readBookiesBlocking();
ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
ledgerManager = ledgerManagerFactory.newLedgerManager();
}
+ private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf)
+ throws IOException {
+ try {
+ Class<? extends EnsemblePlacementPolicy> policyCls = conf.getEnsemblePlacementPolicy();
+ return ReflectionUtils.newInstance(policyCls).initialize(conf);
+ } catch (ConfigurationException e) {
+ throw new IOException("Failed to initialize ensemble placement policy : ", e);
+ }
+ }
+
LedgerManager getLedgerManager() {
return ledgerManager;
}
@@ -661,6 +680,7 @@ public class BookKeeper {
* @param ctx
* optional control object
*/
+ @Override
public void createComplete(int rc, LedgerHandle lh, Object ctx) {
SyncCounter counter = (SyncCounter) ctx;
counter.setLh(lh);
@@ -680,12 +700,13 @@ public class BookKeeper {
* @param ctx
* optional control object
*/
+ @Override
public void openComplete(int rc, LedgerHandle lh, Object ctx) {
SyncCounter counter = (SyncCounter) ctx;
counter.setLh(lh);
-
+
LOG.debug("Open complete: {}", rc);
-
+
counter.setrc(rc);
counter.dec();
}
@@ -700,6 +721,7 @@ public class BookKeeper {
* @param ctx
* optional control object
*/
+ @Override
public void deleteComplete(int rc, Object ctx) {
SyncCounter counter = (SyncCounter) ctx;
counter.setrc(rc);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java Wed Jun 12 16:41:47 2013
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -57,15 +56,15 @@ import org.apache.zookeeper.ZooDefs.Ids;
class BookieWatcher implements Watcher, ChildrenCallback {
static final Logger logger = LoggerFactory.getLogger(BookieWatcher.class);
+ public static int ZK_CONNECT_BACKOFF_SEC = 1;
+ private static final Set<InetSocketAddress> EMPTY_SET = new HashSet<InetSocketAddress>();
+
// Bookie registration path in ZK
private final String bookieRegistrationPath;
- static final Set<InetSocketAddress> EMPTY_SET = new HashSet<InetSocketAddress>();
- public static int ZK_CONNECT_BACKOFF_SEC = 1;
final BookKeeper bk;
-
- HashSet<InetSocketAddress> knownBookies = new HashSet<InetSocketAddress>();
final ScheduledExecutorService scheduler;
+ final EnsemblePlacementPolicy placementPolicy;
SafeRunnable reReadTask = new SafeRunnable() {
@Override
@@ -77,16 +76,24 @@ class BookieWatcher implements Watcher,
public BookieWatcher(ClientConfiguration conf,
ScheduledExecutorService scheduler,
+ EnsemblePlacementPolicy placementPolicy,
BookKeeper bk) throws KeeperException, InterruptedException {
this.bk = bk;
// ZK bookie registration path
this.bookieRegistrationPath = conf.getZkAvailableBookiesPath();
this.scheduler = scheduler;
+ this.placementPolicy = placementPolicy;
readOnlyBookieWatcher = new ReadOnlyBookieWatcher(conf, bk);
}
- public synchronized Collection<InetSocketAddress> getBookies() {
- return new HashSet<InetSocketAddress>(knownBookies);
+ public Collection<InetSocketAddress> getBookies() {
+ try {
+ List<String> children = bk.getZkHandle().getChildren(this.bookieRegistrationPath, false);
+ return convertToBookieAddresses(children);
+ } catch (Exception e) {
+ logger.error("Failed to get bookies : ", e);
+ return new HashSet<InetSocketAddress>();
+ }
}
public void readBookies() {
@@ -118,13 +125,10 @@ class BookieWatcher implements Watcher,
HashSet<InetSocketAddress> newBookieAddrs = convertToBookieAddresses(children);
- final HashSet<InetSocketAddress> deadBookies;
+ final Set<InetSocketAddress> deadBookies;
synchronized (this) {
- deadBookies = new HashSet<InetSocketAddress>(knownBookies);
- deadBookies.removeAll(newBookieAddrs);
- // No need to close readonly bookie clients.
- deadBookies.removeAll(readOnlyBookieWatcher.getReadOnlyBookies());
- knownBookies = newBookieAddrs;
+ Set<InetSocketAddress> readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies();
+ deadBookies = placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies);
}
if (bk.getBookieClient() != null) {
@@ -178,12 +182,16 @@ class BookieWatcher implements Watcher,
/**
* Wrapper over the {@link #getAdditionalBookies(Set, int)} method when there is no exclusion list (or exisiting bookies)
- * @param numBookiesNeeded
- * @return
+ * @param ensembleSize
+ * Ensemble Size
+ * @param writeQuorumSize
+ * Write Quorum Size
+ * @return list of bookies for new ensemble.
* @throws BKNotEnoughBookiesException
*/
- public ArrayList<InetSocketAddress> getNewBookies(int numBookiesNeeded) throws BKNotEnoughBookiesException {
- return getAdditionalBookies(EMPTY_SET, numBookiesNeeded);
+ public ArrayList<InetSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize)
+ throws BKNotEnoughBookiesException {
+ return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, EMPTY_SET);
}
/**
@@ -192,49 +200,10 @@ class BookieWatcher implements Watcher,
* @return
* @throws BKNotEnoughBookiesException
*/
- public InetSocketAddress getAdditionalBookie(List<InetSocketAddress> existingBookies)
+ public InetSocketAddress replaceBookie(List<InetSocketAddress> existingBookies, int bookieIdx)
throws BKNotEnoughBookiesException {
- return getAdditionalBookies(new HashSet<InetSocketAddress>(existingBookies), 1).get(0);
- }
-
- /**
- * Returns additional bookies given an exclusion list and how many are needed
- * @param existingBookies
- * @param numAdditionalBookiesNeeded
- * @return
- * @throws BKNotEnoughBookiesException
- */
- public ArrayList<InetSocketAddress> getAdditionalBookies(Set<InetSocketAddress> existingBookies,
- int numAdditionalBookiesNeeded) throws BKNotEnoughBookiesException {
-
- ArrayList<InetSocketAddress> newBookies = new ArrayList<InetSocketAddress>();
-
- if (numAdditionalBookiesNeeded <= 0) {
- return newBookies;
- }
-
- List<InetSocketAddress> allBookies;
-
- synchronized (this) {
- allBookies = new ArrayList<InetSocketAddress>(knownBookies);
- }
-
- Collections.shuffle(allBookies);
-
- for (InetSocketAddress bookie : allBookies) {
- if (existingBookies.contains(bookie)) {
- continue;
- }
-
- newBookies.add(bookie);
- numAdditionalBookiesNeeded--;
-
- if (numAdditionalBookiesNeeded == 0) {
- return newBookies;
- }
- }
-
- throw new BKNotEnoughBookiesException();
+ InetSocketAddress addr = existingBookies.get(bookieIdx);
+ return placementPolicy.replaceBookie(addr, new HashSet<InetSocketAddress>(existingBookies));
}
/**
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * Default Ensemble Placement Policy, which picks bookies randomly
+ */
+public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
+
+ static final Set<InetSocketAddress> EMPTY_SET = new HashSet<InetSocketAddress>();
+
+ private Set<InetSocketAddress> knownBookies = new HashSet<InetSocketAddress>();
+
+ @Override
+ public ArrayList<InetSocketAddress> newEnsemble(int ensembleSize, int quorumSize,
+ Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+ ArrayList<InetSocketAddress> newBookies = new ArrayList<InetSocketAddress>(ensembleSize);
+ if (ensembleSize <= 0) {
+ return newBookies;
+ }
+ List<InetSocketAddress> allBookies;
+ synchronized (this) {
+ allBookies = new ArrayList<InetSocketAddress>(knownBookies);
+ }
+ Collections.shuffle(allBookies);
+ for (InetSocketAddress bookie : allBookies) {
+ if (excludeBookies.contains(bookie)) {
+ continue;
+ }
+ newBookies.add(bookie);
+ --ensembleSize;
+ if (ensembleSize == 0) {
+ return newBookies;
+ }
+ }
+ throw new BKNotEnoughBookiesException();
+ }
+
+ @Override
+ public InetSocketAddress replaceBookie(InetSocketAddress bookieToReplace,
+ Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+ ArrayList<InetSocketAddress> addresses = newEnsemble(1, 1, excludeBookies);
+ return addresses.get(0);
+ }
+
+ @Override
+ public synchronized Set<InetSocketAddress> onClusterChanged(Set<InetSocketAddress> writableBookies,
+ Set<InetSocketAddress> readOnlyBookies) {
+ HashSet<InetSocketAddress> deadBookies;
+ deadBookies = new HashSet<InetSocketAddress>(knownBookies);
+ deadBookies.removeAll(writableBookies);
+ // readonly bookies should not be treated as dead bookies
+ deadBookies.removeAll(readOnlyBookies);
+ knownBookies = writableBookies;
+ return deadBookies;
+ }
+
+ @Override
+ public EnsemblePlacementPolicy initialize(Configuration conf) {
+ return this;
+ }
+
+ @Override
+ public void uninitalize() {
+ // do nothing
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * Encapsulation of the algorithm that selects a number of bookies from the cluster as an ensemble for storing
+ * data, based on the data input as well as the node properties.
+ */
+public interface EnsemblePlacementPolicy {
+
+ /**
+ * Initialize the policy.
+ *
+ * @param conf
+ * client configuration.
+ * @return initialized ensemble placement policy
+ */
+ public EnsemblePlacementPolicy initialize(Configuration conf);
+
+ /**
+ * Uninitialize the policy
+ */
+ public void uninitalize();
+
+ /**
+ * A consistent view of the cluster (what bookies are available as writable, what bookies are available as
+ * readonly) is updated when any changes happen in the cluster.
+ *
+ * @param writableBookies
+ * All the bookies in the cluster available for write/read.
+ * @param readOnlyBookies
+ * All the bookies in the cluster available for readonly.
+ * @return the dead bookies during this cluster change.
+ */
+ public Set<InetSocketAddress> onClusterChanged(Set<InetSocketAddress> writableBookies,
+ Set<InetSocketAddress> readOnlyBookies);
+
+ /**
+ * Choose <i>numBookies</i> bookies for ensemble. If the count is more than the number of available
+ * nodes, {@link BKNotEnoughBookiesException} is thrown.
+ *
+ * @param ensembleSize
+ * Ensemble Size
+ * @param writeQuorumSize
+ * Write Quorum Size
+ * @param excludeBookies
+ * Bookies that should not be considered as targets.
+ * @return list of bookies chosen as targets.
+ * @throws BKNotEnoughBookiesException if not enough bookies available.
+ */
+ public ArrayList<InetSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
+ Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+
+ /**
+ * Choose a new bookie to replace <i>bookieToReplace</i>. If no bookie available in the cluster,
+ * {@link BKNotEnoughBookiesException} is thrown.
+ *
+ * @param bookieToReplace
+ * bookie to replace
+ * @param excludeBookies
+ * bookies that should not be considered as candidate.
+ * @return the bookie chosen as target.
+ * @throws BKNotEnoughBookiesException
+ */
+ public InetSocketAddress replaceBookie(InetSocketAddress bookieToReplace,
+ Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java Wed Jun 12 16:41:47 2013
@@ -92,7 +92,8 @@ class LedgerCreateOp implements GenericC
ArrayList<InetSocketAddress> ensemble;
try {
- ensemble = bk.bookieWatcher.getNewBookies(metadata.getEnsembleSize());
+ ensemble = bk.bookieWatcher
+ .newEnsemble(metadata.getEnsembleSize(), metadata.getWriteQuorumSize());
} catch (BKNotEnoughBookiesException e) {
LOG.error("Not enough bookies to create ledger");
cb.createComplete(e.getCode(), null, this.ctx);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Wed Jun 12 16:41:47 2013
@@ -655,7 +655,7 @@ public class LedgerHandle {
// avoid parallel ensemble changes to same ensemble.
synchronized (metadata) {
- newBookie = bk.bookieWatcher.getAdditionalBookie(metadata.currentEnsemble);
+ newBookie = bk.bookieWatcher.replaceBookie(metadata.currentEnsemble, bookieIndex);
newEnsemble.addAll(metadata.currentEnsemble);
newEnsemble.set(bookieIndex, newBookie);
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.conf.Configurable;
+import org.apache.bookkeeper.net.CachedDNSToSwitchMapping;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.Node;
+import org.apache.bookkeeper.net.NodeBase;
+import org.apache.bookkeeper.net.ScriptBasedMapping;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Simple rackware ensemble placement policy.
+ *
+ * Make most of the class and methods as protected, so it could be extended to implement other algorithms.
+ */
+public class RackawareEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
+
+ static final Logger LOG = LoggerFactory.getLogger(RackawareEnsemblePlacementPolicy.class);
+
+ public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass";
+
+ /**
+ * Predicate used when choosing an ensemble.
+ */
+ protected static interface Predicate {
+ boolean apply(BookieNode candidate, Ensemble chosenBookies);
+ }
+
+ /**
+ * Ensemble used to hold the result of an ensemble selected for placement.
+ */
+ protected static interface Ensemble {
+
+ /**
+ * Append the new bookie node to the ensemble.
+ *
+ * @param node
+ * new candidate bookie node.
+ */
+ public void addBookie(BookieNode node);
+
+ /**
+ * @return list of addresses representing the ensemble
+ */
+ public ArrayList<InetSocketAddress> toList();
+ }
+
+ protected static class TruePredicate implements Predicate {
+
+ public static final TruePredicate instance = new TruePredicate();
+
+ @Override
+ public boolean apply(BookieNode candidate, Ensemble chosenNodes) {
+ return true;
+ }
+
+ }
+
+ protected static class EnsembleForReplacement implements Ensemble {
+
+ public static final EnsembleForReplacement instance = new EnsembleForReplacement();
+ static final ArrayList<InetSocketAddress> EMPTY_LIST = new ArrayList<InetSocketAddress>(0);
+
+ @Override
+ public void addBookie(BookieNode node) {
+ // do nothing
+ }
+
+ @Override
+ public ArrayList<InetSocketAddress> toList() {
+ return EMPTY_LIST;
+ }
+
+ }
+
+ /**
+ * A predicate checking the rack coverage for write quorum in {@link RoundRobinDistributionSchedule},
+ * which ensures that a write quorum should be covered by at least two racks.
+ */
+ protected static class RRRackCoverageEnsemble implements Predicate, Ensemble {
+
+ class QuorumCoverageSet {
+ Set<String> racks = new HashSet<String>();
+ int seenBookies = 0;
+
+ boolean apply(BookieNode candidate) {
+ if (seenBookies + 1 == writeQuorumSize) {
+ return racks.size() > (racks.contains(candidate.getNetworkLocation()) ? 1 : 0);
+ }
+ return true;
+ }
+
+ void addBookie(BookieNode candidate) {
+ ++seenBookies;
+ racks.add(candidate.getNetworkLocation());
+ }
+ }
+
+ final int ensembleSize;
+ final int writeQuorumSize;
+ final ArrayList<BookieNode> chosenNodes;
+ private final QuorumCoverageSet[] quorums;
+
+ protected RRRackCoverageEnsemble(int ensembleSize, int writeQuorumSize) {
+ this.ensembleSize = ensembleSize;
+ this.writeQuorumSize = writeQuorumSize;
+ this.chosenNodes = new ArrayList<BookieNode>(ensembleSize);
+ this.quorums = new QuorumCoverageSet[ensembleSize];
+ }
+
+ @Override
+ public boolean apply(BookieNode candidate, Ensemble ensemble) {
+ if (ensemble != this) {
+ return false;
+ }
+ // candidate position
+ int candidatePos = chosenNodes.size();
+ int startPos = candidatePos - writeQuorumSize + 1;
+ for (int i = startPos; i <= candidatePos; i++) {
+ int idx = (i + ensembleSize) % ensembleSize;
+ if (null == quorums[idx]) {
+ quorums[idx] = new QuorumCoverageSet();
+ }
+ if (!quorums[idx].apply(candidate)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void addBookie(BookieNode node) {
+ int candidatePos = chosenNodes.size();
+ int startPos = candidatePos - writeQuorumSize + 1;
+ for (int i = startPos; i <= candidatePos; i++) {
+ int idx = (i + ensembleSize) % ensembleSize;
+ if (null == quorums[idx]) {
+ quorums[idx] = new QuorumCoverageSet();
+ }
+ quorums[idx].addBookie(node);
+ }
+ chosenNodes.add(node);
+ }
+
+ @Override
+ public ArrayList<InetSocketAddress> toList() {
+ ArrayList<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>(ensembleSize);
+ for (BookieNode bn : chosenNodes) {
+ addresses.add(bn.getAddr());
+ }
+ return addresses;
+ }
+
+ @Override
+ public String toString() {
+ return chosenNodes.toString();
+ }
+
+ }
+
+ protected static class BookieNode implements Node {
+
+ private final InetSocketAddress addr; // identifier of a bookie node.
+
+ private int level; // the level in topology tree
+ private Node parent; // its parent in topology tree
+ private String location = NetworkTopology.DEFAULT_RACK; // its network location
+ private final String name;
+
+ BookieNode(InetSocketAddress addr, String networkLoc) {
+ this.addr = addr;
+ this.name = StringUtils.addrToString(addr);
+ setNetworkLocation(networkLoc);
+ }
+
+ public InetSocketAddress getAddr() {
+ return addr;
+ }
+
+ @Override
+ public int getLevel() {
+ return level;
+ }
+
+ @Override
+ public void setLevel(int level) {
+ this.level = level;
+ }
+
+ @Override
+ public Node getParent() {
+ return parent;
+ }
+
+ @Override
+ public void setParent(Node parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getNetworkLocation() {
+ return location;
+ }
+
+ @Override
+ public void setNetworkLocation(String location) {
+ this.location = location;
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof BookieNode)) {
+ return false;
+ }
+ BookieNode other = (BookieNode) obj;
+ return getName().equals(other.getName());
+ }
+
+ @Override
+ public String toString() {
+ return String.format("<Bookie:%s>", name);
+ }
+
+ }
+
+ static class DefaultResolver implements DNSToSwitchMapping {
+
+ @Override
+ public List<String> resolve(List<String> names) {
+ List<String> rNames = new ArrayList<String>(names.size());
+ for (@SuppressWarnings("unused") String name : names) {
+ rNames.add(NetworkTopology.DEFAULT_RACK);
+ }
+ return rNames;
+ }
+
+ @Override
+ public void reloadCachedMappings() {
+ // nop
+ }
+
+ };
+
+ // for now, we just maintain the writable bookies' topology
+ private final NetworkTopology topology;
+ private DNSToSwitchMapping dnsResolver;
+ private final Map<InetSocketAddress, BookieNode> knownBookies;
+ private BookieNode localNode;
+ private final ReentrantReadWriteLock rwLock;
+
+ public RackawareEnsemblePlacementPolicy() {
+ topology = new NetworkTopology();
+ knownBookies = new HashMap<InetSocketAddress, BookieNode>();
+
+ rwLock = new ReentrantReadWriteLock();
+ }
+
+ private BookieNode createBookieNode(InetSocketAddress addr) {
+ return new BookieNode(addr, resolveNetworkLocation(addr));
+ }
+
+ @Override
+ public EnsemblePlacementPolicy initialize(Configuration conf) {
+ String dnsResolverName = conf.getString(REPP_DNS_RESOLVER_CLASS, ScriptBasedMapping.class.getName());
+ try {
+ dnsResolver = ReflectionUtils.newInstance(dnsResolverName, DNSToSwitchMapping.class);
+ if (dnsResolver instanceof Configurable) {
+ ((Configurable) dnsResolver).setConf(conf);
+ }
+ } catch (RuntimeException re) {
+ LOG.info("Failed to initialize DNS Resolver {}, used default subnet resolver.", dnsResolverName, re);
+ dnsResolver = new DefaultResolver();
+ }
+
+ BookieNode bn;
+ try {
+ bn = createBookieNode(new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 0));
+ } catch (UnknownHostException e) {
+ LOG.error("Failed to get local host address : ", e);
+ bn = null;
+ }
+ localNode = bn;
+ LOG.info("Initialize rackaware ensemble placement policy @ {} : {}.", localNode,
+ dnsResolver.getClass().getName());
+ return this;
+ }
+
+ @Override
+ public void uninitalize() {
+ // do nothing
+ }
+
+ private String resolveNetworkLocation(InetSocketAddress addr) {
+ List<String> names = new ArrayList<String>(1);
+ if (dnsResolver instanceof CachedDNSToSwitchMapping) {
+ names.add(addr.getAddress().getHostAddress());
+ } else {
+ names.add(addr.getHostName());
+ }
+ // resolve network addresses
+ List<String> rNames = dnsResolver.resolve(names);
+ String netLoc;
+ if (null == rNames) {
+ LOG.warn("Failed to resolve network location for {}, using default rack for them : {}.", names,
+ NetworkTopology.DEFAULT_RACK);
+ netLoc = NetworkTopology.DEFAULT_RACK;
+ } else {
+ netLoc = rNames.get(0);
+ }
+ return netLoc;
+ }
+
+ @Override
+ public Set<InetSocketAddress> onClusterChanged(Set<InetSocketAddress> writableBookies,
+ Set<InetSocketAddress> readOnlyBookies) {
+ rwLock.writeLock().lock();
+ try {
+ Set<InetSocketAddress> joinedBookies, leftBookies, deadBookies;
+ Set<InetSocketAddress> oldBookieSet = knownBookies.keySet();
+ // left bookies : bookies in known bookies, but not in new writable bookie cluster.
+ leftBookies = Sets.difference(oldBookieSet, writableBookies);
+ // joined bookies : bookies in new writable bookie cluster, but not in known bookies
+ joinedBookies = Sets.difference(writableBookies, oldBookieSet);
+ // dead bookies.
+ deadBookies = Sets.difference(leftBookies, readOnlyBookies);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Cluster changed : left bookies are {}, joined bookies are {}, while dead bookies are {}.",
+ new Object[] { leftBookies, joinedBookies, deadBookies });
+ }
+
+ // node left
+ for (InetSocketAddress addr : leftBookies) {
+ BookieNode node = knownBookies.remove(addr);
+ topology.remove(node);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cluster changed : bookie {} left from cluster.", addr);
+ }
+ }
+
+ // node joined
+ for (InetSocketAddress addr : joinedBookies) {
+ BookieNode node = createBookieNode(addr);
+ topology.add(node);
+ knownBookies.put(addr, node);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cluster changed : bookie {} joined the cluster.", addr);
+ }
+ }
+
+ return deadBookies;
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ private Set<Node> convertBookiesToNodes(Set<InetSocketAddress> excludeBookies) {
+ Set<Node> nodes = new HashSet<Node>();
+ for (InetSocketAddress addr : excludeBookies) {
+ BookieNode bn = knownBookies.get(addr);
+ if (null == bn) {
+ bn = createBookieNode(addr);
+ }
+ nodes.add(bn);
+ }
+ return nodes;
+ }
+
+ @Override
+ public ArrayList<InetSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
+ Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+ rwLock.readLock().lock();
+ try {
+ Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+ RRRackCoverageEnsemble ensemble = new RRRackCoverageEnsemble(ensembleSize, writeQuorumSize);
+ BookieNode prevNode = null;
+ int numRacks = topology.getNumOfRacks();
+ // only one rack, use the random algorithm.
+ if (numRacks < 2) {
+ List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes,
+ EnsembleForReplacement.instance);
+ ArrayList<InetSocketAddress> addrs = new ArrayList<InetSocketAddress>(ensembleSize);
+ for (BookieNode bn : bns) {
+ addrs.add(bn.addr);
+ }
+ return addrs;
+ }
+ // pick nodes by racks, to ensure there is at least two racks per write quorum.
+ for (int i = 0; i < ensembleSize; i++) {
+ String curRack;
+ if (null == prevNode) {
+ if (null == localNode) {
+ curRack = NodeBase.ROOT;
+ } else {
+ curRack = localNode.getNetworkLocation();
+ }
+ } else {
+ curRack = "~" + prevNode.getNetworkLocation();
+ }
+ prevNode = selectFromRack(curRack, excludeNodes, ensemble, ensemble);
+ }
+ return ensemble.toList();
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public InetSocketAddress replaceBookie(InetSocketAddress bookieToReplace,
+ Set<InetSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+ rwLock.readLock().lock();
+ try {
+ BookieNode bn = knownBookies.get(bookieToReplace);
+ if (null == bn) {
+ bn = createBookieNode(bookieToReplace);
+ }
+
+ Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+ // add the bookie to replace in exclude set
+ excludeNodes.add(bn);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Try to choose a new bookie to replace {}, excluding {}.", bookieToReplace,
+ excludeNodes);
+ }
+ // pick a candidate from same rack to replace
+ BookieNode candidate = selectFromRack(bn.getNetworkLocation(), excludeNodes,
+ TruePredicate.instance, EnsembleForReplacement.instance);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bn);
+ }
+ return candidate.addr;
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ protected BookieNode selectFromRack(String networkLoc, Set<Node> excludeBookies, Predicate predicate,
+ Ensemble ensemble) throws BKNotEnoughBookiesException {
+ // select one from local rack
+ try {
+ return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble);
+ } catch (BKNotEnoughBookiesException e) {
+ LOG.warn("Failed to choose a bookie from {} : "
+ + "excluded {}, fallback to choose bookie randomly from the cluster.",
+ networkLoc, excludeBookies);
+ // randomly choose one from whole cluster, ignore the provided predicate.
+ return selectRandom(1, excludeBookies, ensemble).get(0);
+ }
+ }
+
+ protected String getRemoteRack(BookieNode node) {
+ return "~" + node.getNetworkLocation();
+ }
+
+ /**
+ * Choose random node under a given network path.
+ *
+ * @param netPath
+ * network path
+ * @param excludeBookies
+ * exclude bookies
+ * @param predicate
+ * predicate to check whether the target is a good target.
+ * @param ensemble
+ * ensemble structure
+ * @return chosen bookie.
+ */
+ protected BookieNode selectRandomFromRack(String netPath, Set<Node> excludeBookies, Predicate predicate,
+ Ensemble ensemble) throws BKNotEnoughBookiesException {
+ List<Node> leaves = new ArrayList<Node>(topology.getLeaves(netPath));
+ Collections.shuffle(leaves);
+ for (Node n : leaves) {
+ if (excludeBookies.contains(n)) {
+ continue;
+ }
+ if (!(n instanceof BookieNode) || !predicate.apply((BookieNode) n, ensemble)) {
+ continue;
+ }
+ BookieNode bn = (BookieNode) n;
+ // got a good candidate
+ ensemble.addBookie(bn);
+ // add the candidate to exclude set
+ excludeBookies.add(bn);
+ return bn;
+ }
+ throw new BKNotEnoughBookiesException();
+ }
+
+ /**
+ * Choose a random node from whole cluster.
+ *
+ * @param numBookies
+ * number bookies to choose
+ * @param excludeBookies
+ * bookies set to exclude.
+ * @param ensemble
+ * ensemble to hold the bookie chosen.
+ * @return the bookie node chosen.
+ * @throws BKNotEnoughBookiesException
+ */
+ protected List<BookieNode> selectRandom(int numBookies, Set<Node> excludeBookies, Ensemble ensemble)
+ throws BKNotEnoughBookiesException {
+ List<BookieNode> allBookies = new ArrayList<BookieNode>(knownBookies.values());
+ Collections.shuffle(allBookies);
+ List<BookieNode> newBookies = new ArrayList<BookieNode>(numBookies);
+ for (BookieNode bookie : allBookies) {
+ if (excludeBookies.contains(bookie)) {
+ continue;
+ }
+ ensemble.addBookie(bookie);
+ excludeBookies.add(bookie);
+ newBookies.add(bookie);
+ --numBookies;
+ if (numBookies == 0) {
+ return newBookies;
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to find {} bookies : excludeBookies {}, allBookies {}.", new Object[] {
+ numBookies, excludeBookies, allBookies });
+ }
+ throw new BKNotEnoughBookiesException();
+ }
+
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java Wed Jun 12 16:41:47 2013
@@ -38,12 +38,13 @@ public abstract class AbstractConfigurat
static final Logger LOG = LoggerFactory.getLogger(AbstractConfiguration.class);
- private static ClassLoader defaultLoader;
+ protected static final ClassLoader defaultLoader;
static {
- defaultLoader = Thread.currentThread().getContextClassLoader();
- if (null == defaultLoader) {
- defaultLoader = AbstractConfiguration.class.getClassLoader();
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ if (null == loader) {
+ loader = AbstractConfiguration.class.getClassLoader();
}
+ defaultLoader = loader;
}
// Ledger Manager
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java Wed Jun 12 16:41:47 2013
@@ -20,7 +20,10 @@ package org.apache.bookkeeper.conf;
import java.util.List;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
-
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang.StringUtils;
import static com.google.common.base.Charsets.UTF_8;
@@ -49,6 +52,9 @@ public class ClientConfiguration extends
// Number Woker Threads
protected final static String NUM_WORKER_THREADS = "numWorkerThreads";
+ // Ensemble Placement Policy
+ protected final static String ENSEMBLE_PLACEMENT_POLICY = "ensemblePlacementPolicy";
+
/**
* Construct a default client-side configuration
*/
@@ -312,4 +318,28 @@ public class ClientConfiguration extends
setProperty(SPECULATIVE_READ_TIMEOUT, timeout);
return this;
}
+
+ /**
+ * Get Ensemble Placement Policy Class.
+ *
+ * @return ensemble placement policy class.
+ */
+ public Class<? extends EnsemblePlacementPolicy> getEnsemblePlacementPolicy()
+ throws ConfigurationException {
+ return ReflectionUtils.getClass(this, ENSEMBLE_PLACEMENT_POLICY,
+ RackawareEnsemblePlacementPolicy.class,
+ EnsemblePlacementPolicy.class,
+ defaultLoader);
+ }
+
+ /**
+ * Set Ensemble Placement Policy Class.
+ *
+ * @param policyClass
+ * Ensemble Placement Policy Class.
+ */
+ public ClientConfiguration setEnsemblePlacementPolicy(Class<? extends EnsemblePlacementPolicy> policyClass) {
+ setProperty(ENSEMBLE_PLACEMENT_POLICY, policyClass.getName());
+ return this;
+ }
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/Configurable.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/Configurable.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/Configurable.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/Configurable.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.conf;
+
+import org.apache.commons.configuration.Configuration;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Class that may be configured with a {@link Configuration}.
+ */
+@Beta
+public interface Configurable {
+
+ /**
+ * Set the configuration to be used by this object.
+ *
+ * @param conf
+ * Configuration object to use
+ */
+ public void setConf(Configuration conf);
+
+ /**
+ * Return the configuration used by this object.
+ *
+ * @return configuration used by this object.
+ */
+ public Configuration getConf();
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.net;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.bookkeeper.conf.Configurable;
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * This is a base class for DNS to Switch mappings. <p/> It is not mandatory to
+ * derive {@link DNSToSwitchMapping} implementations from it, but it is strongly
+ * recommended, as it makes it easy for the Hadoop developers to add new methods
+ * to this base class that are automatically picked up by all implementations.
+ * <p/>
+ *
+ * This class does not extend the <code>Configured</code>
+ * base class, and should not be changed to do so, as it causes problems
+ * for subclasses. The constructor of the <code>Configured</code> calls
+ * the {@link #setConf(Configuration)} method, which will call into the
+ * subclasses before they have been fully constructed.
+ *
+ */
+public abstract class AbstractDNSToSwitchMapping implements DNSToSwitchMapping, Configurable {
+
+ private Configuration conf;
+
+ /**
+ * Create an unconfigured instance
+ */
+ protected AbstractDNSToSwitchMapping() {
+ }
+
+ /**
+ * Create an instance, caching the configuration file.
+ * This constructor does not call {@link #setConf(Configuration)}; if
+ * a subclass extracts information in that method, it must call it explicitly.
+ * @param conf the configuration
+ */
+ protected AbstractDNSToSwitchMapping(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Predicate that indicates that the switch mapping is known to be
+ * single-switch. The base class returns false: it assumes all mappings are
+ * multi-rack. Subclasses may override this with methods that are more aware
+ * of their topologies.
+ *
+ * <p/>
+ *
+ * This method is used when parts of Hadoop need know whether to apply
+ * single rack vs multi-rack policies, such as during block placement.
+ * Such algorithms behave differently if they are on multi-switch systems.
+ * </p>
+ *
+ * @return true if the mapping thinks that it is on a single switch
+ */
+ public boolean isSingleSwitch() {
+ return false;
+ }
+
+ /**
+ * Get a copy of the map (for diagnostics)
+ * @return a clone of the map or null for none known
+ */
+ public Map<String, String> getSwitchMap() {
+ return null;
+ }
+
+ /**
+ * Generate a string listing the switch mapping implementation,
+ * the mapping for every known node and the number of nodes and
+ * unique switches known about -each entry to a separate line.
+ * @return a string that can be presented to the ops team or used in
+ * debug messages.
+ */
+ public String dumpTopology() {
+ Map<String, String> rack = getSwitchMap();
+ StringBuilder builder = new StringBuilder();
+ builder.append("Mapping: ").append(toString()).append("\n");
+ if (rack != null) {
+ builder.append("Map:\n");
+ Set<String> switches = new HashSet<String>();
+ for (Map.Entry<String, String> entry : rack.entrySet()) {
+ builder.append(" ").append(entry.getKey()).append(" -> ").append(entry.getValue()).append("\n");
+ switches.add(entry.getValue());
+ }
+ builder.append("Nodes: ").append(rack.size()).append("\n");
+ builder.append("Switches: ").append(switches.size()).append("\n");
+ } else {
+ builder.append("No topology information");
+ }
+ return builder.toString();
+ }
+
+ protected boolean isSingleSwitchByScriptPolicy() {
+ return conf != null && conf.getString(CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null;
+ }
+
+ /**
+ * Query for a {@link DNSToSwitchMapping} instance being on a single
+ * switch.
+ * <p/>
+ * This predicate simply assumes that all mappings not derived from
+ * this class are multi-switch.
+ * @param mapping the mapping to query
+ * @return true if the base class says it is single switch, or the mapping
+ * is not derived from this class.
+ */
+ public static boolean isMappingSingleSwitch(DNSToSwitchMapping mapping) {
+ return mapping != null && mapping instanceof AbstractDNSToSwitchMapping
+ && ((AbstractDNSToSwitchMapping) mapping).isSingleSwitch();
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A cached implementation of DNSToSwitchMapping that takes an
+ * raw DNSToSwitchMapping and stores the resolved network location in
+ * a cache. The following calls to a resolved network location
+ * will get its location from the cache.
+ *
+ */
+public class CachedDNSToSwitchMapping extends AbstractDNSToSwitchMapping {
+ private Map<String, String> cache = new ConcurrentHashMap<String, String>();
+
+ /**
+ * The uncached mapping
+ */
+ protected final DNSToSwitchMapping rawMapping;
+
+ /**
+ * cache a raw DNS mapping
+ * @param rawMapping the raw mapping to cache
+ */
+ public CachedDNSToSwitchMapping(DNSToSwitchMapping rawMapping) {
+ this.rawMapping = rawMapping;
+ }
+
+ /**
+ * @param names a list of hostnames to probe for being cached
+ * @return the hosts from 'names' that have not been cached previously
+ */
+ private List<String> getUncachedHosts(List<String> names) {
+ // find out all names without cached resolved location
+ List<String> unCachedHosts = new ArrayList<String>(names.size());
+ for (String name : names) {
+ if (cache.get(name) == null) {
+ unCachedHosts.add(name);
+ }
+ }
+ return unCachedHosts;
+ }
+
+ /**
+ * Caches the resolved host:rack mappings. The two list
+ * parameters must be of equal size.
+ *
+ * @param uncachedHosts a list of hosts that were uncached
+ * @param resolvedHosts a list of resolved host entries where the element
+ * at index(i) is the resolved value for the entry in uncachedHosts[i]
+ */
+ private void cacheResolvedHosts(List<String> uncachedHosts,
+ List<String> resolvedHosts) {
+ // Cache the result
+ if (resolvedHosts != null) {
+ for (int i=0; i<uncachedHosts.size(); i++) {
+ cache.put(uncachedHosts.get(i), resolvedHosts.get(i));
+ }
+ }
+ }
+
+ /**
+ * @param names a list of hostnames to look up (can be be empty)
+ * @return the cached resolution of the list of hostnames/addresses.
+ * or null if any of the names are not currently in the cache
+ */
+ private List<String> getCachedHosts(List<String> names) {
+ List<String> result = new ArrayList<String>(names.size());
+ // Construct the result
+ for (String name : names) {
+ String networkLocation = cache.get(name);
+ if (networkLocation != null) {
+ result.add(networkLocation);
+ } else {
+ return null;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public List<String> resolve(List<String> names) {
+ // normalize all input names to be in the form of IP addresses
+ names = NetUtils.normalizeHostNames(names);
+
+ List <String> result = new ArrayList<String>(names.size());
+ if (names.isEmpty()) {
+ return result;
+ }
+
+ List<String> uncachedHosts = getUncachedHosts(names);
+
+ // Resolve the uncached hosts
+ List<String> resolvedHosts = rawMapping.resolve(uncachedHosts);
+ //cache them
+ cacheResolvedHosts(uncachedHosts, resolvedHosts);
+ //now look up the entire list in the cache
+ return getCachedHosts(names);
+
+ }
+
+ /**
+ * Get the (host x switch) map.
+ * @return a copy of the cached map of hosts to rack
+ */
+ @Override
+ public Map<String, String> getSwitchMap() {
+ Map<String, String > switchMap = new HashMap<String, String>(cache);
+ return switchMap;
+ }
+
+
+ @Override
+ public String toString() {
+ return "cached switch mapping relaying to " + rawMapping;
+ }
+
+ /**
+ * Delegate the switch topology query to the raw mapping, via
+ * {@link AbstractDNSToSwitchMapping#isMappingSingleSwitch(DNSToSwitchMapping)}
+ * @return true iff the raw mapper is considered single-switch.
+ */
+ @Override
+ public boolean isSingleSwitch() {
+ return isMappingSingleSwitch(rawMapping);
+ }
+
+ @Override
+ public void reloadCachedMappings() {
+ cache.clear();
+ }
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CommonConfigurationKeys.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CommonConfigurationKeys.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CommonConfigurationKeys.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.net;
+
+public interface CommonConfigurationKeys {
+
+ // script file name to resolve network topology
+ public static final String NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY = "networkTopologyScriptFileName";
+ // number of arguments that network topology resolve script used
+ public static final String NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY = "networkTopologyScriptNumberArgs";
+ // default value of NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY
+ public static final int NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_DEFAULT = 100;
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+import java.util.List;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * An interface that must be implemented to allow pluggable
+ * DNS-name/IP-address to RackID resolvers.
+ *
+ */
+@Beta
+public interface DNSToSwitchMapping {
+ /**
+ * Resolves a list of DNS-names/IP-addresses and returns back a list of
+ * switch information (network paths). One-to-one correspondence must be
+ * maintained between the elements in the lists.
+ * Consider an element in the argument list - x.y.com. The switch information
+ * that is returned must be a network path of the form /foo/rack,
+ * where / is the root, and 'foo' is the switch where 'rack' is connected.
+ * Note the hostname/ip-address is not part of the returned path.
+ * The network topology of the cluster would determine the number of
+ * components in the network path.
+ * <p/>
+ *
+ * If a name cannot be resolved to a rack, the implementation
+ * should return {@link NetworkTopology#DEFAULT_RACK}. This
+ * is what the bundled implementations do, though it is not a formal requirement
+ *
+ * @param names the list of hosts to resolve (can be empty)
+ * @return list of resolved network paths.
+ * If <i>names</i> is empty, the returned list is also empty
+ */
+ public List<String> resolve(List<String> names);
+
+ /**
+ * Reload all of the cached mappings.
+ *
+ * If there is a cache, this method will clear it, so that future accesses
+ * will get a chance to see the new data.
+ */
+ public void reloadCachedMappings();
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class NetUtils {
+
+ /**
+ * Given a string representation of a host, return its ip address
+ * in textual presentation.
+ *
+ * @param name a string representation of a host:
+ * either a textual representation its IP address or its host name
+ * @return its IP address in the string format
+ */
+ public static String normalizeHostName(String name) {
+ try {
+ return InetAddress.getByName(name).getHostAddress();
+ } catch (UnknownHostException e) {
+ return name;
+ }
+ }
+
+ /**
+ * Given a collection of string representation of hosts, return a list of
+ * corresponding IP addresses in the textual representation.
+ *
+ * @param names a collection of string representations of hosts
+ * @return a list of corresponding IP addresses in the string format
+ * @see #normalizeHostName(String)
+ */
+ public static List<String> normalizeHostNames(Collection<String> names) {
+ List<String> hostNames = new ArrayList<String>(names.size());
+ for (String name : names) {
+ hostNames.add(normalizeHostName(name));
+ }
+ return hostNames;
+ }
+
+}