You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/28 23:21:53 UTC
[43/96] [abbrv] [partial] Change package namespace to
org.apache.usergrid
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
new file mode 100644
index 0000000..5df1b79
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
@@ -0,0 +1,470 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra.io;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.locking.Lock;
+import org.apache.usergrid.locking.LockManager;
+import org.apache.usergrid.locking.exception.UGLockException;
+import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.mq.QueueQuery;
+import org.apache.usergrid.mq.QueueResults;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
+import org.apache.usergrid.persistence.exceptions.QueueException;
+import org.apache.usergrid.persistence.exceptions.TransactionNotFoundException;
+import org.apache.usergrid.utils.UUIDUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.mutation.Mutator;
+import me.prettyprint.hector.api.query.SliceQuery;
+import static me.prettyprint.hector.api.factory.HFactory.createColumn;
+import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
+import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getConsumerId;
+import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getQueueClientTransactionKey;
+import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getQueueId;
+import static org.apache.usergrid.mq.cassandra.QueuesCF.CONSUMER_QUEUE_TIMEOUTS;
+
+
+/**
+ * Reads from the queue and starts a transaction
+ *
+ * @author tnine
+ */
+public class ConsumerTransaction extends NoTransactionSearch
+{
+
+ private static final Logger logger = LoggerFactory.getLogger( ConsumerTransaction.class );
+ private static final int MAX_READ = 10000;
+ private final LockManager lockManager;
+ private final UUID applicationId;
+ protected final CassandraService cass;
+
+ //timeout on reading lock
+ private final int lockTimeout;
+
+
+ /**
+ * @param ko
+ */
+ public ConsumerTransaction( UUID applicationId, Keyspace ko, LockManager lockManager, CassandraService cass, int lockTimeout )
+ {
+ super( ko );
+ this.applicationId = applicationId;
+ this.lockManager = lockManager;
+ this.cass = cass;
+ this.lockTimeout = lockTimeout;
+ }
+
+
+ /**
+ * Renew the existing transaction. Does so by deleting the exiting timeout, and replacing it with a new value
+ *
+ * @param queuePath The queue path
+ * @param transactionId The transaction id
+ * @param query The query params
+ *
+ * @return The new transaction uuid
+ */
+ public UUID renewTransaction( String queuePath, UUID transactionId, QueueQuery query )
+ throws TransactionNotFoundException
+ {
+ long now = System.currentTimeMillis();
+
+ if ( query == null )
+ {
+ query = new QueueQuery();
+ }
+
+ UUID queueId = getQueueId( queuePath );
+ UUID consumerId = getConsumerId( queueId, query );
+ ByteBuffer key = getQueueClientTransactionKey( queueId, consumerId );
+
+ // read the original transaction, if it's not there, then we can't possibly
+ // extend it
+ SliceQuery<ByteBuffer, UUID, UUID> q = createSliceQuery( ko, be, ue, ue );
+ q.setColumnFamily( CONSUMER_QUEUE_TIMEOUTS.getColumnFamily() );
+ q.setKey( key );
+ q.setColumnNames( transactionId );
+
+ HColumn<UUID, UUID> col = q.execute().get().getColumnByName( transactionId );
+
+ if ( col == null )
+ {
+ throw new TransactionNotFoundException(
+ String.format( "No transaction with id %s exists", transactionId ) );
+ }
+
+ UUID origTrans = col.getName();
+ UUID messageId = col.getValue();
+
+ // Generate a new expiration and insert it
+ UUID expirationId = UUIDUtils.newTimeUUID( now + query.getTimeout() );
+
+ logger.debug( "Writing new timeout at '{}' for message '{}'", expirationId, messageId );
+
+ Mutator<ByteBuffer> mutator = createMutator( ko, be );
+
+ mutator.addInsertion( key, CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(),
+ createColumn( expirationId, messageId, cass.createTimestamp(), ue, ue ) );
+
+ mutator.execute();
+
+ // now delete the old value
+ deleteTransaction( queueId, consumerId, origTrans );
+
+ return expirationId;
+ }
+
+
+ /** Delete the specified transaction */
+ public void deleteTransaction( String queuePath, UUID transactionId, QueueQuery query )
+ {
+
+ if ( query == null )
+ {
+ query = new QueueQuery();
+ }
+
+ UUID queueId = getQueueId( queuePath );
+ UUID consumerId = getConsumerId( queueId, query );
+
+ deleteTransaction( queueId, consumerId, transactionId );
+ }
+
+
+ /** Delete the specified transaction */
+ private void deleteTransaction( UUID queueId, UUID consumerId, UUID transactionId )
+ {
+
+ Mutator<ByteBuffer> mutator = createMutator( ko, be );
+ ByteBuffer key = getQueueClientTransactionKey( queueId, consumerId );
+
+ mutator.addDeletion( key, CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(), transactionId, ue,
+ cass.createTimestamp() );
+
+ mutator.execute();
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.mq.cassandra.io.QueueSearch#getResults(java.lang.String,
+ * org.apache.usergrid.mq.QueueQuery)
+ */
+ @Override
+ public QueueResults getResults( String queuePath, QueueQuery query )
+ {
+
+ UUID queueId = getQueueId( queuePath );
+ UUID consumerId = getConsumerId( queueId, query );
+
+ if ( query.getLimit() > MAX_READ )
+ {
+ throw new IllegalArgumentException( String.format(
+ "You specified a size of %d, you cannot specify a size larger than %d when using transations",
+ query.getLimit( DEFAULT_READ ), MAX_READ ) );
+ }
+
+ QueueResults results = null;
+
+ Lock lock = lockManager.createLock( applicationId, queueId.toString(), consumerId.toString() );
+
+ try
+ {
+
+ //only try to get a lock with a timeout, if we can't bail
+ if(!lock.tryLock(lockTimeout, TimeUnit.SECONDS)){
+ throw new QueueException( "Unable to obtain a lock on queue '" + queuePath + "' after '" + lockTimeout + "'seconds" );
+ }
+
+ long startTime = System.currentTimeMillis();
+
+ UUID startTimeUUID = UUIDUtils.newTimeUUID( startTime, 0 );
+
+ QueueBounds bounds = getQueueBounds( queueId );
+
+ //queue has never been written to
+ if ( bounds == null )
+ {
+ return createResults( new ArrayList<Message>( 0 ), queuePath, queueId, consumerId );
+ }
+
+ // with transactional reads, we can't read into the future, set the bounds
+ // to be now
+ bounds = new QueueBounds( bounds.getOldest(), startTimeUUID );
+
+ SearchParam params = getParams( queueId, consumerId, query );
+
+ List<UUID> ids = getQueueRange( queueId, bounds, params );
+
+ // get a list of ids from the consumer.
+
+ List<TransactionPointer> pointers = getConsumerIds( queueId, consumerId, params, startTimeUUID );
+
+ TransactionPointer pointer = null;
+
+ int lastTransactionIndex = -1;
+
+ for ( int i = 0; i < pointers.size(); i++ )
+ {
+
+ pointer = pointers.get( i );
+
+ int insertIndex = Collections.binarySearch( ids, pointer.expiration );
+
+ // we're done, this message goes at the end, no point in continuing
+ // since
+ // we have our full result set
+ if ( insertIndex <= params.limit * -1 - 1 )
+ {
+ break;
+ }
+
+ // get the insertion index into the set
+ insertIndex = ( insertIndex + 1 ) * -1;
+
+ ids.add( insertIndex, pointer.targetMessage );
+
+ lastTransactionIndex = i;
+ }
+
+ // now we've merge the results, trim them to size;
+ if ( ids.size() > params.limit )
+ {
+ ids = ids.subList( 0, params.limit );
+ }
+
+ // load the messages
+ List<Message> messages = loadMessages( ids, params.reversed );
+
+ // write our future timeouts for all these messages
+ writeTransactions( messages, query.getTimeout() + startTime, queueId, consumerId );
+
+ // remove all read transaction pointers
+ deleteTransactionPointers( pointers, lastTransactionIndex + 1, queueId, consumerId );
+
+ // return the results
+ results = createResults( messages, queuePath, queueId, consumerId );
+
+ UUID lastReadTransactionPointer =
+ lastTransactionIndex == -1 ? null : pointers.get( lastTransactionIndex ).expiration;
+
+ UUID lastId = messages.size() == 0 ? null : messages.get( messages.size() - 1 ).getUuid();
+
+ // our last read id will either be the last read transaction pointer, or
+ // the
+ // last read messages uuid, whichever is greater
+ UUID lastReadId = UUIDUtils.max( lastReadTransactionPointer, lastId );
+
+ writeClientPointer( queueId, consumerId, lastReadId );
+ }
+ catch ( UGLockException e )
+ {
+ logger.debug( "Unable to acquire lock", e );
+ throw new QueueException( "Unable to acquire lock", e );
+ }
+ finally
+ {
+ try
+ {
+ lock.unlock();
+ }
+ catch ( UGLockException e )
+ {
+ logger.debug( "Unable to release lock", e );
+ throw new QueueException( "Unable to release lock", e );
+ }
+ }
+
+ return results;
+ }
+
+
+ /**
+ * Get all pending transactions that have timed out
+ *
+ * @param queueId The queue id
+ * @param consumerId The consumer id
+ * @param params The server params
+ * @param startTimeUUID The start time
+ */
+ protected List<TransactionPointer> getConsumerIds( UUID queueId, UUID consumerId, SearchParam params,
+ UUID startTimeUUID )
+ {
+
+ SliceQuery<ByteBuffer, UUID, UUID> q = createSliceQuery( ko, be, ue, ue );
+ q.setColumnFamily( CONSUMER_QUEUE_TIMEOUTS.getColumnFamily() );
+ q.setKey( getQueueClientTransactionKey( queueId, consumerId ) );
+ q.setRange( params.startId, startTimeUUID, false, params.limit + 1 );
+
+ List<HColumn<UUID, UUID>> cassResults = q.execute().get().getColumns();
+
+ List<TransactionPointer> results = new ArrayList<TransactionPointer>( params.limit );
+
+ for ( HColumn<UUID, UUID> column : cassResults )
+ {
+
+ if ( logger.isDebugEnabled() )
+ {
+ logger.debug( "Adding uuid '{}' for original message '{}' to results for queue '{}' and consumer '{}'",
+ new Object[] { column.getName(), column.getValue(), queueId, consumerId } );
+ logger.debug( "Max timeuuid : '{}', Current timeuuid : '{}', comparison '{}'", new Object[] {
+ startTimeUUID, column.getName(), UUIDUtils.compare( startTimeUUID, column.getName() )
+ } );
+ }
+
+ results.add( new TransactionPointer( column.getName(), column.getValue() ) );
+ }
+
+ return results;
+ }
+
+
+ public boolean hasOutstandingTransactions( UUID queueId, UUID consumerId )
+ {
+ SliceQuery<ByteBuffer, UUID, UUID> q = createSliceQuery( ko, be, ue, ue );
+ q.setColumnFamily( CONSUMER_QUEUE_TIMEOUTS.getColumnFamily() );
+ q.setKey( getQueueClientTransactionKey( queueId, consumerId ) );
+ q.setRange( null, null, false, 1 );
+ return q.execute().get().getColumns().size() > 0;
+ }
+
+
+ /**
+ * Delete all re-read transaction pointers
+ *
+ * @param pointers The list of transaction pointers
+ * @param maxIndex The index to stop at (exclusive)
+ * @param queueId The queue id
+ * @param consumerId The consumer id
+ */
+ protected void deleteTransactionPointers( List<TransactionPointer> pointers, int maxIndex, UUID queueId,
+ UUID consumerId )
+ {
+
+ if ( maxIndex == 0 || pointers.size() == 0 )
+ {
+ return;
+ }
+
+ Mutator<ByteBuffer> mutator = createMutator( ko, be );
+ ByteBuffer key = getQueueClientTransactionKey( queueId, consumerId );
+
+ for ( int i = 0; i < maxIndex && i < pointers.size(); i++ )
+ {
+ UUID pointer = pointers.get( i ).expiration;
+
+ if ( logger.isDebugEnabled() )
+ {
+ logger.debug( "Removing transaction pointer '{}' for queue '{}' and consumer '{}'", new Object[] {
+ pointer, queueId, consumerId
+ } );
+ }
+
+ mutator.addDeletion( key, CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(), pointer, ue, cass.createTimestamp() );
+ }
+
+ mutator.execute();
+ }
+
+
+ /**
+ * Write the transaction timeouts
+ *
+ * @param messages The messages to load
+ * @param futureTimeout The time these message should expire
+ * @param queueId The queue UUId
+ * @param consumerId The consumer Id
+ */
+ protected void writeTransactions( List<Message> messages, final long futureTimeout, UUID queueId, UUID consumerId )
+ {
+
+ Mutator<ByteBuffer> mutator = createMutator( ko, be );
+
+ ByteBuffer key = getQueueClientTransactionKey( queueId, consumerId );
+
+ int counter = 0;
+
+ long time = cass.createTimestamp();
+
+ for ( Message message : messages )
+ {
+ // note we're not incrementing futureSnapshot on purpose. The uuid
+ // generation should give us a sequenced unique ID for each response, even
+ // if the
+ // time is the same since we increment the counter. If we read more than
+ // 10k messages in a single transaction, our millisecond will roll to the
+ // next due to 10k being the max amount of 1/10 microsecond headroom. Not
+ // possible to avoid this given the way time uuids are encoded.
+ UUID expirationId = UUIDUtils.newTimeUUID( futureTimeout, counter );
+ UUID messageId = message.getUuid();
+
+ logger.debug( "Writing new timeout at '{}' for message '{}'", expirationId, messageId );
+
+ mutator.addInsertion( key, CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(),
+ createColumn( expirationId, messageId, time, ue, ue ) );
+
+ // add the transactionid to the message
+ message.setTransaction( expirationId );
+ counter++;
+ }
+
+ mutator.execute();
+ }
+
+
+ private static class TransactionPointer
+ {
+ private UUID expiration;
+ private UUID targetMessage;
+
+
+ /**
+ * @param expiration
+ * @param targetMessage
+ */
+ public TransactionPointer( UUID expiration, UUID targetMessage )
+ {
+ super();
+ this.expiration = expiration;
+ this.targetMessage = targetMessage;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "TransactionPointer [expiration=" + expiration + ", targetMessage=" + targetMessage + "]";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/EndSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/EndSearch.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/EndSearch.java
new file mode 100644
index 0000000..fd5497f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/EndSearch.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra.io;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.mq.QueueQuery;
+
+import me.prettyprint.hector.api.Keyspace;
+
+
+/**
+ * Reads from the queue without starting transactions.
+ *
+ * @author tnine
+ */
+public class EndSearch extends NoTransactionSearch
+{
+
+ /**
+ * @param ko
+ * @param cassTimestamp
+ */
+ public EndSearch( Keyspace ko )
+ {
+ super( ko );
+ } /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.mq.cassandra.io.NoTransaction#getParams(java.util.UUID,
+ * java.util.UUID, org.apache.usergrid.mq.QueueQuery)
+ */
+
+
+ @Override
+ protected SearchParam getParams( UUID queueId, UUID consumerId, QueueQuery query )
+ {
+ UUID lastMessageId = query.getLastMessageId();
+ return new SearchParam( lastMessageId, true, lastMessageId != null, query.getLimit( DEFAULT_READ ) );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.usergrid.mq.cassandra.io.FifoSearch#writeClientPointer(java.util.UUID,
+ * java.util.UUID, java.util.UUID)
+ */
+ @Override
+ protected void writeClientPointer( UUID queueId, UUID consumerId, UUID lastReturnedId )
+ {
+ // no op for searches from the end
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/FilterSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/FilterSearch.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/FilterSearch.java
new file mode 100644
index 0000000..42332a8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/FilterSearch.java
@@ -0,0 +1,258 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra.io;
+
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.mq.QueryProcessor;
+import org.apache.usergrid.mq.QueueQuery;
+import org.apache.usergrid.mq.QueueResults;
+import org.apache.usergrid.mq.QueryProcessor.QuerySlice;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.uuid.UUIDComparator;
+
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.beans.AbstractComposite.ComponentEquality;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.beans.HColumn;
+import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
+import static org.apache.usergrid.mq.Queue.getQueueId;
+import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getConsumerId;
+import static org.apache.usergrid.mq.cassandra.QueueManagerImpl.DEFAULT_SEARCH_COUNT;
+import static org.apache.usergrid.mq.cassandra.QueueManagerImpl.QUEUE_SHARD_INTERVAL;
+import static org.apache.usergrid.mq.cassandra.QueuesCF.PROPERTY_INDEX;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+import static org.apache.usergrid.utils.CompositeUtils.setEqualityFlag;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
+import static org.apache.usergrid.utils.NumberUtils.roundLong;
+import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMillis;
+
+
+/**
+ * Searches in the queue without transactions
+ *
+ * @author tnine
+ */
+public class FilterSearch extends NoTransactionSearch
+{
+
+ private static final Logger logger = LoggerFactory.getLogger( FilterSearch.class );
+
+
+ /**
+ *
+ */
+ public FilterSearch( Keyspace ko )
+ {
+ super( ko );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.mq.cassandra.io.QueueSearch#getResults(java.lang.String,
+ * org.apache.usergrid.mq.QueueQuery)
+ */
+ @Override
+ public QueueResults getResults( String queuePath, QueueQuery query )
+ {
+
+ QueryProcessor qp = new QueryProcessor( query );
+ List<QuerySlice> slices = qp.getSlices();
+
+ long limit = query.getLimit();
+
+ UUID queueId = getQueueId( queuePath );
+ UUID consumerId = getConsumerId( queueId, query );
+ QueueBounds bounds = getQueueBounds( queueId );
+
+ UUIDComparator comparator = query.isReversed() ? new ReverseUUIDComparator() : new UUIDComparator();
+
+ SortedSet<UUID> merged = null;
+
+ for ( QuerySlice slice : slices )
+ {
+ SortedSet<UUID> results =
+ searchQueueRange( ko, queueId, bounds, slice, query.getLastMessageId(), query.isReversed(),
+ comparator );
+
+ if ( merged == null )
+ {
+ merged = results;
+ }
+ else
+ {
+ merged.retainAll( results );
+ }
+ }
+
+ // now trim. Not efficient, but when indexing is updated, seeking will
+ // change, so I'm not worried about this.
+ if ( merged.size() > limit )
+ {
+ Iterator<UUID> current = merged.iterator();
+ UUID max = null;
+
+
+ for ( int i = 0; i <= limit && current.hasNext(); i++ )
+ {
+ max = current.next();
+ }
+
+ merged = merged.headSet( max );
+ }
+
+ List<Message> messages = loadMessages( merged, query.isReversed() );
+
+ QueueResults results = createResults( messages, queuePath, queueId, consumerId );
+
+ return results;
+ }
+
+
+ public SortedSet<UUID> searchQueueRange( Keyspace ko, UUID queueId, QueueBounds bounds, QuerySlice slice, UUID last,
+ boolean reversed, UUIDComparator comparator )
+ {
+
+ TreeSet<UUID> uuid_set = new TreeSet<UUID>( comparator );
+
+ if ( bounds == null )
+ {
+ logger.error( "Necessary queue bounds not found" );
+ return uuid_set;
+ }
+
+ UUID start_uuid = reversed ? bounds.getNewest() : bounds.getOldest();
+
+ UUID finish_uuid = reversed ? bounds.getOldest() : bounds.getNewest();
+
+ if ( last != null )
+ {
+ start_uuid = last;
+ }
+
+
+ if ( finish_uuid == null )
+ {
+ logger.error( "No last message in queue" );
+ return uuid_set;
+ }
+
+ long start_ts_shard = roundLong( getTimestampInMillis( start_uuid ), QUEUE_SHARD_INTERVAL );
+
+ long finish_ts_shard = roundLong( getTimestampInMillis( finish_uuid ), QUEUE_SHARD_INTERVAL );
+
+ long current_ts_shard = start_ts_shard;
+
+ if ( reversed )
+ {
+ current_ts_shard = finish_ts_shard;
+ }
+
+ ByteBuffer start = null;
+ if ( slice.getCursor() != null )
+ {
+ start = slice.getCursor();
+ }
+ else if ( slice.getStart() != null )
+ {
+ DynamicComposite s = new DynamicComposite( slice.getStart().getCode(), slice.getStart().getValue() );
+ if ( !slice.getStart().isInclusive() )
+ {
+ setEqualityFlag( s, ComponentEquality.GREATER_THAN_EQUAL );
+ }
+ start = s.serialize();
+ }
+
+ ByteBuffer finish = null;
+ if ( slice.getFinish() != null )
+ {
+ DynamicComposite f = new DynamicComposite( slice.getFinish().getCode(), slice.getFinish().getValue() );
+ if ( slice.getFinish().isInclusive() )
+ {
+ setEqualityFlag( f, ComponentEquality.GREATER_THAN_EQUAL );
+ }
+ finish = f.serialize();
+ }
+
+ while ( ( current_ts_shard >= start_ts_shard ) && ( current_ts_shard <= finish_ts_shard ) && ( uuid_set.size()
+ < DEFAULT_SEARCH_COUNT ) )
+ {
+
+ while ( true )
+ {
+ List<HColumn<ByteBuffer, ByteBuffer>> results =
+ createSliceQuery( ko, be, be, be ).setColumnFamily( PROPERTY_INDEX.getColumnFamily() )
+ .setKey( bytebuffer( key( queueId, current_ts_shard, slice.getPropertyName() ) ) )
+ .setRange( start, finish, false, DEFAULT_SEARCH_COUNT ).execute().get().getColumns();
+
+ for ( HColumn<ByteBuffer, ByteBuffer> column : results )
+ {
+ DynamicComposite c = DynamicComposite.fromByteBuffer( column.getName().duplicate() );
+ UUID uuid = c.get( 2, ue );
+
+ uuid_set.add( uuid );
+ }
+
+ if ( results.size() < DEFAULT_SEARCH_COUNT )
+ {
+ break;
+ }
+
+ start = results.get( results.size() - 1 ).getName().duplicate();
+ }
+
+ if ( reversed )
+ {
+ current_ts_shard -= QUEUE_SHARD_INTERVAL;
+ }
+ else
+ {
+ current_ts_shard += QUEUE_SHARD_INTERVAL;
+ }
+ }
+
+ // trim the results
+ return uuid_set.headSet( finish_uuid ).tailSet( start_uuid );
+ }
+
+
+ private static class ReverseUUIDComparator extends UUIDComparator
+ {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see com.fasterxml.uuid.UUIDComparator#compare(java.util.UUID,
+ * java.util.UUID)
+ */
+ @Override
+ public int compare( UUID u1, UUID u2 )
+ {
+ return super.compare( u1, u2 ) * -1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/NoTransactionSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/NoTransactionSearch.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/NoTransactionSearch.java
new file mode 100644
index 0000000..5ca2eb4
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/NoTransactionSearch.java
@@ -0,0 +1,131 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra.io;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.mq.QueueQuery;
+import org.apache.usergrid.mq.QueueResults;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.hector.api.Keyspace;
+import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getConsumerId;
+import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getQueueId;
+
+
+/**
+ * Reads from the queue without starting transactions.
+ *
+ * @author tnine
+ */
+public class NoTransactionSearch extends AbstractSearch
+{
+
+ private static final Logger logger = LoggerFactory.getLogger( NoTransactionSearch.class );
+
+ protected static final int DEFAULT_READ = 1;
+
+
+ /**
+ * @param ko
+ */
+ public NoTransactionSearch( Keyspace ko )
+ {
+ super( ko );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.mq.cassandra.io.QueueSearch#getResults(java.lang.String,
+ * org.apache.usergrid.mq.QueueQuery)
+ */
+ @Override
+ public QueueResults getResults( String queuePath, QueueQuery query )
+ {
+
+ UUID queueId = getQueueId( queuePath );
+ UUID consumerId = getConsumerId( queueId, query );
+ QueueBounds bounds = getQueueBounds( queueId );
+ SearchParam params = getParams( queueId, consumerId, query );
+
+ List<UUID> ids = getIds( queueId, consumerId, bounds, params );
+
+ List<Message> messages = loadMessages( ids, params.reversed );
+
+ QueueResults results = createResults( messages, queuePath, queueId, consumerId );
+
+ writeClientPointer( queueId, consumerId, results.getLast() );
+
+ return results;
+ }
+
+
+ /** Get information for reading no transactionally from the query. Subclasses can override this behavior */
+ protected SearchParam getParams( UUID queueId, UUID consumerId, QueueQuery query )
+ {
+ UUID lastReadMessageId = getConsumerQueuePosition( queueId, consumerId );
+
+ if ( logger.isDebugEnabled() )
+ {
+ logger.debug( "Last message id is '{}' for queueId '{}' and clientId '{}'",
+ new Object[] { lastReadMessageId, queueId, consumerId } );
+ }
+
+ return new SearchParam( lastReadMessageId, false, lastReadMessageId != null, query.getLimit( DEFAULT_READ ) );
+ }
+
+
+ /** Get the list of ids we should load and return to the client. Message ids should be in order on return */
+ protected List<UUID> getIds( UUID queueId, UUID consumerId, QueueBounds bounds, SearchParam params )
+ {
+ return getQueueRange( queueId, bounds, params );
+ }
+
+
+ protected static class SearchParam
+ {
+
+ /** The uuid to start seeking from */
+ protected final UUID startId;
+
+ /** true if we should seek from high to low */
+ protected final boolean reversed;
+ /** The number of results to include */
+ protected final int limit;
+ /** true if the first result should be skipped. Useful for paging from a previous result */
+ protected final boolean skipFirst;
+
+
+ /**
+ * @param startId
+ * @param reversed
+ * @param count
+ */
+ public SearchParam( UUID startId, boolean reversed, boolean skipFirst, int count )
+ {
+ this.startId = startId;
+ this.reversed = reversed;
+ this.skipFirst = skipFirst;
+ this.limit = count;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueBounds.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueBounds.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueBounds.java
new file mode 100644
index 0000000..6e7cf4b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueBounds.java
@@ -0,0 +1,91 @@
+package org.apache.usergrid.mq.cassandra.io;
+
+
+import java.util.UUID;
+
+
+public class QueueBounds
+{
+
+ private final UUID oldest;
+ private final UUID newest;
+
+
+ public QueueBounds( UUID oldest, UUID newest )
+ {
+ this.oldest = oldest;
+ this.newest = newest;
+ }
+
+
+ public UUID getOldest()
+ {
+ return oldest;
+ }
+
+
+ public UUID getNewest()
+ {
+ return newest;
+ }
+
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = ( prime * result ) + ( ( newest == null ) ? 0 : newest.hashCode() );
+ result = ( prime * result ) + ( ( oldest == null ) ? 0 : oldest.hashCode() );
+ return result;
+ }
+
+
+ @Override
+ public boolean equals( Object obj )
+ {
+ if ( this == obj )
+ {
+ return true;
+ }
+ if ( obj == null )
+ {
+ return false;
+ }
+ if ( getClass() != obj.getClass() )
+ {
+ return false;
+ }
+ QueueBounds other = ( QueueBounds ) obj;
+ if ( newest == null )
+ {
+ if ( other.newest != null )
+ {
+ return false;
+ }
+ }
+ else if ( !newest.equals( other.newest ) )
+ {
+ return false;
+ }
+ if ( oldest == null )
+ {
+ if ( other.oldest != null )
+ {
+ return false;
+ }
+ }
+ else if ( !oldest.equals( other.oldest ) )
+ {
+ return false;
+ }
+ return true;
+ }
+
+
+ @Override
+ public String toString()
+ {
+ return "QueueBounds [oldest=" + oldest + ", newest=" + newest + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueSearch.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueSearch.java
new file mode 100644
index 0000000..c12afe6
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/QueueSearch.java
@@ -0,0 +1,29 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra.io;
+
+
+import org.apache.usergrid.mq.QueueQuery;
+import org.apache.usergrid.mq.QueueResults;
+
+
+/** @author tnine */
+public interface QueueSearch
+{
+
+ /** Get the results for this queue search */
+ public QueueResults getResults( String queuePath, QueueQuery query );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/StartSearch.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/StartSearch.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/StartSearch.java
new file mode 100644
index 0000000..370f841
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/StartSearch.java
@@ -0,0 +1,68 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra.io;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.mq.QueueQuery;
+
+import me.prettyprint.hector.api.Keyspace;
+
+
+/**
+ * Reads from the queue without starting transactions.
+ *
+ * @author tnine
+ */
+public class StartSearch extends NoTransactionSearch
+{
+
+ /**
+ * @param ko
+ * @param cassTimestamp
+ */
+ public StartSearch( Keyspace ko )
+ {
+ super( ko );
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.usergrid.mq.cassandra.io.NoTransaction#getParams(java.util.UUID,
+ * java.util.UUID, org.apache.usergrid.mq.QueueQuery)
+ */
+ @Override
+ protected SearchParam getParams( UUID queueId, UUID consumerId, QueueQuery query )
+ {
+ UUID lastMessageId = query.getLastMessageId();
+
+
+ return new SearchParam( lastMessageId, false, lastMessageId != null, query.getLimit( DEFAULT_READ ) );
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.usergrid.mq.cassandra.io.FifoSearch#writeClientPointer(java.util.UUID, java.util.UUID, java.util.UUID)
+ */
+ @Override
+ protected void writeClientPointer( UUID queueId, UUID consumerId, UUID lastReturnedId )
+ {
+ //no op for searches from the start
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/AbstractEntity.java b/stack/core/src/main/java/org/apache/usergrid/persistence/AbstractEntity.java
new file mode 100644
index 0000000..91d9da2
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/AbstractEntity.java
@@ -0,0 +1,342 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.usergrid.persistence.annotations.EntityProperty;
+import org.codehaus.jackson.annotate.JsonAnyGetter;
+import org.codehaus.jackson.annotate.JsonAnySetter;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
+
+
+/**
+ * The abstract superclass implementation of the Entity interface.
+ *
+ * @author edanuff
+ */
+@XmlRootElement
+public abstract class AbstractEntity implements Entity {
+
+ protected UUID uuid;
+
+ protected Long created;
+
+ protected Long modified;
+
+ protected Map<String, Object> dynamic_properties = new TreeMap<String, Object>( String.CASE_INSENSITIVE_ORDER );
+
+ protected Map<String, Set<Object>> dynamic_sets = new TreeMap<String, Set<Object>>( String.CASE_INSENSITIVE_ORDER );
+
+
+ @Override
+ @EntityProperty(required = true, mutable = false, basic = true, indexed = false)
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public UUID getUuid() {
+ return uuid;
+ }
+
+
+ @Override
+ public void setUuid( UUID uuid ) {
+ this.uuid = uuid;
+ }
+
+
+ @Override
+ @EntityProperty(required = true, mutable = false, basic = true, indexed = false)
+ public String getType() {
+ return Schema.getDefaultSchema().getEntityType( this.getClass() );
+ }
+
+
+ @Override
+ public void setType( String type ) {
+ }
+
+
+ @Override
+ @EntityProperty(indexed = true, required = true, mutable = false)
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public Long getCreated() {
+ return created;
+ }
+
+
+ @Override
+ public void setCreated( Long created ) {
+ if ( created == null ) {
+ created = System.currentTimeMillis();
+ }
+ this.created = created;
+ }
+
+
+ @Override
+ @EntityProperty(indexed = true, required = true, mutable = true)
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public Long getModified() {
+ return modified;
+ }
+
+
+ @Override
+ public void setModified( Long modified ) {
+ if ( modified == null ) {
+ modified = System.currentTimeMillis();
+ }
+ this.modified = modified;
+ }
+
+
+ @Override
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public String getName() {
+ Object value = getProperty( PROPERTY_NAME );
+
+ if ( value instanceof UUID ) {
+ // fixes existing data that uses UUID in USERGRID-2099
+ return value.toString();
+ }
+
+ return ( String ) value;
+ }
+
+
+ @Override
+ @JsonIgnore
+ public Map<String, Object> getProperties() {
+ return Schema.getDefaultSchema().getEntityProperties( this );
+ }
+
+
+ @Override
+ public final Object getProperty( String propertyName ) {
+ return Schema.getDefaultSchema().getEntityProperty( this, propertyName );
+ }
+
+
+ @Override
+ public final void setProperty( String propertyName, Object propertyValue ) {
+ Schema.getDefaultSchema().setEntityProperty( this, propertyName, propertyValue );
+ }
+
+
+ @Override
+ public void setProperties( Map<String, Object> properties ) {
+ dynamic_properties = new TreeMap<String, Object>( String.CASE_INSENSITIVE_ORDER );
+ addProperties( properties );
+ }
+
+
+ @Override
+ public void addProperties( Map<String, Object> properties ) {
+ if ( properties == null ) {
+ return;
+ }
+ for ( Entry<String, Object> entry : properties.entrySet() ) {
+ setProperty( entry.getKey(), entry.getValue() );
+ }
+ }
+
+
+ @Override
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public Object getMetadata( String key ) {
+ return getDataset( "metadata", key );
+ }
+
+
+ @Override
+ public void setMetadata( String key, Object value ) {
+ setDataset( "metadata", key, value );
+ }
+
+
+ @Override
+ public void mergeMetadata( Map<String, Object> new_metadata ) {
+ mergeDataset( "metadata", new_metadata );
+ }
+
+
+ @Override
+ public void clearMetadata() {
+ clearDataset( "metadata" );
+ }
+
+
+ public <T> T getDataset( String property, String key ) {
+ Object md = dynamic_properties.get( property );
+ if ( md == null ) {
+ return null;
+ }
+ if ( !( md instanceof Map<?, ?> ) ) {
+ return null;
+ }
+ @SuppressWarnings("unchecked") Map<String, T> metadata = ( Map<String, T> ) md;
+ return metadata.get( key );
+ }
+
+
+ public <T> void setDataset( String property, String key, T value ) {
+ if ( key == null ) {
+ return;
+ }
+ Object md = dynamic_properties.get( property );
+ if ( !( md instanceof Map<?, ?> ) ) {
+ md = new HashMap<String, T>();
+ dynamic_properties.put( property, md );
+ }
+ @SuppressWarnings("unchecked") Map<String, T> metadata = ( Map<String, T> ) md;
+ metadata.put( key, value );
+ }
+
+
+ public <T> void mergeDataset( String property, Map<String, T> new_metadata ) {
+ Object md = dynamic_properties.get( property );
+ if ( !( md instanceof Map<?, ?> ) ) {
+ md = new HashMap<String, T>();
+ dynamic_properties.put( property, md );
+ }
+ @SuppressWarnings("unchecked") Map<String, T> metadata = ( Map<String, T> ) md;
+ metadata.putAll( new_metadata );
+ }
+
+
+ public void clearDataset( String property ) {
+ dynamic_properties.remove( property );
+ }
+
+
+ @Override
+ public List<Entity> getCollections( String key ) {
+ return getDataset( "collections", key );
+ }
+
+
+ @Override
+ public void setCollections( String key, List<Entity> results ) {
+ setDataset( "collections", key, results );
+ }
+
+
+ @Override
+ public List<Entity> getConnections( String key ) {
+ return getDataset( "connections", key );
+ }
+
+
+ @Override
+ public void setConnections( String key, List<Entity> results ) {
+ setDataset( "connections", key, results );
+ }
+
+
+ @Override
+ public String toString() {
+ return "Entity(" + getProperties() + ")";
+ }
+
+
+ @Override
+ @JsonAnySetter
+ public void setDynamicProperty( String key, Object value ) {
+ dynamic_properties.put( key, value );
+ }
+
+
+ @Override
+ @JsonAnyGetter
+ public Map<String, Object> getDynamicProperties() {
+ return dynamic_properties;
+ }
+
+
+ @Override
+ public final int compareTo( Entity o ) {
+ if ( o == null ) {
+ return 1;
+ }
+ try {
+ long t1 = getUuid().timestamp();
+ long t2 = o.getUuid().timestamp();
+ return ( t1 < t2 ) ? -1 : ( t1 == t2 ) ? 0 : 1;
+ }
+ catch ( UnsupportedOperationException e ) {
+ }
+ return getUuid().compareTo( o.getUuid() );
+ }
+
+
+ @Override
+ public Entity toTypedEntity() {
+ Entity entity = EntityFactory.newEntity( getUuid(), getType() );
+ entity.setProperties( getProperties() );
+ return entity;
+ }
+
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ( ( uuid == null ) ? 0 : uuid.hashCode() );
+ return result;
+ }
+
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals( Object obj ) {
+ if ( this == obj ) {
+ return true;
+ }
+ if ( obj == null ) {
+ return false;
+ }
+ if ( getClass() != obj.getClass() ) {
+ return false;
+ }
+ AbstractEntity other = ( AbstractEntity ) obj;
+ if ( uuid == null ) {
+ if ( other.uuid != null ) {
+ return false;
+ }
+ }
+ else if ( !uuid.equals( other.uuid ) ) {
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounter.java b/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounter.java
new file mode 100644
index 0000000..b82c551
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounter.java
@@ -0,0 +1,48 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+public class AggregateCounter {
+ private long timestamp;
+ private long value;
+
+
+ public AggregateCounter( long timestamp, long value ) {
+ this.timestamp = timestamp;
+ this.value = value;
+ }
+
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+
+ public void setTimestamp( long timestamp ) {
+ this.timestamp = timestamp;
+ }
+
+
+ public long getValue() {
+ return value;
+ }
+
+
+ public void setValue( long value ) {
+ this.value = value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounterSet.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounterSet.java b/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounterSet.java
new file mode 100644
index 0000000..c14d043
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/AggregateCounterSet.java
@@ -0,0 +1,116 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+
+public class AggregateCounterSet {
+ private String name;
+ private UUID user;
+ private UUID group;
+ private UUID queue;
+ private String category;
+ private List<AggregateCounter> values;
+
+
+ public AggregateCounterSet( String name, UUID user, UUID group, String category, List<AggregateCounter> values ) {
+ this.name = name;
+ this.user = user;
+ this.group = group;
+ this.category = category;
+ this.values = values;
+ }
+
+
+ public AggregateCounterSet( String name, UUID queue, String category, List<AggregateCounter> values ) {
+ this.name = name;
+ setQueue( queue );
+ this.category = category;
+ this.values = values;
+ }
+
+
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public UUID getUser() {
+ return user;
+ }
+
+
+ public void setUser( UUID user ) {
+ this.user = user;
+ }
+
+
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public UUID getGroup() {
+ return group;
+ }
+
+
+ public void setGroup( UUID group ) {
+ this.group = group;
+ }
+
+
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public String getCategory() {
+ return category;
+ }
+
+
+ public void setCategory( String category ) {
+ this.category = category;
+ }
+
+
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public String getName() {
+ return name;
+ }
+
+
+ public void setName( String name ) {
+ this.name = name;
+ }
+
+
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public List<AggregateCounter> getValues() {
+ return values;
+ }
+
+
+ public void setValues( List<AggregateCounter> values ) {
+ this.values = values;
+ }
+
+
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public UUID getQueue() {
+ return queue;
+ }
+
+
+ public void setQueue( UUID queue ) {
+ this.queue = queue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/AssociatedEntityRef.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/AssociatedEntityRef.java b/stack/core/src/main/java/org/apache/usergrid/persistence/AssociatedEntityRef.java
new file mode 100644
index 0000000..aceaf81
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/AssociatedEntityRef.java
@@ -0,0 +1,21 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+public interface AssociatedEntityRef extends EntityRef {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/CollectionRef.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/CollectionRef.java b/stack/core/src/main/java/org/apache/usergrid/persistence/CollectionRef.java
new file mode 100644
index 0000000..4a560b7
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/CollectionRef.java
@@ -0,0 +1,26 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+public interface CollectionRef extends AssociatedEntityRef {
+
+ public EntityRef getOwnerEntity();
+
+ public String getCollectionName();
+
+ public EntityRef getItemRef();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectedEntityRef.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectedEntityRef.java b/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectedEntityRef.java
new file mode 100644
index 0000000..c551ac7
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectedEntityRef.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+public interface ConnectedEntityRef extends EntityRef {
+
+ public String getConnectionType();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectionRef.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectionRef.java b/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectionRef.java
new file mode 100644
index 0000000..9a5340c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/ConnectionRef.java
@@ -0,0 +1,34 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+import java.util.List;
+
+
+/**
+ * Connection tuple
+ *
+ * @author edanuff
+ */
+public interface ConnectionRef extends ConnectedEntityRef, AssociatedEntityRef {
+
+ public EntityRef getConnectingEntity();
+
+ public List<ConnectedEntityRef> getPairedConnections();
+
+ public ConnectedEntityRef getConnectedEntity();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/CounterQuery.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/CounterQuery.java b/stack/core/src/main/java/org/apache/usergrid/persistence/CounterQuery.java
new file mode 100644
index 0000000..ea4e017
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/CounterQuery.java
@@ -0,0 +1,313 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.usergrid.persistence.Query.CounterFilterPredicate;
+import org.apache.usergrid.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.usergrid.utils.ClassUtils.cast;
+import static org.apache.usergrid.utils.ListUtils.firstBoolean;
+import static org.apache.usergrid.utils.ListUtils.firstInteger;
+import static org.apache.usergrid.utils.ListUtils.firstLong;
+import static org.apache.usergrid.utils.ListUtils.isEmpty;
+import static org.apache.usergrid.utils.MapUtils.toMapList;
+
+
+public class CounterQuery {
+
+ public static final Logger logger = LoggerFactory.getLogger( CounterQuery.class );
+
+ public static final int DEFAULT_MAX_RESULTS = 10;
+
+ private int limit = 0;
+ boolean limitSet = false;
+
+ private Long startTime;
+ private Long finishTime;
+ private boolean pad;
+ private CounterResolution resolution = CounterResolution.ALL;
+ private List<String> categories;
+ private List<CounterFilterPredicate> counterFilters;
+
+
+ public CounterQuery() {
+ }
+
+
+ public CounterQuery( CounterQuery q ) {
+ if ( q != null ) {
+ limit = q.limit;
+ limitSet = q.limitSet;
+ startTime = q.startTime;
+ finishTime = q.finishTime;
+ resolution = q.resolution;
+ pad = q.pad;
+ categories = q.categories != null ? new ArrayList<String>( q.categories ) : null;
+ counterFilters =
+ q.counterFilters != null ? new ArrayList<CounterFilterPredicate>( q.counterFilters ) : null;
+ }
+ }
+
+
+ public static CounterQuery newQueryIfNull( CounterQuery query ) {
+ if ( query == null ) {
+ query = new CounterQuery();
+ }
+ return query;
+ }
+
+
+ public static CounterQuery fromJsonString( String json ) {
+ Object o = JsonUtils.parse( json );
+ if ( o instanceof Map ) {
+ @SuppressWarnings({ "unchecked", "rawtypes" }) Map<String, List<String>> params =
+ cast( toMapList( ( Map ) o ) );
+ return fromQueryParams( params );
+ }
+ return null;
+ }
+
+
+ public static CounterQuery fromQueryParams( Map<String, List<String>> params ) {
+
+ CounterQuery q = null;
+ Integer limit = null;
+ Long startTime = null;
+ Long finishTime = null;
+ Boolean pad = null;
+ CounterResolution resolution = null;
+ List<CounterFilterPredicate> counterFilters = null;
+ List<String> categories = null;
+
+ List<String> l = null;
+
+ limit = firstInteger( params.get( "limit" ) );
+ startTime = firstLong( params.get( "start_time" ) );
+ finishTime = firstLong( params.get( "end_time" ) );
+
+ l = params.get( "resolution" );
+ if ( !isEmpty( l ) ) {
+ resolution = CounterResolution.fromString( l.get( 0 ) );
+ }
+
+ categories = params.get( "category" );
+
+ l = params.get( "counter" );
+ if ( !isEmpty( l ) ) {
+ counterFilters = CounterFilterPredicate.fromList( l );
+ }
+
+ pad = firstBoolean( params.get( "pad" ) );
+
+ if ( limit != null ) {
+ q = newQueryIfNull( q );
+ q.setLimit( limit );
+ }
+
+ if ( startTime != null ) {
+ q = newQueryIfNull( q );
+ q.setStartTime( startTime );
+ }
+
+ if ( finishTime != null ) {
+ q = newQueryIfNull( q );
+ q.setFinishTime( finishTime );
+ }
+
+ if ( resolution != null ) {
+ q = newQueryIfNull( q );
+ q.setResolution( resolution );
+ }
+
+ if ( categories != null ) {
+ q = newQueryIfNull( q );
+ q.setCategories( categories );
+ }
+
+ if ( counterFilters != null ) {
+ q = newQueryIfNull( q );
+ q.setCounterFilters( counterFilters );
+ }
+
+ if ( pad != null ) {
+ q = newQueryIfNull( q );
+ q.setPad( pad );
+ }
+
+ return q;
+ }
+
+
+ public int getLimit() {
+ return getLimit( DEFAULT_MAX_RESULTS );
+ }
+
+
+ public int getLimit( int defaultMax ) {
+ if ( limit <= 0 ) {
+ if ( defaultMax > 0 ) {
+ return defaultMax;
+ }
+ else {
+ return DEFAULT_MAX_RESULTS;
+ }
+ }
+ return limit;
+ }
+
+
+ public void setLimit( int limit ) {
+ limitSet = true;
+ this.limit = limit;
+ }
+
+
+ public CounterQuery withLimit( int limit ) {
+ limitSet = true;
+ this.limit = limit;
+ return this;
+ }
+
+
+ public boolean isLimitSet() {
+ return limitSet;
+ }
+
+
+ public Long getStartTime() {
+ return startTime;
+ }
+
+
+ public void setStartTime( Long startTime ) {
+ this.startTime = startTime;
+ }
+
+
+ public CounterQuery withStartTime( Long startTime ) {
+ this.startTime = startTime;
+ return this;
+ }
+
+
+ public Long getFinishTime() {
+ return finishTime;
+ }
+
+
+ public void setFinishTime( Long finishTime ) {
+ this.finishTime = finishTime;
+ }
+
+
+ public CounterQuery withFinishTime( Long finishTime ) {
+ this.finishTime = finishTime;
+ return this;
+ }
+
+
+ public boolean isPad() {
+ return pad;
+ }
+
+
+ public void setPad( boolean pad ) {
+ this.pad = pad;
+ }
+
+
+ public CounterQuery withPad( boolean pad ) {
+ this.pad = pad;
+ return this;
+ }
+
+
+ public void setResolution( CounterResolution resolution ) {
+ this.resolution = resolution;
+ }
+
+
+ public CounterResolution getResolution() {
+ return resolution;
+ }
+
+
+ public CounterQuery withResolution( CounterResolution resolution ) {
+ this.resolution = resolution;
+ return this;
+ }
+
+
+ public List<String> getCategories() {
+ return categories;
+ }
+
+
+ public CounterQuery addCategory( String category ) {
+ if ( categories == null ) {
+ categories = new ArrayList<String>();
+ }
+ categories.add( category );
+ return this;
+ }
+
+
+ public void setCategories( List<String> categories ) {
+ this.categories = categories;
+ }
+
+
+ public CounterQuery withCategories( List<String> categories ) {
+ this.categories = categories;
+ return this;
+ }
+
+
+ public List<CounterFilterPredicate> getCounterFilters() {
+ return counterFilters;
+ }
+
+
+ public CounterQuery addCounterFilter( String counter ) {
+ CounterFilterPredicate p = CounterFilterPredicate.fromString( counter );
+ if ( p == null ) {
+ return this;
+ }
+ if ( counterFilters == null ) {
+ counterFilters = new ArrayList<CounterFilterPredicate>();
+ }
+ counterFilters.add( p );
+ return this;
+ }
+
+
+ public void setCounterFilters( List<CounterFilterPredicate> counterFilters ) {
+ this.counterFilters = counterFilters;
+ }
+
+
+ public CounterQuery withCounterFilters( List<CounterFilterPredicate> counterFilters ) {
+ this.counterFilters = counterFilters;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/CounterResolution.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/CounterResolution.java b/stack/core/src/main/java/org/apache/usergrid/persistence/CounterResolution.java
new file mode 100644
index 0000000..b944a3b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/CounterResolution.java
@@ -0,0 +1,84 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+public enum CounterResolution {
+ ALL( 0 ), MINUTE( 1 ), FIVE_MINUTES( 5 ), HALF_HOUR( 30 ), HOUR( 60 ), SIX_HOUR( 60 * 6 ), HALF_DAY( 60 * 12 ),
+ DAY( 60 * 24 ), WEEK( 60 * 24 * 7 ), MONTH( 60 * 24 * ( 365 / 12 ) );
+
+ private final long interval;
+
+
+ CounterResolution( long minutes ) {
+ interval = minutes * 60 * 1000;
+ }
+
+
+ public long interval() {
+ return interval;
+ }
+
+
+ public long round( long timestamp ) {
+ if ( interval == 0 ) {
+ return 1;
+ }
+ return ( timestamp / interval ) * interval;
+ }
+
+
+ public long next( long timestamp ) {
+ return round( timestamp ) + interval;
+ }
+
+
+ public static CounterResolution fromOrdinal( int i ) {
+ if ( ( i < 0 ) || ( i >= CounterResolution.values().length ) ) {
+ throw new IndexOutOfBoundsException( "Invalid ordinal" );
+ }
+ return CounterResolution.values()[i];
+ }
+
+
+ public static CounterResolution fromMinutes( int m ) {
+ m = m * 60 * 1000;
+ for ( int i = CounterResolution.values().length - 1; i >= 0; i-- ) {
+ if ( CounterResolution.values()[i].interval <= m ) {
+ return CounterResolution.values()[i];
+ }
+ }
+ return ALL;
+ }
+
+
+ public static CounterResolution fromString( String s ) {
+ if ( s == null ) {
+ return ALL;
+ }
+ try {
+ return CounterResolution.valueOf( s.toUpperCase() );
+ }
+ catch ( IllegalArgumentException e ) {
+ }
+ try {
+ return fromMinutes( Integer.valueOf( s ) );
+ }
+ catch ( NumberFormatException e ) {
+ }
+ return ALL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/CredentialsInfo.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/CredentialsInfo.java b/stack/core/src/main/java/org/apache/usergrid/persistence/CredentialsInfo.java
new file mode 100644
index 0000000..1065a76
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/CredentialsInfo.java
@@ -0,0 +1,176 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.codehaus.jackson.annotate.JsonAnyGetter;
+import org.codehaus.jackson.annotate.JsonAnySetter;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+
+@XmlRootElement
+public class CredentialsInfo implements Comparable<CredentialsInfo> {
+
+ boolean recoverable;
+ boolean encrypted;
+ String cipher;
+ String key;
+ String secret;
+ String hashType;
+ Long created;
+
+ /**
+ * A list of crypto algorithms to apply to unecrypted input for comparison. Note that cipher and hashtype should be
+ * deprecated
+ */
+ private String[] cryptoChain;
+
+ protected Map<String, Object> properties = new TreeMap<String, Object>( String.CASE_INSENSITIVE_ORDER );
+
+
+ public CredentialsInfo() {
+ created = System.currentTimeMillis();
+ }
+
+
+ public boolean getRecoverable() {
+ return recoverable;
+ }
+
+
+ public void setRecoverable( boolean recoverable ) {
+ this.recoverable = recoverable;
+ }
+
+
+ public boolean getEncrypted() {
+ return encrypted;
+ }
+
+
+ public void setEncrypted( boolean encrypted ) {
+ this.encrypted = encrypted;
+ }
+
+
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public String getCipher() {
+ return cipher;
+ }
+
+
+ public void setCipher( String cipher ) {
+ this.cipher = cipher;
+ }
+
+
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public String getKey() {
+ return key;
+ }
+
+
+ public void setKey( String key ) {
+ this.key = key;
+ }
+
+
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public String getSecret() {
+ return secret;
+ }
+
+
+ public void setSecret( String secret ) {
+ this.secret = secret;
+ }
+
+
+ public static String getCredentialsSecret( CredentialsInfo credentials ) {
+ if ( credentials == null ) {
+ return null;
+ }
+ return credentials.getSecret();
+ }
+
+
+ @JsonAnyGetter
+ public Map<String, Object> getProperties() {
+ return properties;
+ }
+
+
+ @JsonAnySetter
+ public void setProperty( String key, Object value ) {
+ properties.put( key, value );
+ }
+
+
+ public Object getProperty( String key ) {
+ return properties.get( key );
+ }
+
+
+ /** @return the hashType */
+ public String getHashType() {
+ return hashType;
+ }
+
+
+ /**
+ * Used for handling legacy passwords encrypted in md5 or similar.
+ *
+ * @param hashType the hashType to set
+ */
+ public void setHashType( String hashType ) {
+ this.hashType = hashType;
+ }
+
+
+ /** @return the cryptoChain */
+ public String[] getCryptoChain() {
+ return cryptoChain;
+ }
+
+
+ /** @param cryptoChain the cryptoChain to set */
+ public void setCryptoChain( String[] cryptoChain ) {
+ this.cryptoChain = cryptoChain;
+ }
+
+
+ public Long getCreated() {
+ return created;
+ }
+
+
+ @Override
+ public int compareTo( CredentialsInfo o ) {
+ if ( created == o.created ) {
+ return 0;
+ }
+ if ( o.created == null ) {
+ return 1;
+ }
+ return o.created.compareTo( created );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/DynamicEntity.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/DynamicEntity.java b/stack/core/src/main/java/org/apache/usergrid/persistence/DynamicEntity.java
new file mode 100644
index 0000000..ea6df03
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/DynamicEntity.java
@@ -0,0 +1,111 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+import java.util.Map;
+import java.util.UUID;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.usergrid.persistence.annotations.EntityProperty;
+import org.apache.usergrid.utils.UUIDUtils;
+
+
+/**
+ * Dynamic entities can represent any entity type whether specified in the Schema or not.
+ *
+ * @author edanuff
+ */
+@XmlRootElement
+public class DynamicEntity extends AbstractEntity {
+
+ @EntityProperty(indexed = true, fulltextIndexed = false, required = false, mutable = true, aliasProperty = true,
+ basic = true)
+ protected String name;
+
+ protected String type;
+
+
+ /**
+ *
+ */
+ public DynamicEntity() {
+ // setId(UUIDUtils.newTimeUUID());
+ }
+
+
+ /**
+ * @param id
+ */
+ public DynamicEntity( UUID id ) {
+ setUuid( id );
+ }
+
+
+ /**
+ * @param type
+ */
+ public DynamicEntity( String type ) {
+ setUuid( UUIDUtils.newTimeUUID() );
+ setType( type );
+ }
+
+
+ /**
+ * @param id
+ * @param type
+ */
+ public DynamicEntity( String type, UUID id ) {
+ setUuid( id );
+ setType( type );
+ }
+
+
+ /**
+ * @param id
+ * @param type
+ */
+ public DynamicEntity( String type, UUID id, Map<String, Object> propertyMap ) {
+ setUuid( id );
+ setType( type );
+ setProperties( propertyMap );
+ }
+
+
+ @Override
+ @EntityProperty(required = true, mutable = false, basic = true, indexed = false)
+ public String getType() {
+ return type;
+ }
+
+
+ @Override
+ public void setType( String type ) {
+ this.type = type;
+ }
+
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+
+ public void setName( String name ) {
+ this.name = name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/Entity.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Entity.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Entity.java
new file mode 100644
index 0000000..42df9e0
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Entity.java
@@ -0,0 +1,103 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence;
+
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.usergrid.persistence.annotations.EntityProperty;
+import org.codehaus.jackson.annotate.JsonAnyGetter;
+import org.codehaus.jackson.annotate.JsonAnySetter;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonPropertyOrder;
+
+import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_URI;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
+
+
+/** Entities are the base object type in the service. */
+@XmlRootElement
+@JsonPropertyOrder({ PROPERTY_UUID, PROPERTY_TYPE, PROPERTY_URI, PROPERTY_NAME })
+public interface Entity extends EntityRef, Comparable<Entity> {
+
+ @Override
+ @EntityProperty(required = true, mutable = false, basic = true, indexed = false)
+ public UUID getUuid();
+
+ public void setUuid( UUID id );
+
+ @Override
+ @EntityProperty(required = true, mutable = false, basic = true, indexed = true)
+ public String getType();
+
+ public void setType( String type );
+
+ public abstract String getName();
+
+ @EntityProperty(indexed = true, required = true, mutable = false)
+ public abstract Long getCreated();
+
+ public abstract void setCreated( Long created );
+
+ @EntityProperty(indexed = true, required = true, mutable = true)
+ public abstract Long getModified();
+
+ public abstract void setModified( Long modified );
+
+ @JsonIgnore
+ public Map<String, Object> getProperties();
+
+ public void setProperties( Map<String, Object> properties );
+
+ public void addProperties( Map<String, Object> properties );
+
+ public abstract Object getProperty( String propertyName );
+
+ public abstract void setProperty( String propertyName, Object propertyValue );
+
+ @Override
+ public abstract int compareTo( Entity o );
+
+ public abstract Entity toTypedEntity();
+
+ public abstract Object getMetadata( String key );
+
+ public abstract void setMetadata( String key, Object value );
+
+ public abstract void mergeMetadata( Map<String, Object> metadata );
+
+ public abstract void clearMetadata();
+
+ public abstract List<Entity> getCollections( String key );
+
+ public abstract void setCollections( String name, List<Entity> results );
+
+ public abstract List<Entity> getConnections( String key );
+
+ public abstract void setConnections( String name, List<Entity> results );
+
+ @JsonAnySetter
+ public abstract void setDynamicProperty( String key, Object value );
+
+ @JsonAnyGetter
+ public abstract Map<String, Object> getDynamicProperties();
+}