You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2014/07/25 00:34:20 UTC
svn commit: r1613315 [1/4] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/
bookkeeper-server/src/main/java/org/apache...
Author: fpj
Date: Thu Jul 24 22:34:19 2014
New Revision: 1613315
URL: http://svn.apache.org/r1613315
Log:
BOOKKEEPER-582: Make bookie and client use protobuf for requests (non-wire part)
(sijie via fpj)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperProtocol.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
Removed:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestProtoVersions.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java
zookeeper/bookkeeper/trunk/compat-deps/pom.xml
zookeeper/bookkeeper/trunk/hedwig-server/pom.xml
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Jul 24 22:34:19 2014
@@ -303,6 +303,9 @@ Trunk (unreleased changes)
BOOKKEEPER-562: Ability to tell if a ledger is closed or not (fpj)
BOOKKEEPER-257: Ability to list all ledgers (fpj via ivank)
+
+ BOOKKEEPER-582: Make bookie and client use protobuf for requests (non-wire part)
+ (sijie via fpj)
Release 4.2.0 - 2013-01-14
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/pom.xml Thu Jul 24 22:34:19 2014
@@ -161,6 +161,18 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper-server-compat420</artifactId>
+ <version>4.2.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper-server</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
<plugins>
@@ -249,6 +261,7 @@
<excludes>
<!-- exclude generated file //-->
<exclude>**/DataFormats.java</exclude>
+ <exclude>**/BookkeeperProtocol.java</exclude>
</excludes>
</configuration>
</plugin>
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java Thu Jul 24 22:34:19 2014
@@ -127,7 +127,8 @@ public class LedgerChecker {
public void readEntryComplete(int rc, long ledgerId, long entryId,
ChannelBuffer buffer, Object ctx) {
- if (rc != BKException.Code.NoSuchEntryException) {
+ if (BKException.Code.NoSuchEntryException != rc &&
+ BKException.Code.NoSuchLedgerExistsException != rc) {
entryMayExist.set(true);
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java Thu Jul 24 22:34:19 2014
@@ -186,15 +186,19 @@ class PendingReadOp implements Enumerati
synchronized void logErrorAndReattemptRead(BookieSocketAddress host, String errMsg, int rc) {
if (BKException.Code.OK == firstError ||
- BKException.Code.NoSuchEntryException == firstError) {
+ BKException.Code.NoSuchEntryException == firstError ||
+ BKException.Code.NoSuchLedgerExistsException == firstError) {
firstError = rc;
} else if (BKException.Code.BookieHandleNotAvailableException == firstError &&
- BKException.Code.NoSuchEntryException != rc) {
- // if other exception rather than NoSuchEntryException is returned
- // we need to update firstError to indicate that it might be a valid read but just failed.
+ BKException.Code.NoSuchEntryException != rc &&
+ BKException.Code.NoSuchLedgerExistsException != rc) {
+ // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is
+ // returned we need to update firstError to indicate that it might be a valid read but just
+ // failed.
firstError = rc;
}
- if (BKException.Code.NoSuchEntryException == rc) {
+ if (BKException.Code.NoSuchEntryException == rc ||
+ BKException.Code.NoSuchLedgerExistsException == rc) {
++numMissedEntryReads;
LOG.debug("No such entry found on bookie. L{} E{} bookie: {}",
new Object[] { lh.ledgerId, entryId, host });
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/RequestProcessor.java Thu Jul 24 22:34:19 2014
@@ -20,7 +20,6 @@
*/
package org.apache.bookkeeper.processor;
-import org.apache.bookkeeper.proto.BookieProtocol;
import org.jboss.netty.channel.Channel;
public interface RequestProcessor {
@@ -38,6 +37,6 @@ public interface RequestProcessor {
* @param channel
* channel received the given request <i>r</i>
*/
- public void processRequest(BookieProtocol.Request r, Channel channel);
+ public void processRequest(Object r, Channel channel);
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java Thu Jul 24 22:34:19 2014
@@ -118,9 +118,11 @@ public class BookieClient implements Per
PerChannelBookieClientPool oldClientPool = channels.putIfAbsent(addr, newClientPool);
if (null == oldClientPool) {
clientPool = newClientPool;
+ // initialize the pool only after we put the pool into the map
+ clientPool.intialize();
} else {
clientPool = oldClientPool;
- newClientPool.close();
+ newClientPool.close(false);
}
} finally {
closeLock.readLock().unlock();
@@ -130,10 +132,10 @@ public class BookieClient implements Per
}
public void closeClients(Set<BookieSocketAddress> addrs) {
+ final HashSet<PerChannelBookieClientPool> clients =
+ new HashSet<PerChannelBookieClientPool>();
closeLock.readLock().lock();
try {
- final HashSet<PerChannelBookieClientPool> clients =
- new HashSet<PerChannelBookieClientPool>();
for (BookieSocketAddress a : addrs) {
PerChannelBookieClientPool c = channels.get(a);
if (c != null) {
@@ -144,17 +146,12 @@ public class BookieClient implements Per
if (clients.size() == 0) {
return;
}
- executor.submit(new SafeRunnable() {
- @Override
- public void safeRun() {
- for (PerChannelBookieClientPool c : clients) {
- c.disconnect();
- }
- }
- });
} finally {
closeLock.readLock().unlock();
}
+ for (PerChannelBookieClientPool c : clients) {
+ c.disconnect(false);
+ }
}
public void addEntry(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
@@ -279,7 +276,7 @@ public class BookieClient implements Per
try {
closed = true;
for (PerChannelBookieClientPool pool : channels.values()) {
- pool.close();
+ pool.close(true);
}
channels.clear();
} finally {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java Thu Jul 24 22:34:19 2014
@@ -20,6 +20,9 @@
*/
package org.apache.bookkeeper.proto;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.jboss.netty.buffer.ChannelBufferFactory;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
@@ -35,9 +38,38 @@ import org.slf4j.LoggerFactory;
public class BookieProtoEncoding {
private final static Logger LOG = LoggerFactory.getLogger(BookieProtoEncoding.class);
- public static class RequestEncoder extends OneToOneEncoder {
+ static final EnDecoder REQ_PREV3 = new RequestEnDeCoderPreV3();
+ static final EnDecoder REP_PREV3 = new ResponseEnDeCoderPreV3();
+ static final EnDecoder REQ_V3 = new RequestEnDecoderV3();
+ static final EnDecoder REP_V3 = new ResponseEnDecoderV3();
+
+ static interface EnDecoder {
+
+ /**
+ * Encode a <i>object</i> into channel buffer.
+ *
+ * @param object
+ * object.
+ * @return encode buffer.
+ * @throws Exception
+ */
+ public Object encode(Object object, ChannelBufferFactory factory) throws Exception;
+
+ /**
+ * Decode a <i>packet</i> into an object.
+ *
+ * @param packet
+ * received packet.
+ * @return parsed object.
+ * @throws Exception
+ */
+ public Object decode(ChannelBuffer packet) throws Exception;
+
+ }
+
+ static class RequestEnDeCoderPreV3 implements EnDecoder {
@Override
- public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+ public Object encode(Object msg, ChannelBufferFactory bufferFactory)
throws Exception {
if (!(msg instanceof BookieProtocol.Request)) {
return msg;
@@ -47,7 +79,7 @@ public class BookieProtoEncoding {
BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest)r;
int totalHeaderSize = 4 // for the header
+ BookieProtocol.MASTER_KEY_LENGTH; // for the master key
- ChannelBuffer buf = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
+ ChannelBuffer buf = bufferFactory.getBuffer(totalHeaderSize);
buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt());
buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
return ChannelBuffers.wrappedBuffer(buf, ar.getData());
@@ -60,7 +92,7 @@ public class BookieProtoEncoding {
totalHeaderSize += BookieProtocol.MASTER_KEY_LENGTH;
}
- ChannelBuffer buf = channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
+ ChannelBuffer buf = bufferFactory.getBuffer(totalHeaderSize);
buf.writeInt(new PacketHeader(r.getProtocolVersion(), r.getOpCode(), r.getFlags()).toInt());
buf.writeLong(r.getLedgerId());
buf.writeLong(r.getEntryId());
@@ -71,17 +103,10 @@ public class BookieProtoEncoding {
return buf;
}
}
- }
- public static class RequestDecoder extends OneToOneDecoder {
@Override
- public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
+ public Object decode(ChannelBuffer packet)
throws Exception {
- if (!(msg instanceof ChannelBuffer)) {
- return msg;
- }
- ChannelBuffer packet = (ChannelBuffer)msg;
-
PacketHeader h = PacketHeader.fromInt(packet.readInt());
// packet format is different between ADDENTRY and READENTRY
@@ -117,20 +142,19 @@ public class BookieProtoEncoding {
return new BookieProtocol.ReadRequest(h.getVersion(), ledgerId, entryId, flags);
}
}
- return msg;
+ return packet;
}
}
- public static class ResponseEncoder extends OneToOneEncoder {
+ static class ResponseEnDeCoderPreV3 implements EnDecoder {
@Override
- public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+ public Object encode(Object msg, ChannelBufferFactory bufferFactory)
throws Exception {
if (!(msg instanceof BookieProtocol.Response)) {
return msg;
}
BookieProtocol.Response r = (BookieProtocol.Response)msg;
- ChannelBuffer buf = ctx.getChannel().getConfig().getBufferFactory()
- .getBuffer(24);
+ ChannelBuffer buf = bufferFactory.getBuffer(24);
buf.writeInt(new PacketHeader(r.getProtocolVersion(),
r.getOpCode(), (short)0).toInt());
buf.writeInt(r.getErrorCode());
@@ -153,17 +177,9 @@ public class BookieProtoEncoding {
return msg;
}
}
- }
-
- public static class ResponseDecoder extends OneToOneDecoder {
@Override
- public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
+ public Object decode(ChannelBuffer buffer)
throws Exception {
- if (!(msg instanceof ChannelBuffer)) {
- return msg;
- }
-
- final ChannelBuffer buffer = (ChannelBuffer)msg;
final int rc;
final long ledgerId, entryId;
final PacketHeader header;
@@ -185,10 +201,130 @@ public class BookieProtoEncoding {
ledgerId, entryId);
}
default:
- LOG.error("Unexpected response of type {} received from {}",
- header.getOpCode(), channel.getRemoteAddress());
+ return buffer;
+ }
+ }
+ }
+
+ static class RequestEnDecoderV3 implements EnDecoder {
+
+ @Override
+ public Object decode(ChannelBuffer packet) throws Exception {
+ return BookkeeperProtocol.Request.parseFrom(new ChannelBufferInputStream(packet));
+ }
+
+ @Override
+ public Object encode(Object msg, ChannelBufferFactory factory) throws Exception {
+ BookkeeperProtocol.Request request = (BookkeeperProtocol.Request) msg;
+ return ChannelBuffers.wrappedBuffer(request.toByteArray());
+ }
+
+ }
+
+ static class ResponseEnDecoderV3 implements EnDecoder {
+
+ @Override
+ public Object decode(ChannelBuffer packet) throws Exception {
+ return BookkeeperProtocol.Response.parseFrom(new ChannelBufferInputStream(packet));
+ }
+
+ @Override
+ public Object encode(Object msg, ChannelBufferFactory factory) throws Exception {
+ BookkeeperProtocol.Response response = (BookkeeperProtocol.Response) msg;
+ return ChannelBuffers.wrappedBuffer(response.toByteArray());
+ }
+
+ }
+
+ public static class RequestEncoder extends OneToOneEncoder {
+
+ @Override
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+ throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Encode request {} to channel {}.", msg, channel);
+ }
+ if (msg instanceof BookkeeperProtocol.Request) {
+ return REQ_V3.encode(msg, ctx.getChannel().getConfig().getBufferFactory());
+ } else if (msg instanceof BookieProtocol.Request) {
+ return REQ_PREV3.encode(msg, ctx.getChannel().getConfig().getBufferFactory());
+ } else {
+ LOG.error("Invalid request to encode to {}: {}", channel, msg.getClass().getName());
return msg;
}
}
}
+
+ public static class RequestDecoder extends OneToOneDecoder {
+
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
+ throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received request {} from channel {} to decode.", msg, channel);
+ }
+ if (!(msg instanceof ChannelBuffer)) {
+ return msg;
+ }
+ ChannelBuffer buffer = (ChannelBuffer) msg;
+ try {
+ buffer.markReaderIndex();
+ try {
+ return REQ_V3.decode(buffer);
+ } catch (InvalidProtocolBufferException e) {
+ buffer.resetReaderIndex();
+ return REQ_PREV3.decode(buffer);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to decode a request from {} : ", channel, e);
+ throw e;
+ }
+ }
+ }
+
+ public static class ResponseEncoder extends OneToOneEncoder {
+
+ @Override
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg)
+ throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Encode response {} to channel {}.", msg, channel);
+ }
+ if (msg instanceof BookkeeperProtocol.Response) {
+ return REP_V3.encode(msg, ctx.getChannel().getConfig().getBufferFactory());
+ } else if (msg instanceof BookieProtocol.Response) {
+ return REP_PREV3.encode(msg, ctx.getChannel().getConfig().getBufferFactory());
+ } else {
+ LOG.error("Invalid response to encode to {}: {}", channel, msg.getClass().getName());
+ return msg;
+ }
+ }
+ }
+
+ public static class ResponseDecoder extends OneToOneDecoder {
+
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
+ throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received response {} from channel {} to decode.", msg, channel);
+ }
+ if (!(msg instanceof ChannelBuffer)) {
+ return msg;
+ }
+ ChannelBuffer buffer = (ChannelBuffer) msg;
+ try {
+ buffer.markReaderIndex();
+ try {
+ return REP_V3.decode(buffer);
+ } catch (InvalidProtocolBufferException e) {
+ buffer.resetReaderIndex();
+ return REP_PREV3.decode(buffer);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to decode a response from channel {} : ", channel, e);
+ throw e;
+ }
+ }
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestHandler.java Thu Jul 24 22:34:19 2014
@@ -22,7 +22,6 @@ package org.apache.bookkeeper.proto;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.processor.RequestProcessor;
-import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
@@ -71,6 +70,7 @@ class BookieRequestHandler extends Simpl
LOG.debug("Channel connected {}", e);
}
+ @Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
LOG.debug("Channel disconnected {}", e);
@@ -78,14 +78,12 @@ class BookieRequestHandler extends Simpl
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- if (!(e.getMessage() instanceof BookieProtocol.Request)) {
+ Object event = e.getMessage();
+ if (!(event instanceof BookkeeperProtocol.Request || event instanceof BookieProtocol.Request)) {
ctx.sendUpstream(e);
return;
}
- BookieProtocol.Request r = (BookieProtocol.Request)e.getMessage();
- Channel c = ctx.getChannel();
- requestProcessor.processRequest(r, c);
+ requestProcessor.processRequest(event, ctx.getChannel());
}
-
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java Thu Jul 24 22:34:19 2014
@@ -21,26 +21,18 @@
package org.apache.bookkeeper.proto;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.processor.RequestProcessor;
-import org.apache.bookkeeper.util.MathUtils;
import org.jboss.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BookieRequestProcessor implements RequestProcessor, BookkeeperInternalCallbacks.WriteCallback {
+public class BookieRequestProcessor implements RequestProcessor {
private final static Logger LOG = LoggerFactory.getLogger(BookieRequestProcessor.class);
/**
@@ -101,234 +93,83 @@ public class BookieRequestProcessor impl
}
@Override
- public void processRequest(BookieProtocol.Request r, Channel c) {
- if (r.getProtocolVersion() < BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION
- || r.getProtocolVersion() > BookieProtocol.CURRENT_PROTOCOL_VERSION) {
- LOG.error("Invalid protocol version, expected something between "
- + BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION
- + " & " + BookieProtocol.CURRENT_PROTOCOL_VERSION
- + ". got " + r.getProtocolVersion());
- c.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, r));
- return;
- }
-
- switch (r.getOpCode()) {
- case BookieProtocol.ADDENTRY:
- processAddRequest(r, c);
- break;
- case BookieProtocol.READENTRY:
- processReadRequest(r, c);
- break;
- default:
- LOG.error("Unknown op type {}, sending error", r.getOpCode());
- c.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
- if (statsEnabled) {
- bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
+ public void processRequest(Object msg, Channel c) {
+ // If we can decode this packet as a Request protobuf packet, process
+ // it as a version 3 packet. Else, just use the old protocol.
+ if (msg instanceof BookkeeperProtocol.Request) {
+ BookkeeperProtocol.Request r = (BookkeeperProtocol.Request) msg;
+ BookkeeperProtocol.BKPacketHeader header = r.getHeader();
+ switch (header.getOperation()) {
+ case ADD_ENTRY:
+ processAddRequestV3(r, c);
+ break;
+ case READ_ENTRY:
+ processReadRequestV3(r, c);
+ break;
+ default:
+ BookkeeperProtocol.Response.Builder response =
+ BookkeeperProtocol.Response.newBuilder().setHeader(r.getHeader())
+ .setStatus(BookkeeperProtocol.StatusCode.EBADREQ);
+ c.write(response.build());
+ if (statsEnabled) {
+ bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
+ }
+ break;
}
- break;
- }
- }
-
- class AddCtx {
- final Channel c;
- final BookieProtocol.AddRequest r;
- final long startTime;
-
- AddCtx(Channel c, BookieProtocol.AddRequest r) {
- this.c = c;
- this.r = r;
-
- if (statsEnabled) {
- startTime = MathUtils.now();
- } else {
- startTime = 0;
+ } else {
+ BookieProtocol.Request r = (BookieProtocol.Request) msg;
+ // process packet
+ switch (r.getOpCode()) {
+ case BookieProtocol.ADDENTRY:
+ processAddRequest(r, c);
+ break;
+ case BookieProtocol.READENTRY:
+ processReadRequest(r, c);
+ break;
+ default:
+ LOG.error("Unknown op type {}, sending error", r.getOpCode());
+ c.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADREQ, r));
+ if (statsEnabled) {
+ bkStats.getOpStats(BKStats.STATS_UNKNOWN).incrementFailedOps();
+ }
+ break;
}
}
}
- private void processAddRequest(final BookieProtocol.Request r, final Channel c) {
+ private void processAddRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
+ WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, bookie);
if (null == writeThreadPool) {
- handleAdd(r, c);
+ write.run();
} else {
- writeThreadPool.submit(new Runnable() {
- @Override
- public void run() {
- handleAdd(r, c);
- }
- });
+ writeThreadPool.submit(write);
}
}
- private void handleAdd(BookieProtocol.Request r, Channel c) {
- assert (r instanceof BookieProtocol.AddRequest);
- BookieProtocol.AddRequest add = (BookieProtocol.AddRequest) r;
-
- if (bookie.isReadOnly()) {
- LOG.warn("BookieServer is running as readonly mode,"
- + " so rejecting the request from the client!");
- c.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, add));
- if (statsEnabled) {
- bkStats.getOpStats(BKStats.STATS_ADD).incrementFailedOps();
- }
- return;
- }
-
- int rc = BookieProtocol.EOK;
- try {
- if (add.isRecoveryAdd()) {
- bookie.recoveryAddEntry(add.getDataAsByteBuffer(), this, new AddCtx(c, add),
- add.getMasterKey());
- } else {
- bookie.addEntry(add.getDataAsByteBuffer(),
- this, new AddCtx(c, add), add.getMasterKey());
- }
- } catch (IOException e) {
- LOG.error("Error writing " + add, e);
- rc = BookieProtocol.EIO;
- } catch (BookieException.LedgerFencedException lfe) {
- LOG.error("Attempt to write to fenced ledger", lfe);
- rc = BookieProtocol.EFENCED;
- } catch (BookieException e) {
- LOG.error("Unauthorized access to ledger " + add.getLedgerId(), e);
- rc = BookieProtocol.EUA;
- }
- if (rc != BookieProtocol.EOK) {
- c.write(ResponseBuilder.buildErrorResponse(rc, add));
- if (statsEnabled) {
- bkStats.getOpStats(BKStats.STATS_ADD).incrementFailedOps();
- }
+ private void processReadRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
+ ReadEntryProcessorV3 read = new ReadEntryProcessorV3(r, c, bookie);
+ if (null == readThreadPool) {
+ read.run();
+ } else {
+ readThreadPool.submit(read);
}
}
- @Override
- public void writeComplete(int rc, long ledgerId, long entryId,
- BookieSocketAddress addr, Object ctx) {
- assert (ctx instanceof AddCtx);
- AddCtx addctx = (AddCtx) ctx;
- addctx.c.write(ResponseBuilder.buildAddResponse(addctx.r));
-
- if (statsEnabled) {
- // compute the latency
- if (0 == rc) {
- // for add operations, we compute latency in writeComplete callbacks.
- long elapsedTime = MathUtils.now() - addctx.startTime;
- bkStats.getOpStats(BKStats.STATS_ADD).updateLatency(elapsedTime);
- } else {
- bkStats.getOpStats(BKStats.STATS_ADD).incrementFailedOps();
- }
+ private void processAddRequest(final BookieProtocol.Request r, final Channel c) {
+ WriteEntryProcessor write = new WriteEntryProcessor(r, c, bookie);
+ if (null == writeThreadPool) {
+ write.run();
+ } else {
+ writeThreadPool.submit(write);
}
}
private void processReadRequest(final BookieProtocol.Request r, final Channel c) {
+ ReadEntryProcessor read = new ReadEntryProcessor(r, c, bookie);
if (null == readThreadPool) {
- handleRead(r, c);
- } else {
- readThreadPool.submit(new Runnable() {
- @Override
- public void run() {
- handleRead(r, c);
- }
- });
- }
- }
-
- private void handleRead(BookieProtocol.Request r, Channel c) {
- assert (r instanceof BookieProtocol.ReadRequest);
- BookieProtocol.ReadRequest read = (BookieProtocol.ReadRequest) r;
-
- LOG.debug("Received new read request: {}", r);
- int errorCode = BookieProtocol.EIO;
- long startTime = 0;
- if (statsEnabled) {
- startTime = MathUtils.now();
- }
- ByteBuffer data = null;
- try {
- Future<Boolean> fenceResult = null;
- if (read.isFencingRequest()) {
- LOG.warn("Ledger " + r.getLedgerId() + " fenced by " + c.getRemoteAddress());
-
- if (read.hasMasterKey()) {
- fenceResult = bookie.fenceLedger(read.getLedgerId(), read.getMasterKey());
- } else {
- LOG.error("Password not provided, Not safe to fence {}", read.getLedgerId());
- if (statsEnabled) {
- bkStats.getOpStats(BKStats.STATS_READ).incrementFailedOps();
- }
- throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
- }
- }
- data = bookie.readEntry(r.getLedgerId(), r.getEntryId());
- LOG.debug("##### Read entry ##### {}", data.remaining());
- if (null != fenceResult) {
- // TODO:
- // currently we don't have readCallback to run in separated read
- // threads. after BOOKKEEPER-429 is complete, we could improve
- // following code to make it not wait here
- //
- // For now, since we only try to wait after read entry. so writing
- // to journal and read entry are executed in different thread
- // it would be fine.
- try {
- Boolean fenced = fenceResult.get(1000, TimeUnit.MILLISECONDS);
- if (null == fenced || !fenced) {
- // if failed to fence, fail the read request to make it retry.
- errorCode = BookieProtocol.EIO;
- data = null;
- } else {
- errorCode = BookieProtocol.EOK;
- }
- } catch (InterruptedException ie) {
- LOG.error("Interrupting fence read entry " + read, ie);
- errorCode = BookieProtocol.EIO;
- data = null;
- } catch (ExecutionException ee) {
- LOG.error("Failed to fence read entry " + read, ee);
- errorCode = BookieProtocol.EIO;
- data = null;
- } catch (TimeoutException te) {
- LOG.error("Timeout to fence read entry " + read, te);
- errorCode = BookieProtocol.EIO;
- data = null;
- }
- } else {
- errorCode = BookieProtocol.EOK;
- }
- } catch (Bookie.NoLedgerException e) {
- if (LOG.isTraceEnabled()) {
- LOG.error("Error reading " + read, e);
- }
- errorCode = BookieProtocol.ENOLEDGER;
- } catch (Bookie.NoEntryException e) {
- if (LOG.isTraceEnabled()) {
- LOG.error("Error reading " + read, e);
- }
- errorCode = BookieProtocol.ENOENTRY;
- } catch (IOException e) {
- if (LOG.isTraceEnabled()) {
- LOG.error("Error reading " + read, e);
- }
- errorCode = BookieProtocol.EIO;
- } catch (BookieException e) {
- LOG.error("Unauthorized access to ledger " + read.getLedgerId(), e);
- errorCode = BookieProtocol.EUA;
- }
-
- LOG.trace("Read entry rc = {} for {}",
- new Object[] { errorCode, read });
- if (errorCode == BookieProtocol.EOK) {
- assert data != null;
-
- c.write(ResponseBuilder.buildReadResponse(data, read));
- if (statsEnabled) {
- long elapsedTime = MathUtils.now() - startTime;
- bkStats.getOpStats(BKStats.STATS_READ).updateLatency(elapsedTime);
- }
+ read.run();
} else {
- c.write(ResponseBuilder.buildErrorResponse(errorCode, read));
- if (statsEnabled) {
- bkStats.getOpStats(BKStats.STATS_READ).incrementFailedOps();
- }
+ readThreadPool.submit(read);
}
}