You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/09/14 11:53:13 UTC

svn commit: r1384700 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/util/

Author: ivank
Date: Fri Sep 14 09:53:13 2012
New Revision: 1384700

URL: http://svn.apache.org/viewvc?rev=1384700&view=rev
Log:
BOOKKEEPER-403: ReReadMetadataCb is not executed in the thread responsible for that ledger (ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1384700&r1=1384699&r2=1384700&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Sep 14 09:53:13 2012
@@ -84,6 +84,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-396: Compilation issue in TestClient.java of BenchMark ( showing this in eclipse) (umamahesh via sijie)
 
+        BOOKKEEPER-403: ReReadMetadataCb is not executed in the thread responsible for that ledger (ivank)
+
       hedwig-protocol:
 
         BOOKKEEPER-394: CompositeException message is not useful (Stu Hood via sijie)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java?rev=1384700&r1=1384699&r2=1384700&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java Fri Sep 14 09:53:13 2012
@@ -22,7 +22,7 @@
 package org.apache.bookkeeper.client;
 
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
  * Encapsulates asynchronous ledger delete operation
  *
  */
-class LedgerDeleteOp implements GenericCallback<Void> {
+class LedgerDeleteOp extends OrderedSafeGenericCallback<Void> {
 
     static final Logger LOG = LoggerFactory.getLogger(LedgerDeleteOp.class);
 
@@ -52,6 +52,7 @@ class LedgerDeleteOp implements GenericC
      *            optional control object
      */
     LedgerDeleteOp(BookKeeper bk, long ledgerId, DeleteCallback cb, Object ctx) {
+        super(bk.mainWorkerPool, ledgerId);
         this.bk = bk;
         this.ledgerId = ledgerId;
         this.cb = cb;
@@ -70,8 +71,8 @@ class LedgerDeleteOp implements GenericC
     /**
      * Implements Delete Callback.
      */
-    public void operationComplete(int rc, Void result) {
+    @Override
+    public void safeOperationComplete(int rc, Void result) {
         cb.deleteComplete(rc, this.ctx);
     }
-
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java?rev=1384700&r1=1384699&r2=1384700&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java Fri Sep 14 09:53:13 2012
@@ -30,6 +30,8 @@ import org.apache.bookkeeper.proto.Booki
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException.Code;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -261,9 +263,10 @@ public class LedgerFragmentReplicator {
                     // will have updated the stat
                     // other operations such as (addEnsemble) would update it
                     // too.
-                    lh.rereadMetadata(new GenericCallback<LedgerMetadata>() {
+                    lh.rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(lh.bk.mainWorkerPool,
+                                                                                     lh.getId()) {
                         @Override
-                        public void operationComplete(int rc,
+                        public void safeOperationComplete(int rc,
                                 LedgerMetadata newMeta) {
                             if (rc != BKException.Code.OK) {
                                 LOG

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1384700&r1=1384699&r2=1384700&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Fri Sep 14 09:53:13 2012
@@ -40,6 +40,8 @@ import org.apache.bookkeeper.client.BKEx
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
 import org.apache.bookkeeper.util.SafeRunnable;
@@ -284,13 +286,18 @@ public class LedgerHandle {
                               + metadata.getLastEntryId() + " with this many bytes: " + metadata.getLength());
                 }
 
-                final class CloseCb implements GenericCallback<Void> {
+                final class CloseCb extends OrderedSafeGenericCallback<Void> {
+                    CloseCb() {
+                        super(bk.mainWorkerPool, ledgerId);
+                    }
+
                     @Override
-                    public void operationComplete(final int rc, Void result) {
+                    public void safeOperationComplete(final int rc, Void result) {
                         if (rc == BKException.Code.MetadataVersionException) {
-                            rereadMetadata(new GenericCallback<LedgerMetadata>() {
+                            rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.mainWorkerPool,
+                                                                                          ledgerId) {
                                 @Override
-                                public void operationComplete(int newrc, LedgerMetadata newMeta) {
+                                public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
                                     if (newrc != BKException.Code.OK) {
                                         LOG.error("Error reading new metadata from ledger " + ledgerId
                                                   + " when closing, code=" + newrc);
@@ -714,37 +721,32 @@ public class LedgerHandle {
      * reformed ensemble. On MetadataVersionException, will reread latest
      * ledgerMetadata and act upon.
      */
-    private final class ChangeEnsembleCb implements GenericCallback<Void> {
+    private final class ChangeEnsembleCb extends OrderedSafeGenericCallback<Void> {
         private final EnsembleInfo ensembleInfo;
 
         ChangeEnsembleCb(EnsembleInfo ensembleInfo) {
+            super(bk.mainWorkerPool, ledgerId);
             this.ensembleInfo = ensembleInfo;
         }
 
         @Override
-        public void operationComplete(final int rc, Void result) {
-
-            bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
-                @Override
-                public void safeRun() {
-                    if (rc == BKException.Code.MetadataVersionException) {
-                        rereadMetadata(new ReReadLedgerMetadataCb(rc,
-                                ensembleInfo));
-                        return;
-                    } else if (rc != BKException.Code.OK) {
-                        LOG.error("Could not persist ledger metadata while "
-                                + "changing ensemble to: "
-                                + ensembleInfo.newEnsemble
-                                + " , closing ledger");
-                        handleUnrecoverableErrorDuringAdd(rc);
-                        return;
-                    }
-                    blockAddCompletions.decrementAndGet();
+        public void safeOperationComplete(final int rc, Void result) {
+            if (rc == BKException.Code.MetadataVersionException) {
+                rereadMetadata(new ReReadLedgerMetadataCb(rc,
+                                       ensembleInfo));
+                return;
+            } else if (rc != BKException.Code.OK) {
+                LOG.error("Could not persist ledger metadata while "
+                          + "changing ensemble to: "
+                          + ensembleInfo.newEnsemble
+                          + " , closing ledger");
+                handleUnrecoverableErrorDuringAdd(rc);
+                return;
+            }
+            blockAddCompletions.decrementAndGet();
 
-                    // the failed bookie has been replaced
-                    unsetSuccessAndSendWriteRequest(ensembleInfo.bookieIndex);
-                }
-            });
+            // the failed bookie has been replaced
+            unsetSuccessAndSendWriteRequest(ensembleInfo.bookieIndex);
         }
     };
 
@@ -752,18 +754,18 @@ public class LedgerHandle {
      * Callback which is reading the ledgerMetadata present in zk. This will try
      * to resolve the version conflicts.
      */
-    private final class ReReadLedgerMetadataCb implements
-            GenericCallback<LedgerMetadata> {
+    private final class ReReadLedgerMetadataCb extends OrderedSafeGenericCallback<LedgerMetadata> {
         private final int rc;
         private final EnsembleInfo ensembleInfo;
 
         ReReadLedgerMetadataCb(int rc, EnsembleInfo ensembleInfo) {
+            super(bk.mainWorkerPool, ledgerId);
             this.rc = rc;
             this.ensembleInfo = ensembleInfo;
         }
 
         @Override
-        public void operationComplete(int newrc, LedgerMetadata newMeta) {
+        public void safeOperationComplete(int newrc, LedgerMetadata newMeta) {
             if (newrc != BKException.Code.OK) {
                 LOG.error("Error reading new metadata from ledger "
                         + "after changing ensemble, code=" + newrc);
@@ -858,13 +860,14 @@ public class LedgerHandle {
 
         metadata.markLedgerInRecovery();
 
-        writeLedgerConfig(new GenericCallback<Void>() {
+        writeLedgerConfig(new OrderedSafeGenericCallback<Void>(bk.mainWorkerPool, ledgerId) {
             @Override
-            public void operationComplete(final int rc, Void result) {
+            public void safeOperationComplete(final int rc, Void result) {
                 if (rc == BKException.Code.MetadataVersionException) {
-                    rereadMetadata(new GenericCallback<LedgerMetadata>() {
+                    rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.mainWorkerPool,
+                                                                                  ledgerId) {
                         @Override
-                        public void operationComplete(int rc, LedgerMetadata newMeta) {
+                        public void safeOperationComplete(int rc, LedgerMetadata newMeta) {
                             if (rc != BKException.Code.OK) {
                                 cb.operationComplete(rc, null);
                             } else {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=1384700&r1=1384699&r2=1384700&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java Fri Sep 14 09:53:13 2012
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.client.Asyn
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -155,9 +156,9 @@ class LedgerOpenOp implements GenericCal
         }
 
         if (doRecovery) {
-            lh.recover(new GenericCallback<Void>() {
+            lh.recover(new OrderedSafeGenericCallback<Void>(bk.mainWorkerPool, ledgerId) {
                     @Override
-                    public void operationComplete(int rc, Void result) {
+                    public void safeOperationComplete(int rc, Void result) {
                         if (rc == BKException.Code.OK) {
                             cb.openComplete(BKException.Code.OK, lh, LedgerOpenOp.this.ctx);
                         } else if (rc == BKException.Code.UnauthorizedAccessException) {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java?rev=1384700&r1=1384699&r2=1384700&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java Fri Sep 14 09:53:13 2012
@@ -23,6 +23,8 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+
 /**
  * This class provides 2 things over the java {@link ScheduledExecutorService}.
  *
@@ -95,4 +97,35 @@ public class OrderedSafeExecutor {
         }
     }
 
+    /**
+     * Generic callback implementation which will run the
+     * callback in the thread which matches the ordering key
+     */
+    public static abstract class OrderedSafeGenericCallback<T>
+            implements GenericCallback<T> {
+        private final OrderedSafeExecutor executor;
+        private final Object orderingKey;
+
+        /**
+         * @param executor The executor on which to run the callback
+         * @param orderingKey Key used to decide which thread the callback
+         *                    should run on.
+         */
+        public OrderedSafeGenericCallback(OrderedSafeExecutor executor, Object orderingKey) {
+            this.executor = executor;
+            this.orderingKey = orderingKey;
+        }
+
+        @Override
+        public final void operationComplete(final int rc, final T result) {
+            executor.submitOrdered(orderingKey, new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        safeOperationComplete(rc, result);
+                    }
+                });
+        }
+
+        public abstract void safeOperationComplete(int rc, T result);
+    }
 }