You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2015/10/22 09:23:29 UTC
bookkeeper git commit: BOOKKEEPER-867: New Client API to allow
applications pass-in EntryId. (Venkateswararao Jujjuri via sijie)
Repository: bookkeeper
Updated Branches:
refs/heads/master 8a3922e00 -> 355ddbc7c
BOOKKEEPER-867: New Client API to allow applications pass-in EntryId. (Venkateswararao Jujjuri via sijie)
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/355ddbc7
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/355ddbc7
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/355ddbc7
Branch: refs/heads/master
Commit: 355ddbc7c13e39c49a58371d467e87aa80f696d7
Parents: 8a3922e
Author: Sijie Guo <si...@apache.org>
Authored: Thu Oct 22 00:22:36 2015 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Thu Oct 22 00:22:36 2015 -0700
----------------------------------------------------------------------
.../apache/bookkeeper/client/BKException.java | 11 +
.../apache/bookkeeper/client/BookKeeper.java | 93 +++++
.../bookkeeper/client/LedgerCreateOp.java | 15 +-
.../apache/bookkeeper/client/LedgerHandle.java | 111 +++++-
.../bookkeeper/client/LedgerHandleAdv.java | 214 +++++++++++
.../apache/bookkeeper/client/PendingAddOp.java | 17 +
.../client/BookieWriteLedgerTest.java | 365 ++++++++++++++++++-
7 files changed, 815 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index 3991085..b2355cd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -96,6 +96,8 @@ public abstract class BKException extends Exception {
return new BKIllegalOpException();
case Code.AddEntryQuorumTimeoutException:
return new BKAddEntryQuorumTimeoutException();
+ case Code.DuplicateEntryIdException:
+ return new BKDuplicateEntryIdException();
default:
return new BKUnexpectedConditionException();
}
@@ -128,6 +130,7 @@ public abstract class BKException extends Exception {
int ClientClosedException = -19;
int LedgerExistException = -20;
int AddEntryQuorumTimeoutException = -21;
+ int DuplicateEntryIdException = -22;
int IllegalOpException = -100;
int LedgerFencedException = -101;
@@ -192,6 +195,8 @@ public abstract class BKException extends Exception {
return "Bookie protocol version on server is incompatible with client";
case Code.MetadataVersionException:
return "Bad ledger metadata version";
+ case Code.DuplicateEntryIdException:
+ return "Attempted to add Duplicate entryId";
case Code.LedgerFencedException:
return "Ledger has been fenced off. Some other client must have opened it to read";
case Code.UnauthorizedAccessException:
@@ -261,6 +266,12 @@ public abstract class BKException extends Exception {
}
}
+ public static class BKDuplicateEntryIdException extends BKException {
+ public BKDuplicateEntryIdException() {
+ super(Code.DuplicateEntryIdException);
+ }
+ }
+
public static class BKUnexpectedConditionException extends BKException {
public BKUnexpectedConditionException() {
super(Code.UnexpectedConditionException);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 6bb71fa..ed744b0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -537,6 +537,99 @@ public class BookKeeper {
}
/**
+ * Synchronous call to create ledger.
+ * Creates a new ledger asynchronously and returns {@link LedgerHandleAdv} which can accept entryId.
+ * Parameters must match those of
+ * {@link #asyncCreateLedgerAdv(int, int, int, DigestType, byte[],
+ * AsyncCallback.CreateCallback, Object)}
+ *
+ * @param ensSize
+ * @param writeQuorumSize
+ * @param ackQuorumSize
+ * @param digestType
+ * @param passwd
+ * @return a handle to the newly created ledger
+ * @throws InterruptedException
+ * @throws BKException
+ */
+ public LedgerHandle createLedgerAdv(int ensSize, int writeQuorumSize, int ackQuorumSize,
+ DigestType digestType, byte passwd[])
+ throws InterruptedException, BKException {
+ SyncCounter counter = new SyncCounter();
+ counter.inc();
+ /*
+ * Calls asynchronous version
+ */
+ asyncCreateLedgerAdv(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd,
+ new SyncCreateCallback(), counter);
+
+ /*
+ * Wait
+ */
+ counter.block(0);
+ if (counter.getrc() != BKException.Code.OK) {
+ LOG.error("Error while creating ledger : {}", counter.getrc());
+ throw BKException.create(counter.getrc());
+ } else if (counter.getLh() == null) {
+ LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation");
+ throw BKException.create(BKException.Code.UnexpectedConditionException);
+ }
+
+ return counter.getLh();
+ }
+
+ /**
+ * Creates a new ledger asynchronously and returns {@link LedgerHandleAdv}
+ * which can accept entryId. Ledgers created with this call have ability to accept
+ * a separate write quorum and ack quorum size. The write quorum must be larger than
+ * the ack quorum.
+ *
+ * Separating the write and the ack quorum allows the BookKeeper client to continue
+ * writing when a bookie has failed but the failure has not yet been detected. Detecting
+ * a bookie has failed can take a number of seconds, as configured by the read timeout
+ * {@link ClientConfiguration#getReadTimeout()}. Once the bookie failure is detected,
+ * that bookie will be removed from the ensemble.
+ *
+ * The other parameters match those of {@link #asyncCreateLedger(int, int, DigestType, byte[],
+ * AsyncCallback.CreateCallback, Object)}
+ *
+ * @param ensSize
+ * number of bookies over which to stripe entries
+ * @param writeQuorumSize
+ * number of bookies each entry will be written to
+ * @param ackQuorumSize
+ * number of bookies which must acknowledge an entry before the call is completed
+ * @param digestType
+ * digest type, either MAC or CRC32
+ * @param passwd
+ * password
+ * @param cb
+ * createCallback implementation
+ * @param ctx
+ * optional control object
+ */
+ public void asyncCreateLedgerAdv(final int ensSize,
+ final int writeQuorumSize,
+ final int ackQuorumSize,
+ final DigestType digestType,
+ final byte[] passwd, final CreateCallback cb, final Object ctx) {
+ if (writeQuorumSize < ackQuorumSize) {
+ throw new IllegalArgumentException("Write quorum must be larger than ack quorum");
+ }
+ closeLock.readLock().lock();
+ try {
+ if (closed) {
+ cb.createComplete(BKException.Code.ClientClosedException, null, ctx);
+ return;
+ }
+ new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize,
+ ackQuorumSize, digestType, passwd, cb, ctx).initiateAdv();
+ } finally {
+ closeLock.readLock().unlock();
+ }
+ }
+
+ /**
* Open existing ledger asynchronously for reading.
*
* Opening a ledger with this method invokes fencing and recovery on the ledger
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 7c181b5..6f794d0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -54,6 +54,7 @@ class LedgerCreateOp implements GenericCallback<Void> {
DigestType digestType;
long startTime;
OpStatsLogger createOpLogger;
+ boolean adv = false;
/**
* Constructor
@@ -137,6 +138,14 @@ class LedgerCreateOp implements GenericCallback<Void> {
}
/**
+ * Initiates the operation to return LedgerHandleAdv.
+ */
+ public void initiateAdv() {
+ this.adv = true;
+ initiate();
+ }
+
+ /**
* Callback when created ledger.
*/
@Override
@@ -151,7 +160,11 @@ class LedgerCreateOp implements GenericCallback<Void> {
}
try {
- lh = new LedgerHandle(bk, ledgerId, metadata, digestType, passwd);
+ if (adv) {
+ lh = new LedgerHandleAdv(bk, ledgerId, metadata, digestType, passwd);
+ } else {
+ lh = new LedgerHandle(bk, ledgerId, metadata, digestType, passwd);
+ }
} catch (GeneralSecurityException e) {
LOG.error("Security exception while creating ledger: " + ledgerId, e);
createComplete(BKException.Code.DigestNotInitializedException, null);
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 36faac4..61cc603 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -78,7 +78,7 @@ public class LedgerHandle {
final static public long INVALID_ENTRY_ID = BookieProtocol.INVALID_ENTRY_ID;
final AtomicInteger blockAddCompletions = new AtomicInteger(0);
- final Queue<PendingAddOp> pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
+ Queue<PendingAddOp> pendingAddOps;
final Counter ensembleChangeCounter;
final Counter lacUpdateHitsCounter;
@@ -89,6 +89,7 @@ public class LedgerHandle {
throws GeneralSecurityException, NumberFormatException {
this.bk = bk;
this.metadata = metadata;
+ this.pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>();
if (metadata.isClosed()) {
lastAddConfirmed = lastAddPushed = metadata.getLastEntryId();
@@ -473,6 +474,23 @@ public class LedgerHandle {
}
/**
+ * Add entry synchronously to an open ledger. This can be used only with
+ * {@link LedgerHandleAdv} returned through ledgers created with {@link
+ * BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}.
+ *
+ *
+ * @param entryId
+ * entryId to be added
+ * @param data
+ * array of bytes to be written to the ledger
+ * @return the entryId of the new inserted entry
+ */
+ public long addEntry(final long entryId, byte[] data) throws InterruptedException, BKException {
+ LOG.error("To use this feature Ledger must be created with createLedgerAdv interface.");
+ throw BKException.create(BKException.Code.IllegalOpException);
+ }
+
+ /**
* Add entry synchronously to an open ledger.
*
* @param data
@@ -502,6 +520,27 @@ public class LedgerHandle {
}
/**
+ * Add entry synchronously to an open ledger. This can be used only with
+ * {@link LedgerHandleAdv} returned through ledgers created with {@link
+ * BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}.
+ *
+ * @param entryId
+ * entryId to be added.
+ * @param data
+ * array of bytes to be written to the ledger
+ * @param offset
+ * offset from which to take bytes from data
+ * @param length
+ * number of bytes to take from data
+ * @return entryId
+ */
+ public long addEntry(final long entryId, byte[] data, int offset, int length) throws InterruptedException,
+ BKException {
+ LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
+ throw BKException.create(BKException.Code.IllegalOpException);
+ }
+
+ /**
* Add entry asynchronously to an open ledger.
*
* @param data
@@ -517,6 +556,26 @@ public class LedgerHandle {
}
/**
+ * Add entry asynchronously to an open ledger. This can be used only with
+ * {@link LedgerHandleAdv} returned through ledgers created with {@link
+ * BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}.
+ *
+ * @param entryId
+ * entryId to be added
+ * @param data
+ * array of bytes to be written
+ * @param cb
+ * object implementing callbackinterface
+ * @param ctx
+ * some control object
+ */
+ public void asyncAddEntry(final long entryId, final byte[] data, final AddCallback cb, final Object ctx)
+ throws BKException {
+ LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
+ cb.addComplete(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, ctx);
+ }
+
+ /**
* Add entry asynchronously to an open ledger, using an offset and range.
*
* @param data
@@ -539,8 +598,35 @@ public class LedgerHandle {
}
/**
- * Make a recovery add entry request. Recovery adds can add to a ledger even if
- * it has been fenced.
+ * Add entry asynchronously to an open ledger, using an offset and range.
+ * This can be used only with {@link LedgerHandleAdv} returned through
+ * ledgers created with {@link BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}.
+ *
+ * @param entryId
+ * entryId of the entry to add.
+ * @param data
+ * array of bytes to be written
+ * @param offset
+ * offset from which to take bytes from data
+ * @param length
+ * number of bytes to take from data
+ * @param cb
+ * object implementing callbackinterface
+ * @param ctx
+ * some control object
+ * @throws ArrayIndexOutOfBoundsException
+ * if offset or length is negative or offset and length sum to a
+ * value higher than the length of data.
+ */
+ public void asyncAddEntry(final long entryId, final byte[] data, final int offset, final int length,
+ final AddCallback cb, final Object ctx) throws BKException {
+ LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface.");
+ cb.addComplete(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, ctx);
+ }
+
+ /**
+ * Make a recovery add entry request. Recovery adds can add to a ledger even
+ * if it has been fenced.
*
* This is only valid for bookie and ledger recovery, which may need to replicate
* entries to a quorum of bookies to ensure data safety.
@@ -553,13 +639,14 @@ public class LedgerHandle {
doAsyncAddEntry(op, data, offset, length, cb, ctx);
}
- private void doAsyncAddEntry(final PendingAddOp op, final byte[] data, final int offset, final int length,
- final AddCallback cb, final Object ctx) {
+ void doAsyncAddEntry(final PendingAddOp op, final byte[] data, final int offset, final int length,
+ final AddCallback cb, final Object ctx) {
+
if (offset < 0 || length < 0
|| (offset + length) > data.length) {
throw new ArrayIndexOutOfBoundsException(
- "Invalid values for offset("+offset
- +") or length("+length+")");
+ "Invalid values for offset(" +offset
+ +") or length("+length+")");
}
throttler.acquire();
@@ -592,6 +679,7 @@ public class LedgerHandle {
cb.addComplete(BKException.Code.LedgerClosedException,
LedgerHandle.this, INVALID_ENTRY_ID, ctx);
}
+
@Override
public String toString() {
return String.format("AsyncAddEntryToClosedLedger(lid=%d)", ledgerId);
@@ -701,6 +789,7 @@ public class LedgerHandle {
}
ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new ReadLastConfirmedOp.LastConfirmedDataCallback() {
AtomicBoolean completed = new AtomicBoolean(false);
+
@Override
public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
if (rc == BKException.Code.OK) {
@@ -842,11 +931,17 @@ public class LedgerHandle {
// Start from the head of the queue and proceed while there are
// entries that have had all their responses come back
PendingAddOp pendingAddOp;
+
while ((pendingAddOp = pendingAddOps.peek()) != null
&& blockAddCompletions.get() == 0) {
if (!pendingAddOp.completed) {
return;
}
+ // Check if it is the next entry in the sequence.
+ if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != lastAddConfirmed + 1) {
+ LOG.debug("Head of the queue entryId: {} is not lac: {} + 1", pendingAddOp.entryId, lastAddConfirmed);
+ return;
+ }
pendingAddOps.remove();
lastAddConfirmed = pendingAddOp.entryId;
pendingAddOp.submitCallback(BKException.Code.OK);
@@ -1219,7 +1314,7 @@ public class LedgerHandle {
}
}
- private static class SyncAddCallback implements AddCallback {
+ static class SyncAddCallback implements AddCallback {
long entryId = -1;
/**
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
new file mode 100644
index 0000000..00fcfa7
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java
@@ -0,0 +1,214 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.client;
+
+import java.io.Serializable;
+import java.security.GeneralSecurityException;
+import java.util.Comparator;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Ledger Advanced handle extends {@link LedgerHandle} to provide API to add entries with
+ * user supplied entryIds. Through this interface Ledger Length may not be accurate wile the
+ * ledger being written.
+ */
+public class LedgerHandleAdv extends LedgerHandle {
+ final static Logger LOG = LoggerFactory.getLogger(LedgerHandleAdv.class);
+
+ static class PendingOpsComparator implements Comparator<PendingAddOp>, Serializable {
+ public int compare(PendingAddOp o1, PendingAddOp o2) {
+ return Long.compare(o1.entryId, o2.entryId);
+ }
+ }
+
+ LedgerHandleAdv(BookKeeper bk, long ledgerId, LedgerMetadata metadata, DigestType digestType, byte[] password)
+ throws GeneralSecurityException, NumberFormatException {
+ super(bk, ledgerId, metadata, digestType, password);
+ pendingAddOps = new PriorityBlockingQueue<PendingAddOp>(10, new PendingOpsComparator());
+ }
+
+
+ /**
+ * Add entry synchronously to an open ledger.
+ *
+ * @param entryId
+ * entryId of the entry to add
+ * @param data
+ * array of bytes to be written to the ledger
+ * @return
+ * entryId that is just created.
+ */
+ @Override
+ public long addEntry(final long entryId, byte[] data) throws InterruptedException, BKException {
+
+ return addEntry(entryId, data, 0, data.length);
+
+ }
+
+ /**
+ * Add entry synchronously to an open ledger.
+ *
+ * @param entryId
+ * entryId of the entry to add
+ * @param data
+ * array of bytes to be written to the ledger
+ * @param offset
+ * offset from which to take bytes from data
+ * @param length
+ * number of bytes to take from data
+ * @return The entryId of newly inserted entry.
+ */
+ @Override
+ public long addEntry(final long entryId, byte[] data, int offset, int length) throws InterruptedException,
+ BKException {
+ LOG.debug("Adding entry {}", data);
+
+ SyncCounter counter = new SyncCounter();
+ counter.inc();
+
+ SyncAddCallback callback = new SyncAddCallback();
+ asyncAddEntry(entryId, data, offset, length, callback, counter);
+
+ counter.block(0);
+
+ if (counter.getrc() != BKException.Code.OK) {
+ throw BKException.create(counter.getrc());
+ }
+ return callback.entryId;
+ }
+
+ /**
+ * Add entry asynchronously to an open ledger.
+ *
+ * @param entryId
+ * entryId of the entry to add
+ * @param data
+ * array of bytes to be written
+ * @param cb
+ * object implementing callbackinterface
+ * @param ctx
+ * some control object
+ */
+ @Override
+ public void asyncAddEntry(long entryId, byte[] data, AddCallback cb, Object ctx) throws BKException {
+ asyncAddEntry(entryId, data, 0, data.length, cb, ctx);
+ }
+
+ /**
+ * Add entry asynchronously to an open ledger, using an offset and range.
+ *
+ * @param entryId
+ * entryId of the entry to add
+ * @param data
+ * array of bytes to be written
+ * @param offset
+ * offset from which to take bytes from data
+ * @param length
+ * number of bytes to take from data
+ * @param cb
+ * object implementing callbackinterface
+ * @param ctx
+ * some control object
+ * @throws ArrayIndexOutOfBoundsException
+ * if offset or length is negative or offset and length sum to a
+ * value higher than the length of data.
+ */
+
+ public void asyncAddEntry(final long entryId, final byte[] data, final int offset, final int length,
+ final AddCallback cb, final Object ctx) {
+ PendingAddOp op = new PendingAddOp(this, cb, ctx);
+ op.setEntryId(entryId);
+ if ((entryId <= this.lastAddConfirmed) || pendingAddOps.contains(op)) {
+ LOG.error("Trying to re-add duplicate entryid:{}", entryId);
+ cb.addComplete(BKException.Code.DuplicateEntryIdException,
+ LedgerHandleAdv.this, entryId, ctx);
+ return;
+ }
+ pendingAddOps.add(op);
+
+ doAsyncAddEntry(op, data, offset, length, cb, ctx);
+ }
+
+ /**
+ * Overriding part is mostly around setting entryId.
+ * Though there may be some code duplication, Choose to have the override routine so the control flow is
+ * unaltered in the base class.
+ */
+ @Override
+ void doAsyncAddEntry(final PendingAddOp op, final byte[] data, final int offset, final int length,
+ final AddCallback cb, final Object ctx) {
+ if (offset < 0 || length < 0
+ || (offset + length) > data.length) {
+ throw new ArrayIndexOutOfBoundsException(
+ "Invalid values for offset("+offset
+ +") or length("+length+")");
+ }
+ throttler.acquire();
+
+ if (metadata.isClosed()) {
+ // make sure the callback is triggered in main worker pool
+ try {
+ bk.mainWorkerPool.submit(new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
+ cb.addComplete(BKException.Code.LedgerClosedException,
+ LedgerHandleAdv.this, op.getEntryId(), ctx);
+ }
+ @Override
+ public String toString() {
+ return String.format("AsyncAddEntryToClosedLedger(lid=%d)", ledgerId);
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
+ LedgerHandleAdv.this, op.getEntryId(), ctx);
+ }
+ return;
+ }
+
+ try {
+ final long currentLength = addToLength(length);
+
+ bk.mainWorkerPool.submit(new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
+ op.getEntryId(), lastAddConfirmed, currentLength, data, offset, length);
+ op.initiate(toSend, length);
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
+ LedgerHandleAdv.this, op.getEntryId(), ctx);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 4034c35..bc487f6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -91,6 +91,10 @@ class PendingAddOp implements WriteCallback, TimerTask {
writeSet = new HashSet<Integer>(lh.distributionSchedule.getWriteSet(entryId));
}
+ long getEntryId() {
+ return this.entryId;
+ }
+
void sendWriteRequest(int bookieIndex) {
int flags = isRecoveryAdd ? BookieProtocol.FLAG_RECOVERY_ADD : BookieProtocol.FLAG_NONE;
@@ -249,4 +253,17 @@ class PendingAddOp implements WriteCallback, TimerTask {
return sb.toString();
}
+ @Override
+ public int hashCode() {
+ return (int) entryId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof PendingAddOp) {
+ return (this.entryId == ((PendingAddOp)o).entryId);
+ }
+ return (this == o);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/355ddbc7/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 7b77c48..692c480 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -131,6 +131,48 @@ public class BookieWriteLedgerTest extends
}
/**
+ * Verify the functionality of Advanced Ledger which returns
+ * LedgerHandleAdv. LedgerHandleAdv takes entryId for addEntry, and let
+ * user manage entryId allocation.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 60000)
+ public void testLedgerCreateAdv() throws Exception {
+ // Create a ledger
+ lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+ for (int i = 0; i < numEntriesToWrite; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+
+ entries1.add(entry.array());
+ lh.addEntry(i, entry.array());
+ }
+ // Start one more bookies
+ startNewBookie();
+
+ // Shutdown one bookie in the last ensemble and continue writing
+ ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+ .getValue();
+ killBookie(ensemble.get(0));
+
+ int i = numEntriesToWrite;
+ numEntriesToWrite = numEntriesToWrite + 50;
+ for (; i < numEntriesToWrite; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+
+ entries1.add(entry.array());
+ lh.addEntry(i, entry.array());
+ }
+
+ readEntries(lh, entries1);
+ lh.close();
+ }
+
+ /**
* Verify asynchronous writing when few bookie failures in last ensemble.
*/
@Test(timeout=60000)
@@ -203,8 +245,327 @@ public class BookieWriteLedgerTest extends
lh2.close();
}
- private void readEntries(LedgerHandle lh, ArrayList<byte[]> entries)
- throws InterruptedException, BKException {
+ /**
+ * Verify Advanced asynchronous writing with entryIds in reverse order
+ */
+ @Test(timeout = 60000)
+ public void testLedgerCreateAdvWithAsyncWritesWithBookieFailures() throws Exception {
+ // Create ledgers
+ lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+ lh2 = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+
+ LOG.info("Ledger ID-1: " + lh.getId());
+ LOG.info("Ledger ID-2: " + lh2.getId());
+ SyncObj syncObj1 = new SyncObj();
+ SyncObj syncObj2 = new SyncObj();
+ for (int i = numEntriesToWrite - 1; i >= 0; i--) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+ try {
+ entries1.add(0, entry.array());
+ entries2.add(0, entry.array());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj1);
+ lh2.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj2);
+ }
+ // Start One more bookie and shutdown one from last ensemble before reading
+ startNewBookie();
+ ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+ .getValue();
+ killBookie(ensemble.get(0));
+
+ // Wait for all entries to be acknowledged for the first ledger
+ synchronized (syncObj1) {
+ while (syncObj1.counter < numEntriesToWrite) {
+ syncObj1.wait();
+ }
+ assertEquals(BKException.Code.OK, syncObj1.rc);
+ }
+ // Wait for all entries to be acknowledged for the second ledger
+ synchronized (syncObj2) {
+ while (syncObj2.counter < numEntriesToWrite) {
+ syncObj2.wait();
+ }
+ assertEquals(BKException.Code.OK, syncObj2.rc);
+ }
+
+ // Reading ledger till the last entry
+ readEntries(lh, entries1);
+ readEntries(lh2, entries2);
+ lh.close();
+ lh2.close();
+ }
+
+ /**
+ * Verify Advanced asynchronous writing with entryIds in pseudo random order with bookie failures between writes
+ */
+ @Test(timeout = 60000)
+ public void testLedgerCreateAdvWithRandomAsyncWritesWithBookieFailuresBetweenWrites() throws Exception {
+ // Create ledgers
+ lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+ lh2 = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+
+ LOG.info("Ledger ID-1: " + lh.getId());
+ LOG.info("Ledger ID-2: " + lh2.getId());
+ SyncObj syncObj1 = new SyncObj();
+ SyncObj syncObj2 = new SyncObj();
+ int batchSize = 5;
+ int i, j;
+
+ // Fill the result buffers first
+ for (i = 0; i < numEntriesToWrite; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+ try {
+ entries1.add(0, entry.array());
+ entries2.add(0, entry.array());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ for (i = 0; i < batchSize; i++) {
+ for (j = i; j < numEntriesToWrite; j = j + batchSize) {
+ byte[] entry1 = entries1.get(j);
+ byte[] entry2 = entries2.get(j);
+ lh.asyncAddEntry(j, entry1, 0, entry1.length, this, syncObj1);
+ lh2.asyncAddEntry(j, entry2, 0, entry2.length, this, syncObj2);
+ if (j == numEntriesToWrite/2) {
+ // Start One more bookie and shutdown one from last ensemble at half-way
+ startNewBookie();
+ ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet()
+ .iterator().next().getValue();
+ killBookie(ensemble.get(0));
+ }
+ }
+ }
+
+ // Wait for all entries to be acknowledged for the first ledger
+ synchronized (syncObj1) {
+ while (syncObj1.counter < numEntriesToWrite) {
+ syncObj1.wait();
+ }
+ assertEquals(BKException.Code.OK, syncObj1.rc);
+ }
+ // Wait for all entries to be acknowledged for the second ledger
+ synchronized (syncObj2) {
+ while (syncObj2.counter < numEntriesToWrite) {
+ syncObj2.wait();
+ }
+ assertEquals(BKException.Code.OK, syncObj2.rc);
+ }
+
+ // Reading ledger till the last entry
+ readEntries(lh, entries1);
+ readEntries(lh2, entries2);
+ lh.close();
+ lh2.close();
+ }
+
+ /**
+ * Verify Advanced asynchronous writing with entryIds in pseudo random order
+ */
+ @Test(timeout = 60000)
+ public void testLedgerCreateAdvWithRandomAsyncWritesWithBookieFailures() throws Exception {
+ // Create ledgers
+ lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+ lh2 = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+
+ LOG.info("Ledger ID-1: " + lh.getId());
+ LOG.info("Ledger ID-2: " + lh2.getId());
+ SyncObj syncObj1 = new SyncObj();
+ SyncObj syncObj2 = new SyncObj();
+ int batchSize = 5;
+ int i, j;
+
+ // Fill the result buffers first
+ for (i = 0; i < numEntriesToWrite; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+ try {
+ entries1.add(0, entry.array());
+ entries2.add(0, entry.array());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ for (i = 0; i < batchSize; i++) {
+ for (j = i; j < numEntriesToWrite; j = j + batchSize) {
+ byte[] entry1 = entries1.get(j);
+ byte[] entry2 = entries2.get(j);
+ lh.asyncAddEntry(j, entry1, 0, entry1.length, this, syncObj1);
+ lh2.asyncAddEntry(j, entry2, 0, entry2.length, this, syncObj2);
+ }
+ }
+ // Start One more bookie and shutdown one from last ensemble before reading
+ startNewBookie();
+ ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next()
+ .getValue();
+ killBookie(ensemble.get(0));
+
+ // Wait for all entries to be acknowledged for the first ledger
+ synchronized (syncObj1) {
+ while (syncObj1.counter < numEntriesToWrite) {
+ syncObj1.wait();
+ }
+ assertEquals(BKException.Code.OK, syncObj1.rc);
+ }
+ // Wait for all entries to be acknowledged for the second ledger
+ synchronized (syncObj2) {
+ while (syncObj2.counter < numEntriesToWrite) {
+ syncObj2.wait();
+ }
+ assertEquals(BKException.Code.OK, syncObj2.rc);
+ }
+
+ // Reading ledger till the last entry
+ readEntries(lh, entries1);
+ readEntries(lh2, entries2);
+ lh.close();
+ lh2.close();
+ }
+
+ /**
+ * Skips few entries before closing the ledger and assert that the
+ * lastAddConfirmed is right before our skipEntryId.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 60000)
+ public void testLedgerCreateAdvWithSkipEntries() throws Exception {
+ long ledgerId;
+ SyncObj syncObj1 = new SyncObj();
+
+ // Create a ledger
+ lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+ // Save ledgerId to reopen the ledger
+ ledgerId = lh.getId();
+ LOG.info("Ledger ID: " + ledgerId);
+ int skipEntryId = rng.nextInt(numEntriesToWrite - 1);
+ for (int i = numEntriesToWrite - 1; i >= 0; i--) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+ try {
+ entries1.add(0, entry.array());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ if (i == skipEntryId) {
+ LOG.info("Skipping entry:{}", skipEntryId);
+ continue;
+ }
+ lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj1);
+ }
+ // wait for all entries to be acknowledged for the first ledger
+ synchronized (syncObj1) {
+ while (syncObj1.counter < skipEntryId) {
+ syncObj1.wait();
+ }
+ assertEquals(BKException.Code.OK, syncObj1.rc);
+ }
+ // Close the ledger
+ lh.close();
+ // Open the ledger
+ lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+ assertEquals(lh.lastAddConfirmed, skipEntryId - 1);
+ lh.close();
+ }
+
+ /**
+ * Verify the functionality LedgerHandleAdv addEntry with duplicate entryIds
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 60000)
+ public void testLedgerCreateAdvSyncAddDuplicateEntryIds() throws Exception {
+ // Create a ledger
+ lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+ LOG.info("Ledger ID: " + lh.getId());
+ for (int i = 0; i < numEntriesToWrite; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+
+ entries1.add(entry.array());
+ lh.addEntry(i, entry.array());
+ entry.position(0);
+ }
+ readEntries(lh, entries1);
+
+ int dupEntryId = rng.nextInt(numEntriesToWrite - 1);
+
+ try {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+ lh.addEntry(dupEntryId, entry.array());
+ fail("Expected exception not thrown");
+ } catch (BKException e) {
+ // This test expects DuplicateEntryIdException
+ assertEquals(e.getCode(), BKException.Code.DuplicateEntryIdException);
+ }
+ lh.close();
+ }
+
+ /**
+ * Verify the functionality LedgerHandleAdv asyncAddEntry with duplicate
+ * entryIds
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 60000)
+ public void testLedgerCreateAdvSyncAsyncAddDuplicateEntryIds() throws Exception {
+ long ledgerId;
+ SyncObj syncObj1 = new SyncObj();
+ SyncObj syncObj2 = new SyncObj();
+
+ // Create a ledger
+ lh = bkc.createLedgerAdv(5, 3, 2, digestType, ledgerPassword);
+ // Save ledgerId to reopen the ledger
+ ledgerId = lh.getId();
+ LOG.info("Ledger ID: " + ledgerId);
+ for (int i = numEntriesToWrite - 1; i >= 0; i--) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+ try {
+ entries1.add(0, entry.array());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj1);
+ if (rng.nextBoolean()) {
+ // Attempt to write the same entry
+ lh.asyncAddEntry(i, entry.array(), 0, entry.capacity(), this, syncObj2);
+ synchronized (syncObj2) {
+ while (syncObj2.counter < 1) {
+ syncObj2.wait();
+ }
+ assertEquals(BKException.Code.DuplicateEntryIdException, syncObj2.rc);
+ }
+ }
+ }
+ // Wait for all entries to be acknowledged for the first ledger
+ synchronized (syncObj1) {
+ while (syncObj1.counter < numEntriesToWrite) {
+ syncObj1.wait();
+ }
+ assertEquals(BKException.Code.OK, syncObj1.rc);
+ }
+ // Close the ledger
+ lh.close();
+ }
+
+ private void readEntries(LedgerHandle lh, ArrayList<byte[]> entries) throws InterruptedException, BKException {
ls = lh.readEntries(0, numEntriesToWrite - 1);
int index = 0;
while (ls.hasMoreElements()) {