You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/01 16:52:09 UTC

[33/35] git commit: Added guard for extending UUID on failed transactions beyond the max of the queue.

Added guard for extending UUID on failed transactions beyond the max of the queue.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e5bcbb23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e5bcbb23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e5bcbb23

Branch: refs/heads/two-dot-o-rebuildable-index
Commit: e5bcbb23be66d12e428d817fd8b0b495793a939c
Parents: f6a79d0
Author: Todd Nine <to...@apache.org>
Authored: Tue Sep 30 15:47:54 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Tue Sep 30 15:47:54 2014 -0600

----------------------------------------------------------------------
 .../mq/cassandra/io/ConsumerTransaction.java         | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e5bcbb23/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
index 6d70924..7205dbc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
@@ -208,9 +208,9 @@ public class ConsumerTransaction extends NoTransactionSearch
 
             long startTime = System.currentTimeMillis();
 
-            UUID startTimeUUID = UUIDUtils.newTimeUUID( startTime, 0 );
+            UUID startTimeUUID = UUIDUtils.newTimeUUID( startTime, 0 );   //this exact moment in time + clockseq + node
 
-            QueueBounds bounds = getQueueBounds( queueId );
+            QueueBounds bounds = getQueueBounds( queueId );  //first write in time, most write in time
 
             //queue has never been written to
             if ( bounds == null )
@@ -224,6 +224,14 @@ public class ConsumerTransaction extends NoTransactionSearch
 
             SearchParam params = getParams( queueId, consumerId, query );
 
+            //if startId is greater than our max, we disregard it and reset to now because we've advanced beyond
+            //"now"
+            if( params.startId != null && UUIDUtils.compare( params.startId, startTimeUUID ) > 0){
+                logger.warn( "Our cursor has advanced beyond the end of the queue due to transactions.  Was {}, resetting to {}", params.startId, startTimeUUID );
+                params = new SearchParam( startTimeUUID, params.reversed, false, params.limit );
+            }
+
+
             List<UUID> ids = getQueueRange( queueId, bounds, params );
 
             // get a list of ids from the consumer.
@@ -285,6 +293,9 @@ public class ConsumerTransaction extends NoTransactionSearch
             // last read messages uuid, whichever is greater
             UUID lastReadId = UUIDUtils.max( lastReadTransactionPointer, lastId );
 
+            //we can only store the min of the queue Id, beyond that we'll cause errors
+            lastReadId = UUIDUtils.min( lastReadId, bounds.getNewest() );
+
             writeClientPointer( queueId, consumerId, lastReadId );
         }
         catch ( UGLockException e )