You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2009/06/24 07:07:24 UTC
svn commit: r787907 [1/2] - in /hadoop/zookeeper/trunk: ./
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ src/cont...
Author: breed
Date: Wed Jun 24 05:07:23 2009
New Revision: 787907
URL: http://svn.apache.org/viewvc?rev=787907&view=rev
Log:
ZOOKEEPER-356. Masking bookie failure during writes to a ledger
Added:
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKDefs.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerManagementProcessor.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Jun 24 05:07:23 2009
@@ -232,6 +232,8 @@
ZOOKEEPER-329. document how to integrate 3rd party authentication into ZK
server ACLs. (breed via mahadev)
+ ZOOKEEPER-356. Masking bookie failure during writes to a ledger (flavio via breed)
+
NEW FEATURES:
ZOOKEEPER-371. jdiff documentation included in build/release (giri via phunt)
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java Wed Jun 24 05:07:23 2009
@@ -44,7 +44,7 @@
public class Bookie extends Thread {
HashMap<Long, LedgerDescriptor> ledgers = new HashMap<Long, LedgerDescriptor>();
- Logger LOG = Logger.getLogger(Bookie.class);
+ static Logger LOG = Logger.getLogger(Bookie.class);
/**
* 4 byte signature followed by 2-byte major and 2-byte minor versions
*/
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java Wed Jun 24 05:07:23 2009
@@ -78,5 +78,4 @@
*/
void readComplete(int rc, LedgerHandle lh, LedgerSequence seq, Object ctx);
}
-
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKDefs.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKDefs.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKDefs.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKDefs.java Wed Jun 24 05:07:23 2009
@@ -26,12 +26,50 @@
* String used to construct znode paths. They are used in BookKeeper
* and LedgerManagementProcessor.
*/
+
+ /*
+ * Path to ledger metadata. ZooKeeper appends a sequence number to L.
+ */
static public final String prefix = "/ledgers/L";
+
+ /*
+ * Parent node to store ensemble composition. Each child corresponds to
+ * one bookie.
+ */
static public final String ensemble = "/ensemble";
+
+ /*
+ * Quorum size.
+ */
static public final String quorumSize = "/quorum";
+
+ /*
+ * Close node.
+ */
static public final String close = "/close";
+
+ /*
+ * Quorum mode: VERIFYING or GENERIC
+ */
static public final String quorumMode = "/mode";
+ /*
+ * Marks failure points in during writes to the ledger.
+ */
+ static public final String quorumEvolution = "/quorum_evolution";
+
+ /*
+ * Ledger is in write mode
+ */
+
+ static public final int WRITE = 0;
+
+ /*
+ * Ledger is in read mode
+ */
+
+ static public final int READ = 1;
+
/**
* Status ok
*/
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java Wed Jun 24 05:07:23 2009
@@ -48,8 +48,12 @@
return new BKDigestNotInitializedException();
case Code.DigestMatchException:
return new BKDigestMatchException();
+ case Code.NotEnoughBookiesException:
+ return new BKNotEnoughBookiesException();
case Code.NoSuchLedgerExistsException:
return new BKNoSuchLedgerExistsException();
+ case Code.BookieHandleNotAvailableException:
+ return new BKBookieHandleNotAvailableException();
default:
return new BKIllegalOpException();
}
@@ -62,7 +66,9 @@
int NoBookieAvailableException = -3;
int DigestNotInitializedException = -4;
int DigestMatchException = -5;
- int NoSuchLedgerExistsException = -6;
+ int NotEnoughBookiesException = -6;
+ int NoSuchLedgerExistsException = -7;
+ int BookieHandleNotAvailableException = -8;
int IllegalOpException = -100;
}
@@ -89,8 +95,12 @@
return "Digest engine not initialized";
case Code.DigestMatchException:
return "Entry digest does not match";
+ case Code.NotEnoughBookiesException:
+ return "Not enough non-faulty bookies available";
case Code.NoSuchLedgerExistsException:
return "No such ledger exists";
+ case Code.BookieHandleNotAvailableException:
+ return "Bookie handle is not available";
default:
return "Invalid operation";
}
@@ -132,10 +142,22 @@
}
}
+ public static class BKNotEnoughBookiesException extends BKException {
+ public BKNotEnoughBookiesException(){
+ super(Code.NotEnoughBookiesException);
+ }
+ }
+
public static class BKNoSuchLedgerExistsException extends BKException {
public BKNoSuchLedgerExistsException(){
super(Code.NoSuchLedgerExistsException);
}
}
+
+ public static class BKBookieHandleNotAvailableException extends BKException {
+ public BKBookieHandleNotAvailableException(){
+ super(Code.BookieHandleNotAvailableException);
+ }
+ }
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java Wed Jun 24 05:07:23 2009
@@ -24,6 +24,7 @@
import java.io.IOException;
import java.net.ConnectException;
import java.nio.ByteBuffer;
+import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -58,6 +59,8 @@
* There are three possible operations: start a new ledger,
* write to a ledger, and read from a ledger.
*
+ * For the ZooKeeper layout, please refer to BKDefs.java.
+ *
*/
public class BookKeeper
@@ -142,6 +145,8 @@
IOException, BKException {
// Check that quorum size follows the minimum
long t;
+ LedgerHandle lh = null;
+
switch(mode){
case VERIFIABLE:
t = java.lang.Math.round(java.lang.Math.floor((ensSize - 1)/2));
@@ -171,71 +176,77 @@
*/
String parts[] = path.split("/");
String subparts[] = parts[2].split("L");
- long lId = Long.parseLong(subparts[1]);
- /*
- * Get children from "/ledgers/available" on zk
- */
- List<String> list =
- zk.getChildren("/ledgers/available", false);
- ArrayList<InetSocketAddress> lBookies = new ArrayList<InetSocketAddress>();
- /*
- * Select ensSize servers to form the ensemble
- */
- path = zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, new byte[0],
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- /*
- * Add quorum size to ZK metadata
- */
- ByteBuffer bb = ByteBuffer.allocate(4);
- bb.putInt(qSize);
- zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, bb.array(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- /*
- * Quorum mode
- */
- bb = ByteBuffer.allocate(4);
- bb.putInt(mode.ordinal());
- zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, bb.array(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- /*
- * Create QuorumEngine
- */
- LedgerHandle lh = new LedgerHandle(this, lId, 0, qSize, mode, passwd);
- //qeMap.put(lId, queue);
- /*
- * Adding bookies to ledger handle
- */
- Random r = new Random();
-
- for(int i = 0; i < ensSize; i++){
- int index = 0;
- if(list.size() > 1)
- index = r.nextInt(list.size() - 1);
- else if(list.size() == 1)
- index = 0;
- else {
- LOG.error("Not enough bookies available");
+ try{
+ long lId = Long.parseLong(subparts[1]);
+
+ /*
+ * Get children from "/ledgers/available" on zk
+ */
+ List<String> list =
+ zk.getChildren("/ledgers/available", false);
+ ArrayList<InetSocketAddress> lBookies = new ArrayList<InetSocketAddress>();
+ /*
+ * Select ensSize servers to form the ensemble
+ */
+ path = zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, new byte[0],
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ /*
+ * Add quorum size to ZK metadata
+ */
+ ByteBuffer bb = ByteBuffer.allocate(4);
+ bb.putInt(qSize);
+ zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumSize, bb.array(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ /*
+ * Quorum mode
+ */
+ bb = ByteBuffer.allocate(4);
+ bb.putInt(mode.ordinal());
+ zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, bb.array(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ /*
+ * Create QuorumEngine
+ */
+ lh = new LedgerHandle(this, lId, 0, qSize, mode, passwd);
+
+ /*
+ * Adding bookies to ledger handle
+ */
+ Random r = new Random();
+
+ for(int i = 0; i < ensSize; i++){
+ int index = 0;
+ if(list.size() > 1)
+ index = r.nextInt(list.size() - 1);
+ else if(list.size() == 1)
+ index = 0;
+ else {
+ LOG.error("Not enough bookies available");
- return null;
- }
+ return null;
+ }
- try{
- String bookie = list.remove(index);
- LOG.info("Bookie: " + bookie);
- InetSocketAddress tAddr = parseAddr(bookie);
- int bindex = lh.addBookie(tAddr);
- ByteBuffer bindexBuf = ByteBuffer.allocate(4);
- bindexBuf.putInt(bindex);
+ try{
+ String bookie = list.remove(index);
+ LOG.info("Bookie: " + bookie);
+ InetSocketAddress tAddr = parseAddr(bookie);
+ int bindex = lh.addBookieForWriting(tAddr);
+ ByteBuffer bindexBuf = ByteBuffer.allocate(4);
+ bindexBuf.putInt(bindex);
- String pBookie = "/" + bookie;
- zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + pBookie, bindexBuf.array(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (IOException e) {
- LOG.error(e);
- i--;
- }
+ String pBookie = "/" + bookie;
+ zk.create(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + pBookie, bindexBuf.array(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (IOException e) {
+ LOG.error(e);
+ i--;
+ }
+ }
+ LOG.debug("Created new ledger");
+ } catch (NumberFormatException e) {
+ LOG.error("Error when parsing the ledger identifier", e);
}
- LOG.debug("Created new ledger");
// Return ledger handler
return lh;
}
@@ -333,7 +344,6 @@
*/
data = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.quorumMode, false, stat);
buf = ByteBuffer.wrap(data);
- //int ordinal = buf.getInt();
QMode qMode;
switch(buf.getInt()){
@@ -361,22 +371,62 @@
List<String> list =
zk.getChildren(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble, false);
- LOG.info("Length of list of bookies: " + list.size());
+ LOG.debug("Length of list of bookies: " + list.size());
for(int i = 0 ; i < list.size() ; i++){
for(String s : list){
+ LOG.debug("Extracting bookie: " + s);
byte[] bindex = zk.getData(BKDefs.prefix + getZKStringId(lId) + BKDefs.ensemble + "/" + s, false, stat);
ByteBuffer bindexBuf = ByteBuffer.wrap(bindex);
if(bindexBuf.getInt() == i){
try{
- lh.addBookie(parseAddr(s));
+ lh.addBookieForReading(parseAddr(s));
} catch (IOException e){
LOG.error(e);
}
}
}
}
+
+ /*
+ * Read changes to quorum over time. To determine if there has been changes during
+ * writes to the ledger, check if there is a znode called quorumEvolution.
+ */
+ if(zk.exists(BKDefs.prefix +
+ getZKStringId(lh.getId()) +
+ BKDefs.quorumEvolution, false) != null){
+ String path = BKDefs.prefix +
+ getZKStringId(lh.getId()) +
+ BKDefs.quorumEvolution;
+
+ List<String> faultList = zk.getChildren(path, false);
+ try{
+ for(String s : faultList){
+ LOG.debug("Faulty list child: " + s);
+ long entry = Long.parseLong(s);
+ String addresses = new String(zk.getData(path + "/" + s, false, stat));
+ String parts[] = addresses.split(" ");
+
+ ArrayList<BookieHandle> newBookieSet = new ArrayList<BookieHandle>();
+ for(int i = 0 ; i < parts.length ; i++){
+ LOG.debug("Address: " + parts[i]);
+ InetSocketAddress faultyBookie =
+ parseAddr(parts[i].substring(1));
+
+ newBookieSet.add(lh.getBookieHandleDup(faultyBookie));
+ }
+ lh.setNewBookieConfig(entry, newBookieSet);
+ LOG.debug("NewBookieSet size: " + newBookieSet.size());
+ }
+
+ lh.prepareEntryChange();
+ } catch (NumberFormatException e) {
+ LOG.error("Error when parsing the ledger identifier", e);
+ }
+ }
- // Return ledger handler
+ /*
+ * Return ledger handler
+ */
return lh;
}
@@ -518,12 +568,14 @@
* @param a InetSocketAddress
*/
- synchronized BookieHandle getBookieHandle(InetSocketAddress a)
+ synchronized BookieHandle getBookieHandle(LedgerHandle lh, InetSocketAddress a)
throws ConnectException, IOException {
if(!bhMap.containsKey(a)){
- bhMap.put(a, new BookieHandle(a));
+ BookieHandle bh = new BookieHandle(a, true);
+ bhMap.put(a, bh);
+ bh.start();
}
- bhMap.get(a).incRefCount();
+ bhMap.get(a).incRefCount(lh);
return bhMap.get(a);
}
@@ -533,9 +585,10 @@
* remove it from the list.
*/
- synchronized void haltBookieHandles(ArrayList<BookieHandle> bookies){
- for(BookieHandle bh : bookies){
- if(bh.halt() <= 0)
+ synchronized void haltBookieHandles(LedgerHandle lh, ArrayList<BookieHandle> bookies){
+ while(bookies.size() > 0){
+ BookieHandle bh = bookies.remove(0);
+ if(bh.halt(lh) <= 0)
bhMap.remove(bh.addr);
}
}
@@ -549,5 +602,15 @@
bookieBlackList.add(addr);
}
-
+ /**
+ * Halts all bookie handles
+ *
+ */
+ public void halt() throws InterruptedException{
+
+ for(BookieHandle bh: bhMap.values()){
+ bh.shutdown();
+ }
+ zk.close();
+ }
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java Wed Jun 24 05:07:23 2009
@@ -24,6 +24,8 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.security.NoSuchAlgorithmException;
@@ -31,12 +33,15 @@
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
+import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.LedgerHandle.QMode;
import org.apache.bookkeeper.client.QuorumEngine.Operation;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.StopOp;
import org.apache.bookkeeper.client.QuorumEngine.SubOp;
import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubAddOp;
import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubReadOp;
+import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubStopOp;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.log4j.Logger;
@@ -47,15 +52,17 @@
*
*/
-class BookieHandle extends Thread{
- Logger LOG = Logger.getLogger(BookieClient.class);
+public class BookieHandle extends Thread {
+ static Logger LOG = Logger.getLogger(BookieClient.class);
- boolean stop = false;
+ volatile boolean stop = false;
+ boolean noreception = false;
private BookieClient client;
InetSocketAddress addr;
static int recvTimeout = 2000;
private ArrayBlockingQueue<ToSend> incomingQueue;
private int refCount = 0;
+ HashSet<LedgerHandle> ledgers;
/**
* Objects of this class are queued waiting to be
@@ -79,13 +86,17 @@
* @param addr address of the bookkeeper server that this
* handle should connect to.
*/
- BookieHandle(InetSocketAddress addr) throws IOException {
- this.client = new BookieClient(addr, recvTimeout);
+ BookieHandle(InetSocketAddress addr, boolean enabled) throws IOException {
+ this.stop = !enabled;
+ this.noreception = !enabled;
+ if(!stop)
+ this.client = new BookieClient(addr, recvTimeout);
+ else
+ this.client = null;
+
this.addr = addr;
this.incomingQueue = new ArrayBlockingQueue<ToSend>(2000);
-
- //genSecurePadding();
- start();
+ this.ledgers = new HashSet<LedgerHandle>();
}
@@ -100,22 +111,39 @@
}
/**
- * Sending add operation to bookie
+ * Sending add operation to bookie. We have to synchronize the send to guarantee
+ * that requests will either get a response or throw an exception.
*
* @param r
* @param cb
* @param ctx
* @throws IOException
*/
- public void sendAdd(LedgerHandle lh, SubAddOp r, long entry)
- throws IOException {
+ public synchronized void sendAdd(LedgerHandle lh, SubAddOp r, long entry)
+ throws IOException, BKException {
try{
- incomingQueue.put(new ToSend(lh, r, entry));
+ if(!noreception){
+ ToSend ts = new ToSend(lh, r, entry);
+ if(!incomingQueue.offer(ts, 1000, TimeUnit.MILLISECONDS))
+ throw BKException.create(Code.BookieHandleNotAvailableException);
+ } else {
+ throw BKException.create(Code.BookieHandleNotAvailableException);
+ }
} catch(InterruptedException e){
LOG.warn("Interrupted while waiting for room in the incoming queue");
}
}
+ private synchronized void sendStop(){
+ try{
+ noreception = true;
+ LOG.debug("Sending stop signal");
+ incomingQueue.put(new ToSend(null, new SubStopOp(new StopOp()), -1));
+ LOG.debug("Sent stop signal");
+ } catch(InterruptedException e) {
+ LOG.fatal("Interrupted while sending stop signal to bookie handle");
+ }
+ }
/**
* MAC instance
*
@@ -142,29 +170,41 @@
* @throws IOException
*/
- public void sendRead(LedgerHandle lh, SubReadOp r, long entry)
- throws IOException {
+ public synchronized void sendRead(LedgerHandle lh, SubReadOp r, long entry)
+ throws IOException, BKException {
try{
- incomingQueue.put(new ToSend(lh, r, entry));
+ if(!noreception){
+ ToSend ts = new ToSend(lh, r, entry);
+ if(!incomingQueue.offer(ts, 1000, TimeUnit.MILLISECONDS))
+ throw BKException.create(Code.BookieHandleNotAvailableException);
+ } else {
+ throw BKException.create(Code.BookieHandleNotAvailableException);
+ }
} catch(InterruptedException e){
LOG.warn("Interrupted while waiting for room in the incoming queue");
}
}
public void run(){
- while(!stop){
- try{
- ToSend ts = incomingQueue.poll(1000, TimeUnit.MILLISECONDS);
+ ToSend ts;
+
+ try{
+ while(!stop){
+ ts = incomingQueue.poll(1000, TimeUnit.MILLISECONDS);
+
if(ts != null){
LedgerHandle self = ts.lh;
switch(ts.type){
+ case Operation.STOP:
+ LOG.info("Stopping BookieHandle: " + addr);
+ client.errorOut();
+ cleanQueue();
+ LOG.debug("Stopped");
+ break;
case Operation.ADD:
SubAddOp aOp = (SubAddOp) ts.ctx;
AddOp op = ((AddOp) aOp.op);
- /*
- * TODO: Really add the confirmed add to the op
- */
long confirmed = self.getAddConfirmed();
ByteBuffer extendedData;
@@ -179,7 +219,6 @@
extendedData.rewind();
byte[] toProcess = new byte[op.data.length + 24];
extendedData.get(toProcess, 0, op.data.length + 24);
- //extendedData.limit(extendedData.capacity() - 20);
extendedData.position(extendedData.capacity() - 20);
if(mac == null)
getMac(self.getMacKey(), "HmacSHA1");
@@ -200,47 +239,133 @@
ts.ctx);
break;
case Operation.READ:
- client.readEntry(self.getId(),
- ts.entry,
- ((SubReadOp) ts.ctx).rcb,
- ts.ctx);
+ if(client != null)
+ client.readEntry(self.getId(),
+ ts.entry,
+ ((SubReadOp) ts.ctx).rcb,
+ ts.ctx);
+ else ((SubReadOp) ts.ctx).rcb.readEntryComplete(-1, ts.lh.getId(), ts.entry, null, ts.ctx);
break;
}
- }
- } catch (InterruptedException e){
- LOG.error(e);
- } catch (IOException e){
- LOG.error(e);
- } catch (NoSuchAlgorithmException e){
- LOG.error(e);
- } catch (InvalidKeyException e) {
- LOG.error(e);
+ } else LOG.warn("Empty queue: " + addr);
}
- }
+ } catch (Exception e){
+ LOG.error("Handling exception before halting BookieHandle", e);
+ for(LedgerHandle lh : ledgers)
+ lh.removeBookie(this);
+
+ /*
+ * We only need to synchronize when setting noreception to avoid that
+ * a client thread add another request to the incomingQueue after we
+ * have cleaned it.
+ */
+ synchronized(this){
+ noreception = true;
+ }
+ client.halt();
+ client.errorOut();
+ cleanQueue();
+ }
+
+ LOG.info("Exiting bookie handle thread: " + addr);
}
+
/**
* Multiple ledgers may use the same BookieHandle object, so we keep
* a count on the number of references.
*/
- int incRefCount(){
+ int incRefCount(LedgerHandle lh){
+ ledgers.add(lh);
return ++refCount;
}
/**
* Halts if there is no ledger using this object.
+ *
+ * @return int reference counter
*/
- int halt(){
+ synchronized int halt(LedgerHandle lh){
+ LOG.info("Calling halt");
+ ledgers.remove(lh);
int currentCount = --refCount;
if(currentCount <= 0){
- stop = true;
+ shutdown();
}
if(currentCount < 0)
LOG.warn("Miscalculated the number of reference counts: " + addr);
-
+
return currentCount;
}
+
+ /**
+ * Halt this bookie handle independent of the number of ledgers using it. Called upon a
+ * failure to write. This method cannot be called by this thread because it may cause a
+ * deadlock as shutdown invokes sendStop. The deadlock comes from sendAdd blocking on
+ * incomingQueue when the queue is full and the thread also blocking on it when
+ * trying to send the stop marker. Because this thread is actually the consumer, if it
+ * does not make progress, then we have a deadlock.
+ *
+ * @return int reference counter
+ */
+ synchronized public int halt(){
+ if(!stop){
+ LOG.info("Calling halt");
+ for(LedgerHandle lh : ledgers)
+ lh.removeBookie(this);
+ refCount = 0;
+ shutdown();
+ }
+
+ return refCount;
+ }
+
+ /**
+ * Stop this bookie handle completely.
+ *
+ */
+ public void shutdown(){
+ if(!stop){
+ LOG.info("Calling shutdown");
+ LOG.debug("Halting client");
+ client.halt();
+ LOG.debug("Cleaning queue");
+ sendStop();
+ LOG.debug("Finished shutdown");
+ }
+ }
+
+ /**
+ * Invokes the callback method for pending requests in the queue
+ * of this BookieHandle.
+ */
+ private void cleanQueue(){
+ stop = true;
+ ToSend ts = incomingQueue.poll();
+ while(ts != null){
+ switch(ts.type){
+ case Operation.ADD:
+ SubAddOp aOp = (SubAddOp) ts.ctx;
+ aOp.wcb.writeComplete(-1, ts.lh.getId(), ts.entry, ts.ctx);
+
+ break;
+ case Operation.READ:
+ ((SubReadOp) ts.ctx).rcb.readEntryComplete(-1, ts.lh.getId(), ts.entry, null, ts.ctx);
+ break;
+ }
+ ts = incomingQueue.poll();
+ }
+ }
+
+ /**
+ * Returns the negated value of stop, which gives the status of the
+ * BookieHandle.
+ */
+
+ boolean isEnabled(){
+ return !stop;
+ }
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java Wed Jun 24 05:07:23 2009
@@ -38,17 +38,17 @@
*/
class ClientCBWorker extends Thread{
- Logger LOG = Logger.getLogger(ClientCBWorker.class);
+ static Logger LOG = Logger.getLogger(ClientCBWorker.class);
static ClientCBWorker instance = null;
- private boolean stop = false;
+ private volatile boolean stop;
private static int instanceCounter= 0;
ArrayBlockingQueue<Operation> pendingOps;
QuorumOpMonitor monitor;
- static synchronized ClientCBWorker getInstance(){
+ static ClientCBWorker getInstance(){
if(instance == null){
instance = new ClientCBWorker();
}
@@ -63,9 +63,10 @@
*
*/
ClientCBWorker(){
- pendingOps = new ArrayBlockingQueue<Operation>(4000);
+ pendingOps = new ArrayBlockingQueue<Operation>(6000);
+ stop = false;
start();
- LOG.debug("Have started cbWorker");
+ LOG.info("Have started cbWorker");
}
@@ -84,11 +85,11 @@
* Gets thread out of its main loop.
*
*/
- synchronized void shutdown(){
+ void shutdown(){
if((--instanceCounter) == 0){
stop = true;
instance = null;
- LOG.debug("Shutting down");
+ LOG.info("Shutting down CBWorker");
}
}
@@ -105,14 +106,14 @@
if(op != null){
synchronized(op){
while(!op.isReady()){
- op.wait();
+ op.wait(1000);
}
}
-
+
switch(op.type){
case Operation.ADD:
AddOp aOp = (AddOp) op;
-
+
aOp.getLedger().setAddConfirmed(aOp.entry);
aOp.cb.addComplete(aOp.getErrorCode(),
aOp.getLedger(),
@@ -122,14 +123,13 @@
break;
case Operation.READ:
ReadOp rOp = (ReadOp) op;
- //LOG.debug("Got one message from the queue: " + rOp.firstEntry);
rOp.cb.readComplete(rOp.getErrorCode(),
rOp.getLedger(),
new LedgerSequence(rOp.seq),
rOp.ctx);
break;
}
- }
+ }
}
} catch (InterruptedException e){
LOG.error("Exception while waiting on queue or operation");
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java Wed Jun 24 05:07:23 2009
@@ -28,7 +28,11 @@
import java.security.NoSuchAlgorithmException;
import java.security.MessageDigest;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.TreeMap;
+import org.apache.bookkeeper.client.BKDefs;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookieHandle;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -56,7 +60,7 @@
* ledgerhandle->write->bookeeper->quorumengine->bookiehandle
* ->bookieclient
*/
- Logger LOG = Logger.getLogger(LedgerHandle.class);
+ static Logger LOG = Logger.getLogger(LedgerHandle.class);
public enum QMode {VERIFIABLE, GENERIC, FREEFORM};
@@ -64,12 +68,16 @@
private long ledger;
private volatile long last;
private volatile long lastAddConfirmed = 0;
- private ArrayList<BookieHandle> bookies;
+ private HashMap<Integer, Long> lastRecvCorrectly;
+ private volatile ArrayList<BookieHandle> bookies;
private ArrayList<InetSocketAddress> bookieAddrList;
+ private TreeMap<Long, ArrayList<BookieHandle> > bookieConfigMap;
+ private long[] entryChange;
private BookKeeper bk;
private QuorumEngine qe;
private int qSize;
private QMode qMode = QMode.VERIFIABLE;
+ private int lMode;
private int threshold;
private String digestAlg = "SHA1";
@@ -94,6 +102,7 @@
this.ledger = ledger;
this.last = last;
this.bookies = new ArrayList<BookieHandle>();
+ this.lastRecvCorrectly = new HashMap<Integer, Long>();
this.passwd = passwd;
genLedgerKey(passwd);
genMacKey(passwd);
@@ -122,6 +131,8 @@
this.ledger = ledger;
this.last = last;
this.bookies = new ArrayList<BookieHandle>();
+ this.lastRecvCorrectly = new HashMap<Integer, Long>();
+
this.qSize = qSize;
this.qMode = mode;
@@ -150,6 +161,8 @@
this.ledger = ledger;
this.last = last;
this.bookies = new ArrayList<BookieHandle>();
+ this.lastRecvCorrectly = new HashMap<Integer, Long>();
+
this.qSize = qSize;
this.passwd = passwd;
@@ -165,7 +178,7 @@
LOG.debug("Opening bookieHandle: " + a);
//BookieHandle bh = new BookieHandle(this, a);
- this.bookies.add(bk.getBookieHandle(a));
+ this.bookies.add(bk.getBookieHandle(this, a));
}
} catch(ConnectException e){
LOG.error(e);
@@ -198,15 +211,37 @@
*
* @param addr socket address
*/
- int addBookie(InetSocketAddress addr)
+ int addBookieForWriting(InetSocketAddress addr)
throws IOException {
LOG.debug("Bookie address: " + addr);
+ lMode = BKDefs.WRITE;
//BookieHandle bh = new BookieHandle(this, addr);
- this.bookies.add(bk.getBookieHandle(addr));
+ this.bookies.add(bk.getBookieHandle(this, addr));
if(bookies.size() > qSize) setThreshold();
return (this.bookies.size() - 1);
}
+ /**
+ * Create bookie handle and add it to the list
+ *
+ * @param addr socket address
+ */
+ int addBookieForReading(InetSocketAddress addr)
+ throws IOException {
+ LOG.debug("Bookie address: " + addr);
+ lMode = BKDefs.READ;
+ //BookieHandle bh = new BookieHandle(this, addr);
+ try{
+ this.bookies.add(bk.getBookieHandle(this, addr));
+ } catch (IOException e){
+ LOG.info("Inserting a decoy bookie handle");
+ this.bookies.add(new BookieHandle(addr, false));
+ }
+ if(bookies.size() > qSize) setThreshold();
+ return (this.bookies.size() - 1);
+ }
+
+
private void setThreshold() {
switch(qMode){
case GENERIC:
@@ -225,6 +260,47 @@
return threshold;
}
+
+ /**
+ * Writes to BookKeeper changes to the ensemble.
+ *
+ * @param addr Address of faulty bookie
+ * @param entry Last entry written before change of ensemble.
+ */
+
+ void changeEnsemble(long entry){
+ String path = BKDefs.prefix +
+ bk.getZKStringId(getId()) +
+ BKDefs.quorumEvolution + "/" +
+ String.format("%010d", entry);
+
+ LOG.info("Report failure: " + String.format("%010d", entry));
+ try{
+ if(bk.getZooKeeper().exists(BKDefs.prefix +
+ bk.getZKStringId(getId()) +
+ BKDefs.quorumEvolution, false) == null)
+ bk.getZooKeeper().create(BKDefs.prefix + bk.getZKStringId(getId()) +
+ BKDefs.quorumEvolution, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ boolean first = true;
+ String addresses = "";
+ for(BookieHandle bh : bookies){
+ if(first){
+ addresses = bh.addr.toString();
+ first = false;
+ }
+ else
+ addresses = addresses + " " + bh.addr.toString();
+ }
+
+ bk.getZooKeeper() .create(path, addresses.getBytes(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch(Exception e){
+ LOG.error("Could not write to ZooKeeper: " + path + ", " + e);
+ }
+ }
+
/**
* Replace bookie in the case of a failure
*/
@@ -250,7 +326,7 @@
/*
* If successful in writing to new bookie, add it to the set
*/
- this.bookies.set(index, bk.getBookieHandle(addr));
+ this.bookies.set(index, bk.getBookieHandle(this, addr));
} catch(ConnectException e){
bk.blackListBookie(addr);
LOG.error(e);
@@ -266,16 +342,23 @@
* to replace the current faulty one. In such cases,
* we simply remove the bookie.
*
- * @param index
+ *
+ * @param BookieHandle
*/
- void removeBookie(int index){
- bookies.remove(index);
- }
-
- void closeUp(){
- ledger = -1;
- last = -1;
- bk.haltBookieHandles(bookies);
+ synchronized void removeBookie(BookieHandle bh){
+ if(lMode == BKDefs.WRITE){
+ LOG.info("Removing bookie: " + bh.addr);
+ int index = bookies.indexOf(bh);
+ if(index >= 0){
+ Long tmpLastRecv = lastRecvCorrectly.get(index);
+ bookies.remove(index);
+
+ if(tmpLastRecv == null)
+ changeEnsemble(0);
+ else
+ changeEnsemble(tmpLastRecv);
+ }
+ }
}
@@ -328,6 +411,11 @@
return lastAddConfirmed;
}
+ void setLastRecvCorrectly(int sId, long entry){
+ //LOG.info("Setting last received correctly: " + entry);
+ lastRecvCorrectly.put(sId, entry);
+ }
+
/**
* Returns the list of bookies
* @return ArrayList<BookieHandle>
@@ -337,6 +425,73 @@
}
/**
+ * For reads, there might be multiple operations.
+ *
+ * @param entry
+ * @return ArrayList<BookieHandle> returns list of bookies
+ */
+ ArrayList<BookieHandle> getBookies(long entry){
+ return getConfig(entry);
+ }
+
+ /**
+ * Returns the bookie handle corresponding to the addresses in the input.
+ *
+ * @param addr
+ * @return
+ */
+ BookieHandle getBookieHandleDup(InetSocketAddress addr){
+ for(BookieHandle bh : bookies){
+ if(bh.addr.equals(addr))
+ return bh;
+ }
+
+ return null;
+ }
+
+ /**
+ * Sets a new bookie configuration corresponding to a failure during
+ * writes to the ledger. We have one configuration for every failure.
+ *
+ * @param entry
+ * @param list
+ */
+
+ void setNewBookieConfig(long entry, ArrayList<BookieHandle> list){
+ if(bookieConfigMap == null)
+ bookieConfigMap = new TreeMap<Long, ArrayList<BookieHandle> >();
+
+ /*
+ * If initial config is not in the list, we include it.
+ */
+ if(!bookieConfigMap.containsKey(new Long(0))){
+ bookieConfigMap.put(new Long(0), bookies);
+ }
+
+ LOG.info("Adding new entry: " + entry + ", " + bookies.size() + ", " + list.size());
+ bookieConfigMap.put(entry, list);
+ }
+
+ /**
+ * Once we read all changes to the bookie configuration, we
+ * have to call this method to generate an array that we use
+ * to determine the bookie configuration for an entry.
+ *
+ * Note that this array is a performance optimization and
+ * it is not necessary for correctness. We could just use
+ * bookieConfigMap but it would be slower.
+ */
+
+ void prepareEntryChange(){
+ entryChange = new long[bookieConfigMap.size()];
+
+ int counter = 0;
+ for(Long l : bookieConfigMap.keySet()){
+ entryChange[counter++] = l;
+ }
+ }
+
+ /**
* Return the quorum size. By default, the size of a quorum is (n+1)/2,
* where n is the size of the set of bookies.
* @return int
@@ -345,6 +500,36 @@
return qSize;
}
+
+ /**
+ * Returns the config corresponding to the entry
+ *
+ * @param entry
+ * @return
+ */
+ private ArrayList<BookieHandle> getConfig(long entry){
+ if(bookieConfigMap == null)
+ return bookies;
+
+ int index = Arrays.binarySearch(entryChange, entry);
+
+ /*
+ * If not on the map, binarySearch returns a negative value
+ */
+ int before = index;
+ index = index >= 0? index : ((-1) - index);
+
+ if(index == 0){
+ if((entry % 10) == 0){
+ LOG.info("Index: " + index + ", " + before + ", " + entry + ", " + bookieConfigMap.get(entryChange[index]).size());
+ }
+ return bookieConfigMap.get(entryChange[index]);
+ } else{
+ //LOG.warn("IndexDiff " + entry);
+ return bookieConfigMap.get(entryChange[index - 1]);
+ }
+ }
+
/**
* Returns the quorum mode for this ledger: Verifiable or Generic
*/
@@ -440,12 +625,18 @@
return ledgerKey;
}
+ void closeUp(){
+ ledger = -1;
+ last = -1;
+ bk.haltBookieHandles(this, bookies);
+ }
+
/**
* Close ledger.
*
*/
public void close()
- throws KeeperException, InterruptedException {
+ throws KeeperException, InterruptedException, BKException {
//Set data on zookeeper
ByteBuffer last = ByteBuffer.allocate(8);
last.putLong(lastAddConfirmed);
@@ -456,13 +647,12 @@
last.array(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- } else {
- bk.getZooKeeper().setData(closePath,
- last.array(), -1);
- }
+ }
+
closeUp();
StopOp sOp = new StopOp();
qe.sendOp(sOp);
+ LOG.info("##### CB worker queue size: " + qe.cbWorker.pendingOps.size());
}
/**
@@ -515,7 +705,7 @@
RetCounter counter = new RetCounter();
counter.inc();
-
+
Operation r = new ReadOp(this, firstEntry, lastEntry, this, counter);
qe.sendOp(r);
@@ -523,7 +713,10 @@
counter.block(0);
LOG.debug("Done with waiting: " + counter.i + ", " + firstEntry);
- if(counter.getSequence() == null) throw BKException.create(Code.ReadException);
+ if(counter.getSequence() == null){
+ LOG.error("Failed to read entries: " + firstEntry + ", " + lastEntry);
+ throw BKException.create(Code.ReadException);
+ }
return counter.getSequence();
}
@@ -535,7 +728,7 @@
* @param ctx some control object
*/
public void asyncAddEntry(byte[] data, AddCallback cb, Object ctx)
- throws InterruptedException {
+ throws InterruptedException, BKException {
AddOp r = new AddOp(this, data, cb, ctx);
qe.sendOp(r);
}
@@ -548,7 +741,7 @@
*/
public long addEntry(byte[] data)
- throws InterruptedException{
+ throws InterruptedException, BKException{
LOG.debug("Adding entry " + data);
RetCounter counter = new RetCounter();
counter.inc();
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerManagementProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerManagementProcessor.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerManagementProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerManagementProcessor.java Wed Jun 24 05:07:23 2009
@@ -95,7 +95,7 @@
/**
* Set value of action
*
- * @return
+ * @return int return action identifier
*/
int setAction(int action){
return this.action = action;
@@ -104,7 +104,7 @@
/**
* Return value of action
*
- * @return
+ * @return int return action identifier
*/
int getAction(){
return action;
@@ -122,7 +122,7 @@
/**
* Return return code
*
- * @return
+ * @return int return code
*/
int getRC(){
return rc;
@@ -365,7 +365,11 @@
private int qSize;
private long last;
private QMode qMode;
- private List<String> bookieIds;
+ private List<String> children;
+
+ private String dataString;
+ private String item;
+ private AtomicInteger counter;
/**
* Constructor of request to open a ledger.
@@ -468,8 +472,8 @@
*
* @param list list of bbokie identifiers
*/
- void addBookieIds(List<String> list){
- this.bookieIds = list;
+ void addChildren(List<String> list){
+ this.children = list;
}
/**
@@ -477,8 +481,55 @@
*
* @return List<String> list of bookie identifiers
*/
- List<String> getBookieIds(){
- return bookieIds;
+ List<String> getChildren(){
+ return children;
+ }
+
+ /**
+ * Returns the size of the children list. Used in processOpen.
+ *
+ * @return int
+ */
+ int getListSize(){
+ return children.size();
+ }
+
+ /**
+ * Sets the value of item. This is used in processOpen to
+ * keep the item value of the list of ensemble changes.
+ *
+ * @param item
+ */
+ void setItem(String item){
+ this.item = item;
+ }
+
+ /**
+ * Returns the value of item
+ *
+ * @return String
+ */
+
+ String getItem(){
+ return item;
+ }
+
+ /**
+ * Sets the value of dataString
+ *
+ * @param data value to set
+ */
+ void setStringData(String data){
+ this.dataString = data;
+ }
+
+ /**
+ * Returns the value of dataString
+ *
+ * @return String
+ */
+ String getStringData(){
+ return dataString;
}
}
@@ -731,7 +782,7 @@
String bookie = children.remove(index);
LOG.info("Bookie: " + bookie);
InetSocketAddress tAddr = bk.parseAddr(bookie);
- int bindex = cop.getLh().addBookie(tAddr);
+ int bindex = cop.getLh().addBookieForWriting(tAddr);
ByteBuffer bindexBuf = ByteBuffer.allocate(4);
bindexBuf.putInt(bindex);
@@ -773,6 +824,9 @@
if(oop.getRC() != BKDefs.EOK)
oop.getCb().openComplete(oop.getRC(), null, oop.getCtx());
+ String path;
+ LedgerHandle lh;
+
switch(oop.getAction()){
case 0:
/*
@@ -833,7 +887,7 @@
/*
* Create ledger handle
*/
- LedgerHandle lh = new LedgerHandle(bk, oop.getLid(), oop.getLast(), oop.getQSize(), oop.getQMode(), oop.getPasswd());
+ lh = new LedgerHandle(bk, oop.getLid(), oop.getLast(), oop.getQSize(), oop.getQMode(), oop.getPasswd());
/*
* Get children of "/ledgers/id/ensemble"
@@ -846,7 +900,7 @@
break;
case 7:
- List<String> list = oop.getBookieIds();
+ List<String> list = oop.getChildren();
LOG.info("Length of list of bookies: " + list.size());
try{
for(int i = 0 ; i < list.size() ; i++){
@@ -855,19 +909,81 @@
false, new Stat());
ByteBuffer bindexBuf = ByteBuffer.wrap(bindex);
if(bindexBuf.getInt() == i){
- oop.getLh().addBookie(bk.parseAddr(s));
+ oop.getLh().addBookieForReading(bk.parseAddr(s));
}
}
}
+
+ /*
+ * Check if there has been any change to the ensemble of bookies
+ * due to failures.
+ */
+ bk.getZooKeeper().exists(BKDefs.prefix +
+ bk.getZKStringId(oop.getLid()) +
+ BKDefs.quorumEvolution,
+ false,
+ this,
+ oop);
+
} catch(KeeperException e){
LOG.error("Exception while adding bookies", e);
oop.setRC(BKDefs.EZK);
+ oop.getCb().openComplete(oop.getRC(), oop.getLh(), oop.getCtx());
} catch(IOException e){
LOG.error("Exception while trying to connect to bookie");
oop.setRC(BKDefs.EIO);
- } finally {
+ oop.getCb().openComplete(oop.getRC(), oop.getLh(), oop.getCtx());
+ }
+
+ break;
+
+ case 8:
+ path = BKDefs.prefix +
+ bk.getZKStringId(oop.getLid()) +
+ BKDefs.quorumEvolution;
+
+ bk.getZooKeeper().getChildren(path,
+ false,
+ this,
+ oop);
+ case 9:
+ oop.getCb().openComplete(oop.getRC(), oop.getLh(), oop.getCtx());
+ break;
+ case 10:
+ path = BKDefs.prefix +
+ bk.getZKStringId(oop.getLid()) +
+ BKDefs.quorumEvolution;
+
+ for(String s : oop.getChildren()){
+ oop.setItem(s);
+ bk.getZooKeeper().getData(path + "/" + s,
+ false,
+ this,
+ oop);
+ }
+
+ break;
+ case 11:
+ lh = oop.getLh();
+
+ String parts[] = oop.getStringData().split(" ");
+
+ ArrayList<BookieHandle> newBookieSet = new ArrayList<BookieHandle>();
+ for(int i = 0 ; i < parts.length ; i++){
+ LOG.info("Address: " + parts[i]);
+ InetSocketAddress faultyBookie =
+ bk.parseAddr(parts[i].substring(1));
+
+ newBookieSet.add(lh.getBookieHandleDup(faultyBookie));
+ }
+ lh.setNewBookieConfig(Long.parseLong(oop.getItem()), newBookieSet);
+
+ if(oop.counter.incrementAndGet() == oop.getListSize()){
+ lh.prepareEntryChange();
oop.getCb().openComplete(oop.getRC(), oop.getLh(), oop.getCtx());
}
+
+ break;
}
}
@@ -956,6 +1072,12 @@
else
op.setAction(4);
break;
+ case 8:
+ if(stat == null)
+ op.setAction(9);
+ else
+ op.setAction(10);
+ break;
}
case CLOSE:
CloseLedgerOp clop = (CloseLedgerOp) op;
@@ -1064,7 +1186,7 @@
break;
case OPEN:
OpenLedgerOp oop = (OpenLedgerOp) op;
- oop.addBookieIds(children);
+ oop.addChildren(children);
break;
}
@@ -1119,6 +1241,12 @@
oop.setQMode(QMode.VERIFIABLE);
LOG.info("Verifiable ledger");
}
+ break;
+ case 10:
+ String addr = new String(data);
+ oop.setStringData(addr);
+ oop.setAction(11);
+ break;
}
break;
default:
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java Wed Jun 24 05:07:23 2009
@@ -31,6 +31,7 @@
import java.util.HashMap;
import java.util.TreeMap;
+//import org.apache.bookkeeper.client.AsyncCallback.FailCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerSequence;
@@ -48,7 +49,7 @@
*
*/
-class LedgerRecoveryMonitor implements ReadEntryCallback{
+class LedgerRecoveryMonitor implements ReadEntryCallback {
Logger LOG = Logger.getLogger(LedgerRecoveryMonitor.class);
BookKeeper self;
@@ -132,11 +133,10 @@
/*
* Obtain largest hint
- */
-
+ */
LedgerHandle lh = new LedgerHandle(self, lId, 0, qSize, qMode, passwd);
for(InetSocketAddress addr : bookies){
- lh.addBookie(addr);
+ lh.addBookieForReading(addr);
}
boolean notLegitimate = true;
@@ -241,4 +241,5 @@
return hint;
}
+
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java Wed Jun 24 05:07:23 2009
@@ -22,9 +22,12 @@
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.ClientCBWorker;
import org.apache.bookkeeper.client.QuorumOpMonitor;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -43,7 +46,7 @@
*/
public class QuorumEngine {
- Logger LOG = Logger.getLogger(QuorumEngine.class);
+ static Logger LOG = Logger.getLogger(QuorumEngine.class);
QuorumOpMonitor opMonitor;
ClientCBWorker cbWorker;
@@ -56,6 +59,11 @@
* ADD, STOP.
*/
+ static long idCounter;
+ static synchronized long getOpId(){
+ return idCounter++;
+ }
+
public static class Operation {
public static final int READ = 0;
public static final int ADD = 1;
@@ -64,9 +72,19 @@
int type;
LedgerHandle ledger;
+ long id;
int rc = 0;
boolean ready = false;
+ public Operation(){
+ this.id = getOpId();
+ }
+
+ long getId(){
+ return id;
+ }
+
+
public static class AddOp extends Operation {
AddCallback cb;
Object ctx;
@@ -178,11 +196,18 @@
this.rcb = rcb;
}
}
+
+ public static class SubStopOp extends SubOp{
+ SubStopOp(Operation op){
+ this.op = op;
+ }
+ }
}
public QuorumEngine(LedgerHandle lh){
this.lh = lh;
- this.opMonitor = QuorumOpMonitor.getInstance(lh);
+ this.opMonitor = new QuorumOpMonitor(lh);
+ QuorumEngine.idCounter = 0;
LOG.debug("Creating cbWorker");
this.cbWorker = ClientCBWorker.getInstance();
LOG.debug("Created cbWorker");
@@ -195,11 +220,12 @@
* @param r Operation descriptor
*/
void sendOp(Operation r)
- throws InterruptedException {
+ throws InterruptedException, BKException {
+ int n;
- int n = lh.getBookies().size();
switch(r.type){
case Operation.READ:
+
Operation.ReadOp rOp = (Operation.ReadOp) r;
LOG.debug("Adding read operation to opMonitor: " + rOp.firstEntry + ", " + rOp.lastEntry);
@@ -211,6 +237,10 @@
long counter = 0;
PendingReadOp pROp = new PendingReadOp(lh);
+ n = lh.getBookies(entry).size();
+ if(n < lh.getQuorumSize())
+ throw BKException.create(Code.NotEnoughBookiesException);
+
//Send requests to bookies
while(counter < lh.getQuorumSize()){
int index = (int)((entry + counter++) % n);
@@ -219,7 +249,9 @@
pROp,
index,
opMonitor);
- lh.getBookies().get((index) % n).sendRead(lh, sRead, entry);
+
+ BookieHandle bh = lh.getBookies(entry).get((index) % n);
+ if(bh.isEnabled()) bh.sendRead(lh, sRead, entry);
} catch(IOException e){
LOG.error(e);
}
@@ -228,11 +260,18 @@
break;
case Operation.ADD:
+ n = lh.getBookies().size();
+
+ if(n < lh.getQuorumSize())
+ throw BKException.create(Code.NotEnoughBookiesException);
+
long counter = 0;
cbWorker.addOperation(r);
Operation.AddOp aOp = (Operation.AddOp) r;
PendingOp pOp = new PendingOp();
+ ArrayList<BookieHandle> bookies;
+
while(counter < lh.getQuorumSize() ){
int index = (int)((aOp.entry + counter++) % n);
@@ -242,20 +281,14 @@
pOp,
index,
opMonitor);
+
lh.getBookies().get((index) % n).sendAdd(lh, sAdd, aOp.entry);
- } catch (IOException io) {
- LOG.error(io);
- try{
- /*
- * Before getting a new bookie, try to reconnect
- */
- lh.getBookies().get((index) % n).restart();
- } catch (IOException nio){
- lh.removeBookie(index);
- }
+ } catch (Exception io) {
+ LOG.error("Error when sending entry: " + aOp.entry + ", " + index + ", " + io);
+ counter--;
+ n = lh.getBookies().size();
}
}
- //qRef = (qRef + 1) % n;
break;
case Operation.STOP:
cbWorker.shutdown();
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java Wed Jun 24 05:07:23 2009
@@ -55,21 +55,12 @@
*
*/
public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
- Logger LOG = Logger.getLogger(QuorumOpMonitor.class);
+ static Logger LOG = Logger.getLogger(QuorumOpMonitor.class);
LedgerHandle lh;
static final int MAXRETRIES = 2;
- static HashMap<Long, QuorumOpMonitor> instances =
- new HashMap<Long, QuorumOpMonitor>();
- public static QuorumOpMonitor getInstance(LedgerHandle lh){
- if(instances.get(lh.getId()) == null) {
- instances.put(lh.getId(), new QuorumOpMonitor(lh));
- }
-
- return instances.get(lh.getId());
- }
/**
* Message disgest instance
@@ -160,17 +151,22 @@
if(rc == 0){
// Everything went ok with this op
synchronized(pOp){
- //pOp.bookieIdSent.add(sId);
pOp.bookieIdRecv.add(sId);
- if(pOp.bookieIdRecv.size() == lh.getQuorumSize()){
- //pendingAdds.remove(entryId);
- //sAdd.op.cb.addComplete(sAdd.op.getErrorCode(),
- // ledgerId, entryId, sAdd.op.ctx);
+ lh.setLastRecvCorrectly(sId, entryId);
+ if(pOp.bookieIdRecv.size() >= lh.getQuorumSize()){
sAdd.op.setReady();
}
}
} else {
- LOG.error("Error sending write request: " + rc + " : " + ledgerId);
+ //LOG.warn("Error sending write request: " + rc + " : " + ledgerId + ": " + lh.getBookies().size());
+ /*
+ * If ledger is closed already, then simply return
+ */
+ if(lh.getId() == -1){
+ LOG.warn("Ledger identifier is not valid");
+ return;
+ }
+
HashSet<Integer> ids;
synchronized(pOp){
@@ -180,8 +176,7 @@
if(ids.size() == lh.getBookies().size()){
if(pOp.retries++ >= MAXRETRIES){
//Call back with error code
- //sAdd.op.cb.addComplete(ErrorCodes.ENUMRETRIES,
- // ledgerId, entryId, sAdd.op.ctx);
+
sAdd.op.setErrorCode(BKDefs.ENR);
sAdd.op.setReady();
return;
@@ -190,25 +185,38 @@
ids.clear();
}
// Select another bookie that we haven't contacted yet
- for(int i = 0; i < lh.getBookies().size(); i++){
- if(!ids.contains(Integer.valueOf(i))){
- // and send it to new bookie
- try{
- list.get(i).sendAdd(lh, new SubAddOp(sAdd.op,
- pOp,
- i,
- this), ((AddOp) sAdd.op).entry);
- pOp.bookieIdRecv.add(sId.intValue());
-
- break;
- } catch(IOException e){
- LOG.error(e);
- }
+ try{
+ //LOG.info("Selecting another bookie " + entryId);
+ int bCounter;
+ if(sId >= (entryId % (lh.getBookies().size() + 1))){
+ bCounter = sId - (((int) entryId) % (lh.getBookies().size() + 1));
+ } else {
+ bCounter = (lh.getBookies().size() + 1) - (((int) entryId) % (lh.getBookies().size() + 1)) - sId;
}
- }
+
+ int tmpId = (((int) entryId) + lh.getQuorumSize()) % (lh.getBookies().size() + 1);
+ int newId = tmpId % lh.getBookies().size();
+ //LOG.info("Sending a new add operation to bookie: " + newId + ", " + lh.getBookies().get(newId).addr);
+
+ BookieHandle bh = lh.getBookies().get(newId);
+
+ //LOG.info("Got handle for " + newId);
+
+ bh.sendAdd(lh, new SubAddOp(sAdd.op,
+ pOp,
+ newId,
+ this), entryId);
+
+ //LOG.info("Ended " + entryId + ", " + newId);
+ } catch(IOException e){
+ LOG.error(e);
+ } catch(BKException e){
+ LOG.error(e);
+ }
}
- }
+ }
}
+
/**
* Callback method for read operations. There is one callback for
@@ -256,6 +264,7 @@
byte[] data = new byte[voted.capacity() - dLength - 24];
voted.position(24);
voted.get(data, 0, data.length);
+ //LOG.warn("Data length (" + entryId + "): " + data.length);
counter = addNewEntry(new LedgerEntry(ledgerId, entryId, data), rOp);
}
}
@@ -338,6 +347,7 @@
if(rOp.nacks.get(entryId).incrementAndGet() >= lh.getThreshold()){
int counter = -1;
+ //LOG.warn("Giving up on " + entryId + "(" + lh.getThreshold() + ")");
counter = addNewEntry(new LedgerEntry(ledgerId, entryId, null), rOp);
if((counter == (rOp.lastEntry - rOp.firstEntry + 1)) &&
@@ -450,6 +460,8 @@
private int addNewEntry(LedgerEntry le, ReadOp op){
long index = le.getEntryId() % (op.lastEntry - op.firstEntry + 1);
if(op.seq[(int) index] == null){
+ if(le.getEntry() == null) LOG.warn("Ledger entry is null (" + le.getEntryId() + ")");
+ //if(le.getEntryId() % 100 == 0) LOG.info("New entry: " + le.getEntryId() + ")");
op.seq[(int) index] = le;
return op.counter.incrementAndGet();
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java Wed Jun 24 05:07:23 2009
@@ -28,12 +28,16 @@
import java.nio.channels.SocketChannel;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.Enumeration;
import java.security.NoSuchAlgorithmException;
import java.security.InvalidKeyException;
import java.security.MessageDigest;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
+//import org.apache.bookkeeper.client.AsyncCallback.FailCallback;
+import org.apache.bookkeeper.client.BookieHandle;
import org.apache.bookkeeper.proto.ReadEntryCallback;
import org.apache.bookkeeper.proto.WriteCallback;
import org.apache.log4j.Logger;
@@ -50,13 +54,8 @@
int myCounter = 0;
public BookieClient(InetSocketAddress addr, int recvTimeout)
- throws IOException, ConnectException {
- sock = SocketChannel.open(addr);
- setDaemon(true);
-
- sock.socket().setSoTimeout(recvTimeout);
- sock.socket().setTcpNoDelay(true);
- start();
+ throws IOException, ConnectException {
+ startConnection(addr, recvTimeout);
}
public BookieClient(String host, int port, int recvTimeout)
@@ -64,6 +63,16 @@
this(new InetSocketAddress(host, port), recvTimeout);
}
+ public void startConnection(InetSocketAddress addr, int recvTimeout)
+ throws IOException, ConnectException {
+ sock = SocketChannel.open(addr);
+ setDaemon(true);
+ //sock.configureBlocking(false);
+ sock.socket().setSoTimeout(recvTimeout);
+ sock.socket().setTcpNoDelay(true);
+ start();
+ }
+
private static class Completion<T> {
Completion(T cb, Object ctx) {
this.cb = cb;
@@ -105,13 +114,12 @@
ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>> readCompletions =
new ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>>();
-
/*
* Use this semaphore to control the number of completion key in both addCompletions
* and readCompletions. This is more of a problem for readCompletions because one
* readEntries opertion is expanded into individual operations to read entries.
*/
- Semaphore completionSemaphore = new Semaphore(1000);
+ Semaphore completionSemaphore = new Semaphore(3000);
/**
@@ -150,7 +158,10 @@
}
/**
- * Send addEntry operation to bookie.
+ * Send addEntry operation to bookie. It throws an IOException
+ * if either the write to the socket fails or it takes too long
+ * to obtain a permit to send another request, which possibly
+ * implies that the corresponding bookie is down.
*
* @param ledgerId ledger identifier
* @param entryId entry identifier
@@ -163,39 +174,37 @@
ByteBuffer entry, WriteCallback cb, Object ctx)
throws IOException, InterruptedException {
- //LOG.info("Data length: " + entry.capacity());
- completionSemaphore.acquire();
+ if(cb == null)
+ LOG.error("WriteCallback object is null: " + entryId);
addCompletions.put(new CompletionKey(ledgerId, entryId),
new Completion<WriteCallback>(cb, ctx));
- //entry = entry.duplicate();
- //entry.position(0);
-
+
ByteBuffer tmpEntry = ByteBuffer.allocate(entry.remaining() + 44);
tmpEntry.position(4);
tmpEntry.putInt(BookieProtocol.ADDENTRY);
tmpEntry.put(masterKey);
- //LOG.debug("Master key: " + new String(masterKey));
tmpEntry.putLong(ledgerId);
tmpEntry.putLong(entryId);
tmpEntry.put(entry);
tmpEntry.position(0);
- //ByteBuffer len = ByteBuffer.allocate(4);
// 4 bytes for the message type
tmpEntry.putInt(tmpEntry.remaining() - 4);
tmpEntry.position(0);
- //sock.write(len);
- //len.clear();
- //len.putInt(BookieProtocol.ADDENTRY);
- //len.flip();
- //sock.write(len);
- sock.write(tmpEntry);
- //LOG.debug("addEntry:finished");
+
+
+ if(!sock.isConnected() ||
+ !completionSemaphore.tryAcquire(1000, TimeUnit.MILLISECONDS)){
+ throw new IOException();
+ } else sock.write(tmpEntry);
}
/**
- * Send readEntry operation to bookie.
+ * Send readEntry operation to bookie. It throws an IOException
+ * if either the write to the socket fails or it takes too long
+ * to obtain a permit to send another request, which possibly
+ * implies that the corresponding bookie is down.
*
* @param ledgerId ledger identifier
* @param entryId entry identifier
@@ -206,28 +215,22 @@
synchronized public void readEntry(long ledgerId, long entryId,
ReadEntryCallback cb, Object ctx)
throws IOException, InterruptedException {
-
- completionSemaphore.acquire();
+ //LOG.info("Entry id: " + entryId);
+ //completionSemaphore.acquire();
readCompletions.put(new CompletionKey(ledgerId, entryId),
new Completion<ReadEntryCallback>(cb, ctx));
+
ByteBuffer tmpEntry = ByteBuffer.allocate(8 + 8 + 8);
tmpEntry.putInt(20);
tmpEntry.putInt(BookieProtocol.READENTRY);
tmpEntry.putLong(ledgerId);
tmpEntry.putLong(entryId);
tmpEntry.position(0);
-
- //ByteBuffer len = ByteBuffer.allocate(4);
- //len.putInt(tmpEntry.remaining() + 4);
- //len.flip();
- //LOG.debug("readEntry: Writing to socket");
- //sock.write(len);
- //len.clear();
- //len.putInt(BookieProtocol.READENTRY);
- //len.flip();
- //sock.write(len);
- sock.write(tmpEntry);
- //LOG.error("Size of readCompletions: " + readCompletions.size());
+
+ if(!sock.isConnected() ||
+ !completionSemaphore.tryAcquire(1000, TimeUnit.MILLISECONDS)){
+ throw new IOException();
+ } else sock.write(tmpEntry);
}
private void readFully(ByteBuffer bb) throws IOException {
@@ -236,6 +239,7 @@
}
}
+ Semaphore running = new Semaphore(0);
public void run() {
int len = -1;
ByteBuffer lenBuffer = ByteBuffer.allocate(4);
@@ -254,47 +258,44 @@
switch(type) {
case BookieProtocol.ADDENTRY:
- {
+ {
long ledgerId = bb.getLong();
long entryId = bb.getLong();
- Completion<WriteCallback> ac = addCompletions.remove(new CompletionKey(ledgerId, entryId));
+
+ Completion<WriteCallback> ac;
+ ac = addCompletions.remove(new CompletionKey(ledgerId, entryId));
completionSemaphore.release();
-
if (ac != null) {
ac.cb.writeComplete(rc, ledgerId, entryId, ac.ctx);
} else {
LOG.error("Callback object null: " + ledgerId + " : " + entryId);
}
+
break;
}
case BookieProtocol.READENTRY:
{
- //ByteBuffer entryData = bb.slice();
long ledgerId = bb.getLong();
long entryId = bb.getLong();
bb.position(24);
byte[] data = new byte[bb.capacity() - 24];
bb.get(data);
- ByteBuffer entryData = ByteBuffer.wrap(data);
- //ByteBuffer entryData = bb;
- //LOG.info("Received entry: " + ledgerId + ", " + entryId
- // + ", " + rc + ", " + entryData.array().length + ", " + bb.array().length + ", " + bb.remaining());
+ ByteBuffer entryData = ByteBuffer.wrap(data);
CompletionKey key = new CompletionKey(ledgerId, entryId);
Completion<ReadEntryCallback> c;
if(readCompletions.containsKey(key)){
- c = readCompletions.remove(key);
- //LOG.error("Found key");
+ c = readCompletions.remove(key);
}
else{
- /*
- * This is a special case. When recovering a ledger, a client submits
- * a read request with id -1, and receives a response with a different
- * entry id.
- */
- c = readCompletions.remove(new CompletionKey(ledgerId, -1));
+ /*
+ * This is a special case. When recovering a ledger, a client submits
+ * a read request with id -1, and receives a response with a different
+ * entry id.
+ */
+ c = readCompletions.remove(new CompletionKey(ledgerId, -1));
}
completionSemaphore.release();
@@ -311,9 +312,72 @@
System.err.println("Got error " + rc + " for type " + type);
}
}
+
} catch(Exception e) {
- LOG.error("Len = " + len + ", Type = " + type + ", rc = " + rc, e);
+ LOG.error("Len = " + len + ", Type = " + type + ", rc = " + rc);
}
+ running.release();
+
+ }
+
+ /**
+ * Errors out pending entries. We call this method from one thread to avoid
+ * concurrent executions to QuorumOpMonitor (implements callbacks). It seems
+ * simpler to call it from BookieHandle instead of calling directly from here.
+ */
+
+ public void errorOut(){
+ LOG.info("Erroring out pending entries");
+
+ for (Enumeration<CompletionKey> e = addCompletions.keys() ; e.hasMoreElements() ;) {
+ CompletionKey key = e.nextElement();
+ Completion<WriteCallback> ac = addCompletions.remove(key);
+ if(ac != null){
+ completionSemaphore.release();
+ ac.cb.writeComplete(-1, key.ledgerId, key.entryId, ac.ctx);
+ }
+ }
+
+ LOG.info("Finished erroring out pending add entries");
+
+ for (Enumeration<CompletionKey> e = readCompletions.keys() ; e.hasMoreElements() ;) {
+ CompletionKey key = e.nextElement();
+ Completion<ReadEntryCallback> ac = readCompletions.remove(key);
+
+ if(ac != null){
+ completionSemaphore.release();
+ ac.cb.readEntryComplete(-1, key.ledgerId, key.entryId, null, ac.ctx);
+ }
+ }
+
+ LOG.info("Finished erroring out pending read entries");
+ }
+
+ /**
+ * Halts client.
+ */
+
+ public void halt() {
+ try{
+ sock.close();
+ } catch(IOException e) {
+ LOG.warn("Exception while closing socket");
+ }
+
+ try{
+ running.acquire();
+ } catch(InterruptedException e){
+ LOG.error("Interrupted while waiting for running semaphore to acquire lock");
+ }
+ }
+
+ /**
+ * Returns the status of the socket of this bookie client.
+ *
+ * @return boolean
+ */
+ public boolean isConnected(){
+ return sock.isConnected();
}
private static class Counter {
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java Wed Jun 24 05:07:23 2009
@@ -39,6 +39,7 @@
public class BookieServer implements NIOServerFactory.PacketProcessor, WriteCallback {
int port;
NIOServerFactory nioServerFactory;
+ volatile boolean down = false;
Bookie bookie;
static Logger LOG = Logger.getLogger(BookieServer.class);
@@ -50,9 +51,13 @@
nioServerFactory = new NIOServerFactory(port, this);
}
public void shutdown() throws InterruptedException {
+ down = true;
nioServerFactory.shutdown();
bookie.shutdown();
}
+ public boolean isDown(){
+ return down;
+ }
public void join() throws InterruptedException {
nioServerFactory.join();
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java?rev=787907&r1=787906&r2=787907&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java Wed Jun 24 05:07:23 2009
@@ -24,6 +24,7 @@
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.log4j.Logger;
@@ -87,6 +88,8 @@
} catch(InterruptedException ie) {
LOG.warn("Interrupted while flusing " + ie);
Thread.currentThread().interrupt();
+ } catch(BKException bke) {
+ LOG.warn("BookKeeper exception ", bke);
}
}
}
@@ -120,6 +123,8 @@
} catch(InterruptedException ie) {
LOG.warn("Interrupted while writing", ie);
Thread.currentThread().interrupt();
+ } catch(BKException bke) {
+ LOG.warn("BookKeeper exception", bke);
}
}
}