You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/28 23:21:55 UTC

[45/96] [abbrv] [partial] Change package namespace to org.apache.usergrid

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/QueueQuery.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/QueueQuery.java b/stack/core/src/main/java/org/apache/usergrid/mq/QueueQuery.java
new file mode 100644
index 0000000..4f43be9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/QueueQuery.java
@@ -0,0 +1,305 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq;
+
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.usergrid.utils.ConversionUtils;
+
+import static java.util.UUID.nameUUIDFromBytes;
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+import static org.apache.usergrid.utils.ConversionUtils.getLong;
+import static org.apache.usergrid.utils.ListUtils.first;
+import static org.apache.usergrid.utils.UUIDUtils.isUUID;
+import static org.apache.usergrid.utils.UUIDUtils.tryGetUUID;
+
+
+public class QueueQuery extends Query {
+
+    UUID consumerId;
+    long lastTimestamp;
+    UUID lastMessageId;
+    QueuePosition position = null;
+    boolean _synchronized;
+    boolean update = true;
+    long timeout;
+
+
+    public QueueQuery() {
+    }
+
+
+    public QueueQuery( Query q ) {
+        super( q );
+    }
+
+
+    public QueueQuery( QueueQuery q ) {
+        super( q );
+        if ( q != null ) {
+            consumerId = q.consumerId;
+            lastTimestamp = q.lastTimestamp;
+            lastMessageId = q.lastMessageId;
+            position = q.position;
+            _synchronized = q._synchronized;
+            update = q.update;
+        }
+    }
+
+
+    public static QueueQuery newQueryIfNull( QueueQuery query ) {
+        if ( query == null ) {
+            query = new QueueQuery();
+        }
+        return query;
+    }
+
+
+    public static QueueQuery fromQL( String ql ) {
+        if ( ql == null ) {
+            return null;
+        }
+        QueueQuery query = null;
+        Query q = Query.fromQL( ql );
+        if ( q != null ) {
+            query = new QueueQuery( q );
+        }
+        return query;
+    }
+
+
+    public static QueueQuery fromQueryParams( Map<String, List<String>> params ) {
+
+        QueueQuery query = null;
+
+        Query q = Query.fromQueryParams( params );
+        if ( q != null ) {
+            query = new QueueQuery( q );
+        }
+
+        String consumer = first( params.get( "consumer" ) );
+
+        if ( consumer != null ) {
+            query = newQueryIfNull( query );
+            query.setConsumerId( getConsumerId( consumer ) );
+        }
+
+        UUID last = tryGetUUID( first( params.get( "last" ) ) );
+        if ( last != null ) {
+            query = newQueryIfNull( query );
+            query.setLastMessageId( last );
+        }
+
+        if ( params.containsKey( "time" ) ) {
+            query = newQueryIfNull( query );
+            long t = getLong( first( params.get( "time" ) ) );
+            if ( t > 0 ) {
+                query.setLastTimestamp( t );
+            }
+        }
+        if ( params.containsKey( "pos" ) ) {
+            query = newQueryIfNull( query );
+            QueuePosition pos = QueuePosition.find( first( params.get( "pos" ) ) );
+            if ( pos != null ) {
+                query.setPosition( pos );
+            }
+        }
+
+        if ( params.containsKey( "update" ) ) {
+            query = newQueryIfNull( query );
+            query.setUpdate( ConversionUtils.getBoolean( first( params.get( "update" ) ) ) );
+        }
+
+        if ( params.containsKey( "synchronized" ) ) {
+            query = newQueryIfNull( query );
+            query.setSynchronized( ConversionUtils.getBoolean( first( params.get( "synchronized" ) ) ) );
+        }
+
+        if ( params.containsKey( "timeout" ) ) {
+            query = newQueryIfNull( query );
+            query.setTimeout( ConversionUtils.getLong( first( params.get( "timeout" ) ) ) );
+        }
+
+        if ( ( query != null ) && ( consumer != null ) ) {
+            query.setPositionIfUnset( QueuePosition.CONSUMER );
+        }
+
+        return query;
+    }
+
+
+    public UUID getConsumerId() {
+        return consumerId;
+    }
+
+
+    public void setConsumerId( UUID consumerId ) {
+        this.consumerId = consumerId;
+    }
+
+
+    public QueueQuery withConsumerId( UUID consumerId ) {
+        this.consumerId = consumerId;
+        setPositionIfUnset( QueuePosition.CONSUMER );
+        return this;
+    }
+
+
+    public QueueQuery withConsumer( String consumer ) {
+        consumerId = getConsumerId( consumer );
+        setPositionIfUnset( QueuePosition.CONSUMER );
+        return this;
+    }
+
+
+    public long getLastTimestamp() {
+        return lastTimestamp;
+    }
+
+
+    public void setLastTimestamp( long lastTimestamp ) {
+        this.lastTimestamp = lastTimestamp;
+    }
+
+
+    public QueueQuery withLastTimestamp( long lastTimestamp ) {
+        this.lastTimestamp = lastTimestamp;
+        return this;
+    }
+
+
+    public UUID getLastMessageId() {
+        return lastMessageId;
+    }
+
+
+    public void setLastMessageId( UUID lastMessageId ) {
+        this.lastMessageId = lastMessageId;
+    }
+
+
+    public QueueQuery withLastMessageId( UUID lastMessageId ) {
+        this.lastMessageId = lastMessageId;
+        return this;
+    }
+
+
+    public QueuePosition getPosition() {
+        if ( position != null ) {
+            return position;
+        }
+        return QueuePosition.LAST;
+    }
+
+
+    public boolean isPositionSet() {
+        return position != null;
+    }
+
+
+    public void setPosition( QueuePosition position ) {
+        this.position = position;
+    }
+
+
+    public void setPositionIfUnset( QueuePosition position ) {
+        if ( this.position == null ) {
+            this.position = position;
+        }
+    }
+
+
+    public QueueQuery withPosition( QueuePosition position ) {
+        this.position = position;
+        return this;
+    }
+
+
+    public QueueQuery withPositionInUnset( QueuePosition position ) {
+        if ( this.position == null ) {
+            this.position = position;
+        }
+        return this;
+    }
+
+
+    public static UUID getConsumerId( String consumer ) {
+        if ( consumer == null ) {
+            return null;
+        }
+        if ( isUUID( consumer ) ) {
+            return UUID.fromString( consumer );
+        }
+        else if ( isNotBlank( consumer ) ) {
+            return nameUUIDFromBytes( ( "consumer:" + consumer ).getBytes() );
+        }
+        return null;
+    }
+
+
+    public boolean isSynchronized() {
+        return _synchronized;
+    }
+
+
+    public void setSynchronized( boolean _synchronized ) {
+        this._synchronized = _synchronized;
+    }
+
+
+    public QueueQuery withSynchronized( boolean _synchronized ) {
+        this._synchronized = _synchronized;
+        return this;
+    }
+
+
+    public boolean isUpdate() {
+        return update;
+    }
+
+
+    public void setUpdate( boolean update ) {
+        this.update = update;
+    }
+
+
+    public QueueQuery withUpdate( boolean update ) {
+        this.update = update;
+        return this;
+    }
+
+
+    /** @return the timeout */
+    public long getTimeout() {
+        return timeout;
+    }
+
+
+    /** @param timeout the timeout to set */
+    public void setTimeout( long timeout ) {
+        this.timeout = timeout;
+        setSynchronized( true );
+    }
+
+
+    public QueueQuery withTimeout( long timeout ) {
+        setTimeout( timeout );
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/QueueResults.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/QueueResults.java b/stack/core/src/main/java/org/apache/usergrid/mq/QueueResults.java
new file mode 100644
index 0000000..c9567c3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/QueueResults.java
@@ -0,0 +1,128 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+
+@XmlRootElement
+public class QueueResults {
+
+    private String path;
+    private UUID queue;
+    private List<Message> messages = new ArrayList<Message>();
+    private UUID last;
+    private UUID consumer;
+
+
+    public QueueResults() {
+
+    }
+
+
+    public QueueResults( Message message ) {
+        if ( message != null ) {
+            messages.add( message );
+        }
+    }
+
+
+    public QueueResults( List<Message> messages ) {
+        if ( messages != null ) {
+            this.messages = messages;
+        }
+    }
+
+
+    public QueueResults( String path, UUID queue, List<Message> messages, UUID last, UUID consumer ) {
+        this.path = path;
+        this.queue = queue;
+        this.messages = messages;
+        this.last = last;
+        this.consumer = consumer;
+    }
+
+
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public String getPath() {
+        return path;
+    }
+
+
+    public void setPath( String path ) {
+        this.path = path;
+    }
+
+
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public UUID getQueue() {
+        return queue;
+    }
+
+
+    public void setQueue( UUID queue ) {
+        this.queue = queue;
+    }
+
+
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public List<Message> getMessages() {
+        return messages;
+    }
+
+
+    public void setMessages( List<Message> messages ) {
+        if ( messages == null ) {
+            messages = new ArrayList<Message>();
+        }
+        this.messages = messages;
+    }
+
+
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public UUID getLast() {
+        return last;
+    }
+
+
+    public void setLast( UUID last ) {
+        this.last = last;
+    }
+
+
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public UUID getConsumer() {
+        return consumer;
+    }
+
+
+    public void setConsumer( UUID consumer ) {
+        this.consumer = consumer;
+    }
+
+
+    public int size() {
+        return messages.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/QueueSet.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/QueueSet.java b/stack/core/src/main/java/org/apache/usergrid/mq/QueueSet.java
new file mode 100644
index 0000000..974b7ed
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/QueueSet.java
@@ -0,0 +1,192 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq;
+
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+
+@XmlRootElement
+public class QueueSet {
+
+    List<QueueInfo> queues = new ArrayList<QueueInfo>();
+    boolean more;
+
+
+    public QueueSet() {
+
+    }
+
+
+    public List<QueueInfo> getQueues() {
+        return queues;
+    }
+
+
+    public void setQueues( List<QueueInfo> queues ) {
+        if ( queues == null ) {
+            queues = new ArrayList<QueueInfo>();
+        }
+        this.queues = queues;
+    }
+
+
+    public QueueSet addQueue( String queuePath, UUID queueId ) {
+        QueueInfo queue = new QueueInfo( queuePath, queueId );
+        queues.add( queue );
+        return this;
+    }
+
+
+    public boolean isMore() {
+        return more;
+    }
+
+
+    public void setMore( boolean more ) {
+        this.more = more;
+    }
+
+
+    public boolean hasMore() {
+        return more;
+    }
+
+
+    public int size() {
+        return queues.size();
+    }
+
+
+    @XmlRootElement
+    public static class QueueInfo {
+
+        String path;
+        UUID uuid;
+
+
+        public QueueInfo() {
+        }
+
+
+        public QueueInfo( String path, UUID uuid ) {
+            this.path = path;
+            this.uuid = uuid;
+        }
+
+
+        public String getPath() {
+            return path;
+        }
+
+
+        public void setPath( String path ) {
+            this.path = path;
+        }
+
+
+        public UUID getUuid() {
+            return uuid;
+        }
+
+
+        public void setUuid( UUID uuid ) {
+            this.uuid = uuid;
+        }
+
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = ( prime * result ) + ( ( path == null ) ? 0 : path.hashCode() );
+            result = ( prime * result ) + ( ( uuid == null ) ? 0 : uuid.hashCode() );
+            return result;
+        }
+
+
+        @Override
+        public boolean equals( Object obj ) {
+            if ( this == obj ) {
+                return true;
+            }
+            if ( obj == null ) {
+                return false;
+            }
+            if ( getClass() != obj.getClass() ) {
+                return false;
+            }
+            QueueInfo other = ( QueueInfo ) obj;
+            if ( path == null ) {
+                if ( other.path != null ) {
+                    return false;
+                }
+            }
+            else if ( !path.equals( other.path ) ) {
+                return false;
+            }
+            if ( uuid == null ) {
+                if ( other.uuid != null ) {
+                    return false;
+                }
+            }
+            else if ( !uuid.equals( other.uuid ) ) {
+                return false;
+            }
+            return true;
+        }
+
+
+        @Override
+        public String toString() {
+            return "QueueInfo [path=" + path + ", uuid=" + uuid + "]";
+        }
+    }
+
+
+    public void setCursorToLastResult() {
+        // TODO Auto-generated method stub
+
+    }
+
+
+    @JsonSerialize(include = Inclusion.NON_NULL)
+    public Object getCursor() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+
+    public void and( QueueSet r ) {
+        Set<QueueInfo> oldSet = new HashSet<QueueInfo>( queues );
+        List<QueueInfo> newList = new ArrayList<QueueInfo>();
+        for ( QueueInfo q : r.getQueues() ) {
+            if ( oldSet.contains( q ) ) {
+                newList.add( q );
+            }
+        }
+        queues = newList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/CassandraMQUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/CassandraMQUtils.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/CassandraMQUtils.java
new file mode 100644
index 0000000..2439597
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/CassandraMQUtils.java
@@ -0,0 +1,277 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.mq.Queue;
+import org.apache.usergrid.mq.QueueQuery;
+import org.apache.usergrid.utils.ConversionUtils;
+import org.apache.usergrid.utils.JsonUtils;
+import org.apache.usergrid.utils.UUIDUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.mutation.Mutator;
+import static me.prettyprint.hector.api.factory.HFactory.createColumn;
+import static org.apache.usergrid.mq.Message.MESSAGE_ID;
+import static org.apache.usergrid.mq.Message.MESSAGE_PROPERTIES;
+import static org.apache.usergrid.mq.Message.MESSAGE_TYPE;
+import static org.apache.usergrid.mq.Queue.QUEUE_NEWEST;
+import static org.apache.usergrid.mq.Queue.QUEUE_OLDEST;
+import static org.apache.usergrid.mq.Queue.QUEUE_PROPERTIES;
+import static org.apache.usergrid.mq.QueuePosition.CONSUMER;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
+import static org.apache.usergrid.utils.ConversionUtils.getLong;
+import static org.apache.usergrid.utils.ConversionUtils.object;
+
+
+public class CassandraMQUtils {
+
+    public static final Logger logger = LoggerFactory.getLogger( CassandraMQUtils.class );
+
+    public static final StringSerializer se = new StringSerializer();
+    public static final ByteBufferSerializer be = new ByteBufferSerializer();
+    public static final UUIDSerializer ue = new UUIDSerializer();
+    public static final BytesArraySerializer bae = new BytesArraySerializer();
+    public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
+    public static final LongSerializer le = new LongSerializer();
+
+    /** Logger for batch operations */
+    private static final Logger batch_logger =
+            LoggerFactory.getLogger( CassandraMQUtils.class.getPackage().getName() + ".BATCH" );
+
+
+    public static void logBatchOperation( String operation, Object columnFamily, Object key, Object columnName,
+                                          Object columnValue, long timestamp ) {
+
+        if ( batch_logger.isInfoEnabled() ) {
+            batch_logger.info( "{} cf={} key={} name={} value={}", new Object[] {
+                    operation, columnFamily, key, columnName, columnValue
+            } );
+        }
+    }
+
+
+    /**
+     * Encode a message into a set of columns. JMS properties are encoded as strings and longs everything else is binary
+     * JSON.
+     */
+    public static Map<ByteBuffer, ByteBuffer> serializeMessage( Message message ) {
+        if ( message == null ) {
+            return null;
+        }
+        Map<ByteBuffer, ByteBuffer> columns = new HashMap<ByteBuffer, ByteBuffer>();
+        for ( Entry<String, Object> property : message.getProperties().entrySet() ) {
+            if ( property.getValue() == null ) {
+                columns.put( bytebuffer( property.getKey() ), null );
+            }
+            else if ( MESSAGE_TYPE.equals( property.getKey() ) || MESSAGE_ID.equals( property.getKey() ) ) {
+                columns.put( bytebuffer( property.getKey() ), bytebuffer( property.getValue() ) );
+            }
+            else {
+                columns.put( bytebuffer( property.getKey() ), JsonUtils.toByteBuffer( property.getValue() ) );
+            }
+        }
+        return columns;
+    }
+
+
+    public static Mutator<ByteBuffer> addMessageToMutator( Mutator<ByteBuffer> m, Message message, long timestamp ) {
+
+        Map<ByteBuffer, ByteBuffer> columns = serializeMessage( message );
+
+        if ( columns == null ) {
+            return m;
+        }
+
+        for ( Map.Entry<ByteBuffer, ByteBuffer> column_entry : columns.entrySet() ) {
+            if ( ( column_entry.getValue() != null ) && column_entry.getValue().hasRemaining() ) {
+                HColumn<ByteBuffer, ByteBuffer> column =
+                        createColumn( column_entry.getKey(), column_entry.getValue(), timestamp, be, be );
+                m.addInsertion( bytebuffer( message.getUuid() ), QueuesCF.MESSAGE_PROPERTIES.toString(), column );
+            }
+            else {
+                m.addDeletion( bytebuffer( message.getUuid() ), QueuesCF.MESSAGE_PROPERTIES.toString(),
+                        column_entry.getKey(), be, timestamp );
+            }
+        }
+
+        return m;
+    }
+
+
+    public static Message deserializeMessage( List<HColumn<String, ByteBuffer>> columns ) {
+        Message message = null;
+
+        Map<String, Object> properties = new HashMap<String, Object>();
+        for ( HColumn<String, ByteBuffer> column : columns ) {
+            if ( MESSAGE_TYPE.equals( column.getName() ) || MESSAGE_ID.equals( column.getName() ) ) {
+                properties.put( column.getName(),
+                        object( MESSAGE_PROPERTIES.get( column.getName() ), column.getValue() ) );
+            }
+            else {
+                properties.put( column.getName(), JsonUtils.fromByteBuffer( column.getValue() ) );
+            }
+        }
+        if ( !properties.isEmpty() ) {
+            message = new Message( properties );
+        }
+
+        return message;
+    }
+
+
+    public static Map<ByteBuffer, ByteBuffer> serializeQueue( Queue queue ) {
+        if ( queue == null ) {
+            return null;
+        }
+        Map<ByteBuffer, ByteBuffer> columns = new HashMap<ByteBuffer, ByteBuffer>();
+        for ( Entry<String, Object> property : queue.getProperties().entrySet() ) {
+            if ( property.getValue() == null ) {
+                continue;
+            }
+            if ( Queue.QUEUE_ID.equals( property.getKey() ) || QUEUE_NEWEST.equals( property.getKey() ) || QUEUE_OLDEST
+                    .equals( property.getKey() ) ) {
+                continue;
+            }
+            if ( QUEUE_PROPERTIES.containsKey( property.getKey() ) ) {
+                columns.put( bytebuffer( property.getKey() ), bytebuffer( property.getValue() ) );
+            }
+            else {
+                columns.put( bytebuffer( property.getKey() ), JsonUtils.toByteBuffer( property.getValue() ) );
+            }
+        }
+        return columns;
+    }
+
+
+    public static Queue deserializeQueue( List<HColumn<String, ByteBuffer>> columns ) {
+        Queue queue = null;
+
+        Map<String, Object> properties = new HashMap<String, Object>();
+        for ( HColumn<String, ByteBuffer> column : columns ) {
+            if ( QUEUE_PROPERTIES.containsKey( column.getName() ) ) {
+                properties
+                        .put( column.getName(), object( QUEUE_PROPERTIES.get( column.getName() ), column.getValue() ) );
+            }
+            else {
+                properties.put( column.getName(), JsonUtils.fromByteBuffer( column.getValue() ) );
+            }
+        }
+        if ( !properties.isEmpty() ) {
+            queue = new Queue( properties );
+        }
+
+        return queue;
+    }
+
+
+    public static Mutator<ByteBuffer> addQueueToMutator( Mutator<ByteBuffer> m, Queue queue, long timestamp ) {
+
+        Map<ByteBuffer, ByteBuffer> columns = serializeQueue( queue );
+
+        if ( columns == null ) {
+            return m;
+        }
+
+        for ( Map.Entry<ByteBuffer, ByteBuffer> column_entry : columns.entrySet() ) {
+            if ( ( column_entry.getValue() != null ) && column_entry.getValue().hasRemaining() ) {
+                HColumn<ByteBuffer, ByteBuffer> column =
+                        createColumn( column_entry.getKey(), column_entry.getValue(), timestamp, be, be );
+                m.addInsertion( bytebuffer( queue.getUuid() ), QueuesCF.QUEUE_PROPERTIES.toString(), column );
+            }
+            else {
+                m.addDeletion( bytebuffer( queue.getUuid() ), QueuesCF.QUEUE_PROPERTIES.toString(),
+                        column_entry.getKey(), be, timestamp );
+            }
+        }
+
+        return m;
+    }
+
+
+    public static ByteBuffer getQueueShardRowKey( UUID uuid, long ts ) {
+        ByteBuffer bytes = ByteBuffer.allocate( 24 );
+        bytes.putLong( uuid.getMostSignificantBits() );
+        bytes.putLong( uuid.getLeastSignificantBits() );
+        bytes.putLong( ts );
+        return ( ByteBuffer ) bytes.rewind();
+    }
+
+
+    /** Get a row key in format of queueId+clientId */
+    public static ByteBuffer getQueueClientTransactionKey( UUID queueId, UUID clientId ) {
+        ByteBuffer bytes = ByteBuffer.allocate( 32 );
+        bytes.putLong( queueId.getMostSignificantBits() );
+        bytes.putLong( queueId.getLeastSignificantBits() );
+        bytes.putLong( clientId.getMostSignificantBits() );
+        bytes.putLong( clientId.getLeastSignificantBits() );
+        return ( ByteBuffer ) bytes.rewind();
+    }
+
+
+    public static UUID getUUIDFromRowKey( ByteBuffer bytes ) {
+        return ConversionUtils.uuid( bytes );
+    }
+
+
+    public static long getLongFromRowKey( ByteBuffer bytes ) {
+        bytes = bytes.slice();
+        return getLong( 16 );
+    }
+
+
+    /** Get the queueId from the path */
+    public static UUID getQueueId( String path ) {
+        String queuePath = Queue.normalizeQueuePath( path );
+        if ( queuePath == null ) {
+            queuePath = "/";
+        }
+
+        logger.info( "QueueManagerFactoryImpl.getFromQueue: {}", queuePath );
+
+        return Queue.getQueueId( queuePath );
+    }
+
+
+    /** Get the consumer Id from the queue id */
+    public static UUID getConsumerId( UUID queueId, QueueQuery query ) {
+        UUID consumerId = queueId;
+
+        if ( query.getPosition() == CONSUMER ) {
+            consumerId = query.getConsumerId();
+            if ( ( consumerId == null ) && ( query.getPosition() == CONSUMER ) ) {
+                consumerId = UUIDUtils.newTimeUUID();
+            }
+        }
+        return consumerId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/MessageIndexUpdate.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/MessageIndexUpdate.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/MessageIndexUpdate.java
new file mode 100644
index 0000000..7184516
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/MessageIndexUpdate.java
@@ -0,0 +1,119 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.usergrid.mq.Message;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.mutation.Mutator;
+import static me.prettyprint.hector.api.factory.HFactory.createColumn;
+import static org.apache.usergrid.mq.Message.MESSAGE_PROPERTIES;
+import static org.apache.usergrid.mq.cassandra.QueueIndexUpdate.indexValueCode;
+import static org.apache.usergrid.mq.cassandra.QueueIndexUpdate.validIndexableValue;
+import static org.apache.usergrid.mq.cassandra.QueueIndexUpdate.validIndexableValueOrJson;
+import static org.apache.usergrid.mq.cassandra.QueueManagerImpl.DICTIONARY_MESSAGE_INDEXES;
+import static org.apache.usergrid.mq.cassandra.QueuesCF.PROPERTY_INDEX;
+import static org.apache.usergrid.mq.cassandra.QueuesCF.QUEUE_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
+import static org.apache.usergrid.utils.IndexUtils.getKeyValueList;
+
+
+public class MessageIndexUpdate {
+
+    public static final boolean FULLTEXT = false;
+
+    final Message message;
+    final Map<String, List<Map.Entry<String, Object>>> propertyEntryList;
+
+    public static final StringSerializer se = new StringSerializer();
+    public static final ByteBufferSerializer be = new ByteBufferSerializer();
+    public static final UUIDSerializer ue = new UUIDSerializer();
+    public static final BytesArraySerializer bae = new BytesArraySerializer();
+    public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
+    public static final LongSerializer le = new LongSerializer();
+
+
+    public MessageIndexUpdate( Message message ) {
+        this.message = message;
+
+        if ( message.isIndexed() ) {
+            propertyEntryList = new HashMap<String, List<Map.Entry<String, Object>>>();
+
+            for ( Map.Entry<String, Object> property : message.getProperties().entrySet() ) {
+
+                if ( !MESSAGE_PROPERTIES.containsKey( property.getKey() ) && validIndexableValueOrJson(
+                        property.getValue() ) ) {
+
+                    List<Map.Entry<String, Object>> list =
+                            getKeyValueList( property.getKey(), property.getValue(), FULLTEXT );
+
+                    propertyEntryList.put( property.getKey(), list );
+                }
+            }
+        }
+        else {
+            propertyEntryList = null;
+        }
+    }
+
+
+    public void addToMutation( Mutator<ByteBuffer> batch, UUID queueId, long shard_ts, long timestamp ) {
+
+        if ( propertyEntryList != null ) {
+            for ( Entry<String, List<Entry<String, Object>>> property : propertyEntryList.entrySet() ) {
+
+                for ( Map.Entry<String, Object> indexEntry : property.getValue() ) {
+
+                    if ( validIndexableValue( indexEntry.getValue() ) ) {
+
+                        batch.addInsertion( bytebuffer( key( queueId, shard_ts, indexEntry.getKey() ) ),
+                                PROPERTY_INDEX.getColumnFamily(), createColumn(
+                                new DynamicComposite( indexValueCode( indexEntry.getValue() ), indexEntry.getValue(),
+                                        message.getUuid() ), ByteBuffer.allocate( 0 ), timestamp, dce, be ) );
+
+                        batch.addInsertion( bytebuffer( key( queueId, DICTIONARY_MESSAGE_INDEXES ) ),
+                                QUEUE_DICTIONARIES.getColumnFamily(),
+                                createColumn( indexEntry.getKey(), ByteBuffer.allocate( 0 ), timestamp, se, be ) );
+                    }
+                }
+
+                batch.addInsertion( bytebuffer( key( queueId, DICTIONARY_MESSAGE_INDEXES ) ),
+                        QUEUE_DICTIONARIES.getColumnFamily(),
+                        createColumn( property.getKey(), ByteBuffer.allocate( 0 ), timestamp, se, be ) );
+            }
+        }
+    }
+
+
+    public Message getMessage() {
+        return message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueIndexUpdate.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueIndexUpdate.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueIndexUpdate.java
new file mode 100644
index 0000000..b71e190
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueIndexUpdate.java
@@ -0,0 +1,347 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra;
+
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.codehaus.jackson.JsonNode;
+
+import com.fasterxml.uuid.UUIDComparator;
+
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.mutation.Mutator;
+import static java.nio.ByteBuffer.wrap;
+import static org.apache.usergrid.utils.JsonUtils.toJsonNode;
+import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
+
+
+public class QueueIndexUpdate {
+
+    public static final byte VALUE_CODE_BYTES = 0;
+    public static final byte VALUE_CODE_UTF8 = 1;
+    public static final byte VALUE_CODE_UUID = 2;
+    public static final byte VALUE_CODE_INT = 3;
+    public static final byte VALUE_CODE_MAX = 127;
+
+    public static int INDEX_STRING_VALUE_LENGTH = 1024;
+
+    private Mutator<ByteBuffer> batch;
+    private String queuePath;
+    private UUID queueId;
+    private String entryName;
+    private Object entryValue;
+    private final List<QueueIndexEntry> prevEntries = new ArrayList<QueueIndexEntry>();
+    private final List<QueueIndexEntry> newEntries = new ArrayList<QueueIndexEntry>();
+    private final Set<String> indexesSet = new LinkedHashSet<String>();
+    private long timestamp;
+    private final UUID timestampUuid;
+
+
+    public QueueIndexUpdate( Mutator<ByteBuffer> batch, String queuePath, UUID queueId, String entryName,
+                             Object entryValue, UUID timestampUuid ) {
+        this.batch = batch;
+        this.queuePath = queuePath;
+        this.queueId = queueId;
+        this.entryName = entryName;
+        this.entryValue = entryValue;
+        timestamp = getTimestampInMicros( timestampUuid );
+        this.timestampUuid = timestampUuid;
+    }
+
+
+    public Mutator<ByteBuffer> getBatch() {
+        return batch;
+    }
+
+
+    public void setBatch( Mutator<ByteBuffer> batch ) {
+        this.batch = batch;
+    }
+
+
+    public String getQueuePath() {
+        return queuePath;
+    }
+
+
+    public void setQueuePath( String queuePath ) {
+        this.queuePath = queuePath;
+    }
+
+
+    public UUID getQueueId() {
+        return queueId;
+    }
+
+
+    public void setQueueId( UUID queueId ) {
+        this.queueId = queueId;
+    }
+
+
+    public String getEntryName() {
+        return entryName;
+    }
+
+
+    public void setEntryName( String entryName ) {
+        this.entryName = entryName;
+    }
+
+
+    public Object getEntryValue() {
+        return entryValue;
+    }
+
+
+    public void setEntryValue( Object entryValue ) {
+        this.entryValue = entryValue;
+    }
+
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+
+    public void setTimestamp( long timestamp ) {
+        this.timestamp = timestamp;
+    }
+
+
+    public UUID getTimestampUuid() {
+        return timestampUuid;
+    }
+
+
+    public List<QueueIndexEntry> getPrevEntries() {
+        return prevEntries;
+    }
+
+
+    public void addPrevEntry( String path, Object value, UUID timestamp ) {
+        QueueIndexEntry entry = new QueueIndexEntry( path, value, timestamp );
+        prevEntries.add( entry );
+    }
+
+
+    public List<QueueIndexEntry> getNewEntries() {
+        return newEntries;
+    }
+
+
+    public void addNewEntry( String path, Object value ) {
+        QueueIndexEntry entry = new QueueIndexEntry( path, value, timestampUuid );
+        newEntries.add( entry );
+    }
+
+
+    public Set<String> getIndexesSet() {
+        return indexesSet;
+    }
+
+
+    public void addIndex( String index ) {
+        indexesSet.add( index );
+    }
+
+
+    public class QueueIndexEntry {
+        private final byte code;
+        private String path;
+        private final Object value;
+        private final UUID timestampUuid;
+
+
+        public QueueIndexEntry( String path, Object value, UUID timestampUuid ) {
+            this.path = path;
+            this.value = value;
+            code = indexValueCode( value );
+            this.timestampUuid = timestampUuid;
+        }
+
+
+        public String getPath() {
+            return path;
+        }
+
+
+        public void setPath( String path ) {
+            this.path = path;
+        }
+
+
+        public Object getValue() {
+            return value;
+        }
+
+
+        public byte getValueCode() {
+            return code;
+        }
+
+
+        public UUID getTimestampUuid() {
+            return timestampUuid;
+        }
+
+
+        public DynamicComposite getIndexComposite() {
+            return new DynamicComposite( code, value, getQueueId(), getQueuePath(), timestampUuid );
+        }
+    }
+
+
+    private static String prepStringForIndex( String str ) {
+        str = str.trim().toLowerCase();
+        str = str.substring( 0, Math.min( INDEX_STRING_VALUE_LENGTH, str.length() ) );
+        return str;
+    }
+
+
+    /**
+     * @param obj
+     * @return
+     */
+    public static Object toIndexableValue( Object obj ) {
+        if ( obj == null ) {
+            return null;
+        }
+
+        if ( obj instanceof String ) {
+            return prepStringForIndex( ( String ) obj );
+        }
+
+        // UUIDs, and BigIntegers map to Cassandra UTF8Type and IntegerType
+        if ( ( obj instanceof UUID ) || ( obj instanceof BigInteger ) ) {
+            return obj;
+        }
+
+        // For any numeric values, turn them into a long
+        // and make them BigIntegers for IntegerType
+        if ( obj instanceof Number ) {
+            return BigInteger.valueOf( ( ( Number ) obj ).longValue() );
+        }
+
+        if ( obj instanceof Boolean ) {
+            return BigInteger.valueOf( ( ( Boolean ) obj ) ? 1L : 0L );
+        }
+
+        if ( obj instanceof Date ) {
+            return BigInteger.valueOf( ( ( Date ) obj ).getTime() );
+        }
+
+        if ( obj instanceof byte[] ) {
+            return wrap( ( byte[] ) obj );
+        }
+
+        if ( obj instanceof ByteBuffer ) {
+            return obj;
+        }
+
+        JsonNode json = toJsonNode( obj );
+        if ( ( json != null ) && json.isValueNode() ) {
+            if ( json.isBigInteger() ) {
+                return json.getBigIntegerValue();
+            }
+            else if ( json.isNumber() || json.isBoolean() ) {
+                return BigInteger.valueOf( json.getValueAsLong() );
+            }
+            else if ( json.isTextual() ) {
+                return prepStringForIndex( json.getTextValue() );
+            }
+            else if ( json.isBinary() ) {
+                try {
+                    return wrap( json.getBinaryValue() );
+                }
+                catch ( IOException e ) {
+                }
+            }
+        }
+
+        return null;
+    }
+
+
+    public static boolean validIndexableValue( Object obj ) {
+        return toIndexableValue( obj ) != null;
+    }
+
+
+    public static boolean validIndexableValueOrJson( Object obj ) {
+        if ( ( obj instanceof Map ) || ( obj instanceof List ) || ( obj instanceof JsonNode ) ) {
+            return true;
+        }
+        return toIndexableValue( obj ) != null;
+    }
+
+
+    public static byte indexValueCode( Object obj ) {
+        obj = toIndexableValue( obj );
+        if ( obj instanceof String ) {
+            return VALUE_CODE_UTF8;
+        }
+        else if ( obj instanceof UUID ) {
+            return VALUE_CODE_UUID;
+        }
+        else if ( obj instanceof BigInteger ) {
+            return VALUE_CODE_INT;
+        }
+        else if ( obj instanceof Number ) {
+            return VALUE_CODE_INT;
+        }
+        else {
+            return VALUE_CODE_BYTES;
+        }
+    }
+
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static int compareIndexedValues( Object o1, Object o2 ) {
+        o1 = toIndexableValue( o1 );
+        o2 = toIndexableValue( o2 );
+        if ( ( o1 == null ) && ( o2 == null ) ) {
+            return 0;
+        }
+        else if ( o1 == null ) {
+            return -1;
+        }
+        else if ( o2 == null ) {
+            return 1;
+        }
+        int c1 = indexValueCode( o1 );
+        int c2 = indexValueCode( o2 );
+        if ( c1 == c2 ) {
+            if ( o1 instanceof UUID ) {
+                UUIDComparator.staticCompare( ( UUID ) o1, ( UUID ) o2 );
+            }
+            else if ( o1 instanceof Comparable ) {
+                return ( ( Comparable ) o1 ).compareTo( o2 );
+            }
+        }
+        return c1 - c2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerFactoryImpl.java
new file mode 100644
index 0000000..e312e68
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerFactoryImpl.java
@@ -0,0 +1,85 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.locking.LockManager;
+import org.apache.usergrid.mq.QueueManager;
+import org.apache.usergrid.mq.QueueManagerFactory;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
+import org.apache.usergrid.persistence.cassandra.CounterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+
+
+public class QueueManagerFactoryImpl implements QueueManagerFactory {
+
+    public static final Logger logger = LoggerFactory.getLogger( QueueManagerFactoryImpl.class );
+
+    public static String IMPLEMENTATION_DESCRIPTION = "Cassandra Queue Manager Factory 1.0";
+
+    private CassandraService cass;
+    private CounterUtils counterUtils;
+    private LockManager lockManager;
+    private int lockTimeout;
+
+    public static final StringSerializer se = new StringSerializer();
+    public static final ByteBufferSerializer be = new ByteBufferSerializer();
+    public static final UUIDSerializer ue = new UUIDSerializer();
+    public static final BytesArraySerializer bae = new BytesArraySerializer();
+    public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
+    public static final LongSerializer le = new LongSerializer();
+
+
+    /**
+     * Must be constructed with a CassandraClientPool.
+     *
+     * @param cass the cassandra client pool
+     * @param counterUtils the CounterUtils
+     */
+    public QueueManagerFactoryImpl( CassandraService cass, CounterUtils counterUtils, LockManager lockManager, int lockTimeout ) {
+        this.cass = cass;
+        this.counterUtils = counterUtils;
+        this.lockManager = lockManager;
+        this.lockTimeout = lockTimeout;
+    }
+
+
+    @Override
+    public String getImpementationDescription() throws Exception {
+        return IMPLEMENTATION_DESCRIPTION;
+    }
+
+
+    @Override
+    public QueueManager getQueueManager( UUID applicationId ) {
+        QueueManagerImpl qm = new QueueManagerImpl();
+        qm.init( cass, counterUtils, lockManager, applicationId, lockTimeout );
+        return qm;
+        //return applicationContext.getAutowireCapableBeanFactory()
+        //		.createBean(QueueManagerImpl.class)
+        //		.init(this, cass, counterUtils, applicationId);
+    }
+}