You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/02/24 20:07:58 UTC

svn commit: r1293383 - in /zookeeper/bookkeeper/trunk: CHANGES.txt bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java

Author: ivank
Date: Fri Feb 24 19:07:57 2012
New Revision: 1293383

URL: http://svn.apache.org/viewvc?rev=1293383&view=rev
Log:
BOOKKEEPER-113: NPE In BookKeeper test (fpj via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1293383&r1=1293382&r2=1293383&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Feb 24 19:07:57 2012
@@ -46,6 +46,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-177: Index file is lost or some index pages aren't flushed. (sijie via ivank)
 
+        BOOKKEEPER-113: NPE In BookKeeper test (fpj via ivank)
+
       hedwig-server/
       
         BOOKKEEPER-140: Hub server doesn't subscribe remote region correctly when a region is down. (Sijie Gou via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1293383&r1=1293382&r2=1293383&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Fri Feb 24 19:07:57 2012
@@ -225,11 +225,6 @@ public class PerChannelBookieClient exte
                   Object ctx, final int options) {
         final int entrySize = toSend.readableBytes();
 
-        // if (totalBytesOutstanding.get() > maxMemory) {
-        // // TODO: how to throttle, throw an exception, or call the callback?
-        // // Maybe this should be done at the layer above?
-        // }
-
         final CompletionKey completionKey = new CompletionKey(ledgerId, entryId);
 
         addCompletions.put(completionKey, new AddCompletion(cb, entrySize, ctx));
@@ -238,30 +233,35 @@ public class PerChannelBookieClient exte
                               + 4 // for the type of request
                               + masterKey.length; // for the master key
 
-        ChannelBuffer header = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
-        header.writeInt(totalHeaderSize - 4 + entrySize);
-        header.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION, 
-                                         BookieProtocol.ADDENTRY, (short)options).toInt());
-        header.writeBytes(masterKey);
-
-        ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer(header, toSend);
+        try{
+            ChannelBuffer header = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
 
-        ChannelFuture future = channel.write(wrappedBuffer);
-        future.addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-                if (future.isSuccess()) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Successfully wrote request for adding entry: " + entryId + " ledger-id: " + ledgerId
-                                  + " bookie: " + channel.getRemoteAddress() + " entry length: " + entrySize);
+            header.writeInt(totalHeaderSize - 4 + entrySize);
+            header.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION,
+                                             BookieProtocol.ADDENTRY, (short)options).toInt());
+            header.writeBytes(masterKey);
+
+            ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer(header, toSend);
+
+            ChannelFuture future = channel.write(wrappedBuffer);
+            future.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (future.isSuccess()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Successfully wrote request for adding entry: " + entryId + " ledger-id: " + ledgerId
+                                                            + " bookie: " + channel.getRemoteAddress() + " entry length: " + entrySize);
+                        }
+                        // totalBytesOutstanding.addAndGet(entrySize);
+                    } else {
+                        errorOutAddKey(completionKey);
                     }
-                    // totalBytesOutstanding.addAndGet(entrySize);
-                } else {
-                    errorOutAddKey(completionKey);
                 }
-            }
-        });
-
+            });
+        } catch (Throwable e) {
+            LOG.warn("Read entry operation failed", e);
+            errorOutReadKey(completionKey);
+        }
     }
 
     public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx, final int options) {
@@ -273,29 +273,33 @@ public class PerChannelBookieClient exte
                               + 8 // for ledgerId
                               + 8; // for entryId
 
-        ChannelBuffer tmpEntry = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
-        tmpEntry.writeInt(totalHeaderSize - 4);
-
-        tmpEntry.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION, 
-                                           BookieProtocol.READENTRY, (short)options).toInt());
-        tmpEntry.writeLong(ledgerId);
-        tmpEntry.writeLong(entryId);
-
-        ChannelFuture future = channel.write(tmpEntry);
-        future.addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-                if (future.isSuccess()) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Successfully wrote request for reading entry: " + entryId + " ledger-id: "
-                                  + ledgerId + " bookie: " + channel.getRemoteAddress());
+        try{
+            ChannelBuffer tmpEntry = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
+            tmpEntry.writeInt(totalHeaderSize - 4);
+
+            tmpEntry.writeInt(new PacketHeader(BookieProtocol.CURRENT_PROTOCOL_VERSION,
+                                               BookieProtocol.READENTRY, (short)options).toInt());
+            tmpEntry.writeLong(ledgerId);
+            tmpEntry.writeLong(entryId);
+
+            ChannelFuture future = channel.write(tmpEntry);
+            future.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (future.isSuccess()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Successfully wrote request for reading entry: " + entryId + " ledger-id: "
+                                                            + ledgerId + " bookie: " + channel.getRemoteAddress());
+                        }
+                    } else {
+                        errorOutReadKey(key);
                     }
-                } else {
-                    errorOutReadKey(key);
                 }
-            }
-        });
-
+            });
+        } catch(Throwable e) {
+            LOG.warn("Read entry operation failed", e);
+            errorOutReadKey(key);
+        }
     }
 
     public void close() {