You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2014/07/25 00:34:20 UTC
svn commit: r1613315 [3/4] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/processor/
bookkeeper-server/src/main/java/org/apache...
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java Thu Jul 24 22:34:19 2014
@@ -52,8 +52,6 @@ class DefaultPerChannelBookieClientPool
this.clients = new PerChannelBookieClient[coreSize];
for (int i = 0; i < coreSize; i++) {
this.clients[i] = factory.create(address);
- // connect proactively
- this.clients[i].connectIfNeededAndDoOp(this);
}
}
@@ -63,6 +61,13 @@ class DefaultPerChannelBookieClientPool
}
@Override
+ public void intialize() {
+ for (PerChannelBookieClient pcbc : this.clients) {
+ pcbc.connectIfNeededAndDoOp(this);
+ }
+ }
+
+ @Override
public void obtain(GenericCallback<PerChannelBookieClient> callback) {
if (1 == clients.length) {
clients[0].connectIfNeededAndDoOp(callback);
@@ -73,16 +78,16 @@ class DefaultPerChannelBookieClientPool
}
@Override
- public void disconnect() {
+ public void disconnect(boolean wait) {
for (PerChannelBookieClient pcbc : clients) {
- pcbc.disconnect();
+ pcbc.disconnect(wait);
}
}
@Override
- public void close() {
+ public void close(boolean wait) {
for (PerChannelBookieClient pcbc : clients) {
- pcbc.close();
+ pcbc.close(wait);
}
}
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java?rev=1613315&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java Thu Jul 24 22:34:19 2014
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookieProtocol.Request;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class PacketProcessorBase implements Runnable {
+ private final static Logger logger = LoggerFactory.getLogger(PacketProcessorBase.class);
+ final Request request;
+ final Channel channel;
+ final Bookie bookie;
+
+ PacketProcessorBase(Request request, Channel channel, Bookie bookie) {
+ this.request = request;
+ this.channel = channel;
+ this.bookie = bookie;
+ }
+
+ protected boolean isVersionCompatible() {
+ byte version = request.getProtocolVersion();
+ if (version < BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION
+ || version > BookieProtocol.CURRENT_PROTOCOL_VERSION) {
+ logger.error("Invalid protocol version, expected something between "
+ + BookieProtocol.LOWEST_COMPAT_PROTOCOL_VERSION
+ + " & " + BookieProtocol.CURRENT_PROTOCOL_VERSION
+ + ". got " + request.getProtocolVersion());
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void run() {
+ if (!isVersionCompatible()) {
+ channel.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, request));
+ return;
+ }
+ processPacket();
+ }
+
+ protected abstract void processPacket();
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java?rev=1613315&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java Thu Jul 24 22:34:19 2014
@@ -0,0 +1,58 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.jboss.netty.channel.Channel;
+
+public abstract class PacketProcessorBaseV3 {
+
+ final Request request;
+ final Channel channel;
+ final Bookie bookie;
+
+ public PacketProcessorBaseV3(Request request, Channel channel, Bookie bookie) {
+ this.request = request;
+ this.channel = channel;
+ this.bookie = bookie;
+ }
+
+ protected boolean isVersionCompatible() {
+ return this.request.getHeader().getVersion().equals(ProtocolVersion.VERSION_THREE);
+ }
+
+ /**
+ * Build a header with protocol version 3 and the operation type same as what was in the
+ * request.
+ * @return
+ */
+ protected BKPacketHeader getHeader() {
+ BKPacketHeader.Builder header = BKPacketHeader.newBuilder();
+ header.setVersion(ProtocolVersion.VERSION_THREE);
+ header.setOperation(request.getHeader().getOperation());
+ header.setTxnId(request.getHeader().getTxnId());
+ return header.build();
+ }
+
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java Thu Jul 24 22:34:19 2014
@@ -23,12 +23,24 @@ import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.protobuf.ByteString;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperClientStats;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -40,6 +52,7 @@ import org.apache.bookkeeper.util.Ordere
import org.apache.bookkeeper.util.SafeRunnable;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
@@ -73,6 +86,7 @@ public class PerChannelBookieClient exte
static final Logger LOG = LoggerFactory.getLogger(PerChannelBookieClient.class);
public static final int MAX_FRAME_LENGTH = 2 * 1024 * 1024; // 2M
+ public static final AtomicLong txnIdGenerator = new AtomicLong(0);
final BookieSocketAddress addr;
final ClientSocketChannelFactory channelFactory;
@@ -81,8 +95,7 @@ public class PerChannelBookieClient exte
final int addEntryTimeout;
final int readEntryTimeout;
- ConcurrentHashMap<CompletionKey, AddCompletion> addCompletions = new ConcurrentHashMap<CompletionKey, AddCompletion>();
- ConcurrentHashMap<CompletionKey, ReadCompletion> readCompletions = new ConcurrentHashMap<CompletionKey, ReadCompletion>();
+ private final ConcurrentHashMap<CompletionKey, CompletionValue> completionObjects = new ConcurrentHashMap<CompletionKey, CompletionValue>();
private final StatsLogger statsLogger;
private final OpStatsLogger readEntryOpLogger;
@@ -150,7 +163,7 @@ public class PerChannelBookieClient exte
}
private void connect() {
- LOG.info("Connecting to bookie: {}", addr);
+ LOG.debug("Connecting to bookie: {}", addr);
// Set up the ClientBootStrap so we can create a new Channel connection
// to the bookie.
@@ -277,19 +290,41 @@ public class PerChannelBookieClient exte
*/
void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, WriteCallback cb,
Object ctx, final int options) {
- BookieProtocol.AddRequest r = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
- ledgerId, entryId, (short)options, masterKey, toSend);
+ final long txnId = getTxnId();
final int entrySize = toSend.readableBytes();
- final CompletionKey completionKey = new CompletionKey(ledgerId, entryId, BookieProtocol.ADDENTRY);
- addCompletions.put(completionKey,
- new AddCompletion(addEntryOpLogger, cb, ctx, scheduleTimeout(completionKey, addEntryTimeout)));
+ final CompletionKey completionKey = new CompletionKey(txnId, OperationType.ADD_ENTRY);
+ completionObjects.put(completionKey,
+ new AddCompletion(addEntryOpLogger, cb, ctx, ledgerId, entryId,
+ scheduleTimeout(completionKey, addEntryTimeout)));
+
+ // Build the request and calculate the total size to be included in the packet.
+ BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.ADD_ENTRY)
+ .setTxnId(txnId);
+
+ AddRequest.Builder addBuilder = AddRequest.newBuilder()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId)
+ .setMasterKey(ByteString.copyFrom(masterKey))
+ .setBody(ByteString.copyFrom(toSend.toByteBuffer()));
+
+ if (((short)options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD) {
+ addBuilder.setFlag(AddRequest.Flag.RECOVERY_ADD);
+ }
+
+ final Request addRequest = Request.newBuilder()
+ .setHeader(headerBuilder)
+ .setAddRequest(addBuilder)
+ .build();
+
final Channel c = channel;
if (c == null) {
errorOutAddKey(completionKey);
return;
}
try {
- ChannelFuture future = c.write(r);
+ ChannelFuture future = c.write(addRequest);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@@ -317,82 +352,110 @@ public class PerChannelBookieClient exte
public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey,
final long entryId,
ReadEntryCallback cb, Object ctx) {
- final CompletionKey key = new CompletionKey(ledgerId, entryId, BookieProtocol.READENTRY);
- readCompletions.put(key,
- new ReadCompletion(readEntryOpLogger, cb, ctx, scheduleTimeout(key, readEntryTimeout)));
-
- final BookieProtocol.ReadRequest r = new BookieProtocol.ReadRequest(
- BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
- BookieProtocol.FLAG_DO_FENCING, masterKey);
+ final long txnId = getTxnId();
+ final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
+ completionObjects.put(completionKey,
+ new ReadCompletion(readEntryOpLogger, cb, ctx, ledgerId, entryId,
+ scheduleTimeout(completionKey, readEntryTimeout)));
+
+ // Build the request and calculate the total size to be included in the packet.
+ BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.READ_ENTRY)
+ .setTxnId(txnId);
+
+ ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId)
+ .setMasterKey(ByteString.copyFrom(masterKey))
+ .setFlag(ReadRequest.Flag.FENCE_LEDGER);
+
+ final Request readRequest = Request.newBuilder()
+ .setHeader(headerBuilder)
+ .setReadRequest(readBuilder)
+ .build();
final Channel c = channel;
if (c == null) {
- errorOutReadKey(key);
+ errorOutReadKey(completionKey);
return;
}
try {
- ChannelFuture future = c.write(r);
+ ChannelFuture future = c.write(readRequest);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully wrote request {} to {}",
- r, c.getRemoteAddress());
+ readRequest, c.getRemoteAddress());
}
} else {
if (!(future.getCause() instanceof ClosedChannelException)) {
LOG.warn("Writing readEntryAndFenceLedger(lid={}, eid={}) to channel {} failed : ",
new Object[] { ledgerId, entryId, c, future.getCause() });
}
- errorOutReadKey(key);
+ errorOutReadKey(completionKey);
}
}
});
} catch(Throwable e) {
- LOG.warn("Read entry operation " + r + " failed", e);
- errorOutReadKey(key);
+ LOG.warn("Read entry operation {} failed", completionKey, e);
+ errorOutReadKey(completionKey);
}
}
public void readEntry(final long ledgerId, final long entryId, ReadEntryCallback cb, Object ctx) {
- final CompletionKey key = new CompletionKey(ledgerId, entryId, BookieProtocol.READENTRY);
- readCompletions.put(key,
- new ReadCompletion(readEntryOpLogger, cb, ctx, scheduleTimeout(key, readEntryTimeout)));
-
- final BookieProtocol.ReadRequest r = new BookieProtocol.ReadRequest(
- BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
- BookieProtocol.FLAG_NONE);
+ final long txnId = getTxnId();
+ final CompletionKey completionKey = new CompletionKey(txnId, OperationType.READ_ENTRY);
+ completionObjects.put(completionKey,
+ new ReadCompletion(readEntryOpLogger, cb, ctx, ledgerId, entryId,
+ scheduleTimeout(completionKey, readEntryTimeout)));
+
+ // Build the request and calculate the total size to be included in the packet.
+ BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder()
+ .setVersion(ProtocolVersion.VERSION_THREE)
+ .setOperation(OperationType.READ_ENTRY)
+ .setTxnId(txnId);
+
+ ReadRequest.Builder readBuilder = ReadRequest.newBuilder()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId);
+
+ final Request readRequest = Request.newBuilder()
+ .setHeader(headerBuilder)
+ .setReadRequest(readBuilder)
+ .build();
final Channel c = channel;
if (c == null) {
- errorOutReadKey(key);
+ errorOutReadKey(completionKey);
return;
}
try{
- ChannelFuture future = c.write(r);
+ ChannelFuture future = c.write(readRequest);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully wrote request {} to {}",
- r, c.getRemoteAddress());
+ readRequest, c.getRemoteAddress());
}
} else {
if (!(future.getCause() instanceof ClosedChannelException)) {
LOG.warn("Writing readEntry(lid={}, eid={}) to channel {} failed : ",
new Object[] { ledgerId, entryId, c, future.getCause() });
}
- errorOutReadKey(key);
+ errorOutReadKey(completionKey);
}
}
});
} catch(Throwable e) {
- LOG.warn("Read entry operation " + r + " failed", e);
- errorOutReadKey(key);
+ LOG.warn("Read entry operation {} failed", readRequest, e);
+ errorOutReadKey(completionKey);
}
}
@@ -400,13 +463,23 @@ public class PerChannelBookieClient exte
* Disconnects the bookie client. It can be reused.
*/
public void disconnect() {
- closeInternal(false);
+ disconnect(true);
+ }
+
+ public void disconnect(boolean wait) {
+ LOG.info("Disconnecting the per channel bookie client for {}", addr);
+ closeInternal(false, wait);
}
/**
* Closes the bookie client permanently. It cannot be reused.
*/
public void close() {
+ close(true);
+ }
+
+ public void close(boolean wait) {
+ LOG.info("Closing the per channel bookie client for {}", addr);
closeLock.writeLock().lock();
try {
if (ConnectionState.CLOSED == state) {
@@ -417,10 +490,10 @@ public class PerChannelBookieClient exte
} finally {
closeLock.writeLock().unlock();
}
- closeInternal(true);
+ closeInternal(true, wait);
}
- private void closeInternal(boolean permanent) {
+ private void closeInternal(boolean permanent, boolean wait) {
Channel toClose = null;
synchronized (this) {
if (permanent) {
@@ -432,7 +505,10 @@ public class PerChannelBookieClient exte
channel = null;
}
if (toClose != null) {
- closeChannel(toClose).awaitUninterruptibly();
+ ChannelFuture cf = closeChannel(toClose);
+ if (wait) {
+ cf.awaitUninterruptibly();
+ }
}
}
@@ -450,27 +526,25 @@ public class PerChannelBookieClient exte
}
void errorOutReadKey(final CompletionKey key, final int rc) {
- executor.submitOrdered(key.ledgerId, new SafeRunnable() {
+ final ReadCompletion readCompletion = (ReadCompletion)completionObjects.remove(key);
+ if (null == readCompletion) {
+ return;
+ }
+ executor.submitOrdered(readCompletion.ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
-
- ReadCompletion readCompletion = readCompletions.remove(key);
String bAddress = "null";
Channel c = channel;
- if(c != null) {
+ if (c != null) {
bAddress = c.getRemoteAddress().toString();
}
- if (readCompletion != null) {
- LOG.debug("Could not write request for reading entry: {}"
- + " ledger-id: {} bookie: {}",
- new Object[] { key.entryId, key.ledgerId, bAddress });
+ LOG.debug("Could not write request for reading entry: {} ledger-id: {} bookie: {}",
+ new Object[]{ readCompletion.entryId, readCompletion.ledgerId, bAddress });
- readCompletion.cb.readEntryComplete(rc,
- key.ledgerId, key.entryId, null, readCompletion.ctx);
- }
+ readCompletion.cb.readEntryComplete(rc, readCompletion.ledgerId, readCompletion.entryId,
+ null, readCompletion.ctx);
}
-
});
}
@@ -479,29 +553,26 @@ public class PerChannelBookieClient exte
}
void errorOutAddKey(final CompletionKey key, final int rc) {
- executor.submitOrdered(key.ledgerId, new SafeRunnable() {
+ final AddCompletion addCompletion = (AddCompletion)completionObjects.remove(key);
+ if (null == addCompletion) {
+ return;
+ }
+ executor.submitOrdered(addCompletion.ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
-
- AddCompletion addCompletion = addCompletions.remove(key);
-
- if (addCompletion != null) {
- String bAddress = "null";
- Channel c = channel;
- if(c != null) {
- bAddress = c.getRemoteAddress().toString();
- }
- LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {}",
- new Object[] { key.entryId, key.ledgerId, bAddress });
-
- addCompletion.cb.writeComplete(rc, key.ledgerId,
- key.entryId, addr, addCompletion.ctx);
- LOG.debug("Invoked callback method: {}", key.entryId);
+ String bAddress = "null";
+ Channel c = channel;
+ if(c != null) {
+ bAddress = c.getRemoteAddress().toString();
}
- }
+ LOG.debug("Could not write request for adding entry: {} ledger-id: {} bookie: {}",
+ new Object[] { addCompletion.entryId, addCompletion.ledgerId, bAddress });
+ addCompletion.cb.writeComplete(rc, addCompletion.ledgerId, addCompletion.entryId,
+ addr, addCompletion.ctx);
+ LOG.debug("Invoked callback method: {}", addCompletion.entryId);
+ }
});
-
}
/**
@@ -519,13 +590,17 @@ public class PerChannelBookieClient exte
// in case they get a write failure on the socket. The one who
// successfully removes the key from the map is the one responsible for
// calling the application callback.
-
- for (CompletionKey key : addCompletions.keySet()) {
- errorOutAddKey(key, rc);
- }
-
- for (CompletionKey key : readCompletions.keySet()) {
- errorOutReadKey(key, rc);
+ for (CompletionKey key : completionObjects.keySet()) {
+ switch (key.operationType) {
+ case ADD_ENTRY:
+ errorOutAddKey(key, rc);
+ break;
+ case READ_ENTRY:
+ errorOutReadKey(key, rc);
+ break;
+ default:
+ break;
+ }
}
}
@@ -543,8 +618,6 @@ public class PerChannelBookieClient exte
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder());
pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.ResponseDecoder());
-
-
pipeline.addLast("mainhandler", this);
return pipeline;
}
@@ -609,125 +682,101 @@ public class PerChannelBookieClient exte
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- if (!(e.getMessage() instanceof BookieProtocol.Response)) {
+ if (!(e.getMessage() instanceof Response)) {
ctx.sendUpstream(e);
return;
}
- final BookieProtocol.Response r = (BookieProtocol.Response)e.getMessage();
- executor.submitOrdered(r.getLedgerId(), new SafeRunnable() {
- @Override
- public void safeRun() {
- switch (r.getOpCode()) {
- case BookieProtocol.ADDENTRY:
- BookieProtocol.AddResponse a = (BookieProtocol.AddResponse)r;
- handleAddResponse(a);
- break;
- case BookieProtocol.READENTRY:
- BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r;
- handleReadResponse(rr);
- break;
- default:
- LOG.error("Unexpected response, type: {}", r);
- }
+ final Response response = (Response) e.getMessage();
+ final BKPacketHeader header = response.getHeader();
+
+ final CompletionValue completionValue = completionObjects.remove(newCompletionKey(header.getTxnId(),
+ header.getOperation()));
+
+ if (null == completionValue) {
+ // Unexpected response, so log it. The txnId should have been present.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unexpected response received from bookie : " + addr + " for type : " + header. getOperation() +
+ " and txnId : " + header.getTxnId());
}
- });
+
+ } else {
+ long orderingKey = completionValue.ledgerId;
+ executor.submitOrdered(orderingKey, new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ OperationType type = header.getOperation();
+ switch (type) {
+ case ADD_ENTRY:
+ handleAddResponse(response.getAddResponse(), completionValue);
+ break;
+ case READ_ENTRY:
+ handleReadResponse(response.getReadResponse(), completionValue);
+ break;
+ default:
+ LOG.error("Unexpected response, type:{} received from bookie:{}, ignoring",
+ type, addr);
+ break;
+ }
+ }
+ });
+ }
}
- void handleAddResponse(BookieProtocol.AddResponse a) {
+ void handleAddResponse(AddResponse response, CompletionValue completionValue) {
+ // The completion value should always be an instance of an AddCompletion object when we reach here.
+ AddCompletion ac = (AddCompletion)completionValue;
+
+ long ledgerId = response.getLedgerId();
+ long entryId = response.getEntryId();
+ StatusCode status = response.getStatus();
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Got response for add request from bookie: {} for ledger: {}", addr, a);
+ LOG.debug("Got response for add request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
+ + entryId + " rc: " + status);
}
-
// convert to BKException code because thats what the uppper
// layers expect. This is UGLY, there should just be one set of
// error codes.
- int rc = BKException.Code.WriteException;
- switch (a.getErrorCode()) {
- case BookieProtocol.EOK:
- rc = BKException.Code.OK;
- break;
- case BookieProtocol.EBADVERSION:
- rc = BKException.Code.ProtocolVersionException;
- break;
- case BookieProtocol.EFENCED:
- rc = BKException.Code.LedgerFencedException;
- break;
- case BookieProtocol.EUA:
- rc = BKException.Code.UnauthorizedAccessException;
- break;
- case BookieProtocol.EREADONLY:
- rc = BKException.Code.WriteOnReadOnlyBookieException;
- break;
- default:
- LOG.warn("Add failed {}", a);
- rc = BKException.Code.WriteException;
- break;
- }
-
- AddCompletion ac;
- ac = addCompletions.remove(new CompletionKey(a.getLedgerId(),
- a.getEntryId(), BookieProtocol.ADDENTRY));
- if (ac == null) {
- LOG.debug("Unexpected add response from bookie {} for {}", addr, a);
- return;
+ Integer rcToRet = statusCodeToExceptionCode(status);
+ if (null == rcToRet) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Add for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + addr
+ + " with code:" + status);
+ }
+ rcToRet = BKException.Code.WriteException;
}
-
- ac.cb.writeComplete(rc, a.getLedgerId(), a.getEntryId(), addr, ac.ctx);
+ ac.cb.writeComplete(rcToRet, ledgerId, entryId, addr, ac.ctx);
}
- void handleReadResponse(BookieProtocol.ReadResponse rr) {
+ void handleReadResponse(ReadResponse response, CompletionValue completionValue) {
+ // The completion value should always be an instance of a ReadCompletion object when we reach here.
+ ReadCompletion rc = (ReadCompletion)completionValue;
+
+ long ledgerId = response.getLedgerId();
+ long entryId = response.getEntryId();
+ StatusCode status = response.getStatus();
+ ChannelBuffer buffer = ChannelBuffers.buffer(0);
+
+ if (response.hasBody()) {
+ buffer = ChannelBuffers.copiedBuffer(response.getBody().asReadOnlyByteBuffer());
+ }
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Got response for read request {} entry length: {}", rr,
- rr.getData() != null ? rr.getData().readableBytes() : -1);
+ LOG.debug("Got response for read request from bookie: " + addr + " for ledger: " + ledgerId + " entry: "
+ + entryId + " rc: " + rc + " entry length: " + buffer.readableBytes());
}
// convert to BKException code because thats what the uppper
// layers expect. This is UGLY, there should just be one set of
// error codes.
- int rc = BKException.Code.ReadException;
- switch (rr.getErrorCode()) {
- case BookieProtocol.EOK:
- rc = BKException.Code.OK;
- break;
- case BookieProtocol.ENOENTRY:
- case BookieProtocol.ENOLEDGER:
- rc = BKException.Code.NoSuchEntryException;
- break;
- case BookieProtocol.EBADVERSION:
- rc = BKException.Code.ProtocolVersionException;
- break;
- case BookieProtocol.EUA:
- rc = BKException.Code.UnauthorizedAccessException;
- break;
- default:
- LOG.warn("Read error for {}", rr);
- rc = BKException.Code.ReadException;
- break;
- }
-
- CompletionKey key = new CompletionKey(rr.getLedgerId(), rr.getEntryId(), BookieProtocol.READENTRY);
- ReadCompletion readCompletion = readCompletions.remove(key);
-
- if (readCompletion == null) {
- /*
- * This is a special case. When recovering a ledger, a client
- * submits a read request with id -1, and receives a response with a
- * different entry id.
- */
-
- readCompletion = readCompletions.remove(new CompletionKey(rr.getLedgerId(),
- BookieProtocol.LAST_ADD_CONFIRMED,
- BookieProtocol.READENTRY));
+ Integer rcToRet = statusCodeToExceptionCode(status);
+ if (null == rcToRet) {
+ LOG.error("Read entry for ledger:{}, entry:{} failed on bookie:{} with code:{}",
+ new Object[] { ledgerId, entryId, addr, status });
+ rcToRet = BKException.Code.ReadException;
}
-
- if (readCompletion == null) {
- LOG.debug("Unexpected read response received from bookie: {} for {}", addr, rr);
- return;
- }
-
- readCompletion.cb.readEntryComplete(rc, rr.getLedgerId(), rr.getEntryId(),
- rr.getData(), readCompletion.ctx);
+ rc.cb.readEntryComplete(rcToRet, ledgerId, entryId, buffer.slice(), rc.ctx);
}
/**
@@ -738,10 +787,15 @@ public class PerChannelBookieClient exte
// visible for testing
static abstract class CompletionValue {
final Object ctx;
+ protected final long ledgerId;
+ protected final long entryId;
protected final Timeout timeout;
- public CompletionValue(Object ctx, Timeout timeout) {
+ public CompletionValue(Object ctx, long ledgerId, long entryId,
+ Timeout timeout) {
this.ctx = ctx;
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
this.timeout = timeout;
}
@@ -756,14 +810,16 @@ public class PerChannelBookieClient exte
static class ReadCompletion extends CompletionValue {
final ReadEntryCallback cb;
- public ReadCompletion(ReadEntryCallback cb, Object ctx) {
- this(null, cb, ctx, null);
+ public ReadCompletion(ReadEntryCallback cb, Object ctx,
+ long ledgerId, long entryId) {
+ this(null, cb, ctx, ledgerId, entryId, null);
}
public ReadCompletion(final OpStatsLogger readEntryOpLogger,
final ReadEntryCallback originalCallback,
- final Object originalCtx, final Timeout timeout) {
- super(originalCtx, timeout);
+ final Object originalCtx, final long ledgerId, final long entryId,
+ final Timeout timeout) {
+ super(originalCtx, ledgerId, entryId, timeout);
final long requestTimeMillis = MathUtils.now();
this.cb = null == readEntryOpLogger ? originalCallback : new ReadEntryCallback() {
@Override
@@ -785,15 +841,16 @@ public class PerChannelBookieClient exte
static class AddCompletion extends CompletionValue {
final WriteCallback cb;
- public AddCompletion(WriteCallback cb, Object ctx) {
- this(null, cb, ctx, null);
+ public AddCompletion(WriteCallback cb, Object ctx,
+ long ledgerId, long entryId) {
+ this(null, cb, ctx, ledgerId, entryId, null);
}
public AddCompletion(final OpStatsLogger addEntryOpLogger,
final WriteCallback originalCallback,
- final Object originalCtx,
+ final Object originalCtx, final long ledgerId, final long entryId,
final Timeout timeout) {
- super(originalCtx, timeout);
+ super(originalCtx, ledgerId, entryId, timeout);
final long requestTimeMillis = MathUtils.now();
this.cb = null == addEntryOpLogger ? originalCallback : new WriteCallback() {
@Override
@@ -812,8 +869,8 @@ public class PerChannelBookieClient exte
}
// visable for testing
- CompletionKey newCompletionKey(long ledgerId, long entryId, byte operationType) {
- return new CompletionKey(ledgerId, entryId, operationType);
+ CompletionKey newCompletionKey(long txnId, OperationType operationType) {
+ return new CompletionKey(txnId, operationType);
}
Timeout scheduleTimeout(CompletionKey key, long timeout) {
@@ -825,16 +882,14 @@ public class PerChannelBookieClient exte
}
class CompletionKey implements TimerTask {
- final long ledgerId;
- final long entryId;
+ final long txnId;
+ final OperationType operationType;
final long requestAt;
- final byte operationType;
- CompletionKey(long ledgerId, long entryId, byte opType) {
- this.ledgerId = ledgerId;
- this.entryId = entryId;
+ CompletionKey(long txnId, OperationType operationType) {
+ this.txnId = txnId;
+ this.operationType = operationType;
this.requestAt = MathUtils.nowInNano();
- this.operationType = opType;
}
@Override
@@ -843,17 +898,17 @@ public class PerChannelBookieClient exte
return false;
}
CompletionKey that = (CompletionKey) obj;
- return this.ledgerId == that.ledgerId && this.entryId == that.entryId;
+ return this.txnId == that.txnId && this.operationType == that.operationType;
}
@Override
public int hashCode() {
- return ((int) ledgerId << 16) ^ ((int) entryId);
+ return ((int) txnId);
}
@Override
public String toString() {
- return String.format("LedgerEntry(%d, %d)", ledgerId, entryId);
+ return String.format("TxnId(%d), OperationType(%s)", txnId, operationType);
}
private long elapsedTime() {
@@ -865,7 +920,7 @@ public class PerChannelBookieClient exte
if (timeout.isCancelled()) {
return;
}
- if (BookieProtocol.ADDENTRY == operationType) {
+ if (OperationType.ADD_ENTRY == operationType) {
errorOutAddKey(this);
addTimeoutOpLogger.registerSuccessfulEvent(elapsedTime());
} else {
@@ -875,4 +930,43 @@ public class PerChannelBookieClient exte
}
}
+ /**
+ * Note : Helper functions follow
+ */
+
+ /**
+ * @param status
+ * @return null if the statuscode is unknown.
+ */
+ private Integer statusCodeToExceptionCode(StatusCode status) {
+ Integer rcToRet = null;
+ switch (status) {
+ case EOK:
+ rcToRet = BKException.Code.OK;
+ break;
+ case ENOENTRY:
+ rcToRet = BKException.Code.NoSuchEntryException;
+ break;
+ case ENOLEDGER:
+ rcToRet = BKException.Code.NoSuchLedgerExistsException;
+ break;
+ case EBADVERSION:
+ rcToRet = BKException.Code.ProtocolVersionException;
+ break;
+ case EUA:
+ rcToRet = BKException.Code.UnauthorizedAccessException;
+ break;
+ case EFENCED:
+ rcToRet = BKException.Code.LedgerFencedException;
+ break;
+ default:
+ break;
+ }
+ return rcToRet;
+ }
+
+ private long getTxnId() {
+ return txnIdGenerator.incrementAndGet();
+ }
+
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java Thu Jul 24 22:34:19 2014
@@ -28,6 +28,11 @@ import org.apache.bookkeeper.proto.Bookk
interface PerChannelBookieClientPool {
/**
+ * intialize the pool. the implementation should not be blocked.
+ */
+ void intialize();
+
+ /**
* Obtain a channel from channel pool to execute operations.
*
* @param callback
@@ -37,12 +42,18 @@ interface PerChannelBookieClientPool {
/**
* Disconnect the connections in the pool.
+ *
+ * @param wait
+ * whether need to wait until pool disconnected.
*/
- void disconnect();
+ void disconnect(boolean wait);
/**
* Close the pool.
+ *
+ * @param wait
+ * whether need to wait until pool closed.
*/
- void close();
+ void close(boolean wait);
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java?rev=1613315&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java Thu Jul 24 22:34:19 2014
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.proto.BookieProtocol.Request;
+import org.apache.bookkeeper.util.MathUtils;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ReadEntryProcessor extends PacketProcessorBase {
+ private final static Logger LOG = LoggerFactory.getLogger(ReadEntryProcessor.class);
+
+ public ReadEntryProcessor(Request request, Channel channel, Bookie bookie) {
+ super(request, channel, bookie);
+ }
+
+ @Override
+ protected void processPacket() {
+ assert (request instanceof BookieProtocol.ReadRequest);
+ BookieProtocol.ReadRequest read = (BookieProtocol.ReadRequest) request;
+
+ LOG.debug("Received new read request: {}", request);
+ int errorCode = BookieProtocol.EIO;
+ long startTime = MathUtils.now();
+ ByteBuffer data = null;
+ try {
+ Future<Boolean> fenceResult = null;
+ if (read.isFencingRequest()) {
+ LOG.warn("Ledger " + request.getLedgerId() + " fenced by " + channel.getRemoteAddress());
+
+ if (read.hasMasterKey()) {
+ fenceResult = bookie.fenceLedger(read.getLedgerId(), read.getMasterKey());
+ } else {
+ LOG.error("Password not provided, Not safe to fence {}", read.getLedgerId());
+ BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
+ throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
+ }
+ }
+ data = bookie.readEntry(request.getLedgerId(), request.getEntryId());
+ LOG.debug("##### Read entry ##### {}", data.remaining());
+ if (null != fenceResult) {
+ // TODO:
+ // currently we don't have readCallback to run in separated read
+ // threads. after BOOKKEEPER-429 is complete, we could improve
+ // following code to make it not wait here
+ //
+ // For now, since we only try to wait after read entry. so writing
+ // to journal and read entry are executed in different thread
+ // it would be fine.
+ try {
+ Boolean fenced = fenceResult.get(1000, TimeUnit.MILLISECONDS);
+ if (null == fenced || !fenced) {
+ // if failed to fence, fail the read request to make it retry.
+ errorCode = BookieProtocol.EIO;
+ data = null;
+ } else {
+ errorCode = BookieProtocol.EOK;
+ }
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupting fence read entry " + read, ie);
+ errorCode = BookieProtocol.EIO;
+ data = null;
+ } catch (ExecutionException ee) {
+ LOG.error("Failed to fence read entry " + read, ee);
+ errorCode = BookieProtocol.EIO;
+ data = null;
+ } catch (TimeoutException te) {
+ LOG.error("Timeout to fence read entry " + read, te);
+ errorCode = BookieProtocol.EIO;
+ data = null;
+ }
+ } else {
+ errorCode = BookieProtocol.EOK;
+ }
+ } catch (Bookie.NoLedgerException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.error("Error reading " + read, e);
+ }
+ errorCode = BookieProtocol.ENOLEDGER;
+ } catch (Bookie.NoEntryException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.error("Error reading " + read, e);
+ }
+ errorCode = BookieProtocol.ENOENTRY;
+ } catch (IOException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.error("Error reading " + read, e);
+ }
+ errorCode = BookieProtocol.EIO;
+ } catch (BookieException e) {
+ LOG.error("Unauthorized access to ledger " + read.getLedgerId(), e);
+ errorCode = BookieProtocol.EUA;
+ }
+
+ LOG.trace("Read entry rc = {} for {}",
+ new Object[] { errorCode, read });
+ if (errorCode == BookieProtocol.EOK) {
+ assert data != null;
+
+ channel.write(ResponseBuilder.buildReadResponse(data, read));
+ long elapsedTime = MathUtils.now() - startTime;
+ BKStats.getInstance().getOpStats(BKStats.STATS_READ).updateLatency(elapsedTime);
+ } else {
+ channel.write(ResponseBuilder.buildErrorResponse(errorCode, read));
+ BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
+ }
+ }
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java?rev=1613315&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java Thu Jul 24 22:34:19 2014
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.util.MathUtils;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+class ReadEntryProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
+
+ private final static Logger LOG = LoggerFactory.getLogger(ReadEntryProcessorV3.class);
+
+ public ReadEntryProcessorV3(Request request, Channel channel, Bookie bookie) {
+ super(request, channel, bookie);
+ }
+
+ private ReadResponse getReadResponse() {
+ long startTime = MathUtils.now();
+ ReadRequest readRequest = request.getReadRequest();
+ long ledgerId = readRequest.getLedgerId();
+ long entryId = readRequest.getEntryId();
+
+ ReadResponse.Builder readResponse = ReadResponse.newBuilder()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId);
+
+ if (!isVersionCompatible()) {
+ readResponse.setStatus(StatusCode.EBADVERSION);
+ BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
+ return readResponse.build();
+ }
+
+ LOG.debug("Received new read request: {}", request);
+ StatusCode status;
+ ByteBuffer entryBody;
+ try {
+ Future<Boolean> fenceResult = null;
+ if (readRequest.hasFlag() && readRequest.getFlag().equals(ReadRequest.Flag.FENCE_LEDGER)) {
+ LOG.warn("Ledger fence request received for ledger: {} from address: {}", ledgerId,
+ channel.getRemoteAddress());
+
+ if (readRequest.hasMasterKey()) {
+ byte[] masterKey = readRequest.getMasterKey().toByteArray();
+ fenceResult = bookie.fenceLedger(ledgerId, masterKey);
+ } else {
+ LOG.error("Fence ledger request received without master key for ledger:{} from address: {}",
+ ledgerId, channel.getRemoteAddress());
+ BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
+ throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
+ }
+ }
+ entryBody = bookie.readEntry(ledgerId, entryId);
+ if (null != fenceResult) {
+ // TODO:
+ // currently we don't have readCallback to run in separated read
+ // threads. after BOOKKEEPER-429 is complete, we could improve
+ // following code to make it not wait here
+ //
+ // For now, since we only try to wait after read entry. so writing
+ // to journal and read entry are executed in different thread
+ // it would be fine.
+ try {
+ Boolean fenced = fenceResult.get(1000, TimeUnit.MILLISECONDS);
+ if (null == fenced || !fenced) {
+ // if failed to fence, fail the read request to make it retry.
+ status = StatusCode.EIO;
+ } else {
+ status = StatusCode.EOK;
+ readResponse.setBody(ByteString.copyFrom(entryBody));
+ }
+ } catch (InterruptedException ie) {
+ LOG.error("Interrupting fence read entry (lid: {}, eid: {})",
+ new Object[] { ledgerId, entryId, ie });
+ status = StatusCode.EIO;
+ } catch (ExecutionException ee) {
+ LOG.error("Failed to fence read entry (lid: {}, eid: {})",
+ new Object[] { ledgerId, entryId, ee });
+ status = StatusCode.EIO;
+ } catch (TimeoutException te) {
+ LOG.error("Timeout to fence read entry (lid: {}, eid: {})",
+ new Object[] { ledgerId, entryId, te });
+ status = StatusCode.EIO;
+ }
+ } else {
+ readResponse.setBody(ByteString.copyFrom(entryBody));
+ status = StatusCode.EOK;
+ }
+ } catch (Bookie.NoLedgerException e) {
+ status = StatusCode.ENOLEDGER;
+ LOG.error("No ledger found while reading entry:{} from ledger: {}", entryId, ledgerId);
+ } catch (Bookie.NoEntryException e) {
+ status = StatusCode.ENOENTRY;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No entry found while reading entry:{} from ledger:{}", entryId, ledgerId);
+ }
+ } catch (IOException e) {
+ status = StatusCode.EIO;
+ LOG.error("IOException while reading entry:{} from ledger:{}", entryId, ledgerId);
+ } catch (BookieException e) {
+ LOG.error("Unauthorized access to ledger:{} while reading entry:{} in request from address: {}",
+ new Object[]{ledgerId, entryId, channel.getRemoteAddress()});
+ status = StatusCode.EUA;
+ }
+
+ if (status == StatusCode.EOK) {
+ long elapsedTime = MathUtils.now() - startTime;
+ BKStats.getInstance().getOpStats(BKStats.STATS_READ).updateLatency(elapsedTime);
+ } else {
+ BKStats.getInstance().getOpStats(BKStats.STATS_READ).incrementFailedOps();
+ }
+
+ // Finally set status and return. The body would have been updated if
+ // a read went through.
+ readResponse.setStatus(status);
+ return readResponse.build();
+ }
+
+ @Override
+ public void run() {
+ ReadResponse readResponse = getReadResponse();
+ Response.Builder response = Response.newBuilder()
+ .setHeader(getHeader())
+ .setStatus(readResponse.getStatus())
+ .setReadResponse(readResponse);
+ channel.write(response.build());
+ }
+}
+
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java?rev=1613315&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java Thu Jul 24 22:34:19 2014
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.MathUtils;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Processes add entry requests
+ */
+class WriteEntryProcessor extends PacketProcessorBase implements WriteCallback {
+
+ private final static Logger LOG = LoggerFactory.getLogger(WriteEntryProcessor.class);
+
+ long startTime;
+
+ public WriteEntryProcessor(Request request, Channel channel, Bookie bookie) {
+ super(request, channel, bookie);
+ }
+
+ @Override
+ protected void processPacket() {
+ assert (request instanceof BookieProtocol.AddRequest);
+ BookieProtocol.AddRequest add = (BookieProtocol.AddRequest) request;
+
+ if (bookie.isReadOnly()) {
+ LOG.warn("BookieServer is running in readonly mode,"
+ + " so rejecting the request from the client!");
+ channel.write(ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, add));
+ BKStats.getInstance().getOpStats(BKStats.STATS_ADD).incrementFailedOps();
+ return;
+ }
+
+ startTime = MathUtils.now();
+ int rc = BookieProtocol.EOK;
+ try {
+ if (add.isRecoveryAdd()) {
+ bookie.recoveryAddEntry(add.getDataAsByteBuffer(),
+ this, channel, add.getMasterKey());
+ } else {
+ bookie.addEntry(add.getDataAsByteBuffer(),
+ this, channel, add.getMasterKey());
+ }
+ } catch (IOException e) {
+ LOG.error("Error writing " + add, e);
+ rc = BookieProtocol.EIO;
+ } catch (BookieException.LedgerFencedException lfe) {
+ LOG.error("Attempt to write to fenced ledger", lfe);
+ rc = BookieProtocol.EFENCED;
+ } catch (BookieException e) {
+ LOG.error("Unauthorized access to ledger " + add.getLedgerId(), e);
+ rc = BookieProtocol.EUA;
+ }
+ if (rc != BookieProtocol.EOK) {
+ channel.write(ResponseBuilder.buildErrorResponse(rc, add));
+ BKStats.getInstance().getOpStats(BKStats.STATS_ADD).incrementFailedOps();
+ }
+ }
+
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId,
+ BookieSocketAddress addr, Object ctx) {
+ channel.write(ResponseBuilder.buildAddResponse(request));
+
+ // compute the latency
+ if (0 == rc) {
+ // for add operations, we compute latency in writeComplete callbacks.
+ long elapsedTime = MathUtils.now() - startTime;
+ BKStats.getInstance().getOpStats(BKStats.STATS_ADD).updateLatency(elapsedTime);
+ } else {
+ BKStats.getInstance().getOpStats(BKStats.STATS_ADD).incrementFailedOps();
+ }
+ }
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java?rev=1613315&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java Thu Jul 24 22:34:19 2014
@@ -0,0 +1,143 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.proto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.Response;
+import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class WriteEntryProcessorV3 extends PacketProcessorBaseV3 implements Runnable {
+ private final static Logger logger = LoggerFactory.getLogger(WriteEntryProcessorV3.class);
+
+ public WriteEntryProcessorV3(Request request, Channel channel, Bookie bookie) {
+ super(request, channel, bookie);
+ }
+
+ // Returns null if there is no exception thrown
+ private AddResponse getAddResponse() {
+ AddRequest addRequest = request.getAddRequest();
+ long ledgerId = addRequest.getLedgerId();
+ long entryId = addRequest.getEntryId();
+
+ final AddResponse.Builder addResponse = AddResponse.newBuilder()
+ .setLedgerId(ledgerId)
+ .setEntryId(entryId);
+
+ if (!isVersionCompatible()) {
+ addResponse.setStatus(StatusCode.EBADVERSION);
+ return addResponse.build();
+ }
+
+ if (bookie.isReadOnly()) {
+ logger.warn("BookieServer is running as readonly mode, so rejecting the request from the client!");
+ addResponse.setStatus(StatusCode.EREADONLY);
+ return addResponse.build();
+ }
+
+ BookkeeperInternalCallbacks.WriteCallback wcb = new BookkeeperInternalCallbacks.WriteCallback() {
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId,
+ BookieSocketAddress addr, Object ctx) {
+ Channel conn = (Channel) ctx;
+ StatusCode status;
+ switch (rc) {
+ case BookieProtocol.EOK:
+ status = StatusCode.EOK;
+ break;
+ case BookieProtocol.EIO:
+ status = StatusCode.EIO;
+ break;
+ default:
+ status = StatusCode.EUA;
+ break;
+ }
+ addResponse.setStatus(status);
+ Response.Builder response = Response.newBuilder()
+ .setHeader(getHeader())
+ .setStatus(addResponse.getStatus())
+ .setAddResponse(addResponse);
+ Response resp = response.build();
+ conn.write(resp);
+ }
+ };
+ StatusCode status = null;
+ byte[] masterKey = addRequest.getMasterKey().toByteArray();
+ ByteBuffer entryToAdd = addRequest.getBody().asReadOnlyByteBuffer();
+ try {
+ if (addRequest.hasFlag() && addRequest.getFlag().equals(AddRequest.Flag.RECOVERY_ADD)) {
+ bookie.recoveryAddEntry(entryToAdd, wcb, channel, masterKey);
+ } else {
+ bookie.addEntry(entryToAdd, wcb, channel, masterKey);
+ }
+ status = StatusCode.EOK;
+ } catch (IOException e) {
+ logger.error("Error writing entry:{} to ledger:{}",
+ new Object[] { entryId, ledgerId, e });
+ status = StatusCode.EIO;
+ } catch (BookieException.LedgerFencedException e) {
+ logger.debug("Ledger fenced while writing entry:{} to ledger:{}",
+ entryId, ledgerId);
+ status = StatusCode.EFENCED;
+ } catch (BookieException e) {
+ logger.error("Unauthorized access to ledger:{} while writing entry:{}",
+ ledgerId, entryId);
+ status = StatusCode.EUA;
+ } catch (Throwable t) {
+ logger.error("Unexpected exception while writing {}@{} : ",
+ new Object[] { entryId, ledgerId, t });
+ // some bad request which cause unexpected exception
+ status = StatusCode.EBADREQ;
+ }
+
+ // If everything is okay, we return null so that the calling function
+ // doesn't return a response back to the caller.
+ if (!status.equals(StatusCode.EOK)) {
+ addResponse.setStatus(status);
+ return addResponse.build();
+ }
+ return null;
+ }
+
+ @Override
+ public void run() {
+ AddResponse addResponse = getAddResponse();
+ if (null != addResponse) {
+ // This means there was an error and we should send this back.
+ Response.Builder response = Response.newBuilder()
+ .setHeader(getHeader())
+ .setStatus(addResponse.getStatus())
+ .setAddResponse(addResponse);
+ Response resp = response.build();
+ channel.write(resp);
+ }
+ }
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto?rev=1613315&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/proto/BookkeeperProtocol.proto Thu Jul 24 22:34:19 2014
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ option java_package = "org.apache.bookkeeper.proto";
+ option optimize_for = SPEED;
+
+/**
+ * Protocol Versions.
+ */
+enum ProtocolVersion {
+ VERSION_ONE = 1;
+ VERSION_TWO = 2;
+ VERSION_THREE = 3;
+}
+
+/**
+ * Status codes.
+ */
+enum StatusCode {
+ EOK = 0;
+
+ // Server side Errors 4xx
+ ENOLEDGER = 402;
+ ENOENTRY = 403;
+ EBADREQ = 404;
+
+ // IO/access errors 5xx
+ EIO = 501;
+ EUA = 502;
+ EBADVERSION = 503;
+ EFENCED = 504;
+ EREADONLY = 505;
+}
+
+/**
+ * Supported operations by this protocol.
+ */
+enum OperationType {
+ READ_ENTRY = 1;
+ ADD_ENTRY = 2;
+ // Not supported yet.
+ RANGE_READ_ENTRY = 3;
+ RANGE_ADD_ENTRY = 4;
+}
+
+/**
+ * Packet header for all requests.
+ */
+message BKPacketHeader {
+ required ProtocolVersion version = 1;
+ required OperationType operation = 2;
+ required uint64 txnId = 3;
+}
+
+message Request {
+ required BKPacketHeader header = 1;
+ // Requests
+ optional ReadRequest readRequest = 100;
+ optional AddRequest addRequest = 101;
+}
+
+message ReadRequest {
+ enum Flag {
+ FENCE_LEDGER = 1;
+ }
+ optional Flag flag = 100;
+ required int64 ledgerId = 1;
+ // entryId will be -1 for reading the LAST_ADD_CONFIRMED entry.
+ required int64 entryId = 2;
+ // Used while fencing a ledger.
+ optional bytes masterKey = 3;
+}
+
+message AddRequest {
+ enum Flag {
+ RECOVERY_ADD = 1;
+ }
+ optional Flag flag = 100;
+ required int64 ledgerId = 1;
+ required int64 entryId = 2;
+ required bytes masterKey = 3;
+ required bytes body = 4;
+}
+
+message Response {
+
+ required BKPacketHeader header = 1;
+ // EOK if the underlying request succeeded. Each individual response
+ // has a more meaningful status. EBADREQ if we have an unsupported request.
+ required StatusCode status = 2;
+ // Response
+ optional ReadResponse readResponse = 100;
+ optional AddResponse addResponse = 101;
+
+}
+
+message ReadResponse {
+ required StatusCode status = 1;
+ required int64 ledgerId = 2;
+ required int64 entryId = 3;
+ optional bytes body = 4;
+}
+
+message AddResponse {
+ required StatusCode status = 1;
+ required int64 ledgerId = 2;
+ required int64 entryId = 3;
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/resources/findbugsExclude.xml Thu Jul 24 22:34:19 2014
@@ -21,6 +21,10 @@
<Class name="~org\.apache\.bookkeeper\.proto\.DataFormats.*" />
</Match>
<Match>
+ <!-- generated code, we can't be held responsible for findbugs in it //-->
+ <Class name="~org\.apache\.bookkeeper\.proto\.BookkeeperProtocol.*" />
+ </Match>
+ <Match>
<!-- it is safe to store external bytes reference here. since we are using
bytes from a slab. //-->
<Class name="org.apache.bookkeeper.bookie.EntryKeyValue" />
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java Thu Jul 24 22:34:19 2014
@@ -226,7 +226,9 @@ public class AuditorPeriodicCheckTest ex
for (int j = 0; j < 100; j++) {
lh.asyncAddEntry("testdata".getBytes(), new AddCallback() {
public void addComplete(int rc2, LedgerHandle lh, long entryId, Object ctx) {
- rc.compareAndSet(BKException.Code.OK, rc2);
+ if (rc.compareAndSet(BKException.Code.OK, rc2)) {
+ LOG.info("Failed to add entry : {}", BKException.getMessage(rc2));
+ }
completeLatch.countDown();
}
}, null);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java Thu Jul 24 22:34:19 2014
@@ -240,7 +240,7 @@ public class BookieClientTest {
synchronized (arc) {
bc.readEntry(addr, 2, 13, recb, arc);
arc.wait(1000);
- assertEquals(BKException.Code.NoSuchEntryException, arc.rc);
+ assertEquals(BKException.Code.NoSuchLedgerExistsException, arc.rc);
}
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java Thu Jul 24 22:34:19 2014
@@ -259,6 +259,103 @@ public class TestBackwardCompat {
}
/**
+ * Version 4.2.0 classes
+ */
+ static class Server420 {
+ org.apache.bk_v4_2_0.bookkeeper.conf.ServerConfiguration conf;
+ org.apache.bk_v4_2_0.bookkeeper.proto.BookieServer server = null;
+
+ Server420(File journalDir, File ledgerDir, int port) throws Exception {
+ conf = new org.apache.bk_v4_2_0.bookkeeper.conf.ServerConfiguration();
+ conf.setBookiePort(port);
+ conf.setZkServers(zkUtil.getZooKeeperConnectString());
+ conf.setJournalDirName(journalDir.getPath());
+ conf.setLedgerDirNames(new String[] { ledgerDir.getPath() });
+ }
+
+ void start() throws Exception {
+ server = new org.apache.bk_v4_2_0.bookkeeper.proto.BookieServer(conf);
+ server.start();
+ waitUp(conf.getBookiePort());
+ }
+
+ org.apache.bk_v4_2_0.bookkeeper.conf.ServerConfiguration getConf() {
+ return conf;
+ }
+
+ void stop() throws Exception {
+ if (server != null) {
+ server.shutdown();
+ }
+ }
+ }
+
+ static class Ledger420 {
+ org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper bk;
+ org.apache.bk_v4_2_0.bookkeeper.client.LedgerHandle lh;
+
+ private Ledger420(org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper bk,
+ org.apache.bk_v4_2_0.bookkeeper.client.LedgerHandle lh) {
+ this.bk = bk;
+ this.lh = lh;
+ }
+
+ static Ledger420 newLedger() throws Exception {
+ org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper newbk
+ = new org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper(zkUtil.getZooKeeperConnectString());
+ org.apache.bk_v4_2_0.bookkeeper.client.LedgerHandle newlh
+ = newbk.createLedger(1, 1,
+ org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper.DigestType.CRC32,
+ "foobar".getBytes());
+ return new Ledger420(newbk, newlh);
+ }
+
+ static Ledger420 openLedger(long id) throws Exception {
+ org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper newbk
+ = new org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper(zkUtil.getZooKeeperConnectString());
+ org.apache.bk_v4_2_0.bookkeeper.client.LedgerHandle newlh
+ = newbk.openLedger(id,
+ org.apache.bk_v4_2_0.bookkeeper.client.BookKeeper.DigestType.CRC32,
+ "foobar".getBytes());
+ return new Ledger420(newbk, newlh);
+ }
+
+ long getId() {
+ return lh.getId();
+ }
+
+ void write100() throws Exception {
+ for (int i = 0; i < 100; i++) {
+ lh.addEntry(ENTRY_DATA);
+ }
+ }
+
+ long readAll() throws Exception {
+ long count = 0;
+ Enumeration<org.apache.bk_v4_2_0.bookkeeper.client.LedgerEntry> entries
+ = lh.readEntries(0, lh.getLastAddConfirmed());
+ while (entries.hasMoreElements()) {
+ assertTrue("entry data doesn't match",
+ Arrays.equals(entries.nextElement().getEntry(), ENTRY_DATA));
+ count++;
+ }
+ return count;
+ }
+
+ void close() throws Exception {
+ try {
+ if (lh != null) {
+ lh.close();
+ }
+ } finally {
+ if (bk != null) {
+ bk.close();
+ }
+ }
+ }
+ }
+
+ /**
* Current verion classes
*/
static class ServerCurrent {
@@ -527,7 +624,12 @@ public class TestBackwardCompat {
// Check that current client can to write to old server
LedgerCurrent lcur = LedgerCurrent.newLedger();
- lcur.write100();
+ try {
+ lcur.write100();
+ fail("Shouldn't be able to write");
+ } catch (Exception e) {
+ // correct behaviour
+ }
lcur.close();
s410.stop();
@@ -619,12 +721,12 @@ public class TestBackwardCompat {
}
/**
- * Test compatability between version 4.1.0 and the current version. - 4.1.0
- * server restarts with useHostNameAsBookieID=true. Read ledgers with old
- * and new clients
+ * Test compatability between old versions and the current version.
+ * - old server restarts with useHostNameAsBookieID=true.
+ * - Read ledgers with old and new clients
*/
@Test(timeout = 60000)
- public void testCompat410ReadLedgerOnRestartedServer() throws Exception {
+ public void testCompatReads() throws Exception {
File journalDir = File.createTempFile("bookie", "journal");
journalDir.delete();
journalDir.mkdir();
@@ -642,10 +744,11 @@ public class TestBackwardCompat {
long oldLedgerId = l410.getId();
l410.close();
- // Check that current client can to write to old server
- LedgerCurrent lcur = LedgerCurrent.newLedger();
- lcur.write100();
- lcur.close();
+ // Check that 420 client can to write to 410 server
+ Ledger420 l420 = Ledger420.newLedger();
+ l420.write100();
+ long lid420 = l420.getId();
+ l420.close();
s410.stop();
@@ -659,19 +762,25 @@ public class TestBackwardCompat {
assertEquals(100, l410.readAll());
l410.close();
+ // Check that 420 client can read old ledgers on new server
+ l420 = Ledger420.openLedger(lid420);
+ assertEquals("Failed to read entries!", 100, l420.readAll());
+ l420.close();
+
// Check that current client can read old ledgers on new server
- final LedgerCurrent curledger = LedgerCurrent.openLedger(lcur.getId());
+ final LedgerCurrent curledger = LedgerCurrent.openLedger(lid420);
assertEquals("Failed to read entries!", 100, curledger.readAll());
curledger.close();
}
/**
- * Test compatability between version 4.1.0 and the current version. - 4.1.0
- * server restarts with useHostNameAsBookieID=true. Write ledgers with old
- * and new clients
+ * Test compatability between version old version and the current version.
+ * - 4.1.0 server restarts with useHostNameAsBookieID=true.
+ * - Write ledgers with old and new clients
+ * - Read ledgers written by old clients.
*/
@Test(timeout = 60000)
- public void testCompat410WriteLedgerOnRestartedServer() throws Exception {
+ public void testCompatWrites() throws Exception {
File journalDir = File.createTempFile("bookie", "journal");
journalDir.delete();
journalDir.mkdir();
@@ -697,15 +806,26 @@ public class TestBackwardCompat {
final LedgerCurrent curledger = LedgerCurrent.openLedger(lcur.getId());
assertEquals("Failed to read entries!", 100, curledger.readAll());
- // Check that current client can write to server
+ // Check that 410 client can write to server
Ledger410 l410 = Ledger410.newLedger();
l410.write100();
long oldLedgerId = l410.getId();
l410.close();
+ // Check that 420 client can write to server
+ Ledger410 l420 = Ledger410.newLedger();
+ l420.write100();
+ long lid420 = l420.getId();
+ l420.close();
+
// check that new client can read old ledgers on new server
LedgerCurrent oldledger = LedgerCurrent.openLedger(oldLedgerId);
assertEquals("Failed to read entries!", 100, oldledger.readAll());
oldledger.close();
+
+ // check that new client can read old ledgers on new server
+ oldledger = LedgerCurrent.openLedger(lid420);
+ assertEquals("Failed to read entries!", 100, oldledger.readAll());
+ oldledger.close();
}
}
Modified: zookeeper/bookkeeper/trunk/compat-deps/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/compat-deps/pom.xml?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/compat-deps/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/compat-deps/pom.xml Thu Jul 24 22:34:19 2014
@@ -33,8 +33,10 @@
<modules>
<module>bookkeeper-server-compat-4.0.0</module>
<module>bookkeeper-server-compat-4.1.0</module>
+ <module>bookkeeper-server-compat-4.2.0</module>
<module>hedwig-server-compat-4.0.0</module>
<module>hedwig-server-compat-4.1.0</module>
+ <module>hedwig-server-compat-4.2.0</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Modified: zookeeper/bookkeeper/trunk/hedwig-server/pom.xml
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/pom.xml?rev=1613315&r1=1613314&r2=1613315&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/pom.xml (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/pom.xml Thu Jul 24 22:34:19 2014
@@ -125,6 +125,30 @@
</dependency>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
+ <artifactId>hedwig-server-compat420</artifactId>
+ <version>4.2.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>bookkeeper-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>hedwig-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>hedwig-protocol</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.bookkeeper</groupId>
+ <artifactId>hedwig-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.bookkeeper</groupId>
<artifactId>hedwig-server-compat410</artifactId>
<version>4.1.0</version>
<scope>test</scope>