You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/01 16:52:04 UTC

[28/35] git commit: Adds additional logging to help diagnose queue issue

Adds additional logging to help diagnose queue issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9ce83461
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9ce83461
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9ce83461

Branch: refs/heads/two-dot-o-rebuildable-index
Commit: 9ce83461b7d6f4eaaec10b14ed2c50f57b7629bf
Parents: d6fd7dd
Author: Todd Nine <to...@apache.org>
Authored: Tue Sep 30 12:26:01 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Tue Sep 30 12:26:01 2014 -0600

----------------------------------------------------------------------
 .../usergrid/mq/cassandra/QueueManagerImpl.java |  18 ++-
 .../mq/cassandra/io/AbstractSearch.java         | 141 +++++++++----------
 .../mq/cassandra/io/ConsumerTransaction.java    |   2 +-
 3 files changed, 80 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ce83461/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerImpl.java
index 57a6e3b..ae4e4c9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerImpl.java
@@ -192,18 +192,24 @@ public class QueueManagerImpl implements QueueManager {
 
         long shard_ts = roundLong( message.getTimestamp(), QUEUE_SHARD_INTERVAL );
 
-        logger.debug( "Adding message with id '{}' to queue '{}'", message.getUuid(), queueId );
+        final UUID messageUuid = message.getUuid();
+
+
+        logger.debug( "Adding message with id '{}' to queue '{}'", messageUuid, queueId );
+
 
         batch.addInsertion( getQueueShardRowKey( queueId, shard_ts ), QUEUE_INBOX.getColumnFamily(),
-                createColumn( message.getUuid(), ByteBuffer.allocate( 0 ), timestamp, ue, be ) );
+                createColumn( messageUuid, ByteBuffer.allocate( 0 ), timestamp, ue, be ) );
 
-        long oldest_ts = Long.MAX_VALUE - getTimestampInMicros( message.getUuid() );
+        long oldest_ts = Long.MAX_VALUE - getTimestampInMicros( messageUuid );
         batch.addInsertion( bytebuffer( queueId ), QUEUE_PROPERTIES.getColumnFamily(),
-                createColumn( QUEUE_OLDEST, message.getUuid(), oldest_ts, se, ue ) );
+                createColumn( QUEUE_OLDEST, messageUuid, oldest_ts, se, ue ) );
 
