You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/10/27 22:28:36 UTC
svn commit: r1710892 - in
/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb:
AbstractBDBMessageStore.java StandardEnvironmentFacade.java
replication/ReplicatedEnvironmentFacade.java
Author: orudyy
Date: Tue Oct 27 21:28:36 2015
New Revision: 1710892
URL: http://svn.apache.org/viewvc?rev=1710892&view=rev
Log:
QPID-6809: [Java Broker] Handle LockConflictException during asynchronous recovery
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1710892&r1=1710891&r2=1710892&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Tue Oct 27 21:28:36 2015
@@ -32,7 +32,6 @@ import java.util.List;
import java.util.Random;
import java.util.UUID;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.je.Cursor;
@@ -101,6 +100,7 @@ public abstract class AbstractBDBMessage
private boolean _limitBusted;
private long _totalStoreSize;
+ private final Random _lockConflictRandom = new Random();
@Override
public void upgradeStoreStructure() throws StoreException
@@ -216,8 +216,6 @@ public abstract class AbstractBDBMessage
{
boolean complete = false;
Transaction tx = null;
-
- Random rand = null;
int attempts = 0;
try
{
@@ -274,30 +272,7 @@ public abstract class AbstractBDBMessage
+ messageId, e);
}
-
- getLogger().warn("Lock timeout exception. Retrying (attempt {} of {} ", (attempts + 1), LOCK_RETRY_ATTEMPTS, e);
-
- if(++attempts < LOCK_RETRY_ATTEMPTS)
- {
- if(rand == null)
- {
- rand = new Random();
- }
-
- try
- {
- Thread.sleep(500l + (long)(500l * rand.nextDouble()));
- }
- catch (InterruptedException e1)
- {
-
- }
- }
- else
- {
- // rethrow the lock conflict exception since we could not solve by retrying
- throw getEnvironmentFacade().handleDatabaseException("Cannot remove messages", e);
- }
+ sleepOrThrowOnLockConflict(attempts++, "Cannot remove messages", e);
}
}
while(!complete);
@@ -446,17 +421,30 @@ public abstract class AbstractBDBMessage
DatabaseEntry value = new DatabaseEntry();
MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ int attempts = 0;
+ boolean completed = false;
+ do
{
- long messageId = LongBinding.entryToLong(key);
- StorableMessageMetaData metaData = valueBinding.entryToObject(value);
- StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
-
- if (!handler.handle(message))
+ try
+ {
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ long messageId = LongBinding.entryToLong(key);
+ StorableMessageMetaData metaData = valueBinding.entryToObject(value);
+ StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+ if (!handler.handle(message))
+ {
+ break;
+ }
+ }
+ completed = true;
+ }
+ catch (LockConflictException e)
{
- break;
+ sleepOrThrowOnLockConflict(attempts++, "Cannot visit messages", e);
}
}
+ while (!completed);
}
catch (RuntimeException e)
{
@@ -478,6 +466,26 @@ public abstract class AbstractBDBMessage
}
}
+ private void sleepOrThrowOnLockConflict(int attempts, String throwMessage, LockConflictException cause)
+ {
+ if (attempts < LOCK_RETRY_ATTEMPTS)
+ {
+ getLogger().info("Lock conflict exception. Retrying (attempt {} of {})", attempts, LOCK_RETRY_ATTEMPTS);
+ try
+ {
+ Thread.sleep(500l + (long)(500l * _lockConflictRandom.nextDouble()));
+ }
+ catch (InterruptedException ie)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ else
+ {
+ // rethrow the lock conflict exception since we could not solve by retrying
+ throw getEnvironmentFacade().handleDatabaseException(throwMessage, cause);
+ }
+ }
private StoredBDBMessage<?> getMessageInternal(long messageId, EnvironmentFacade environmentFacade)
{
@@ -1487,26 +1495,45 @@ public abstract class AbstractBDBMessage
QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key);
- if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ boolean searchCompletedSuccessfully = false;
+ int attempts = 0;
+ boolean completed = false;
+ do
{
- QueueEntryKey entry = keyBinding.entryToObject(key);
- if(entry.getQueueId().equals(queue.getId()))
- {
- entries.add(entry);
- }
- while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ try
{
- entry = keyBinding.entryToObject(key);
- if(entry.getQueueId().equals(queue.getId()))
+ if (!searchCompletedSuccessfully && (searchCompletedSuccessfully = cursor.getSearchKeyRange(key,value, LockMode.DEFAULT) == OperationStatus.SUCCESS))
{
- entries.add(entry);
+ QueueEntryKey entry = keyBinding.entryToObject(key);
+ if(entry.getQueueId().equals(queue.getId()))
+ {
+ entries.add(entry);
+ }
}
- else
+
+ if (searchCompletedSuccessfully)
{
- break;
+ while(cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ QueueEntryKey entry = keyBinding.entryToObject(key);
+ if(entry.getQueueId().equals(queue.getId()))
+ {
+ entries.add(entry);
+ }
+ else
+ {
+ break;
+ }
+ }
}
+ completed = true;
+ }
+ catch (LockConflictException e)
+ {
+ sleepOrThrowOnLockConflict(attempts++, "Cannot visit messages", e);
}
}
+ while (!completed);
}
catch (RuntimeException e)
{
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1710892&r1=1710891&r2=1710892&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Tue Oct 27 21:28:36 2015
@@ -351,7 +351,7 @@ public class StandardEnvironmentFacade i
{
return e;
}
- return new StoreException("Unexpected exception occurred on store operation", e);
+ return new StoreException(contextMessage, e);
}
@Override
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1710892&r1=1710891&r2=1710892&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Tue Oct 27 21:28:36 2015
@@ -454,7 +454,7 @@ public class ReplicatedEnvironmentFacade
return new ConnectionScopedRuntimeException("Underlying JE environment is being restarted", dbe);
}
}
- return new StoreException("Unexpected exception occurred in replicated environment", dbe);
+ return new StoreException(contextMessage, dbe);
}
private void tryToRestartEnvironment(final DatabaseException dbe)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org