You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/11/25 19:22:02 UTC

git commit: Have the leveldb store thorw SuppressReplyExceptions instead of IOExceptions so that the clients retry try the operations instead of giving up. Also retry the problemantic getMessage() call which seems to fail at times.

Updated Branches:
  refs/heads/trunk 00cb9a566 -> b0e91d47f


Have the leveldb store thorw SuppressReplyExceptions instead of IOExceptions so that the clients retry try the operations instead of giving up.  Also retry the problemantic getMessage() call which seems to fail at times.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b0e91d47
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b0e91d47
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b0e91d47

Branch: refs/heads/trunk
Commit: b0e91d47f5fced59c89a34d993f4d87c7986b04b
Parents: 00cb9a5
Author: Hiram Chirino <hi...@hiramchirino.com>
Authored: Mon Nov 25 13:17:58 2013 -0500
Committer: Hiram Chirino <hi...@hiramchirino.com>
Committed: Mon Nov 25 13:17:58 2013 -0500

----------------------------------------------------------------------
 .../activemq/broker/SuppressReplyException.java |  8 +++++++
 .../org/apache/activemq/leveldb/DBManager.scala |  2 +-
 .../apache/activemq/leveldb/LevelDBClient.scala | 23 +++++++++++++++++---
 .../apache/activemq/leveldb/LevelDBStore.scala  |  6 ++---
 4 files changed, 32 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
