You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/01 01:56:19 UTC

git commit: Fixes issue with range scanning and swallows exception.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o e5bcbb23b -> abbd76eb4


Fixes issue with range scanning and swallows exception.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/abbd76eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/abbd76eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/abbd76eb

Branch: refs/heads/two-dot-o
Commit: abbd76eb40328f63e79441d404555e85894d86fe
Parents: e5bcbb2
Author: Todd Nine <to...@apache.org>
Authored: Tue Sep 30 17:56:02 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Tue Sep 30 17:56:02 2014 -0600

----------------------------------------------------------------------
 .../mq/cassandra/io/AbstractSearch.java         | 60 +++++++++++++++-----
 .../mq/cassandra/io/ConsumerTransaction.java    |  2 +-
 2 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/abbd76eb/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
index ffda843..0e7dea1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
@@ -30,6 +30,8 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.thrift.InvalidRequestException;
+
 import org.apache.usergrid.mq.Message;
 import org.apache.usergrid.mq.QueueResults;
 import org.apache.usergrid.mq.cassandra.io.NoTransactionSearch.SearchParam;
@@ -37,13 +39,12 @@ import org.apache.usergrid.persistence.exceptions.QueueException;
 import org.apache.usergrid.persistence.hector.CountingMutator;
 import org.apache.usergrid.utils.UUIDUtils;
 
-import com.fasterxml.uuid.UUIDComparator;
-
 import me.prettyprint.hector.api.Keyspace;
 import me.prettyprint.hector.api.beans.ColumnSlice;
 import me.prettyprint.hector.api.beans.HColumn;
 import me.prettyprint.hector.api.beans.Row;
 import me.prettyprint.hector.api.beans.Rows;
+import me.prettyprint.hector.api.exceptions.HInvalidRequestException;
 import me.prettyprint.hector.api.factory.HFactory;
 import me.prettyprint.hector.api.mutation.Mutator;
 import me.prettyprint.hector.api.query.SliceQuery;
@@ -65,7 +66,6 @@ import static org.apache.usergrid.persistence.cassandra.Serializers.be;
 import static org.apache.usergrid.persistence.cassandra.Serializers.se;
 import static org.apache.usergrid.persistence.cassandra.Serializers.ue;
 import static org.apache.usergrid.utils.NumberUtils.roundLong;
-import static org.apache.usergrid.utils.UUIDUtils.compare;
 import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMillis;
 
 
@@ -156,7 +156,7 @@ public abstract class AbstractSearch implements QueueSearch {
 
         UUID finish_uuid = params.reversed ? bounds.getOldest() : bounds.getNewest();
 
-        List<UUID> results = new ArrayList<UUID>( params.limit );
+        List<UUID> results = new ArrayList<>( params.limit );
 
         UUID start = params.startId;
 
@@ -184,7 +184,7 @@ public abstract class AbstractSearch implements QueueSearch {
             current_ts_shard = finish_ts_shard;
         }
 
-        final MessageIdComparator comparator = new MessageIdComparator(params.reversed);
+        final MessageIdComparator comparator = new MessageIdComparator( params.reversed );
 
 
         //should be start < finish
@@ -196,14 +196,14 @@ public abstract class AbstractSearch implements QueueSearch {
         }
 
 
-
-
         UUID lastValue = start;
         boolean firstPage = true;
 
-        while ( ( current_ts_shard >= start_ts_shard ) && ( current_ts_shard <= finish_ts_shard ) && comparator.compare(start, finish_uuid) < 1 ) {
+        while ( ( current_ts_shard >= start_ts_shard ) && ( current_ts_shard <= finish_ts_shard )
+                && comparator.compare( start, finish_uuid ) < 1 ) {
 
-            logger.info( "Starting search with start UUID {}, finish UUID {}, and reversed {}", new Object[]{lastValue, finish_uuid, params.reversed });
+            logger.info( "Starting search with start UUID {}, finish UUID {}, and reversed {}",
+                    new Object[] { lastValue, finish_uuid, params.reversed } );
 
 
             SliceQuery<ByteBuffer, UUID, ByteBuffer> q = createSliceQuery( ko, be, ue, be );
@@ -211,7 +211,8 @@ public abstract class AbstractSearch implements QueueSearch {
             q.setKey( getQueueShardRowKey( queueId, current_ts_shard ) );
             q.setRange( lastValue, finish_uuid, params.reversed, params.limit + 1 );
 
-            final List<HColumn<UUID, ByteBuffer>> cassResults = q.execute().get().getColumns();
+            final List<HColumn<UUID, ByteBuffer>> cassResults = swallowOrderedExecution(q);
+
 
             for ( int i = 0; i < cassResults.size(); i++ ) {
                 HColumn<UUID, ByteBuffer> column = cassResults.get( i );
@@ -339,12 +340,12 @@ public abstract class AbstractSearch implements QueueSearch {
     }
 
 
-    private static final class MessageIdComparator implements Comparator<UUID> {
+    protected static final class MessageIdComparator implements Comparator<UUID> {
 
         private final int comparator;
 
 
-        private MessageIdComparator( final boolean reversed ) {
+        protected MessageIdComparator( final boolean reversed ) {
 
             this.comparator = reversed ? -1 : 1;
         }
@@ -352,7 +353,40 @@ public abstract class AbstractSearch implements QueueSearch {
 
         @Override
         public int compare( final UUID o1, final UUID o2 ) {
-            return UUIDUtils.compare( o1, o2 )*comparator;
+            return UUIDUtils.compare( o1, o2 ) * comparator;
+        }
+    }
+
+
+    /**
+     * This method intentionally swallows ordered execution issues.  For some reason, our Time UUID ordering does
+     * not agree with the cassandra comparator as our micros get very close
+     * @param query
+     * @param <K>
+     * @param <UUID>
+     * @param <V>
+     * @return
+     */
+    protected static <K, UUID, V> List<HColumn<UUID, V>> swallowOrderedExecution( final SliceQuery<K, UUID, V> query ) {
+        try {
+
+            return query.execute().get().getColumns();
+        }
+        catch ( HInvalidRequestException e ) {
+            //invalid request.  Occasionally we get order issues when there shouldn't be, disregard them.
+
+            final Throwable invalidRequestException = e.getCause();
+
+            if ( invalidRequestException instanceof InvalidRequestException
+                    //we had a range error
+                    && ( ( InvalidRequestException ) invalidRequestException ).getWhy().contains(
+                    "range finish must come after start in the order of traversal" )) {
+                return Collections.emptyList();
+            }
+
+            throw e;
         }
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/abbd76eb/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
index 7205dbc..bdf9afd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
@@ -337,7 +337,7 @@ public class ConsumerTransaction extends NoTransactionSearch
         q.setKey( getQueueClientTransactionKey( queueId, consumerId ) );
         q.setRange( params.startId, startTimeUUID, false, params.limit + 1 );
 
-        List<HColumn<UUID, UUID>> cassResults = q.execute().get().getColumns();
+        List<HColumn<UUID, UUID>> cassResults = swallowOrderedExecution(q);
 
         List<TransactionPointer> results = new ArrayList<TransactionPointer>( params.limit );