You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/01/27 00:16:49 UTC
svn commit: r903483 [4/6] - in /hadoop/zookeeper/trunk: ./
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ src/cont...
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
package org.apache.bookkeeper.proto;
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,391 +21,126 @@
*
*/
-
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.ConnectException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.Enumeration;
-import java.security.NoSuchAlgorithmException;
-import java.security.InvalidKeyException;
-import java.security.MessageDigest;
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
-
-//import org.apache.bookkeeper.client.AsyncCallback.FailCallback;
-import org.apache.bookkeeper.client.BookieHandle;
-import org.apache.bookkeeper.proto.ReadEntryCallback;
-import org.apache.bookkeeper.proto.WriteCallback;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.log4j.Logger;
-
-
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
/**
- * Implements the client-side part of the BookKeeper protocol.
+ * Implements the client-side part of the BookKeeper protocol.
*
- */
-public class BookieClient extends Thread {
- Logger LOG = Logger.getLogger(BookieClient.class);
- SocketChannel sock;
- int myCounter = 0;
-
- public BookieClient(InetSocketAddress addr, int recvTimeout)
- throws IOException, ConnectException {
- startConnection(addr, recvTimeout);
- }
-
- public BookieClient(String host, int port, int recvTimeout)
- throws IOException, ConnectException {
- this(new InetSocketAddress(host, port), recvTimeout);
- }
-
- public void startConnection(InetSocketAddress addr, int recvTimeout)
- throws IOException, ConnectException {
- sock = SocketChannel.open(addr);
- setDaemon(true);
- //sock.configureBlocking(false);
- sock.socket().setSoTimeout(recvTimeout);
- sock.socket().setTcpNoDelay(true);
- start();
- }
-
- private static class Completion<T> {
- Completion(T cb, Object ctx) {
- this.cb = cb;
- this.ctx = ctx;
- }
+ */
+public class BookieClient {
+ static final Logger LOG = Logger.getLogger(BookieClient.class);
+
+ // This is global state that should be across all BookieClients
+ AtomicLong totalBytesOutstanding = new AtomicLong();
- T cb;
- Object ctx;
+ OrderedSafeExecutor executor;
+ ClientSocketChannelFactory channelFactory;
+ ConcurrentHashMap<InetSocketAddress, PerChannelBookieClient> channels = new ConcurrentHashMap<InetSocketAddress, PerChannelBookieClient>();
+
+ public BookieClient(ClientSocketChannelFactory channelFactory, OrderedSafeExecutor executor) {
+ this.channelFactory = channelFactory;
+ this.executor = executor;
}
- private static class CompletionKey {
- long ledgerId;
- long entryId;
-
- CompletionKey(long ledgerId, long entryId) {
- this.ledgerId = ledgerId;
- this.entryId = entryId;
- }
+ public PerChannelBookieClient lookupClient(InetSocketAddress addr) {
+ PerChannelBookieClient channel = channels.get(addr);
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof CompletionKey) || obj == null) {
- return false;
+ if (channel == null) {
+ channel = new PerChannelBookieClient(executor, channelFactory, addr, totalBytesOutstanding);
+ PerChannelBookieClient prevChannel = channels.putIfAbsent(addr, channel);
+ if (prevChannel != null) {
+ channel = prevChannel;
}
- CompletionKey that = (CompletionKey) obj;
- return this.ledgerId == that.ledgerId && this.entryId == that.entryId;
- }
-
- @Override
- public int hashCode() {
- return ((int) ledgerId << 16) ^ ((int) entryId);
}
+ return channel;
}
- ConcurrentHashMap<CompletionKey, Completion<WriteCallback>> addCompletions =
- new ConcurrentHashMap<CompletionKey, Completion<WriteCallback>>();
-
- ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>> readCompletions =
- new ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>>();
-
- /*
- * Use this semaphore to control the number of completion key in both addCompletions
- * and readCompletions. This is more of a problem for readCompletions because one
- * readEntries opertion is expanded into individual operations to read entries.
- */
- Semaphore completionSemaphore = new Semaphore(3000);
-
-
- /**
- * Message disgest instance
- *
- */
- MessageDigest digest = null;
-
- /**
- * Get digest instance if there is none.
- *
- */
- public MessageDigest getDigestInstance(String alg)
- throws NoSuchAlgorithmException {
- if(digest == null){
- digest = MessageDigest.getInstance(alg);
- }
-
- return digest;
- }
-
- /**
- * Mac instance
- *
- */
- Mac mac = null;
-
- public Mac getMac(String alg, byte[] key)
- throws NoSuchAlgorithmException, InvalidKeyException {
- if(mac == null){
- mac = Mac.getInstance(alg);
- mac.init(new SecretKeySpec(key, "HmacSHA1"));
- }
-
- return mac;
- }
-
- /**
- * Send addEntry operation to bookie. It throws an IOException
- * if either the write to the socket fails or it takes too long
- * to obtain a permit to send another request, which possibly
- * implies that the corresponding bookie is down.
- *
- * @param ledgerId ledger identifier
- * @param entryId entry identifier
- * @param cb object implementing callback method
- * @param ctx control object
- * @throws IOException
- * @throws InterruptedException
- */
- synchronized public void addEntry(long ledgerId, byte[] masterKey, long entryId,
- ByteBuffer entry, WriteCallback cb, Object ctx)
- throws IOException, InterruptedException {
-
- if(cb == null)
- LOG.error("WriteCallback object is null: " + entryId);
- addCompletions.put(new CompletionKey(ledgerId, entryId),
- new Completion<WriteCallback>(cb, ctx));
-
- ByteBuffer tmpEntry = ByteBuffer.allocate(entry.remaining() + 44);
-
- tmpEntry.position(4);
- tmpEntry.putInt(BookieProtocol.ADDENTRY);
- tmpEntry.put(masterKey);
- tmpEntry.putLong(ledgerId);
- tmpEntry.putLong(entryId);
- tmpEntry.put(entry);
- tmpEntry.position(0);
-
- // 4 bytes for the message type
- tmpEntry.putInt(tmpEntry.remaining() - 4);
- tmpEntry.position(0);
-
-
- if(!sock.isConnected() ||
- !completionSemaphore.tryAcquire(1000, TimeUnit.MILLISECONDS)){
- throw new IOException();
- } else sock.write(tmpEntry);
- }
-
- /**
- * Send readEntry operation to bookie. It throws an IOException
- * if either the write to the socket fails or it takes too long
- * to obtain a permit to send another request, which possibly
- * implies that the corresponding bookie is down.
- *
- * @param ledgerId ledger identifier
- * @param entryId entry identifier
- * @param cb object implementing callback method
- * @param ctx control object
- * @throws IOException
- */
- synchronized public void readEntry(long ledgerId, long entryId,
- ReadEntryCallback cb, Object ctx)
- throws IOException, InterruptedException {
- //LOG.info("Entry id: " + entryId);
- //completionSemaphore.acquire();
- readCompletions.put(new CompletionKey(ledgerId, entryId),
- new Completion<ReadEntryCallback>(cb, ctx));
-
- ByteBuffer tmpEntry = ByteBuffer.allocate(8 + 8 + 8);
- tmpEntry.putInt(20);
- tmpEntry.putInt(BookieProtocol.READENTRY);
- tmpEntry.putLong(ledgerId);
- tmpEntry.putLong(entryId);
- tmpEntry.position(0);
-
- if(!sock.isConnected() ||
- !completionSemaphore.tryAcquire(1000, TimeUnit.MILLISECONDS)){
- throw new IOException();
- } else sock.write(tmpEntry);
- }
-
- private void readFully(ByteBuffer bb) throws IOException {
- while(bb.remaining() > 0) {
- sock.read(bb);
- }
- }
-
- Semaphore running = new Semaphore(0);
- public void run() {
- int len = -1;
- ByteBuffer lenBuffer = ByteBuffer.allocate(4);
- int type = -1, rc = -1;
- try {
- while(sock.isConnected()) {
- lenBuffer.clear();
- readFully(lenBuffer);
- lenBuffer.flip();
- len = lenBuffer.getInt();
- ByteBuffer bb = ByteBuffer.allocate(len);
- readFully(bb);
- bb.flip();
- type = bb.getInt();
- rc = bb.getInt();
-
- switch(type) {
- case BookieProtocol.ADDENTRY:
- {
- long ledgerId = bb.getLong();
- long entryId = bb.getLong();
-
- Completion<WriteCallback> ac;
- ac = addCompletions.remove(new CompletionKey(ledgerId, entryId));
- completionSemaphore.release();
- if (ac != null) {
- ac.cb.writeComplete(rc, ledgerId, entryId, ac.ctx);
- } else {
- LOG.error("Callback object null: " + ledgerId + " : " + entryId);
- }
+ public void addEntry(final InetSocketAddress addr, final long ledgerId, final byte[] masterKey, final long entryId,
+ final ChannelBuffer toSend, final WriteCallback cb, final Object ctx) {
- break;
- }
- case BookieProtocol.READENTRY:
- {
- long ledgerId = bb.getLong();
- long entryId = bb.getLong();
-
- bb.position(24);
- byte[] data = new byte[bb.capacity() - 24];
- bb.get(data);
- ByteBuffer entryData = ByteBuffer.wrap(data);
-
- CompletionKey key = new CompletionKey(ledgerId, entryId);
- Completion<ReadEntryCallback> c;
-
- if(readCompletions.containsKey(key)){
- c = readCompletions.remove(key);
- }
- else{
- /*
- * This is a special case. When recovering a ledger, a client submits
- * a read request with id -1, and receives a response with a different
- * entry id.
- */
- c = readCompletions.remove(new CompletionKey(ledgerId, -1));
- }
- completionSemaphore.release();
-
- if (c != null) {
- c.cb.readEntryComplete(rc,
- ledgerId,
- entryId,
- entryData,
- c.ctx);
- }
- break;
- }
- default:
- System.err.println("Got error " + rc + " for type " + type);
+ final PerChannelBookieClient client = lookupClient(addr);
+
+ client.connectIfNeededAndDoOp(new GenericCallback<Void>() {
+ @Override
+ public void operationComplete(int rc, Void result) {
+ if (rc != BKException.Code.OK) {
+ cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+ return;
}
+ client.addEntry(ledgerId, masterKey, entryId, toSend, cb, ctx);
}
-
- } catch(Exception e) {
- LOG.error("Len = " + len + ", Type = " + type + ", rc = " + rc);
- }
- running.release();
-
+ });
}
-
- /**
- * Errors out pending entries. We call this method from one thread to avoid
- * concurrent executions to QuorumOpMonitor (implements callbacks). It seems
- * simpler to call it from BookieHandle instead of calling directly from here.
- */
-
- public void errorOut(){
- LOG.info("Erroring out pending entries");
-
- for (Enumeration<CompletionKey> e = addCompletions.keys() ; e.hasMoreElements() ;) {
- CompletionKey key = e.nextElement();
- Completion<WriteCallback> ac = addCompletions.remove(key);
- if(ac != null){
- completionSemaphore.release();
- ac.cb.writeComplete(-1, key.ledgerId, key.entryId, ac.ctx);
- }
- }
-
- LOG.info("Finished erroring out pending add entries");
-
- for (Enumeration<CompletionKey> e = readCompletions.keys() ; e.hasMoreElements() ;) {
- CompletionKey key = e.nextElement();
- Completion<ReadEntryCallback> ac = readCompletions.remove(key);
-
- if(ac != null){
- completionSemaphore.release();
- ac.cb.readEntryComplete(-1, key.ledgerId, key.entryId, null, ac.ctx);
+
+ public void readEntry(final InetSocketAddress addr, final long ledgerId, final long entryId,
+ final ReadEntryCallback cb, final Object ctx) {
+
+ final PerChannelBookieClient client = lookupClient(addr);
+
+ client.connectIfNeededAndDoOp(new GenericCallback<Void>() {
+ @Override
+ public void operationComplete(int rc, Void result) {
+
+ if (rc != BKException.Code.OK) {
+ cb.readEntryComplete(rc, ledgerId, entryId, null, ctx);
+ return;
+ }
+ client.readEntry(ledgerId, entryId, cb, ctx);
}
- }
-
- LOG.info("Finished erroring out pending read entries");
+ });
}
- /**
- * Halts client.
- */
-
- public void halt() {
- try{
- sock.close();
- } catch(IOException e) {
- LOG.warn("Exception while closing socket");
- }
-
- try{
- running.acquire();
- } catch(InterruptedException e){
- LOG.error("Interrupted while waiting for running semaphore to acquire lock");
+ public void close(){
+ for (PerChannelBookieClient channel: channels.values()){
+ channel.close();
}
}
-
- /**
- * Returns the status of the socket of this bookie client.
- *
- * @return boolean
- */
- public boolean isConnected(){
- return sock.isConnected();
- }
private static class Counter {
int i;
int total;
+
synchronized void inc() {
i++;
total++;
}
+
synchronized void dec() {
i--;
notifyAll();
}
+
synchronized void wait(int limit) throws InterruptedException {
- while(i > limit) {
+ while (i > limit) {
wait();
}
}
+
synchronized int total() {
return total;
}
}
+
/**
* @param args
- * @throws IOException
- * @throws NumberFormatException
- * @throws InterruptedException
+ * @throws IOException
+ * @throws NumberFormatException
+ * @throws InterruptedException
*/
public static void main(String[] args) throws NumberFormatException, IOException, InterruptedException {
if (args.length != 3) {
@@ -413,8 +149,8 @@
}
WriteCallback cb = new WriteCallback() {
- public void writeComplete(int rc, long ledger, long entry, Object ctx) {
- Counter counter = (Counter)ctx;
+ public void writeComplete(int rc, long ledger, long entry, InetSocketAddress addr, Object ctx) {
+ Counter counter = (Counter) ctx;
counter.dec();
if (rc != 0) {
System.out.println("rc = " + rc + " for " + entry + "@" + ledger);
@@ -424,18 +160,19 @@
Counter counter = new Counter();
byte hello[] = "hello".getBytes();
long ledger = Long.parseLong(args[2]);
- BookieClient bc = new BookieClient(args[0], Integer.parseInt(args[1]), 5000);
- for(int i = 0; i < 100000; i++) {
- ByteBuffer entry = ByteBuffer.allocate(100);
- entry.putLong(ledger);
- entry.putLong(i);
- entry.putInt(0);
- entry.put(hello);
- entry.flip();
+ ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
+ .newCachedThreadPool());
+ OrderedSafeExecutor executor = new OrderedSafeExecutor(1);
+ BookieClient bc = new BookieClient(channelFactory, executor);
+ InetSocketAddress addr = new InetSocketAddress(args[0], Integer.parseInt(args[1]));
+
+ for (int i = 0; i < 100000; i++) {
counter.inc();
- bc.addEntry(ledger, new byte[0], i, entry, cb, counter);
+ bc.addEntry(addr, ledger, new byte[0], i, ChannelBuffers.wrappedBuffer(hello), cb, counter);
}
counter.wait(0);
System.out.println("Total = " + counter.total());
+ channelFactory.releaseExternalResources();
+ executor.shutdown();
}
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
package org.apache.bookkeeper.proto;
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,33 +21,31 @@
*
*/
-
/**
- * The packets of the Bookie protocol all have a 4-byte integer
- * indicating the type of request or response at the very beginning
- * of the packet followed by a payload.
- *
+ * The packets of the Bookie protocol all have a 4-byte integer indicating the
+ * type of request or response at the very beginning of the packet followed by a
+ * payload.
+ *
*/
public interface BookieProtocol {
/**
- * The Add entry request payload will be a ledger entry exactly
- * as it should be logged. The response payload will be a 4-byte
- * integer that has the error code followed by the 8-byte
- * ledger number and 8-byte entry number of the entry written.
+ * The Add entry request payload will be a ledger entry exactly as it should
+ * be logged. The response payload will be a 4-byte integer that has the
+ * error code followed by the 8-byte ledger number and 8-byte entry number
+ * of the entry written.
*/
public static final int ADDENTRY = 1;
/**
- * The Read entry request payload will be the ledger number and
- * entry number to read. (The ledger number is an 8-byte integer
- * and the entry number is a 8-byte integer.) The
- * response payload will be a 4-byte integer representing an
- * error code and a ledger entry if the error code is EOK, otherwise
- * it will be the 8-byte ledger number and the 4-byte entry number
- * requested. (Note that the first sixteen bytes of the entry happen
- * to be the ledger number and entry number as well.)
+ * The Read entry request payload will be the ledger number and entry number
+ * to read. (The ledger number is an 8-byte integer and the entry number is
+ * a 8-byte integer.) The response payload will be a 4-byte integer
+ * representing an error code and a ledger entry if the error code is EOK,
+ * otherwise it will be the 8-byte ledger number and the 4-byte entry number
+ * requested. (Note that the first sixteen bytes of the entry happen to be
+ * the ledger number and entry number as well.)
*/
public static final int READENTRY = 2;
-
+
/**
* The error code that indicates success
*/
@@ -67,10 +66,10 @@
* General error occurred at the server
*/
public static final int EIO = 101;
-
+
/**
* Unauthorized access to ledger
*/
public static final int EUA = 102;
-
+
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
package org.apache.bookkeeper.proto;
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,9 +21,9 @@
*
*/
-
import java.io.File;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.apache.bookkeeper.bookie.Bookie;
@@ -30,107 +31,114 @@
import org.apache.bookkeeper.proto.NIOServerFactory.Cnxn;
import org.apache.log4j.Logger;
-
-
/**
* Implements the server-side part of the BookKeeper protocol.
- *
+ *
*/
-public class BookieServer implements NIOServerFactory.PacketProcessor, WriteCallback {
+public class BookieServer implements NIOServerFactory.PacketProcessor, BookkeeperInternalCallbacks.WriteCallback {
int port;
NIOServerFactory nioServerFactory;
volatile boolean down = false;
Bookie bookie;
static Logger LOG = Logger.getLogger(BookieServer.class);
-
- public BookieServer(int port, File journalDirectory, File ledgerDirectories[]) {
+
+ public BookieServer(int port, File journalDirectory, File ledgerDirectories[]) throws IOException {
this.port = port;
this.bookie = new Bookie(journalDirectory, ledgerDirectories);
}
+
public void start() throws IOException {
nioServerFactory = new NIOServerFactory(port, this);
}
+
public void shutdown() throws InterruptedException {
down = true;
nioServerFactory.shutdown();
bookie.shutdown();
}
- public boolean isDown(){
+
+ public boolean isDown() {
return down;
}
+
public void join() throws InterruptedException {
nioServerFactory.join();
}
+
/**
* @param args
- * @throws IOException
- * @throws InterruptedException
+ * @throws IOException
+ * @throws InterruptedException
*/
public static void main(String[] args) throws IOException, InterruptedException {
- if (args.length < 3) {
+ if (args.length < 3) {
System.err.println("USAGE: BookieServer port journalDirectory ledgerDirectory [ledgerDirectory]*");
return;
}
int port = Integer.parseInt(args[0]);
File journalDirectory = new File(args[1]);
- File ledgerDirectory[] = new File[args.length-2];
+ File ledgerDirectory[] = new File[args.length - 2];
StringBuilder sb = new StringBuilder();
- for(int i = 0; i < ledgerDirectory.length; i++) {
- ledgerDirectory[i] = new File(args[i+2]);
+ for (int i = 0; i < ledgerDirectory.length; i++) {
+ ledgerDirectory[i] = new File(args[i + 2]);
if (i != 0) {
sb.append(',');
}
sb.append(ledgerDirectory[i]);
}
- String hello = String.format("Hello, I'm your bookie, listening on port %1$s. Journals are in %2$s. Ledgers are stored in %3$s.", port, journalDirectory, sb);
+ String hello = String.format(
+ "Hello, I'm your bookie, listening on port %1$s. Journals are in %2$s. Ledgers are stored in %3$s.",
+ port, journalDirectory, sb);
LOG.info(hello);
BookieServer bs = new BookieServer(port, journalDirectory, ledgerDirectory);
bs.start();
bs.join();
}
-
public void processPacket(ByteBuffer packet, Cnxn src) {
int type = packet.getInt();
- switch(type) {
+ switch (type) {
case BookieProtocol.ADDENTRY:
try {
byte[] masterKey = new byte[20];
packet.get(masterKey, 0, 20);
- //LOG.debug("Master key: " + new String(masterKey));
+ // LOG.debug("Master key: " + new String(masterKey));
bookie.addEntry(packet.slice(), this, src, masterKey);
- } catch(IOException e) {
- if (LOG.isTraceEnabled()) {
- ByteBuffer bb = packet.duplicate();
-
- long ledgerId = bb.getLong();
- long entryId = bb.getLong();
- LOG.trace("Error reading " + entryId + "@" + ledgerId, e);
- }
- ByteBuffer eio = ByteBuffer.allocate(8);
+ } catch (IOException e) {
+ ByteBuffer bb = packet.duplicate();
+
+ long ledgerId = bb.getLong();
+ long entryId = bb.getLong();
+ LOG.error("Error writing " + entryId + "@" + ledgerId, e);
+ ByteBuffer eio = ByteBuffer.allocate(8 + 16);
eio.putInt(type);
eio.putInt(BookieProtocol.EIO);
+ eio.putLong(ledgerId);
+ eio.putLong(entryId);
eio.flip();
- src.sendResponse(new ByteBuffer[] {eio});
- } catch(BookieException e){
+ src.sendResponse(new ByteBuffer[] { eio });
+ } catch (BookieException e) {
ByteBuffer bb = packet.duplicate();
long ledgerId = bb.getLong();
-
+ long entryId = bb.getLong();
+
LOG.error("Unauthorized access to ledger " + ledgerId);
-
- ByteBuffer eio = ByteBuffer.allocate(8);
+
+ ByteBuffer eio = ByteBuffer.allocate(8 + 16);
eio.putInt(type);
eio.putInt(BookieProtocol.EUA);
+ eio.putLong(ledgerId);
+ eio.putLong(entryId);
eio.flip();
- src.sendResponse(new ByteBuffer[] {eio});
+ src.sendResponse(new ByteBuffer[] { eio });
}
break;
case BookieProtocol.READENTRY:
ByteBuffer[] rsp = new ByteBuffer[2];
- ByteBuffer rc = ByteBuffer.allocate(8+8+8);
+ ByteBuffer rc = ByteBuffer.allocate(8 + 8 + 8);
rsp[0] = rc;
rc.putInt(type);
-
+
long ledgerId = packet.getLong();
long entryId = packet.getLong();
LOG.debug("Received new read request: " + ledgerId + ", " + entryId);
@@ -138,17 +146,17 @@
rsp[1] = bookie.readEntry(ledgerId, entryId);
LOG.debug("##### Read entry ##### " + rsp[1].remaining());
rc.putInt(BookieProtocol.EOK);
- } catch(Bookie.NoLedgerException e) {
+ } catch (Bookie.NoLedgerException e) {
if (LOG.isTraceEnabled()) {
LOG.error("Error reading " + entryId + "@" + ledgerId, e);
}
rc.putInt(BookieProtocol.ENOLEDGER);
- } catch(Bookie.NoEntryException e) {
+ } catch (Bookie.NoEntryException e) {
if (LOG.isTraceEnabled()) {
LOG.error("Error reading " + entryId + "@" + ledgerId, e);
}
rc.putInt(BookieProtocol.ENOENTRY);
- } catch(IOException e) {
+ } catch (IOException e) {
if (LOG.isTraceEnabled()) {
LOG.error("Error reading " + entryId + "@" + ledgerId, e);
}
@@ -178,12 +186,12 @@
badType.putInt(type);
badType.putInt(BookieProtocol.EBADREQ);
badType.flip();
- src.sendResponse(new ByteBuffer[] {packet});
+ src.sendResponse(new ByteBuffer[] { packet });
}
}
-
- public void writeComplete(int rc, long ledgerId, long entryId, Object ctx) {
- Cnxn src = (Cnxn)ctx;
+
+ public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
+ Cnxn src = (Cnxn) ctx;
ByteBuffer bb = ByteBuffer.allocate(24);
bb.putInt(BookieProtocol.ADDENTRY);
bb.putInt(rc);
@@ -193,7 +201,7 @@
if (LOG.isTraceEnabled()) {
LOG.trace("Add entry rc = " + rc + " for " + entryId + "@" + ledgerId);
}
- src.sendResponse(new ByteBuffer[] {bb});
+ src.sendResponse(new ByteBuffer[] { bb });
}
-
+
}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.proto;
+
+import java.net.InetSocketAddress;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * Declaration of a callback interfaces used in bookkeeper client library but
+ * not exposed to the client application.
+ */
+
+public class BookkeeperInternalCallbacks {
+ /**
+ * Callback for calls from BookieClient objects. Such calls are for replies
+ * of write operations (operations to add an entry to a ledger).
+ *
+ */
+
+ public interface WriteCallback {
+ void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx);
+ }
+
+ public interface GenericCallback<T> {
+ void operationComplete(int rc, T result);
+ }
+
+ /**
+ * Declaration of a callback implementation for calls from BookieClient objects.
+ * Such calls are for replies of read operations (operations to read an entry
+ * from a ledger).
+ *
+ */
+
+ public interface ReadEntryCallback {
+ void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx);
+ }
+}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java Tue Jan 26 23:16:45 2010
@@ -45,8 +45,9 @@
public interface PacketProcessor {
public void processPacket(ByteBuffer packet, Cnxn src);
}
- ServerStats stats = new ServerStats();
+ ServerStats stats = new ServerStats();
+
Logger LOG = Logger.getLogger(NIOServerFactory.class);
ServerSocketChannel ss;
@@ -89,6 +90,7 @@
}
}
+ @Override
public void run() {
while (!ss.socket().isClosed()) {
try {
@@ -97,16 +99,13 @@
synchronized (this) {
selected = selector.selectedKeys();
}
- ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
- selected);
+ ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
Collections.shuffle(selectedList);
for (SelectionKey k : selectedList) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
- SocketChannel sc = ((ServerSocketChannel) k.channel())
- .accept();
+ SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
sc.configureBlocking(false);
- SelectionKey sk = sc.register(selector,
- SelectionKey.OP_READ);
+ SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
Cnxn cnxn = new Cnxn(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
@@ -167,7 +166,7 @@
public class Cnxn {
private SocketChannel sock;
-
+
private SelectionKey sk;
boolean initialized;
@@ -183,7 +182,7 @@
int packetsSent;
int packetsReceived;
-
+
void doIO(SelectionKey k) throws InterruptedException {
try {
if (sock == null) {
@@ -233,8 +232,7 @@
* be copied, so we've got to slice the buffer
* if it's too big.
*/
- b = (ByteBuffer) b.slice().limit(
- directBuffer.remaining());
+ b = (ByteBuffer) b.slice().limit(directBuffer.remaining());
}
/*
* put() is going to modify the positions of both
@@ -286,15 +284,12 @@
}
synchronized (this) {
if (outgoingBuffers.size() == 0) {
- if (!initialized
- && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
+ if (!initialized && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
throw new IOException("Responded to info probe");
}
- sk.interestOps(sk.interestOps()
- & (~SelectionKey.OP_WRITE));
+ sk.interestOps(sk.interestOps() & (~SelectionKey.OP_WRITE));
} else {
- sk.interestOps(sk.interestOps()
- | SelectionKey.OP_WRITE);
+ sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
}
}
}
@@ -349,9 +344,8 @@
}
String peerName;
-
- public Cnxn(SocketChannel sock, SelectionKey sk)
- throws IOException {
+
+ public Cnxn(SocketChannel sock, SelectionKey sk) throws IOException {
this.sock = sock;
this.sk = sk;
sock.socket().setTcpNoDelay(true);
@@ -360,14 +354,14 @@
if (LOG.isTraceEnabled()) {
peerName = sock.socket().toString();
}
-
+
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
+ @Override
public String toString() {
- return "NIOServerCnxn object with sock = " + sock + " and sk = "
- + sk;
+ return "NIOServerCnxn object with sock = " + sock + " and sk = " + sk;
}
boolean closed;
@@ -437,11 +431,11 @@
throw e;
}
}
-
+
private void sendBuffers(ByteBuffer bb[]) {
ByteBuffer len = ByteBuffer.allocate(4);
int total = 0;
- for(int i = 0; i < bb.length; i++) {
+ for (int i = 0; i < bb.length; i++) {
if (bb[i] != null) {
total += bb[i].remaining();
}
@@ -452,14 +446,14 @@
len.putInt(total);
len.flip();
outgoingBuffers.add(len);
- for(int i = 0; i < bb.length; i++) {
+ for (int i = 0; i < bb.length; i++) {
if (bb[i] != null) {
outgoingBuffers.add(bb[i]);
}
}
makeWritable(sk);
}
-
+
synchronized public void sendResponse(ByteBuffer bb[]) {
if (closed) {
return;
@@ -485,8 +479,8 @@
long packetsSent;
/**
- * The number of requests that have been submitted but not yet responded
- * to.
+ * The number of requests that have been submitted but not yet
+ * responded to.
*/
public long getOutstandingRequests() {
return outstandingRequests;
@@ -500,19 +494,15 @@
return packetsSent;
}
+ @Override
public String toString() {
StringBuilder sb = new StringBuilder();
Channel channel = sk.channel();
if (channel instanceof SocketChannel) {
- sb.append(" ").append(
- ((SocketChannel) channel).socket()
- .getRemoteSocketAddress()).append("[")
- .append(Integer.toHexString(sk.interestOps()))
- .append("](queued=").append(
- getOutstandingRequests())
- .append(",recved=").append(getPacketsReceived())
- .append(",sent=").append(getPacketsSent()).append(
- ")\n");
+ sb.append(" ").append(((SocketChannel) channel).socket().getRemoteSocketAddress()).append("[")
+ .append(Integer.toHexString(sk.interestOps())).append("](queued=").append(
+ getOutstandingRequests()).append(",recved=").append(getPacketsReceived()).append(
+ ",sent=").append(getPacketsSent()).append(")\n");
}
return sb.toString();
}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,570 @@
+package org.apache.bookkeeper.proto;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.log4j.Logger;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+
+/**
+ * This class manages all details of connection to a particular bookie. It also
+ * has reconnect logic if a connection to a bookie fails.
+ *
+ */
+
+@ChannelPipelineCoverage("one")
+public class PerChannelBookieClient extends SimpleChannelHandler implements ChannelPipelineFactory {
+
+ static final Logger LOG = Logger.getLogger(PerChannelBookieClient.class);
+
+ static final long maxMemory = Runtime.getRuntime().maxMemory() / 5;
+ public static int MAX_FRAME_LENGTH = 2 * 1024 * 1024; // 2M
+
+ InetSocketAddress addr;
+ boolean connected = false;
+ AtomicLong totalBytesOutstanding;
+ ClientSocketChannelFactory channelFactory;
+ OrderedSafeExecutor executor;
+
+ ConcurrentHashMap<CompletionKey, AddCompletion> addCompletions = new ConcurrentHashMap<CompletionKey, AddCompletion>();
+ ConcurrentHashMap<CompletionKey, ReadCompletion> readCompletions = new ConcurrentHashMap<CompletionKey, ReadCompletion>();
+
+ /**
+ * The following member variables do not need to be concurrent, or volatile
+ * because they are always updated under a lock
+ */
+ Queue<GenericCallback<Void>> pendingOps = new ArrayDeque<GenericCallback<Void>>();
+ boolean connectionAttemptInProgress;
+ Channel channel = null;
+
+ public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory,
+ InetSocketAddress addr, AtomicLong totalBytesOutstanding) {
+ this.addr = addr;
+ this.executor = executor;
+ this.totalBytesOutstanding = totalBytesOutstanding;
+ this.channelFactory = channelFactory;
+ connect(channelFactory);
+ }
+
+ void connect(ChannelFactory channelFactory) {
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Connecting to bookie: " + addr);
+
+ // Set up the ClientBootStrap so we can create a new Channel connection
+ // to the bookie.
+ ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
+ bootstrap.setPipelineFactory(this);
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("keepAlive", true);
+
+ // Start the connection attempt to the input server host.
+ connectionAttemptInProgress = true;
+
+ ChannelFuture future = bootstrap.connect(addr);
+
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ int rc;
+ Queue<GenericCallback<Void>> oldPendingOps;
+
+ synchronized (PerChannelBookieClient.this) {
+
+ if (future.isSuccess()) {
+ LOG.info("Successfully connected to bookie: " + addr);
+ rc = BKException.Code.OK;
+ channel = future.getChannel();
+ connected = true;
+ } else {
+ LOG.error("Could not connect to bookie: " + addr);
+ rc = BKException.Code.BookieHandleNotAvailableException;
+ channel = null;
+ connected = false;
+ }
+
+ connectionAttemptInProgress = false;
+ PerChannelBookieClient.this.channel = channel;
+
+ // trick to not do operations under the lock, take the list
+ // of pending ops and assign it to a new variable, while
+ // emptying the pending ops by just assigning it to a new
+ // list
+ oldPendingOps = pendingOps;
+ pendingOps = new ArrayDeque<GenericCallback<Void>>();
+ }
+
+ for (GenericCallback<Void> pendingOp : oldPendingOps) {
+ pendingOp.operationComplete(rc, null);
+ }
+
+ }
+ });
+ }
+
+ void connectIfNeededAndDoOp(GenericCallback<Void> op) {
+ boolean doOpNow;
+
+ // common case without lock first
+ if (channel != null && connected) {
+ doOpNow = true;
+ } else {
+
+ synchronized (this) {
+ // check again under lock
+ if (channel != null && connected) {
+ doOpNow = true;
+ } else {
+
+ // if reached here, channel is either null (first connection
+ // attempt),
+ // or the channel is disconnected
+ doOpNow = false;
+
+ // connection attempt is still in progress, queue up this
+ // op. Op will be executed when connection attempt either
+ // fails
+ // or
+ // succeeds
+ pendingOps.add(op);
+
+ if (!connectionAttemptInProgress) {
+ connect(channelFactory);
+ }
+
+ }
+ }
+ }
+
+ if (doOpNow) {
+ op.operationComplete(BKException.Code.OK, null);
+ }
+
+ }
+
+ /**
+ * This method should be called only after connection has been checked for
+ * {@link #connectIfNeededAndDoOp(GenericCallback)}
+ *
+ * @param ledgerId
+ * @param masterKey
+ * @param entryId
+ * @param lastConfirmed
+ * @param macCode
+ * @param data
+ * @param cb
+ * @param ctx
+ */
+ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb,
+ Object ctx) {
+
+ final int entrySize = toSend.readableBytes();
+ // if (totalBytesOutstanding.get() > maxMemory) {
+ // // TODO: how to throttle, throw an exception, or call the callback?
+ // // Maybe this should be done at the layer above?
+ // }
+
+ final CompletionKey completionKey = new CompletionKey(ledgerId, entryId);
+
+ addCompletions.put(completionKey, new AddCompletion(cb, entrySize, ctx));
+
+ int totalHeaderSize = 4 // for the length of the packet
+ + 4 // for the type of request
+ + masterKey.length; // for the master key
+
+ ChannelBuffer header = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
+ header.writeInt(totalHeaderSize - 4 + entrySize);
+ header.writeInt(BookieProtocol.ADDENTRY);
+ header.writeBytes(masterKey);
+
+ ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer(header, toSend);
+
+ ChannelFuture future = channel.write(wrappedBuffer);
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Successfully wrote request for adding entry: " + entryId + " ledger-id: " + ledgerId
+ + " bookie: " + channel.getRemoteAddress() + " entry length: " + entrySize);
+ }
+ // totalBytesOutstanding.addAndGet(entrySize);
+ } else {
+ errorOutAddKey(completionKey);
+ }
+ }
+ });
+
+ }
+
+ public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) {
+
+ final CompletionKey key = new CompletionKey(ledgerId, entryId);
+ readCompletions.put(key, new ReadCompletion(cb, ctx));
+
+ int totalHeaderSize = 4 // for the length of the packet
+ + 4 // for request type
+ + 8 // for ledgerId
+ + 8; // for entryId
+
+ ChannelBuffer tmpEntry = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
+ tmpEntry.writeInt(totalHeaderSize - 4);
+ tmpEntry.writeInt(BookieProtocol.READENTRY);
+ tmpEntry.writeLong(ledgerId);
+ tmpEntry.writeLong(entryId);
+
+ ChannelFuture future = channel.write(tmpEntry);
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Successfully wrote request for reading entry: " + entryId + " ledger-id: "
+ + ledgerId + " bookie: " + channel.getRemoteAddress());
+ }
+ } else {
+ errorOutReadKey(key);
+ }
+ }
+ });
+
+ }
+
+ public void close() {
+ if (channel != null) {
+ channel.close();
+ }
+ }
+
+ void errorOutReadKey(final CompletionKey key) {
+ executor.submitOrdered(key.ledgerId, new SafeRunnable() {
+ @Override
+ public void safeRun() {
+
+ ReadCompletion readCompletion = readCompletions.remove(key);
+
+ if (readCompletion != null) {
+ LOG.error("Could not write request for reading entry: " + key.entryId + " ledger-id: "
+ + key.ledgerId + " bookie: " + channel.getRemoteAddress());
+
+ readCompletion.cb.readEntryComplete(BKException.Code.BookieHandleNotAvailableException,
+ key.ledgerId, key.entryId, null, readCompletion.ctx);
+ }
+ }
+
+ });
+ }
+
+ void errorOutAddKey(final CompletionKey key) {
+ executor.submitOrdered(key.ledgerId, new SafeRunnable() {
+ @Override
+ public void safeRun() {
+
+ AddCompletion addCompletion = addCompletions.remove(key);
+
+ if (addCompletion != null) {
+ String bAddress = "null";
+ if(channel != null)
+ bAddress = channel.getRemoteAddress().toString();
+ LOG.error("Could not write request for adding entry: " + key.entryId + " ledger-id: "
+ + key.ledgerId + " bookie: " + bAddress);
+
+ addCompletion.cb.writeComplete(BKException.Code.BookieHandleNotAvailableException, key.ledgerId,
+ key.entryId, addr, addCompletion.ctx);
+ LOG.error("Invoked callback method: " + key.entryId);
+ }
+ }
+
+ });
+
+ }
+
+ /**
+ * Errors out pending entries. We call this method from one thread to avoid
+ * concurrent executions to QuorumOpMonitor (implements callbacks). It seems
+ * simpler to call it from BookieHandle instead of calling directly from
+ * here.
+ */
+
+ void errorOutOutstandingEntries() {
+
+ // DO NOT rewrite these using Map.Entry iterations. We want to iterate
+ // on keys and see if we are successfully able to remove the key from
+ // the map. Because the add and the read methods also do the same thing
+ // in case they get a write failure on the socket. The one who
+ // successfully removes the key from the map is the one responsible for
+ // calling the application callback.
+
+ for (CompletionKey key : addCompletions.keySet()) {
+ errorOutAddKey(key);
+ }
+
+ for (CompletionKey key : readCompletions.keySet()) {
+ errorOutReadKey(key);
+ }
+ }
+
+ /**
+ * In the netty pipeline, we need to split packets based on length, so we
+ * use the {@link LengthFieldBasedFrameDecoder}. Other than that all actions
+ * are carried out in this class, e.g., making sense of received messages,
+ * prepending the length to outgoing packets etc.
+ */
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+ pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
+ pipeline.addLast("mainhandler", this);
+ return pipeline;
+ }
+
+ /**
+ * If our channel has disconnected, we just error out the pending entries
+ */
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ LOG.info("Disconnected from bookie: " + addr);
+ errorOutOutstandingEntries();
+ channel.close();
+
+ connected = false;
+
+ // we don't want to reconnect right away. If someone sends a request to
+ // this address, we will reconnect.
+ }
+
+ /**
+ * Called by netty when an exception happens in one of the netty threads
+ * (mostly due to what we do in the netty threads)
+ */
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ Throwable t = e.getCause();
+ if (t instanceof CorruptedFrameException || t instanceof TooLongFrameException) {
+ LOG.error("Corrupted fram recieved from bookie: " + e.getChannel().getRemoteAddress());
+ return;
+ }
+ if (t instanceof IOException) {
+ // these are thrown when a bookie fails, logging them just pollutes
+ // the logs (the failure is logged from the listeners on the write
+ // operation), so I'll just ignore it here.
+ return;
+ }
+
+ LOG.fatal("Unexpected exception caught by bookie client channel handler", t);
+ // Since we are a library, cant terminate App here, can we?
+ }
+
+ /**
+ * Called by netty when a message is received on a channel
+ */
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ if (!(e.getMessage() instanceof ChannelBuffer)) {
+ ctx.sendUpstream(e);
+ return;
+ }
+
+ final ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+ final int type, rc;
+ final long ledgerId, entryId;
+
+ try {
+ type = buffer.readInt();
+ rc = buffer.readInt();
+ ledgerId = buffer.readLong();
+ entryId = buffer.readLong();
+ } catch (IndexOutOfBoundsException ex) {
+ LOG.error("Unparseable response from bookie: " + addr, ex);
+ return;
+ }
+
+ executor.submitOrdered(ledgerId, new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ switch (type) {
+ case BookieProtocol.ADDENTRY:
+ handleAddResponse(ledgerId, entryId, rc);
+ break;
+ case BookieProtocol.READENTRY:
+ handleReadResponse(ledgerId, entryId, rc, buffer);
+ break;
+ default:
+ LOG.error("Unexpected response, type: " + type + " recieved from bookie: " + addr + " , ignoring");
+ }
+ }
+
+ });
+ }
+
+ void handleAddResponse(long ledgerId, long entryId, int rc) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
+ + entryId + " rc: " + rc);
+ }
+
+ // convert to BKException code because thats what the uppper
+ // layers expect. This is UGLY, there should just be one set of
+ // error codes.
+ if (rc != BookieProtocol.EOK) {
+ LOG.error("Add for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + addr
+ + " with code: " + rc);
+ rc = BKException.Code.WriteException;
+ } else {
+ rc = BKException.Code.OK;
+ }
+
+ AddCompletion ac;
+ ac = addCompletions.remove(new CompletionKey(ledgerId, entryId));
+ if (ac == null) {
+ LOG.error("Unexpected add response received from bookie: " + addr + " for ledger: " + ledgerId
+ + ", entry: " + entryId + " , ignoring");
+ return;
+ }
+
+ // totalBytesOutstanding.addAndGet(-ac.size);
+
+ ac.cb.writeComplete(rc, ledgerId, entryId, addr, ac.ctx);
+
+ }
+
+ void handleReadResponse(long ledgerId, long entryId, int rc, ChannelBuffer buffer) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
+ + entryId + " rc: " + rc + "entry length: " + buffer.readableBytes());
+ }
+
+ // convert to BKException code because thats what the uppper
+ // layers expect. This is UGLY, there should just be one set of
+ // error codes.
+ if (rc == BookieProtocol.EOK) {
+ rc = BKException.Code.OK;
+ } else if (rc == BookieProtocol.ENOENTRY || rc == BookieProtocol.ENOLEDGER) {
+ rc = BKException.Code.NoSuchEntryException;
+ } else {
+ LOG.error("Read for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + addr
+ + " with code: " + rc);
+ rc = BKException.Code.ReadException;
+ }
+
+ CompletionKey key = new CompletionKey(ledgerId, entryId);
+ ReadCompletion readCompletion = readCompletions.remove(key);
+
+ if (readCompletion == null) {
+ /*
+ * This is a special case. When recovering a ledger, a client
+ * submits a read request with id -1, and receives a response with a
+ * different entry id.
+ */
+ readCompletion = readCompletions.remove(new CompletionKey(ledgerId, -1));
+ }
+
+ if (readCompletion == null) {
+ LOG.error("Unexpected read response recieved from bookie: " + addr + " for ledger: " + ledgerId
+ + ", entry: " + entryId + " , ignoring");
+ return;
+ }
+
+ readCompletion.cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), readCompletion.ctx);
+ }
+
+ /**
+ * Boiler-plate wrapper classes follow
+ *
+ */
+
+ private static class ReadCompletion {
+ final ReadEntryCallback cb;
+ final Object ctx;
+
+ public ReadCompletion(ReadEntryCallback cb, Object ctx) {
+ this.cb = cb;
+ this.ctx = ctx;
+ }
+ }
+
+ private static class AddCompletion {
+ final WriteCallback cb;
+ //final long size;
+ final Object ctx;
+
+ public AddCompletion(WriteCallback cb, long size, Object ctx) {
+ this.cb = cb;
+ //this.size = size;
+ this.ctx = ctx;
+ }
+ }
+
+ private static class CompletionKey {
+ long ledgerId;
+ long entryId;
+
+ CompletionKey(long ledgerId, long entryId) {
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof CompletionKey) || obj == null) {
+ return false;
+ }
+ CompletionKey that = (CompletionKey) obj;
+ return this.ledgerId == that.ledgerId && this.entryId == that.entryId;
+ }
+
+ @Override
+ public int hashCode() {
+ return ((int) ledgerId << 16) ^ ((int) entryId);
+ }
+
+ }
+
+}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ServerStats.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ServerStats.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ServerStats.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ServerStats.java Tue Jan 26 23:16:45 2010
@@ -14,45 +14,51 @@
package org.apache.bookkeeper.proto;
-
public class ServerStats {
- private static ServerStats instance= new ServerStats();
+ private static ServerStats instance = new ServerStats();
private long packetsSent;
private long packetsReceived;
private long maxLatency;
private long minLatency = Long.MAX_VALUE;
private long totalLatency = 0;
private long count = 0;
-
- public interface Provider{
+
+ public interface Provider {
public long getOutstandingRequests();
+
public long getLastProcessedZxid();
}
- private Provider provider=null;
- private Object mutex=new Object();
-
- static public ServerStats getInstance(){
+
+ private Provider provider = null;
+ private Object mutex = new Object();
+
+ static public ServerStats getInstance() {
return instance;
}
+
static public void registerAsConcrete() {
setInstance(new ServerStats());
}
+
static synchronized public void unregister() {
- instance=null;
+ instance = null;
}
- static synchronized protected void setInstance(ServerStats newInstance){
- assert instance==null;
+
+ static synchronized protected void setInstance(ServerStats newInstance) {
+ assert instance == null;
instance = newInstance;
}
- protected ServerStats(){}
-
+
+ protected ServerStats() {
+ }
+
// getters
synchronized public long getMinLatency() {
return (minLatency == Long.MAX_VALUE) ? 0 : minLatency;
}
synchronized public long getAvgLatency() {
- if(count!=0)
+ if (count != 0)
return totalLatency / count;
return 0;
}
@@ -62,15 +68,17 @@
}
public long getOutstandingRequests() {
- synchronized(mutex){
- return (provider!=null)?provider.getOutstandingRequests():-1;
+ synchronized (mutex) {
+ return (provider != null) ? provider.getOutstandingRequests() : -1;
}
}
- public long getLastProcessedZxid(){
- synchronized(mutex){
- return (provider!=null)?provider.getLastProcessedZxid():-1;
+
+ public long getLastProcessedZxid() {
+ synchronized (mutex) {
+ return (provider != null) ? provider.getLastProcessedZxid() : -1;
}
}
+
synchronized public long getPacketsReceived() {
return packetsReceived;
}
@@ -79,29 +87,31 @@
return packetsSent;
}
- public String getServerState(){
+ public String getServerState() {
return "standalone";
}
-
- public String toString(){
+
+ @Override
+ public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("Latency min/avg/max: " + getMinLatency() + "/"
- + getAvgLatency() + "/" + getMaxLatency() + "\n");
+ sb.append("Latency min/avg/max: " + getMinLatency() + "/" + getAvgLatency() + "/" + getMaxLatency() + "\n");
sb.append("Received: " + getPacketsReceived() + "\n");
sb.append("Sent: " + getPacketsSent() + "\n");
if (provider != null) {
sb.append("Outstanding: " + getOutstandingRequests() + "\n");
- sb.append("Zxid: 0x"+ Long.toHexString(getLastProcessedZxid())+ "\n");
+ sb.append("Zxid: 0x" + Long.toHexString(getLastProcessedZxid()) + "\n");
}
- sb.append("Mode: "+getServerState()+"\n");
+ sb.append("Mode: " + getServerState() + "\n");
return sb.toString();
}
+
// mutators
- public void setStatsProvider(Provider zk){
- synchronized(mutex){
- provider=zk;
+ public void setStatsProvider(Provider zk) {
+ synchronized (mutex) {
+ provider = zk;
}
}
+
synchronized void updateLatency(long requestCreateTime) {
long latency = System.currentTimeMillis() - requestCreateTime;
totalLatency += latency;
@@ -113,21 +123,26 @@
maxLatency = latency;
}
}
- synchronized public void resetLatency(){
- totalLatency=count=maxLatency=0;
- minLatency=Long.MAX_VALUE;
+
+ synchronized public void resetLatency() {
+ totalLatency = count = maxLatency = 0;
+ minLatency = Long.MAX_VALUE;
}
- synchronized public void resetMaxLatency(){
- maxLatency=getMinLatency();
+
+ synchronized public void resetMaxLatency() {
+ maxLatency = getMinLatency();
}
+
synchronized public void incrementPacketsReceived() {
packetsReceived++;
}
+
synchronized public void incrementPacketsSent() {
packetsSent++;
}
- synchronized public void resetRequestCounters(){
- packetsReceived=packetsSent=0;
+
+ synchronized public void resetRequestCounters() {
+ packetsReceived = packetsSent = 0;
}
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerInputStream.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerInputStream.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerInputStream.java Tue Jan 26 23:16:45 2010
@@ -23,11 +23,11 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.Enumeration;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.LedgerSequence;
import org.apache.log4j.Logger;
public class LedgerInputStream extends InputStream {
@@ -35,14 +35,16 @@
private LedgerHandle lh;
private ByteBuffer bytebuff;
byte[] bbytes;
- long lastEntry =0;
+ long lastEntry = 0;
int increment = 50;
int defaultSize = 1024 * 1024; // 1MB default size
- LedgerSequence ledgerSeq = null;
-
+ Enumeration<LedgerEntry> ledgerSeq = null;
+
/**
* construct a outputstream from a ledger handle
- * @param lh ledger handle
+ *
+ * @param lh
+ * ledger handle
* @throws {@link BKException}, {@link InterruptedException}
*/
public LedgerInputStream(LedgerHandle lh) throws BKException, InterruptedException {
@@ -50,14 +52,17 @@
bbytes = new byte[defaultSize];
this.bytebuff = ByteBuffer.wrap(bbytes);
this.bytebuff.position(this.bytebuff.limit());
- lastEntry = Math.max(lh.getLast(), increment);
+ lastEntry = Math.min(lh.getLastAddConfirmed(), increment);
ledgerSeq = lh.readEntries(0, lastEntry);
}
/**
* construct a outputstream from a ledger handle
- * @param lh the ledger handle
- * @param size the size of the buffer
+ *
+ * @param lh
+ * the ledger handle
+ * @param size
+ * the size of the buffer
* @throws {@link BKException}, {@link InterruptedException}
*/
public LedgerInputStream(LedgerHandle lh, int size) throws BKException, InterruptedException {
@@ -65,38 +70,37 @@
bbytes = new byte[size];
this.bytebuff = ByteBuffer.wrap(bbytes);
this.bytebuff.position(this.bytebuff.limit());
- lastEntry = Math.max(lh.getLast(), increment);
+ lastEntry = Math.min(lh.getLastAddConfirmed(), increment);
ledgerSeq = lh.readEntries(0, lastEntry);
}
-
-
+
@Override
public void close() {
// do nothing
- // let the applciation
+ // let the application
// close the ledger
}
-
+
/**
- * refill the buffer, we
- * need to read more bytes
+ * refill the buffer, we need to read more bytes
+ *
* @return if we can refill or not
*/
private synchronized boolean refill() throws IOException {
bytebuff.clear();
- if (!ledgerSeq.hasMoreElements() && lastEntry >= lh.getLast()) {
+ if (!ledgerSeq.hasMoreElements() && lastEntry >= lh.getLastAddConfirmed()) {
return false;
}
if (!ledgerSeq.hasMoreElements()) {
- //do refill
- long last = Math.max( lastEntry + increment, lh.getLast());
+ // do refill
+ long last = Math.min(lastEntry + increment, lh.getLastAddConfirmed());
try {
ledgerSeq = lh.readEntries(lastEntry + 1, last);
- } catch(BKException bk) {
+ } catch (BKException bk) {
IOException ie = new IOException(bk.getMessage());
ie.initCause(bk);
throw ie;
- } catch(InterruptedException ie) {
+ } catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
lastEntry = last;
@@ -106,7 +110,7 @@
bytebuff = ByteBuffer.wrap(bbytes);
return true;
}
-
+
@Override
public synchronized int read() throws IOException {
boolean toread = true;
@@ -120,10 +124,10 @@
}
return -1;
}
-
+
@Override
public synchronized int read(byte[] b) throws IOException {
- // be smart ... just copy the bytes
+ // be smart ... just copy the bytes
// once and return the size
// user will call it again
boolean toread = true;
@@ -133,19 +137,19 @@
if (toread) {
int bcopied = bytebuff.remaining();
int tocopy = Math.min(bcopied, b.length);
- //cannot used gets because of
+ // cannot used gets because of
// the underflow/overflow exceptions
- System.arraycopy(bbytes, bytebuff.position(), b,0, tocopy);
+ System.arraycopy(bbytes, bytebuff.position(), b, 0, tocopy);
bytebuff.position(bytebuff.position() + tocopy);
return tocopy;
}
return -1;
}
-
+
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
- //again dont need ot fully
- // fill b, just return
+ // again dont need ot fully
+ // fill b, just return
// what we have and let the application call read
// again
boolean toread = true;
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java Tue Jan 26 23:16:45 2010
@@ -28,15 +28,11 @@
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.log4j.Logger;
-
/**
- * this class provides a streaming api
- * to get an output stream from a ledger
- * handle and write to it as a stream of
- * bytes. This is built on top of ledgerhandle
- * api and uses a buffer to cache the data
- * written to it and writes out the entry
- * to the ledger.
+ * this class provides a streaming api to get an output stream from a ledger
+ * handle and write to it as a stream of bytes. This is built on top of
+ * ledgerhandle api and uses a buffer to cache the data written to it and writes
+ * out the entry to the ledger.
*/
public class LedgerOutputStream extends OutputStream {
Logger LOG = Logger.getLogger(LedgerOutputStream.class);
@@ -44,62 +40,66 @@
private ByteBuffer bytebuff;
byte[] bbytes;
int defaultSize = 1024 * 1024; // 1MB default size
-
+
/**
* construct a outputstream from a ledger handle
- * @param lh ledger handle
+ *
+ * @param lh
+ * ledger handle
*/
public LedgerOutputStream(LedgerHandle lh) {
this.lh = lh;
bbytes = new byte[defaultSize];
this.bytebuff = ByteBuffer.wrap(bbytes);
}
-
+
/**
* construct a outputstream from a ledger handle
- * @param lh the ledger handle
- * @param size the size of the buffer
+ *
+ * @param lh
+ * the ledger handle
+ * @param size
+ * the size of the buffer
*/
public LedgerOutputStream(LedgerHandle lh, int size) {
this.lh = lh;
bbytes = new byte[size];
this.bytebuff = ByteBuffer.wrap(bbytes);
}
-
+
@Override
public void close() {
- //flush everything
+ // flush everything
// we have
flush();
}
-
+
@Override
public synchronized void flush() {
- // lets flush all the data
+ // lets flush all the data
// into the ledger entry
if (bytebuff.position() > 0) {
- //copy the bytes into
+ // copy the bytes into
// a new byte buffer and send it out
byte[] b = new byte[bytebuff.position()];
LOG.info("Comment: flushing with params " + " " + bytebuff.position());
System.arraycopy(bbytes, 0, b, 0, bytebuff.position());
try {
lh.addEntry(b);
- } catch(InterruptedException ie) {
+ } catch (InterruptedException ie) {
LOG.warn("Interrupted while flusing " + ie);
Thread.currentThread().interrupt();
- } catch(BKException bke) {
+ } catch (BKException bke) {
LOG.warn("BookKeeper exception ", bke);
}
}
}
-
+
/**
- * make space for len bytes to be written
- * to the buffer.
+ * make space for len bytes to be written to the buffer.
+ *
* @param len
- * @return if true then we can make space for len
- * if false we cannot
+ * @return if true then we can make space for len if false we cannot
*/
private boolean makeSpace(int len) {
if (bytebuff.remaining() < len) {
@@ -111,34 +111,33 @@
}
return true;
}
-
+
@Override
public synchronized void write(byte[] b) {
if (makeSpace(b.length)) {
bytebuff.put(b);
- }
- else {
+ } else {
try {
lh.addEntry(b);
- } catch(InterruptedException ie) {
+ } catch (InterruptedException ie) {
LOG.warn("Interrupted while writing", ie);
Thread.currentThread().interrupt();
- } catch(BKException bke) {
+ } catch (BKException bke) {
LOG.warn("BookKeeper exception", bke);
}
}
}
-
+
@Override
public synchronized void write(byte[] b, int off, int len) {
if (!makeSpace(len)) {
- //lets try making the buffer bigger
+ // lets try making the buffer bigger
bbytes = new byte[len];
bytebuff = ByteBuffer.wrap(bbytes);
}
bytebuff.put(b, off, len);
}
-
+
@Override
public synchronized void write(int b) throws IOException {
makeSpace(1);
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java Tue Jan 26 23:16:45 2010
@@ -25,9 +25,6 @@
import java.io.OutputStream;
import java.net.Socket;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.LedgerSequence;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
@@ -40,11 +37,8 @@
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.ServerStats;
import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.log4j.Logger;
-
public class LocalBookKeeper {
protected static final Logger LOG = Logger.getLogger(LocalBookKeeper.class);
public static final int CONNECTION_TIMEOUT = 30000;
@@ -98,7 +92,7 @@
// TODO Auto-generated catch block
LOG.fatal("Exception while instantiating ZooKeeper", e);
}
-
+
boolean b = waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT);
LOG.debug("ZooKeeper server up: " + b);
}
@@ -210,5 +204,5 @@
}
return false;
}
-
+
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/Main.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/Main.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/Main.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/Main.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
package org.apache.bookkeeper.util;
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,30 +21,28 @@
*
*/
-
import java.io.IOException;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieServer;
-
public class Main {
static void usage() {
System.err.println("USAGE: bookeeper client|bookie");
}
+
/**
* @param args
- * @throws InterruptedException
- * @throws IOException
+ * @throws InterruptedException
+ * @throws IOException
*/
public static void main(String[] args) throws IOException, InterruptedException {
- if (args.length < 1 || !(args[0].equals("client") ||
- args[0].equals("bookie"))) {
+ if (args.length < 1 || !(args[0].equals("client") || args[0].equals("bookie"))) {
usage();
return;
}
- String newArgs[] = new String[args.length-1];
+ String newArgs[] = new String[args.length - 1];
System.arraycopy(args, 1, newArgs, 0, newArgs.length);
if (args[0].equals("bookie")) {
BookieServer.main(newArgs);
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/MathUtils.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/MathUtils.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/MathUtils.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/MathUtils.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,38 @@
+package org.apache.bookkeeper.util;
+
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Provides misc math functions that dont come standard
+ */
+public class MathUtils {
+
+ public static int signSafeMod(long dividend, int divisor){
+ int mod = (int) (dividend % divisor);
+
+ if (mod < 0){
+ mod += divisor;
+ }
+
+ return mod;
+
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,98 @@
+package org.apache.bookkeeper.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * This class provides 2 things over the java {@link ScheduledExecutorService}.
+ *
+ * 1. It takes {@link SafeRunnable objects} instead of plain Runnable objects.
+ * This means that exceptions in scheduled tasks wont go unnoticed and will be
+ * logged.
+ *
+ * 2. It supports submitting tasks with an ordering key, so that tasks submitted
+ * with the same key will always be executed in order, but tasks across
+ * different keys can be unordered. This retains parallelism while retaining the
+ * basic amount of ordering we want (e.g. , per ledger handle). Ordering is
+ * achieved by hashing the key objects to threads by their {@link #hashCode()}
+ * method.
+ *
+ */
+public class OrderedSafeExecutor {
+ ExecutorService threads[];
+ Random rand = new Random();
+
+ public OrderedSafeExecutor(int numThreads) {
+ if (numThreads <= 0) {
+ throw new IllegalArgumentException();
+ }
+
+ threads = new ExecutorService[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ threads[i] = Executors.newSingleThreadExecutor();
+ }
+ }
+
+ ExecutorService chooseThread() {
+ // skip random # generation in this special case
+ if (threads.length == 1) {
+ return threads[0];
+ }
+
+ return threads[rand.nextInt(threads.length)];
+
+ }
+
+ ExecutorService chooseThread(Object orderingKey) {
+ // skip hashcode generation in this special case
+ if (threads.length == 1) {
+ return threads[0];
+ }
+
+ return threads[MathUtils.signSafeMod(orderingKey.hashCode(), threads.length)];
+
+ }
+
+ /**
+ * schedules a one time action to execute
+ */
+ public void submit(SafeRunnable r) {
+ chooseThread().submit(r);
+ }
+
+ /**
+ * schedules a one time action to execute with an ordering guarantee on the key
+ * @param orderingKey
+ * @param r
+ */
+ public void submitOrdered(Object orderingKey, SafeRunnable r) {
+ chooseThread(orderingKey).submit(r);
+ }
+
+ public void shutdown() {
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].shutdown();
+ }
+ }
+
+}