index eb54a12..f2c6502 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
@@ -26,6 +26,14 @@ import java.io.IOException;
  *
  */
 public class SuppressReplyException extends RuntimeException {
+    public SuppressReplyException(Throwable cause) {
+        super(cause);
+    }
+
+    public SuppressReplyException(String reason) {
+        super(reason);
+    }
+
     public SuppressReplyException(String reason, IOException cause) {
         super(reason, cause);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index e467379..00260d9 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -860,7 +860,7 @@ class DBManager(val parent:LevelDBStore) {
   def getMessage(x: MessageId):Message = {
     val id = Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x)
     val locator = id.getDataLocator()
-    val msg = client.getMessage(locator)
+    val msg = client.getMessageWithRetry(locator)
     msg.setMessageId(id)
     msg
   }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index 15f7bb0..c0cedce 100755
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -52,6 +52,7 @@ import org.apache.activemq.leveldb.MessageRecord
 import org.apache.activemq.leveldb.EntryLocator
 import org.apache.activemq.leveldb.DataLocator
 import org.fusesource.hawtbuf.ByteArrayOutputStream
+import org.apache.activemq.broker.SuppressReplyException
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -545,7 +546,7 @@ class LevelDBClient(store: LevelDBStore) {
           Thread.sleep(100);
         }
       }
-      throw failure;
+      throw new SuppressReplyException(failure);
     }
     try {
       func
@@ -1244,7 +1245,7 @@ class LevelDBClient(store: LevelDBStore) {
     collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
       val seq = decodeLong(key)
       var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
-      val msg = getMessage(locator)
+      val msg = getMessageWithRetry(locator)
       msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
       msg.getMessageId().setDataLocator(locator)
       msg.setRedeliveryCounter(decodeQueueEntryMeta(value))
@@ -1270,7 +1271,7 @@ class LevelDBClient(store: LevelDBStore) {
         func(XaAckRecord(collectionKey, seq, ack, sub))
       } else {
         var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
-        val msg = getMessage(locator)
+        val msg = getMessageWithRetry(locator)
         msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
         msg.getMessageId().setDataLocator(locator)
         func(msg)
@@ -1287,6 +1288,22 @@ class LevelDBClient(store: LevelDBStore) {
     }
   }
 
+  def getMessageWithRetry(locator:AnyRef):Message = {
+    var retry = 0
+    var rc = getMessage(locator);
+    while( rc == null ) {
+      if( retry > 10 )
+        return null;
+      Thread.sleep(retry*10)
+      rc = getMessage(locator);
+      retry+=1
+    }
+    if( retry > 0 ) {
+      info("Recovered from 'failed getMessage' on retry: "+retry)
+    }
+    rc
+  }
+  
   def getMessage(locator:AnyRef):Message = {
     assert(locator!=null)
     val buffer = locator match {

http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 322656f..e4c7a02 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -17,7 +17,7 @@
 
 package org.apache.activemq.leveldb
 
-import org.apache.activemq.broker.{LockableServiceSupport, BrokerServiceAware, ConnectionContext}
+import org.apache.activemq.broker.{SuppressReplyException, LockableServiceSupport, BrokerServiceAware, ConnectionContext}
 import org.apache.activemq.command._
 import org.apache.activemq.openwire.OpenWireFormat
 import org.apache.activemq.usage.SystemUsage
@@ -186,7 +186,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
 
   def check_running = {
     if( this.isStopped ) {
-      throw new IOException("Store has been stopped")
+      throw new SuppressReplyException("Store has been stopped")
     }
   }
 
@@ -437,7 +437,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
   def verify_running = {
     if( isStopping || isStopped ) {
       try {
-        throw new IOException("Not running")
+        throw new SuppressReplyException("Not running")
       } catch {
         case e:IOException =>
           if( broker_service!=null ) {


Re: git commit: Have the leveldb store thorw SuppressReplyExceptions instead of IOExceptions so that the clients retry try the operations instead of giving up. Also retry the problemantic getMessage() call which seems to fail at times.

Posted by Hiram Chirino <hi...@hiramchirino.com>.
Thx for the heads up.. Fixed now.

On Fri, Nov 29, 2013 at 7:44 AM, Gary Tully <ga...@gmail.com> wrote:
> Hiram, there is a regression in
> org.apache.activemq.store.LevelDBStorePerDestinationTest
> seems nothing is going to terminate the connection in this case.
> Skipped the test as it was hanging the ci builds.
> see: https://git-wip-us.apache.org/repos/asf?p=activemq.git;a=commit;h=b9f0783a
>
> On 25 November 2013 18:22,  <ch...@apache.org> wrote:
>> Updated Branches:
>>   refs/heads/trunk 00cb9a566 -> b0e91d47f
>>
>>
>> Have the leveldb store thorw SuppressReplyExceptions instead of IOExceptions so that the clients retry try the operations instead of giving up.  Also retry the problemantic getMessage() call which seems to fail at times.
>>
>> Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b0e91d47
>> Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b0e91d47
>> Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b0e91d47
>>
>> Branch: refs/heads/trunk
>> Commit: b0e91d47f5fced59c89a34d993f4d87c7986b04b
>> Parents: 00cb9a5
>> Author: Hiram Chirino <hi...@hiramchirino.com>
>> Authored: Mon Nov 25 13:17:58 2013 -0500
>> Committer: Hiram Chirino <hi...@hiramchirino.com>
>> Committed: Mon Nov 25 13:17:58 2013 -0500
>>
>> ----------------------------------------------------------------------
>>  .../activemq/broker/SuppressReplyException.java |  8 +++++++
>>  .../org/apache/activemq/leveldb/DBManager.scala |  2 +-
>>  .../apache/activemq/leveldb/LevelDBClient.scala | 23 +++++++++++++++++---
>>  .../apache/activemq/leveldb/LevelDBStore.scala  |  6 ++---
>>  4 files changed, 32 insertions(+), 7 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
>> ----------------------------------------------------------------------
>> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
>> index eb54a12..f2c6502 100644
>> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
>> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
>> @@ -26,6 +26,14 @@ import java.io.IOException;
>>   *
>>   */
>>  public class SuppressReplyException extends RuntimeException {
>> +    public SuppressReplyException(Throwable cause) {
>> +        super(cause);
>> +    }
>> +
>> +    public SuppressReplyException(String reason) {
>> +        super(reason);
>> +    }
>> +
>>      public SuppressReplyException(String reason, IOException cause) {
>>          super(reason, cause);
>>      }
>>
>> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
>> ----------------------------------------------------------------------
>> diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
>> index e467379..00260d9 100644
>> --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
>> +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
>> @@ -860,7 +860,7 @@ class DBManager(val parent:LevelDBStore) {
>>    def getMessage(x: MessageId):Message = {
>>      val id = Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x)
>>      val locator = id.getDataLocator()
>> -    val msg = client.getMessage(locator)
>> +    val msg = client.getMessageWithRetry(locator)
>>      msg.setMessageId(id)
>>      msg
>>    }
>>
>> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
>> ----------------------------------------------------------------------
>> diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
>> index 15f7bb0..c0cedce 100755
>> --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
>> +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
>> @@ -52,6 +52,7 @@ import org.apache.activemq.leveldb.MessageRecord
>>  import org.apache.activemq.leveldb.EntryLocator
>>  import org.apache.activemq.leveldb.DataLocator
>>  import org.fusesource.hawtbuf.ByteArrayOutputStream
>> +import org.apache.activemq.broker.SuppressReplyException
>>
>>  /**
>>   * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
>> @@ -545,7 +546,7 @@ class LevelDBClient(store: LevelDBStore) {
>>            Thread.sleep(100);
>>          }
>>        }
>> -      throw failure;
>> +      throw new SuppressReplyException(failure);
>>      }
>>      try {
>>        func
>> @@ -1244,7 +1245,7 @@ class LevelDBClient(store: LevelDBStore) {
>>      collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
>>        val seq = decodeLong(key)
>>        var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
>> -      val msg = getMessage(locator)
>> +      val msg = getMessageWithRetry(locator)
>>        msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
>>        msg.getMessageId().setDataLocator(locator)
>>        msg.setRedeliveryCounter(decodeQueueEntryMeta(value))
>> @@ -1270,7 +1271,7 @@ class LevelDBClient(store: LevelDBStore) {
>>          func(XaAckRecord(collectionKey, seq, ack, sub))
>>        } else {
>>          var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
>> -        val msg = getMessage(locator)
>> +        val msg = getMessageWithRetry(locator)
>>          msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
>>          msg.getMessageId().setDataLocator(locator)
>>          func(msg)
>> @@ -1287,6 +1288,22 @@ class LevelDBClient(store: LevelDBStore) {
>>      }
>>    }
>>
>> +  def getMessageWithRetry(locator:AnyRef):Message = {
>> +    var retry = 0
>> +    var rc = getMessage(locator);
>> +    while( rc == null ) {
>> +      if( retry > 10 )
>> +        return null;
>> +      Thread.sleep(retry*10)
>> +      rc = getMessage(locator);
>> +      retry+=1
>> +    }
>> +    if( retry > 0 ) {
>> +      info("Recovered from 'failed getMessage' on retry: "+retry)
>> +    }
>> +    rc
>> +  }
>> +
>>    def getMessage(locator:AnyRef):Message = {
>>      assert(locator!=null)
>>      val buffer = locator match {
>>
>> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
>> ----------------------------------------------------------------------
>> diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
>> index 322656f..e4c7a02 100644
>> --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
>> +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
>> @@ -17,7 +17,7 @@
>>
>>  package org.apache.activemq.leveldb
>>
>> -import org.apache.activemq.broker.{LockableServiceSupport, BrokerServiceAware, ConnectionContext}
>> +import org.apache.activemq.broker.{SuppressReplyException, LockableServiceSupport, BrokerServiceAware, ConnectionContext}
>>  import org.apache.activemq.command._
>>  import org.apache.activemq.openwire.OpenWireFormat
>>  import org.apache.activemq.usage.SystemUsage
>> @@ -186,7 +186,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
>>
>>    def check_running = {
>>      if( this.isStopped ) {
>> -      throw new IOException("Store has been stopped")
>> +      throw new SuppressReplyException("Store has been stopped")
>>      }
>>    }
>>
>> @@ -437,7 +437,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
>>    def verify_running = {
>>      if( isStopping || isStopped ) {
>>        try {
>> -        throw new IOException("Not running")
>> +        throw new SuppressReplyException("Not running")
>>        } catch {
>>          case e:IOException =>
>>            if( broker_service!=null ) {
>>
>
>
>
> --
> http://redhat.com
> http://blog.garytully.com



-- 
Hiram Chirino

Engineering | Red Hat, Inc.

hchirino@redhat.com | fusesource.com | redhat.com

skype: hiramchirino | twitter: @hiramchirino

blog: Hiram Chirino's Bit Mojo

Re: git commit: Have the leveldb store thorw SuppressReplyExceptions instead of IOExceptions so that the clients retry try the operations instead of giving up. Also retry the problemantic getMessage() call which seems to fail at times.

Posted by Gary Tully <ga...@gmail.com>.
Hiram, there is a regression in
org.apache.activemq.store.LevelDBStorePerDestinationTest
seems nothing is going to terminate the connection in this case.
Skipped the test as it was hanging the ci builds.
see: https://git-wip-us.apache.org/repos/asf?p=activemq.git;a=commit;h=b9f0783a

On 25 November 2013 18:22,  <ch...@apache.org> wrote:
> Updated Branches:
>   refs/heads/trunk 00cb9a566 -> b0e91d47f
>
>
> Have the leveldb store thorw SuppressReplyExceptions instead of IOExceptions so that the clients retry try the operations instead of giving up.  Also retry the problemantic getMessage() call which seems to fail at times.
>
> Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
> Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b0e91d47
> Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b0e91d47
> Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b0e91d47
>
> Branch: refs/heads/trunk
> Commit: b0e91d47f5fced59c89a34d993f4d87c7986b04b
> Parents: 00cb9a5
> Author: Hiram Chirino <hi...@hiramchirino.com>
> Authored: Mon Nov 25 13:17:58 2013 -0500
> Committer: Hiram Chirino <hi...@hiramchirino.com>
> Committed: Mon Nov 25 13:17:58 2013 -0500
>
> ----------------------------------------------------------------------
>  .../activemq/broker/SuppressReplyException.java |  8 +++++++
>  .../org/apache/activemq/leveldb/DBManager.scala |  2 +-
>  .../apache/activemq/leveldb/LevelDBClient.scala | 23 +++++++++++++++++---
>  .../apache/activemq/leveldb/LevelDBStore.scala  |  6 ++---
>  4 files changed, 32 insertions(+), 7 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
> index eb54a12..f2c6502 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
> @@ -26,6 +26,14 @@ import java.io.IOException;
>   *
>   */
>  public class SuppressReplyException extends RuntimeException {
> +    public SuppressReplyException(Throwable cause) {
> +        super(cause);
> +    }
> +
> +    public SuppressReplyException(String reason) {
> +        super(reason);
> +    }
> +
>      public SuppressReplyException(String reason, IOException cause) {
>          super(reason, cause);
>      }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
> ----------------------------------------------------------------------
> diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
> index e467379..00260d9 100644
> --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
> +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
> @@ -860,7 +860,7 @@ class DBManager(val parent:LevelDBStore) {
>    def getMessage(x: MessageId):Message = {
>      val id = Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x)
>      val locator = id.getDataLocator()
> -    val msg = client.getMessage(locator)
> +    val msg = client.getMessageWithRetry(locator)
>      msg.setMessageId(id)
>      msg
>    }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
> ----------------------------------------------------------------------
> diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
> index 15f7bb0..c0cedce 100755
> --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
> +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
> @@ -52,6 +52,7 @@ import org.apache.activemq.leveldb.MessageRecord
>  import org.apache.activemq.leveldb.EntryLocator
>  import org.apache.activemq.leveldb.DataLocator
>  import org.fusesource.hawtbuf.ByteArrayOutputStream
> +import org.apache.activemq.broker.SuppressReplyException
>
>  /**
>   * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
> @@ -545,7 +546,7 @@ class LevelDBClient(store: LevelDBStore) {
>            Thread.sleep(100);
>          }
>        }
> -      throw failure;
> +      throw new SuppressReplyException(failure);
>      }
>      try {
>        func
> @@ -1244,7 +1245,7 @@ class LevelDBClient(store: LevelDBStore) {
>      collectionCursor(collectionKey, encodeLong(seq)) { (key, value) =>
>        val seq = decodeLong(key)
>        var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
> -      val msg = getMessage(locator)
> +      val msg = getMessageWithRetry(locator)
>        msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
>        msg.getMessageId().setDataLocator(locator)
>        msg.setRedeliveryCounter(decodeQueueEntryMeta(value))
> @@ -1270,7 +1271,7 @@ class LevelDBClient(store: LevelDBStore) {
>          func(XaAckRecord(collectionKey, seq, ack, sub))
>        } else {
>          var locator = DataLocator(store, value.getValueLocation, value.getValueLength)
> -        val msg = getMessage(locator)
> +        val msg = getMessageWithRetry(locator)
>          msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq))
>          msg.getMessageId().setDataLocator(locator)
>          func(msg)
> @@ -1287,6 +1288,22 @@ class LevelDBClient(store: LevelDBStore) {
>      }
>    }
>
> +  def getMessageWithRetry(locator:AnyRef):Message = {
> +    var retry = 0
> +    var rc = getMessage(locator);
> +    while( rc == null ) {
> +      if( retry > 10 )
> +        return null;
> +      Thread.sleep(retry*10)
> +      rc = getMessage(locator);
> +      retry+=1
> +    }
> +    if( retry > 0 ) {
> +      info("Recovered from 'failed getMessage' on retry: "+retry)
> +    }
> +    rc
> +  }
> +
>    def getMessage(locator:AnyRef):Message = {
>      assert(locator!=null)
>      val buffer = locator match {
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/b0e91d47/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
> ----------------------------------------------------------------------
> diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
> index 322656f..e4c7a02 100644
> --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
> +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
> @@ -17,7 +17,7 @@
>
>  package org.apache.activemq.leveldb
>
> -import org.apache.activemq.broker.{LockableServiceSupport, BrokerServiceAware, ConnectionContext}
> +import org.apache.activemq.broker.{SuppressReplyException, LockableServiceSupport, BrokerServiceAware, ConnectionContext}
>  import org.apache.activemq.command._
>  import org.apache.activemq.openwire.OpenWireFormat
>  import org.apache.activemq.usage.SystemUsage
> @@ -186,7 +186,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
>
>    def check_running = {
>      if( this.isStopped ) {
> -      throw new IOException("Store has been stopped")
> +      throw new SuppressReplyException("Store has been stopped")
>      }
>    }
>
> @@ -437,7 +437,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
>    def verify_running = {
>      if( isStopping || isStopped ) {
>        try {
> -        throw new IOException("Not running")
> +        throw new SuppressReplyException("Not running")
>        } catch {
>          case e:IOException =>
>            if( broker_service!=null ) {
>



-- 
http://redhat.com
http://blog.garytully.com