You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/02/24 20:07:58 UTC
svn commit: r1293383 - in /zookeeper/bookkeeper/trunk: CHANGES.txt
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
Author: ivank
Date: Fri Feb 24 19:07:57 2012
New Revision: 1293383
URL: http://svn.apache.org/viewvc?rev=1293383&view=rev
Log:
BOOKKEEPER-113: NPE In BookKeeper test (fpj via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1293383&r1=1293382&r2=1293383&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Feb 24 19:07:57 2012
@@ -46,6 +46,8 @@ Trunk (unreleased changes)
BOOKKEEPER-177: Index file is lost or some index pages aren't flushed. (sijie via ivank)
+ BOOKKEEPER-113: NPE In BookKeeper test (fpj via ivank)
+
hedwig-server/
BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region is down. (Sijie Gou via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1293383&r1=1293382&r2=1293383&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Fri Feb 24 19:07:57 2012
@@ -225,11 +225,6 @@ public class PerChannelBookieClient exte
Object ctx, final int options) {
final int entrySize = toSend.readableBytes();
- // if (totalBytesOutstanding.get() > maxMemory) {
- // // TODO: how to throttle, throw an exception, or call the callback?
- // // Maybe this should be done at the layer above?
- // }
-
final CompletionKey completionKey = new CompletionKey(ledgerId, entryId);
addCompletions.put(completionKey, new AddCompletion(cb, entrySize, ctx));
@@ -238,30 +233,35 @@ public class PerChannelBookieClient exte
+ 4 // for the type of request
+ masterKey.length; // for the master key
- ChannelBuffer header = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
- header.writeInt(totalHeaderSize - 4 + entrySize);
- header.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION,
- BookieProtocol.ADDENTRY, (short)options).toInt());
- header.writeBytes(masterKey);
-
- ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer(header, toSend);
+ try{
+ ChannelBuffer header = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
- ChannelFuture future = channel.write(wrappedBuffer);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully wrote request for adding entry: " + entryId + " ledger-id: " + ledgerId
- + " bookie: " + channel.getRemoteAddress() + " entry length: " + entrySize);
+ header.writeInt(totalHeaderSize - 4 + entrySize);
+ header.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION,
+ BookieProtocol.ADDENTRY, (short)options).toInt());
+ header.writeBytes(masterKey);
+
+ ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer(header, toSend);
+
+ ChannelFuture future = channel.write(wrappedBuffer);
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Successfully wrote request for adding entry: " + entryId + " ledger-id: " + ledgerId
+ + " bookie: " + channel.getRemoteAddress() + " entry length: " + entrySize);
+ }
+ // totalBytesOutstanding.addAndGet(entrySize);
+ } else {
+ errorOutAddKey(completionKey);
}
- // totalBytesOutstanding.addAndGet(entrySize);
- } else {
- errorOutAddKey(completionKey);
}
- }
- });
-
+ });
+ } catch (Throwable e) {
+ LOG.warn("Read entry operation failed", e);
+ errorOutReadKey(completionKey);
+ }
}
public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx, final int options) {
@@ -273,29 +273,33 @@ public class PerChannelBookieClient exte
+ 8 // for ledgerId
+ 8; // for entryId
- ChannelBuffer tmpEntry = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
- tmpEntry.writeInt(totalHeaderSize - 4);
-
- tmpEntry.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION,
- BookieProtocol.READENTRY, (short)options).toInt());
- tmpEntry.writeLong(ledgerId);
- tmpEntry.writeLong(entryId);
-
- ChannelFuture future = channel.write(tmpEntry);
- future.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully wrote request for reading entry: " + entryId + " ledger-id: "
- + ledgerId + " bookie: " + channel.getRemoteAddress());
+ try{
+ ChannelBuffer tmpEntry = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
+ tmpEntry.writeInt(totalHeaderSize - 4);
+
+ tmpEntry.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION,
+ BookieProtocol.READENTRY, (short)options).toInt());
+ tmpEntry.writeLong(ledgerId);
+ tmpEntry.writeLong(entryId);
+
+ ChannelFuture future = channel.write(tmpEntry);
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Successfully wrote request for reading entry: " + entryId + " ledger-id: "
+ + ledgerId + " bookie: " + channel.getRemoteAddress());
+ }
+ } else {
+ errorOutReadKey(key);
}
- } else {
- errorOutReadKey(key);
}
- }
- });
-
+ });
+ } catch(Throwable e) {
+ LOG.warn("Read entry operation failed", e);
+ errorOutReadKey(key);
+ }
}
public void close() {