You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/10/27 22:28:36 UTC

svn commit: r1710892 - in /qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb: AbstractBDBMessageStore.java StandardEnvironmentFacade.java replication/ReplicatedEnvironmentFacade.java

Author: orudyy
Date: Tue Oct 27 21:28:36 2015
New Revision: 1710892

URL: http://svn.apache.org/viewvc?rev=1710892&view=rev
Log:
QPID-6809: [Java Broker] Handle LockConflictException during asynchronous recovery

Modified:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1710892&r1=1710891&r2=1710892&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Tue Oct 27 21:28:36 2015
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.sleepycat.bind.tuple.LongBinding;
 import com.sleepycat.je.Cursor;
@@ -101,6 +100,7 @@ public abstract class AbstractBDBMessage
 
     private boolean _limitBusted;
     private long _totalStoreSize;
+    private final Random _lockConflictRandom = new Random();
 
     @Override
     public void upgradeStoreStructure() throws StoreException
@@ -216,8 +216,6 @@ public abstract class AbstractBDBMessage
     {
         boolean complete = false;
         Transaction tx = null;
-
-        Random rand = null;
         int attempts = 0;
         try
         {
@@ -274,30 +272,7 @@ public abstract class AbstractBDBMessage
                                                                              + messageId, e);
                     }
 
-
-                    getLogger().warn("Lock timeout exception. Retrying (attempt {} of {} ", (attempts + 1), LOCK_RETRY_ATTEMPTS,  e);
-
-                    if(++attempts < LOCK_RETRY_ATTEMPTS)
-                    {
-                        if(rand == null)
-                        {
-                            rand = new Random();
-                        }
-
-                        try
-                        {
-                            Thread.sleep(500l + (long)(500l * rand.nextDouble()));
-                        }
-                        catch (InterruptedException e1)
-                        {
-
-                        }
-                    }
-                    else
-                    {
-                        // rethrow the lock conflict exception since we could not solve by retrying
-                        throw getEnvironmentFacade().handleDatabaseException("Cannot remove messages", e);
-                    }
+                    sleepOrThrowOnLockConflict(attempts++, "Cannot remove messages", e);
                 }
             }
             while(!complete);
@@ -446,17 +421,30 @@ public abstract class AbstractBDBMessage
             DatabaseEntry value = new DatabaseEntry();
             MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
 
-            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            int attempts = 0;
+            boolean completed = false;
+            do
             {
-                long messageId = LongBinding.entryToLong(key);
-                StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
-
-                if (!handler.handle(message))
+                try
+                {
+                    while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+                    {
+                        long messageId = LongBinding.entryToLong(key);
+                        StorableMessageMetaData metaData = valueBinding.entryToObject(value);
+                        StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+                        if (!handler.handle(message))
+                        {
+                            break;
+                        }
+                    }
+                    completed = true;
+                }
+                catch (LockConflictException e)
                 {
-                    break;
+                    sleepOrThrowOnLockConflict(attempts++, "Cannot visit messages", e);
                 }
             }
+            while (!completed);
         }
         catch (RuntimeException e)
         {
@@ -478,6 +466,26 @@ public abstract class AbstractBDBMessage
         }
     }
 
+    private void sleepOrThrowOnLockConflict(int attempts, String throwMessage, LockConflictException cause)
+    {
+        if (attempts < LOCK_RETRY_ATTEMPTS)
+        {
+            getLogger().info("Lock conflict exception. Retrying (attempt {} of {})", attempts, LOCK_RETRY_ATTEMPTS);
+            try
+            {
+                Thread.sleep(500l + (long)(500l * _lockConflictRandom.nextDouble()));
+            }
+            catch (InterruptedException ie)
+            {
+                Thread.currentThread().interrupt();
+            }
+        }
+        else
+        {
+            // rethrow the lock conflict exception since we could not solve by retrying
+            throw getEnvironmentFacade().handleDatabaseException(throwMessage, cause);
+        }
+    }
 
     private StoredBDBMessage<?> getMessageInternal(long messageId, EnvironmentFacade environmentFacade)
     {
@@ -1487,26 +1495,45 @@ public abstract class AbstractBDBMessage
                 QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
                 keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key);
 
-                if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS)
+                boolean searchCompletedSuccessfully = false;
+                int attempts = 0;
+                boolean completed = false;
+                do
                 {
-                    QueueEntryKey entry = keyBinding.entryToObject(key);
-                    if(entry.getQueueId().equals(queue.getId()))
-                    {
-                        entries.add(entry);
-                    }
-                    while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+                    try
                     {
-                        entry = keyBinding.entryToObject(key);
-                        if(entry.getQueueId().equals(queue.getId()))
+                        if (!searchCompletedSuccessfully && (searchCompletedSuccessfully = cursor.getSearchKeyRange(key,value, LockMode.DEFAULT) == OperationStatus.SUCCESS))
                         {
-                            entries.add(entry);
+                            QueueEntryKey entry = keyBinding.entryToObject(key);
+                            if(entry.getQueueId().equals(queue.getId()))
+                            {
+                                entries.add(entry);
+                            }
                         }
-                        else
+
+                        if (searchCompletedSuccessfully)
                         {
-                            break;
+                            while(cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+                            {
+                                QueueEntryKey entry = keyBinding.entryToObject(key);
+                                if(entry.getQueueId().equals(queue.getId()))
+                                {
+                                    entries.add(entry);
+                                }
+                                else
+                                {
+                                    break;
+                                }
+                            }
                         }
+                        completed = true;
+                    }
+                    catch (LockConflictException e)
+                    {
+                        sleepOrThrowOnLockConflict(attempts++, "Cannot visit messages", e);
                     }
                 }
+                while (!completed);
             }
             catch (RuntimeException e)
             {

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1710892&r1=1710891&r2=1710892&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Tue Oct 27 21:28:36 2015
@@ -351,7 +351,7 @@ public class StandardEnvironmentFacade i
         {
             return e;
         }
-        return new StoreException("Unexpected exception occurred on store operation", e);
+        return new StoreException(contextMessage, e);
     }
 
     @Override

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1710892&r1=1710891&r2=1710892&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Tue Oct 27 21:28:36 2015
@@ -454,7 +454,7 @@ public class ReplicatedEnvironmentFacade
                 return new ConnectionScopedRuntimeException("Underlying JE environment is being restarted", dbe);
             }
         }
-        return new StoreException("Unexpected exception occurred in replicated environment", dbe);
+        return new StoreException(contextMessage, dbe);
     }
 
     private void tryToRestartEnvironment(final DatabaseException dbe)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org