-        long newest_ts = getTimestampInMicros( message.getUuid() );
+        long newest_ts = getTimestampInMicros( messageUuid );
         batch.addInsertion( bytebuffer( queueId ), QUEUE_PROPERTIES.getColumnFamily(),
-                createColumn( QUEUE_NEWEST, message.getUuid(), newest_ts, se, ue ) );
+                createColumn( QUEUE_NEWEST, messageUuid, newest_ts, se, ue ) );
+
+        logger.debug( "Writing UUID {} with oldest timestamp {} and newest with timestamp {}", new Object[]{messageUuid, oldest_ts, newest_ts});
 
         batch.addInsertion( bytebuffer( getQueueId( "/" ) ), QUEUE_SUBSCRIBERS.getColumnFamily(),
                 createColumn( queuePath, queueId, timestamp, se, ue ) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ce83461/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
index c3d2800..b0b5ac7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.mq.Message;
 import org.apache.usergrid.mq.QueueResults;
 import org.apache.usergrid.mq.cassandra.io.NoTransactionSearch.SearchParam;
@@ -47,7 +48,6 @@ import me.prettyprint.hector.api.query.SliceQuery;
 
 import static me.prettyprint.hector.api.factory.HFactory.createColumn;
 import static me.prettyprint.hector.api.factory.HFactory.createMultigetSliceQuery;
-
 import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
 import static org.apache.usergrid.mq.Queue.QUEUE_NEWEST;
 import static org.apache.usergrid.mq.Queue.QUEUE_OLDEST;
@@ -59,16 +59,15 @@ import static org.apache.usergrid.mq.cassandra.QueuesCF.CONSUMERS;
 import static org.apache.usergrid.mq.cassandra.QueuesCF.MESSAGE_PROPERTIES;
 import static org.apache.usergrid.mq.cassandra.QueuesCF.QUEUE_INBOX;
 import static org.apache.usergrid.mq.cassandra.QueuesCF.QUEUE_PROPERTIES;
+import static org.apache.usergrid.persistence.cassandra.Serializers.be;
+import static org.apache.usergrid.persistence.cassandra.Serializers.se;
+import static org.apache.usergrid.persistence.cassandra.Serializers.ue;
 import static org.apache.usergrid.utils.NumberUtils.roundLong;
-import static org.apache.usergrid.utils.UUIDUtils.MAX_TIME_UUID;
-import static org.apache.usergrid.utils.UUIDUtils.MIN_TIME_UUID;
 import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMillis;
-import static org.apache.usergrid.persistence.cassandra.Serializers.*;
 
 
 /** @author tnine */
-public abstract class AbstractSearch implements QueueSearch
-{
+public abstract class AbstractSearch implements QueueSearch {
 
     private static final Logger logger = LoggerFactory.getLogger( AbstractSearch.class );
 
@@ -78,8 +77,7 @@ public abstract class AbstractSearch implements QueueSearch
     /**
      *
      */
-    public AbstractSearch( Keyspace ko )
-    {
+    public AbstractSearch( Keyspace ko ) {
         this.ko = ko;
     }
 
@@ -90,13 +88,11 @@ public abstract class AbstractSearch implements QueueSearch
      * @param queueId The queueId
      * @param consumerId The consumerId
      */
-    public UUID getConsumerQueuePosition( UUID queueId, UUID consumerId )
-    {
+    public UUID getConsumerQueuePosition( UUID queueId, UUID consumerId ) {
         HColumn<UUID, UUID> result =
                 HFactory.createColumnQuery( ko, ue, ue, ue ).setKey( consumerId ).setName( queueId )
                         .setColumnFamily( CONSUMERS.getColumnFamily() ).execute().get();
-        if ( result != null )
-        {
+        if ( result != null ) {
             return result.getValue();
         }
 
@@ -105,21 +101,19 @@ public abstract class AbstractSearch implements QueueSearch
 
 
     /** Load the messages into an array list */
-    protected List<Message> loadMessages( Collection<UUID> messageIds, boolean reversed )
-    {
+    protected List<Message> loadMessages( Collection<UUID> messageIds, boolean reversed ) {
 
         Rows<UUID, String, ByteBuffer> messageResults =
                 createMultigetSliceQuery( ko, ue, se, be ).setColumnFamily( MESSAGE_PROPERTIES.getColumnFamily() )
-                        .setKeys( messageIds ).setRange( null, null, false, ALL_COUNT ).execute().get();
+                                                          .setKeys( messageIds )
+                                                          .setRange( null, null, false, ALL_COUNT ).execute().get();
 
         List<Message> messages = new ArrayList<Message>( messageIds.size() );
 
-        for ( Row<UUID, String, ByteBuffer> row : messageResults )
-        {
+        for ( Row<UUID, String, ByteBuffer> row : messageResults ) {
             Message message = deserializeMessage( row.getColumnSlice().getColumns() );
 
-            if ( message != null )
-            {
+            if ( message != null ) {
                 messages.add( message );
             }
         }
@@ -131,13 +125,11 @@ public abstract class AbstractSearch implements QueueSearch
 
 
     /** Create the results to return from the given messages */
-    protected QueueResults createResults( List<Message> messages, String queuePath, UUID queueId, UUID consumerId )
-    {
+    protected QueueResults createResults( List<Message> messages, String queuePath, UUID queueId, UUID consumerId ) {
 
         UUID lastId = null;
 
-        if ( messages != null && messages.size() > 0 )
-        {
+        if ( messages != null && messages.size() > 0 ) {
             lastId = messages.get( messages.size() - 1 ).getUuid();
         }
 
@@ -152,11 +144,9 @@ public abstract class AbstractSearch implements QueueSearch
      * @param queueId The queue id to read
      * @param bounds The bounds to use when reading
      */
-    protected List<UUID> getQueueRange( UUID queueId, QueueBounds bounds, SearchParam params )
-    {
+    protected List<UUID> getQueueRange( UUID queueId, QueueBounds bounds, SearchParam params ) {
 
-        if ( bounds == null )
-        {
+        if ( bounds == null ) {
             logger.error( "Necessary queue bounds not found" );
             throw new QueueException( "Neccessary queue bounds not found" );
         }
@@ -167,19 +157,16 @@ public abstract class AbstractSearch implements QueueSearch
 
         UUID start = params.startId;
 
-        if ( start == null )
-        {
+        if ( start == null ) {
             start = params.reversed ? bounds.getNewest() : bounds.getOldest();
         }
 
-        if ( start == null )
-        {
+        if ( start == null ) {
             logger.error( "No first message in queue" );
             return results;
         }
 
-        if ( finish_uuid == null )
-        {
+        if ( finish_uuid == null ) {
             logger.error( "No last message in queue" );
             return results;
         }
@@ -189,51 +176,68 @@ public abstract class AbstractSearch implements QueueSearch
         long finish_ts_shard = roundLong( getTimestampInMillis( finish_uuid ), QUEUE_SHARD_INTERVAL );
 
         long current_ts_shard = start_ts_shard;
-        if ( params.reversed )
-        {
+
+        if ( params.reversed ) {
             current_ts_shard = finish_ts_shard;
         }
 
 
+        //should be start < finish
+        if ( !params.reversed && UUIDUtils.compare( start, finish_uuid ) > 0 ) {
+            logger.warn( "Tried to perform a slice with start UUID {} after finish UUID {}.", start, finish_uuid );
+            throw new IllegalArgumentException( String.format("You cannot specify a start value of %s after finish value of %s", start, finish_uuid) );
+        }
+
+        // should be finish < start
+        else if ( params.reversed && UUIDUtils.compare( start, finish_uuid ) < 0 ) {
+            logger.warn( "Tried to perform a slice with start UUID {} after finish UUID {}.", start, finish_uuid );
+            throw new IllegalArgumentException( String.format("You cannot specify a start value of %s after finish value of %s", start, finish_uuid) );
+        }
+
+
+
+        UUID lastValue = start;
+        boolean firstPage = true;
+
+        while ( ( current_ts_shard >= start_ts_shard ) && ( current_ts_shard <= finish_ts_shard ) ) {
 
-        while ( ( current_ts_shard >= start_ts_shard ) && ( current_ts_shard <= finish_ts_shard ) )
-        {
 
             SliceQuery<ByteBuffer, UUID, ByteBuffer> q = createSliceQuery( ko, be, ue, be );
             q.setColumnFamily( QUEUE_INBOX.getColumnFamily() );
             q.setKey( getQueueShardRowKey( queueId, current_ts_shard ) );
-            q.setRange( start, finish_uuid, params.reversed, params.limit + 1 );
+            q.setRange( lastValue, finish_uuid, params.reversed, params.limit + 1 );
 
-            List<HColumn<UUID, ByteBuffer>> cassResults = q.execute().get().getColumns();
+            final List<HColumn<UUID, ByteBuffer>> cassResults = q.execute().get().getColumns();
 
-            for ( int i = 0; i < cassResults.size(); i++ )
-            {
+            for ( int i = 0; i < cassResults.size(); i++ ) {
                 HColumn<UUID, ByteBuffer> column = cassResults.get( i );
 
+                final UUID columnName = column.getName();
+
                 // skip the first one, we've already read it
-                if ( i == 0 && params.skipFirst && params.startId.equals( column.getName() ) )
-                {
+                if ( i == 0 && (firstPage && params.skipFirst && params.startId.equals( columnName ))
+                        || (!firstPage &&  lastValue != null && lastValue.equals(columnName)) ) {
                     continue;
                 }
 
-                UUID id = column.getName();
 
-                results.add( id );
+                lastValue = columnName;
+
+                results.add( columnName );
 
-                logger.debug( "Added id '{}' to result set for queue id '{}'", id, queueId );
+                logger.debug( "Added id '{}' to result set for queue id '{}'", start, queueId );
 
-                if ( results.size() >= params.limit )
-                {
+                if ( results.size() >= params.limit ) {
                     return results;
                 }
+
+                firstPage = false;
             }
 
-            if ( params.reversed )
-            {
+            if ( params.reversed ) {
                 current_ts_shard -= QUEUE_SHARD_INTERVAL;
             }
-            else
-            {
+            else {
                 current_ts_shard += QUEUE_SHARD_INTERVAL;
             }
         }
@@ -247,23 +251,19 @@ public abstract class AbstractSearch implements QueueSearch
      *
      * @return The bounds for the queue
      */
-    public QueueBounds getQueueBounds( UUID queueId )
-    {
-        try
-        {
+    public QueueBounds getQueueBounds( UUID queueId ) {
+        try {
             ColumnSlice<String, UUID> result = HFactory.createSliceQuery( ko, ue, se, ue ).setKey( queueId )
                                                        .setColumnNames( QUEUE_NEWEST, QUEUE_OLDEST )
                                                        .setColumnFamily( QUEUE_PROPERTIES.getColumnFamily() ).execute()
                                                        .get();
             if ( result != null && result.getColumnByName( QUEUE_OLDEST ) != null
-                    && result.getColumnByName( QUEUE_NEWEST ) != null )
-            {
+                    && result.getColumnByName( QUEUE_NEWEST ) != null ) {
                 return new QueueBounds( result.getColumnByName( QUEUE_OLDEST ).getValue(),
                         result.getColumnByName( QUEUE_NEWEST ).getValue() );
             }
         }
-        catch ( Exception e )
-        {
+        catch ( Exception e ) {
             logger.error( "Error getting oldest queue message ID", e );
         }
         return null;
@@ -276,11 +276,9 @@ public abstract class AbstractSearch implements QueueSearch
      * @param lastReturnedId This is a null safe parameter. If it's null, this won't be written since it means we didn't
      * read any messages
      */
-    protected void writeClientPointer( UUID queueId, UUID consumerId, UUID lastReturnedId )
-    {
+    protected void writeClientPointer( UUID queueId, UUID consumerId, UUID lastReturnedId ) {
         // nothing to do
-        if ( lastReturnedId == null )
-        {
+        if ( lastReturnedId == null ) {
             return;
         }
 
@@ -292,8 +290,7 @@ public abstract class AbstractSearch implements QueueSearch
 
         Mutator<UUID> mutator = CountingMutator.createFlushingMutator( ko, ue );
 
-        if ( logger.isDebugEnabled() )
-        {
+        if ( logger.isDebugEnabled() ) {
             logger.debug( "Writing last client id pointer of '{}' for queue '{}' and consumer '{}' with timestamp '{}",
                     new Object[] {
                             lastReturnedId, queueId, consumerId, colTimestamp
@@ -307,18 +304,15 @@ public abstract class AbstractSearch implements QueueSearch
     }
 
 
-    private class RequestedOrderComparator implements Comparator<Message>
-    {
+    private class RequestedOrderComparator implements Comparator<Message> {
 
         private Map<UUID, Integer> indexCache = new HashMap<UUID, Integer>();
 
 
-        private RequestedOrderComparator( Collection<UUID> ids )
-        {
+        private RequestedOrderComparator( Collection<UUID> ids ) {
             int i = 0;
 
-            for ( UUID id : ids )
-            {
+            for ( UUID id : ids ) {
                 indexCache.put( id, i );
                 i++;
             }
@@ -331,8 +325,7 @@ public abstract class AbstractSearch implements QueueSearch
          * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
          */
         @Override
-        public int compare( Message o1, Message o2 )
-        {
+        public int compare( Message o1, Message o2 ) {
             int o1Idx = indexCache.get( o1.getUuid() );
 
             int o2Idx = indexCache.get( o2.getUuid() );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ce83461/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
index 7e45b00..6d70924 100644
--- a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
@@ -230,7 +230,7 @@ public class ConsumerTransaction extends NoTransactionSearch
 
             List<TransactionPointer> pointers = getConsumerIds( queueId, consumerId, params, startTimeUUID );
 
-            TransactionPointer pointer = null;
+            TransactionPointer pointer;
 
             int lastTransactionIndex = -1;