You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/14 11:53:13 UTC
svn commit: r1384700 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/util/
Author: ivank
Date: Fri Sep 14 09:53:13 2012
New Revision: 1384700
URL: http://svn.apache.org/viewvc?rev=1384700&view=rev
Log:
BOOKKEEPER-403: ReReadMetadataCb is not executed in the thread responsible for that ledger (ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1384700&r1=1384699&r2=1384700&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Sep 14 09:53:13 2012
@@ -84,6 +84,8 @@ Trunk (unreleased changes)
BOOKKEEPER-396: Compilation issue in TestClient.java of BenchMark ( showing this in eclipse) (umamahesh via sijie)
+ BOOKKEEPER-403: ReReadMetadataCb is not executed in the thread responsible for that ledger (ivank)
+
hedwig-protocol:
BOOKKEEPER-394: CompositeException message is not useful (Stu Hood via sijie)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java?rev=1384700&r1=1384699&r2=1384700&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java Fri Sep 14 09:53:13 2012
@@ -22,7 +22,7 @@
package org.apache.bookkeeper.client;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
* Encapsulates asynchronous ledger delete operation
*
*/
-class LedgerDeleteOp implements GenericCallback<Void> {
+class LedgerDeleteOp extends OrderedSafeGenericCallback<Void> {
static final Logger LOG = LoggerFactory.getLogger(LedgerDeleteOp.class);
@@ -52,6 +52,7 @@ class LedgerDeleteOp implements GenericC
* optional control object
*/
LedgerDeleteOp(BookKeeper bk, long ledgerId, DeleteCallback cb, Object ctx) {
+ super(bk.mainWorkerPool, ledgerId);
this.bk = bk;
this.ledgerId = ledgerId;
this.cb = cb;
@@ -70,8 +71,8 @@ class LedgerDeleteOp implements GenericC
/**
* Implements Delete Callback.
*/
- public void operationComplete(int rc, Void result) {
+ @Override
+ public void safeOperationComplete(int rc, Void result) {
cb.deleteComplete(rc, this.ctx);
}
-
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java?rev=1384700&r1=1384699&r2=1384700&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java Fri Sep 14 09:53:13 2012
@@ -30,6 +30,8 @@ import org.apache.bookkeeper.proto.Booki
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -261,9 +263,10 @@ public class LedgerFragmentReplicator {
// will have updated the stat
// other operations such as (addEnsemble) would update it
// too.
- lh.rereadMetadata(new GenericCallback<LedgerMetadata>() {
+ lh.rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(lh.bk.mainWorkerPool,
+ lh.getId()) {
@Override
- public void operationComplete(int rc,
+ public void safeOperationComplete(int rc,
LedgerMetadata newMeta) {
if (rc != BKException.Code.OK) {
LOG
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1384700&r1=1384699&r2=1384700&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Fri Sep 14 09:53:13 2012
@@ -40,6 +40,8 @@ import org.apache.bookkeeper.client.BKEx
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
import org.apache.bookkeeper.util.SafeRunnable;
@@ -284,13 +286,18 @@ public class LedgerHandle {
+ metadata.getLastEntryId() + " with this many bytes: " + metadata.getLength());
}
- final class CloseCb implements GenericCallback<Void> {
+ final class CloseCb extends OrderedSafeGenericCallback<Void> {
+ CloseCb() {
+ super(bk.mainWorkerPool, ledgerId);
+ }
+
@Override
- public void operationComplete(final int rc, Void result) {
+ public void safeOperationComplete(final int rc, Void result) {
if (rc == BKException.Code.MetadataVersionException) {
- rereadMetadata(new GenericCallback<LedgerMetadata>() {
+ rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.mainWorkerPool,
+ ledgerId) {
@Override
- public void operationComplete(int newrc, LedgerMetadata newMeta) {
+ public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
if (newrc != BKException.Code.OK) {
LOG.error("Error reading new metadata from ledger " + ledgerId
+ " when closing, code=" + newrc);
@@ -714,37 +721,32 @@ public class LedgerHandle {
* reformed ensemble. On MetadataVersionException, will reread latest
* ledgerMetadata and act upon.
*/
- private final class ChangeEnsembleCb implements GenericCallback<Void> {
+ private final class ChangeEnsembleCb extends OrderedSafeGenericCallback<Void> {
private final EnsembleInfo ensembleInfo;
ChangeEnsembleCb(EnsembleInfo ensembleInfo) {
+ super(bk.mainWorkerPool, ledgerId);
this.ensembleInfo = ensembleInfo;
}
@Override
- public void operationComplete(final int rc, Void result) {
-
- bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
- @Override
- public void safeRun() {
- if (rc == BKException.Code.MetadataVersionException) {
- rereadMetadata(new ReReadLedgerMetadataCb(rc,
- ensembleInfo));
- return;
- } else if (rc != BKException.Code.OK) {
- LOG.error("Could not persist ledger metadata while "
- + "changing ensemble to: "
- + ensembleInfo.newEnsemble
- + " , closing ledger");
- handleUnrecoverableErrorDuringAdd(rc);
- return;
- }
- blockAddCompletions.decrementAndGet();
+ public void safeOperationComplete(final int rc, Void result) {
+ if (rc == BKException.Code.MetadataVersionException) {
+ rereadMetadata(new ReReadLedgerMetadataCb(rc,
+ ensembleInfo));
+ return;
+ } else if (rc != BKException.Code.OK) {
+ LOG.error("Could not persist ledger metadata while "
+ + "changing ensemble to: "
+ + ensembleInfo.newEnsemble
+ + " , closing ledger");
+ handleUnrecoverableErrorDuringAdd(rc);
+ return;
+ }
+ blockAddCompletions.decrementAndGet();
- // the failed bookie has been replaced
- unsetSuccessAndSendWriteRequest(ensembleInfo.bookieIndex);
- }
- });
+ // the failed bookie has been replaced
+ unsetSuccessAndSendWriteRequest(ensembleInfo.bookieIndex);
}
};
@@ -752,18 +754,18 @@ public class LedgerHandle {
* Callback which is reading the ledgerMetadata present in zk. This will try
* to resolve the version conflicts.
*/
- private final class ReReadLedgerMetadataCb implements
- GenericCallback<LedgerMetadata> {
+ private final class ReReadLedgerMetadataCb extends OrderedSafeGenericCallback<LedgerMetadata> {
private final int rc;
private final EnsembleInfo ensembleInfo;
ReReadLedgerMetadataCb(int rc, EnsembleInfo ensembleInfo) {
+ super(bk.mainWorkerPool, ledgerId);
this.rc = rc;
this.ensembleInfo = ensembleInfo;
}
@Override
- public void operationComplete(int newrc, LedgerMetadata newMeta) {
+ public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
if (newrc != BKException.Code.OK) {
LOG.error("Error reading new metadata from ledger "
+ "after changing ensemble, code=" + newrc);
@@ -858,13 +860,14 @@ public class LedgerHandle {
metadata.markLedgerInRecovery();
- writeLedgerConfig(new GenericCallback<Void>() {
+ writeLedgerConfig(new OrderedSafeGenericCallback<Void>(bk.mainWorkerPool, ledgerId) {
@Override
- public void operationComplete(final int rc, Void result) {
+ public void safeOperationComplete(final int rc, Void result) {
if (rc == BKException.Code.MetadataVersionException) {
- rereadMetadata(new GenericCallback<LedgerMetadata>() {
+ rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.mainWorkerPool,
+ ledgerId) {
@Override
- public void operationComplete(int rc, LedgerMetadata newMeta) {
+ public void safeOperationComplete(int rc, LedgerMetadata newMeta) {
if (rc != BKException.Code.OK) {
cb.operationComplete(rc, null);
} else {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=1384700&r1=1384699&r2=1384700&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java Fri Sep 14 09:53:13 2012
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.client.Asyn
import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -155,9 +156,9 @@ class LedgerOpenOp implements GenericCal
}
if (doRecovery) {
- lh.recover(new GenericCallback<Void>() {
+ lh.recover(new OrderedSafeGenericCallback<Void>(bk.mainWorkerPool, ledgerId) {
@Override
- public void operationComplete(int rc, Void result) {
+ public void safeOperationComplete(int rc, Void result) {
if (rc == BKException.Code.OK) {
cb.openComplete(BKException.Code.OK, lh, LedgerOpenOp.this.ctx);
} else if (rc == BKException.Code.UnauthorizedAccessException) {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java?rev=1384700&r1=1384699&r2=1384700&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java Fri Sep 14 09:53:13 2012
@@ -23,6 +23,8 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+
/**
* This class provides 2 things over the java {@link ScheduledExecutorService}.
*
@@ -95,4 +97,35 @@ public class OrderedSafeExecutor {
}
}
+ /**
+ * Generic callback implementation which will run the
+ * callback in the thread which matches the ordering key
+ */
+ public static abstract class OrderedSafeGenericCallback<T>
+ implements GenericCallback<T> {
+ private final OrderedSafeExecutor executor;
+ private final Object orderingKey;
+
+ /**
+ * @param executor The executor on which to run the callback
+ * @param orderingKey Key used to decide which thread the callback
+ * should run on.
+ */
+ public OrderedSafeGenericCallback(OrderedSafeExecutor executor, Object orderingKey) {
+ this.executor = executor;
+ this.orderingKey = orderingKey;
+ }
+
+ @Override
+ public final void operationComplete(final int rc, final T result) {
+ executor.submitOrdered(orderingKey, new SafeRunnable() {
+ @Override
+ public void safeRun() {
+ safeOperationComplete(rc, result);
+ }
+ });
+ }
+
+ public abstract void safeOperationComplete(int rc, T result);
+ }
}