You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/01 16:52:10 UTC
[34/35] git commit: Fixes issue with range scanning and swallows
exception.
Fixes issue with range scanning and swallows exception.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/abbd76eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/abbd76eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/abbd76eb
Branch: refs/heads/two-dot-o-rebuildable-index
Commit: abbd76eb40328f63e79441d404555e85894d86fe
Parents: e5bcbb2
Author: Todd Nine <to...@apache.org>
Authored: Tue Sep 30 17:56:02 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Tue Sep 30 17:56:02 2014 -0600
----------------------------------------------------------------------
.../mq/cassandra/io/AbstractSearch.java | 60 +++++++++++++++-----
.../mq/cassandra/io/ConsumerTransaction.java | 2 +-
2 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/abbd76eb/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
index ffda843..0e7dea1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
@@ -30,6 +30,8 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.thrift.InvalidRequestException;
+
import org.apache.usergrid.mq.Message;
import org.apache.usergrid.mq.QueueResults;
import org.apache.usergrid.mq.cassandra.io.NoTransactionSearch.SearchParam;
@@ -37,13 +39,12 @@ import org.apache.usergrid.persistence.exceptions.QueueException;
import org.apache.usergrid.persistence.hector.CountingMutator;
import org.apache.usergrid.utils.UUIDUtils;
-import com.fasterxml.uuid.UUIDComparator;
-
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.Row;
import me.prettyprint.hector.api.beans.Rows;
+import me.prettyprint.hector.api.exceptions.HInvalidRequestException;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.SliceQuery;
@@ -65,7 +66,6 @@ import static org.apache.usergrid.persistence.cassandra.Serializers.be;
import static org.apache.usergrid.persistence.cassandra.Serializers.se;
import static org.apache.usergrid.persistence.cassandra.Serializers.ue;
import static org.apache.usergrid.utils.NumberUtils.roundLong;
-import static org.apache.usergrid.utils.UUIDUtils.compare;
import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMillis;
@@ -156,7 +156,7 @@ public abstract class AbstractSearch implements QueueSearch {
UUID finish_uuid = params.reversed ? bounds.getOldest() : bounds.getNewest();
- List<UUID> results = new ArrayList<UUID>( params.limit );
+ List<UUID> results = new ArrayList<>( params.limit );
UUID start = params.startId;
@@ -184,7 +184,7 @@ public abstract class AbstractSearch implements QueueSearch {
current_ts_shard = finish_ts_shard;
}
- final MessageIdComparator comparator = new MessageIdComparator(params.reversed);
+ final MessageIdComparator comparator = new MessageIdComparator( params.reversed );
//should be start < finish
@@ -196,14 +196,14 @@ public abstract class AbstractSearch implements QueueSearch {
}
-
-
UUID lastValue = start;
boolean firstPage = true;
- while ( ( current_ts_shard >= start_ts_shard ) && ( current_ts_shard <= finish_ts_shard ) && comparator.compare(start, finish_uuid) < 1 ) {
+ while ( ( current_ts_shard >= start_ts_shard ) && ( current_ts_shard <= finish_ts_shard )
+ && comparator.compare( start, finish_uuid ) < 1 ) {
- logger.info( "Starting search with start UUID {}, finish UUID {}, and reversed {}", new Object[]{lastValue, finish_uuid, params.reversed });
+ logger.info( "Starting search with start UUID {}, finish UUID {}, and reversed {}",
+ new Object[] { lastValue, finish_uuid, params.reversed } );
SliceQuery<ByteBuffer, UUID, ByteBuffer> q = createSliceQuery( ko, be, ue, be );
@@ -211,7 +211,8 @@ public abstract class AbstractSearch implements QueueSearch {
q.setKey( getQueueShardRowKey( queueId, current_ts_shard ) );
q.setRange( lastValue, finish_uuid, params.reversed, params.limit + 1 );
- final List<HColumn<UUID, ByteBuffer>> cassResults = q.execute().get().getColumns();
+ final List<HColumn<UUID, ByteBuffer>> cassResults = swallowOrderedExecution(q);
+
for ( int i = 0; i < cassResults.size(); i++ ) {
HColumn<UUID, ByteBuffer> column = cassResults.get( i );
@@ -339,12 +340,12 @@ public abstract class AbstractSearch implements QueueSearch {
}
- private static final class MessageIdComparator implements Comparator<UUID> {
+ protected static final class MessageIdComparator implements Comparator<UUID> {
private final int comparator;
- private MessageIdComparator( final boolean reversed ) {
+ protected MessageIdComparator( final boolean reversed ) {
this.comparator = reversed ? -1 : 1;
}
@@ -352,7 +353,40 @@ public abstract class AbstractSearch implements QueueSearch {
@Override
public int compare( final UUID o1, final UUID o2 ) {
- return UUIDUtils.compare( o1, o2 )*comparator;
+ return UUIDUtils.compare( o1, o2 ) * comparator;
+ }
+ }
+
+
+ /**
+ * This method intentionally swallows ordered execution issues. For some reason, our Time UUID ordering does
+ * not agree with the cassandra comparator as our micros get very close
+ * @param query
+ * @param <K>
+ * @param <UUID>
+ * @param <V>
+ * @return
+ */
+ protected static <K, UUID, V> List<HColumn<UUID, V>> swallowOrderedExecution( final SliceQuery<K, UUID, V> query ) {
+ try {
+
+ return query.execute().get().getColumns();
+ }
+ catch ( HInvalidRequestException e ) {
+ //invalid request. Occasionally we get order issues when there shouldn't be, disregard them.
+
+ final Throwable invalidRequestException = e.getCause();
+
+ if ( invalidRequestException instanceof InvalidRequestException
+ //we had a range error
+ && ( ( InvalidRequestException ) invalidRequestException ).getWhy().contains(
+ "range finish must come after start in the order of traversal" )) {
+ return Collections.emptyList();
+ }
+
+ throw e;
}
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/abbd76eb/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
index 7205dbc..bdf9afd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
@@ -337,7 +337,7 @@ public class ConsumerTransaction extends NoTransactionSearch
q.setKey( getQueueClientTransactionKey( queueId, consumerId ) );
q.setRange( params.startId, startTimeUUID, false, params.limit + 1 );
- List<HColumn<UUID, UUID>> cassResults = q.execute().get().getColumns();
+ List<HColumn<UUID, UUID>> cassResults = swallowOrderedExecution(q);
List<TransactionPointer> results = new ArrayList<TransactionPointer>( params.limit );