You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/01 16:52:04 UTC
[28/35] git commit: Adds additional logging to help diagnose queue
issue
Adds additional logging to help diagnose queue issue
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9ce83461
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9ce83461
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9ce83461
Branch: refs/heads/two-dot-o-rebuildable-index
Commit: 9ce83461b7d6f4eaaec10b14ed2c50f57b7629bf
Parents: d6fd7dd
Author: Todd Nine <to...@apache.org>
Authored: Tue Sep 30 12:26:01 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Tue Sep 30 12:26:01 2014 -0600
----------------------------------------------------------------------
.../usergrid/mq/cassandra/QueueManagerImpl.java | 18 ++-
.../mq/cassandra/io/AbstractSearch.java | 141 +++++++++----------
.../mq/cassandra/io/ConsumerTransaction.java | 2 +-
3 files changed, 80 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ce83461/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerImpl.java
index 57a6e3b..ae4e4c9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerImpl.java
@@ -192,18 +192,24 @@ public class QueueManagerImpl implements QueueManager {
long shard_ts = roundLong( message.getTimestamp(), QUEUE_SHARD_INTERVAL );
- logger.debug( "Adding message with id '{}' to queue '{}'", message.getUuid(), queueId );
+ final UUID messageUuid = message.getUuid();
+
+
+ logger.debug( "Adding message with id '{}' to queue '{}'", messageUuid, queueId );
+
batch.addInsertion( getQueueShardRowKey( queueId, shard_ts ), QUEUE_INBOX.getColumnFamily(),
- createColumn( message.getUuid(), ByteBuffer.allocate( 0 ), timestamp, ue, be ) );
+ createColumn( messageUuid, ByteBuffer.allocate( 0 ), timestamp, ue, be ) );
- long oldest_ts = Long.MAX_VALUE - getTimestampInMicros( message.getUuid() );
+ long oldest_ts = Long.MAX_VALUE - getTimestampInMicros( messageUuid );
batch.addInsertion( bytebuffer( queueId ), QUEUE_PROPERTIES.getColumnFamily(),
- createColumn( QUEUE_OLDEST, message.getUuid(), oldest_ts, se, ue ) );
+ createColumn( QUEUE_OLDEST, messageUuid, oldest_ts, se, ue ) );
- long newest_ts = getTimestampInMicros( message.getUuid() );
+ long newest_ts = getTimestampInMicros( messageUuid );
batch.addInsertion( bytebuffer( queueId ), QUEUE_PROPERTIES.getColumnFamily(),
- createColumn( QUEUE_NEWEST, message.getUuid(), newest_ts, se, ue ) );
+ createColumn( QUEUE_NEWEST, messageUuid, newest_ts, se, ue ) );
+
+ logger.debug( "Writing UUID {} with oldest timestamp {} and newest with timestamp {}", new Object[]{messageUuid, oldest_ts, newest_ts});
batch.addInsertion( bytebuffer( getQueueId( "/" ) ), QUEUE_SUBSCRIBERS.getColumnFamily(),
createColumn( queuePath, queueId, timestamp, se, ue ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ce83461/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
index c3d2800..b0b5ac7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
@@ -29,6 +29,7 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.usergrid.mq.Message;
import org.apache.usergrid.mq.QueueResults;
import org.apache.usergrid.mq.cassandra.io.NoTransactionSearch.SearchParam;
@@ -47,7 +48,6 @@ import me.prettyprint.hector.api.query.SliceQuery;
import static me.prettyprint.hector.api.factory.HFactory.createColumn;
import static me.prettyprint.hector.api.factory.HFactory.createMultigetSliceQuery;
-
import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
import static org.apache.usergrid.mq.Queue.QUEUE_NEWEST;
import static org.apache.usergrid.mq.Queue.QUEUE_OLDEST;
@@ -59,16 +59,15 @@ import static org.apache.usergrid.mq.cassandra.QueuesCF.CONSUMERS;
import static org.apache.usergrid.mq.cassandra.QueuesCF.MESSAGE_PROPERTIES;
import static org.apache.usergrid.mq.cassandra.QueuesCF.QUEUE_INBOX;
import static org.apache.usergrid.mq.cassandra.QueuesCF.QUEUE_PROPERTIES;
+import static org.apache.usergrid.persistence.cassandra.Serializers.be;
+import static org.apache.usergrid.persistence.cassandra.Serializers.se;
+import static org.apache.usergrid.persistence.cassandra.Serializers.ue;
import static org.apache.usergrid.utils.NumberUtils.roundLong;
-import static org.apache.usergrid.utils.UUIDUtils.MAX_TIME_UUID;
-import static org.apache.usergrid.utils.UUIDUtils.MIN_TIME_UUID;
import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMillis;
-import static org.apache.usergrid.persistence.cassandra.Serializers.*;
/** @author tnine */
-public abstract class AbstractSearch implements QueueSearch
-{
+public abstract class AbstractSearch implements QueueSearch {
private static final Logger logger = LoggerFactory.getLogger( AbstractSearch.class );
@@ -78,8 +77,7 @@ public abstract class AbstractSearch implements QueueSearch
/**
*
*/
- public AbstractSearch( Keyspace ko )
- {
+ public AbstractSearch( Keyspace ko ) {
this.ko = ko;
}
@@ -90,13 +88,11 @@ public abstract class AbstractSearch implements QueueSearch
* @param queueId The queueId
* @param consumerId The consumerId
*/
- public UUID getConsumerQueuePosition( UUID queueId, UUID consumerId )
- {
+ public UUID getConsumerQueuePosition( UUID queueId, UUID consumerId ) {
HColumn<UUID, UUID> result =
HFactory.createColumnQuery( ko, ue, ue, ue ).setKey( consumerId ).setName( queueId )
.setColumnFamily( CONSUMERS.getColumnFamily() ).execute().get();
- if ( result != null )
- {
+ if ( result != null ) {
return result.getValue();
}
@@ -105,21 +101,19 @@ public abstract class AbstractSearch implements QueueSearch
/** Load the messages into an array list */
- protected List<Message> loadMessages( Collection<UUID> messageIds, boolean reversed )
- {
+ protected List<Message> loadMessages( Collection<UUID> messageIds, boolean reversed ) {
Rows<UUID, String, ByteBuffer> messageResults =
createMultigetSliceQuery( ko, ue, se, be ).setColumnFamily( MESSAGE_PROPERTIES.getColumnFamily() )
- .setKeys( messageIds ).setRange( null, null, false, ALL_COUNT ).execute().get();
+ .setKeys( messageIds )
+ .setRange( null, null, false, ALL_COUNT ).execute().get();
List<Message> messages = new ArrayList<Message>( messageIds.size() );
- for ( Row<UUID, String, ByteBuffer> row : messageResults )
- {
+ for ( Row<UUID, String, ByteBuffer> row : messageResults ) {
Message message = deserializeMessage( row.getColumnSlice().getColumns() );
- if ( message != null )
- {
+ if ( message != null ) {
messages.add( message );
}
}
@@ -131,13 +125,11 @@ public abstract class AbstractSearch implements QueueSearch
/** Create the results to return from the given messages */
- protected QueueResults createResults( List<Message> messages, String queuePath, UUID queueId, UUID consumerId )
- {
+ protected QueueResults createResults( List<Message> messages, String queuePath, UUID queueId, UUID consumerId ) {
UUID lastId = null;
- if ( messages != null && messages.size() > 0 )
- {
+ if ( messages != null && messages.size() > 0 ) {
lastId = messages.get( messages.size() - 1 ).getUuid();
}
@@ -152,11 +144,9 @@ public abstract class AbstractSearch implements QueueSearch
* @param queueId The queue id to read
* @param bounds The bounds to use when reading
*/
- protected List<UUID> getQueueRange( UUID queueId, QueueBounds bounds, SearchParam params )
- {
+ protected List<UUID> getQueueRange( UUID queueId, QueueBounds bounds, SearchParam params ) {
- if ( bounds == null )
- {
+ if ( bounds == null ) {
logger.error( "Necessary queue bounds not found" );
throw new QueueException( "Neccessary queue bounds not found" );
}
@@ -167,19 +157,16 @@ public abstract class AbstractSearch implements QueueSearch
UUID start = params.startId;
- if ( start == null )
- {
+ if ( start == null ) {
start = params.reversed ? bounds.getNewest() : bounds.getOldest();
}
- if ( start == null )
- {
+ if ( start == null ) {
logger.error( "No first message in queue" );
return results;
}
- if ( finish_uuid == null )
- {
+ if ( finish_uuid == null ) {
logger.error( "No last message in queue" );
return results;
}
@@ -189,51 +176,68 @@ public abstract class AbstractSearch implements QueueSearch
long finish_ts_shard = roundLong( getTimestampInMillis( finish_uuid ), QUEUE_SHARD_INTERVAL );
long current_ts_shard = start_ts_shard;
- if ( params.reversed )
- {
+
+ if ( params.reversed ) {
current_ts_shard = finish_ts_shard;
}
+ //should be start < finish
+ if ( !params.reversed && UUIDUtils.compare( start, finish_uuid ) > 0 ) {
+ logger.warn( "Tried to perform a slice with start UUID {} after finish UUID {}.", start, finish_uuid );
+ throw new IllegalArgumentException( String.format("You cannot specify a start value of %s after finish value of %s", start, finish_uuid) );
+ }
+
+ // should be finish < start
+ else if ( params.reversed && UUIDUtils.compare( start, finish_uuid ) < 0 ) {
+ logger.warn( "Tried to perform a slice with start UUID {} after finish UUID {}.", start, finish_uuid );
+ throw new IllegalArgumentException( String.format("You cannot specify a start value of %s after finish value of %s", start, finish_uuid) );
+ }
+
+
+
+ UUID lastValue = start;
+ boolean firstPage = true;
+
+ while ( ( current_ts_shard >= start_ts_shard ) && ( current_ts_shard <= finish_ts_shard ) ) {
- while ( ( current_ts_shard >= start_ts_shard ) && ( current_ts_shard <= finish_ts_shard ) )
- {
SliceQuery<ByteBuffer, UUID, ByteBuffer> q = createSliceQuery( ko, be, ue, be );
q.setColumnFamily( QUEUE_INBOX.getColumnFamily() );
q.setKey( getQueueShardRowKey( queueId, current_ts_shard ) );
- q.setRange( start, finish_uuid, params.reversed, params.limit + 1 );
+ q.setRange( lastValue, finish_uuid, params.reversed, params.limit + 1 );
- List<HColumn<UUID, ByteBuffer>> cassResults = q.execute().get().getColumns();
+ final List<HColumn<UUID, ByteBuffer>> cassResults = q.execute().get().getColumns();
- for ( int i = 0; i < cassResults.size(); i++ )
- {
+ for ( int i = 0; i < cassResults.size(); i++ ) {
HColumn<UUID, ByteBuffer> column = cassResults.get( i );
+ final UUID columnName = column.getName();
+
// skip the first one, we've already read it
- if ( i == 0 && params.skipFirst && params.startId.equals( column.getName() ) )
- {
+ if ( i == 0 && (firstPage && params.skipFirst && params.startId.equals( columnName ))
+ || (!firstPage && lastValue != null && lastValue.equals(columnName)) ) {
continue;
}
- UUID id = column.getName();
- results.add( id );
+ lastValue = columnName;
+
+ results.add( columnName );
- logger.debug( "Added id '{}' to result set for queue id '{}'", id, queueId );
+ logger.debug( "Added id '{}' to result set for queue id '{}'", start, queueId );
- if ( results.size() >= params.limit )
- {
+ if ( results.size() >= params.limit ) {
return results;
}
+
+ firstPage = false;
}
- if ( params.reversed )
- {
+ if ( params.reversed ) {
current_ts_shard -= QUEUE_SHARD_INTERVAL;
}
- else
- {
+ else {
current_ts_shard += QUEUE_SHARD_INTERVAL;
}
}
@@ -247,23 +251,19 @@ public abstract class AbstractSearch implements QueueSearch
*
* @return The bounds for the queue
*/
- public QueueBounds getQueueBounds( UUID queueId )
- {
- try
- {
+ public QueueBounds getQueueBounds( UUID queueId ) {
+ try {
ColumnSlice<String, UUID> result = HFactory.createSliceQuery( ko, ue, se, ue ).setKey( queueId )
.setColumnNames( QUEUE_NEWEST, QUEUE_OLDEST )
.setColumnFamily( QUEUE_PROPERTIES.getColumnFamily() ).execute()
.get();
if ( result != null && result.getColumnByName( QUEUE_OLDEST ) != null
- && result.getColumnByName( QUEUE_NEWEST ) != null )
- {
+ && result.getColumnByName( QUEUE_NEWEST ) != null ) {
return new QueueBounds( result.getColumnByName( QUEUE_OLDEST ).getValue(),
result.getColumnByName( QUEUE_NEWEST ).getValue() );
}
}
- catch ( Exception e )
- {
+ catch ( Exception e ) {
logger.error( "Error getting oldest queue message ID", e );
}
return null;
@@ -276,11 +276,9 @@ public abstract class AbstractSearch implements QueueSearch
* @param lastReturnedId This is a null safe parameter. If it's null, this won't be written since it means we didn't
* read any messages
*/
- protected void writeClientPointer( UUID queueId, UUID consumerId, UUID lastReturnedId )
- {
+ protected void writeClientPointer( UUID queueId, UUID consumerId, UUID lastReturnedId ) {
// nothing to do
- if ( lastReturnedId == null )
- {
+ if ( lastReturnedId == null ) {
return;
}
@@ -292,8 +290,7 @@ public abstract class AbstractSearch implements QueueSearch
Mutator<UUID> mutator = CountingMutator.createFlushingMutator( ko, ue );
- if ( logger.isDebugEnabled() )
- {
+ if ( logger.isDebugEnabled() ) {
logger.debug( "Writing last client id pointer of '{}' for queue '{}' and consumer '{}' with timestamp '{}",
new Object[] {
lastReturnedId, queueId, consumerId, colTimestamp
@@ -307,18 +304,15 @@ public abstract class AbstractSearch implements QueueSearch
}
- private class RequestedOrderComparator implements Comparator<Message>
- {
+ private class RequestedOrderComparator implements Comparator<Message> {
private Map<UUID, Integer> indexCache = new HashMap<UUID, Integer>();
- private RequestedOrderComparator( Collection<UUID> ids )
- {
+ private RequestedOrderComparator( Collection<UUID> ids ) {
int i = 0;
- for ( UUID id : ids )
- {
+ for ( UUID id : ids ) {
indexCache.put( id, i );
i++;
}
@@ -331,8 +325,7 @@ public abstract class AbstractSearch implements QueueSearch
* @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
*/
@Override
- public int compare( Message o1, Message o2 )
- {
+ public int compare( Message o1, Message o2 ) {
int o1Idx = indexCache.get( o1.getUuid() );
int o2Idx = indexCache.get( o2.getUuid() );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ce83461/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
index 7e45b00..6d70924 100644
--- a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
@@ -230,7 +230,7 @@ public class ConsumerTransaction extends NoTransactionSearch
List<TransactionPointer> pointers = getConsumerIds( queueId, consumerId, params, startTimeUUID );
- TransactionPointer pointer = null;
+ TransactionPointer pointer;
int lastTransactionIndex = -1;