You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/28 23:21:55 UTC
[45/96] [abbrv] [partial] Change package namespace to
org.apache.usergrid
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/QueueQuery.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/QueueQuery.java b/stack/core/src/main/java/org/apache/usergrid/mq/QueueQuery.java
new file mode 100644
index 0000000..4f43be9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/QueueQuery.java
@@ -0,0 +1,305 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq;
+
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.usergrid.utils.ConversionUtils;
+
+import static java.util.UUID.nameUUIDFromBytes;
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+import static org.apache.usergrid.utils.ConversionUtils.getLong;
+import static org.apache.usergrid.utils.ListUtils.first;
+import static org.apache.usergrid.utils.UUIDUtils.isUUID;
+import static org.apache.usergrid.utils.UUIDUtils.tryGetUUID;
+
+
+public class QueueQuery extends Query {
+
+ UUID consumerId;
+ long lastTimestamp;
+ UUID lastMessageId;
+ QueuePosition position = null;
+ boolean _synchronized;
+ boolean update = true;
+ long timeout;
+
+
+ public QueueQuery() {
+ }
+
+
+ public QueueQuery( Query q ) {
+ super( q );
+ }
+
+
+ public QueueQuery( QueueQuery q ) {
+ super( q );
+ if ( q != null ) {
+ consumerId = q.consumerId;
+ lastTimestamp = q.lastTimestamp;
+ lastMessageId = q.lastMessageId;
+ position = q.position;
+ _synchronized = q._synchronized;
+ update = q.update;
+ }
+ }
+
+
+ public static QueueQuery newQueryIfNull( QueueQuery query ) {
+ if ( query == null ) {
+ query = new QueueQuery();
+ }
+ return query;
+ }
+
+
+ public static QueueQuery fromQL( String ql ) {
+ if ( ql == null ) {
+ return null;
+ }
+ QueueQuery query = null;
+ Query q = Query.fromQL( ql );
+ if ( q != null ) {
+ query = new QueueQuery( q );
+ }
+ return query;
+ }
+
+
+ public static QueueQuery fromQueryParams( Map<String, List<String>> params ) {
+
+ QueueQuery query = null;
+
+ Query q = Query.fromQueryParams( params );
+ if ( q != null ) {
+ query = new QueueQuery( q );
+ }
+
+ String consumer = first( params.get( "consumer" ) );
+
+ if ( consumer != null ) {
+ query = newQueryIfNull( query );
+ query.setConsumerId( getConsumerId( consumer ) );
+ }
+
+ UUID last = tryGetUUID( first( params.get( "last" ) ) );
+ if ( last != null ) {
+ query = newQueryIfNull( query );
+ query.setLastMessageId( last );
+ }
+
+ if ( params.containsKey( "time" ) ) {
+ query = newQueryIfNull( query );
+ long t = getLong( first( params.get( "time" ) ) );
+ if ( t > 0 ) {
+ query.setLastTimestamp( t );
+ }
+ }
+ if ( params.containsKey( "pos" ) ) {
+ query = newQueryIfNull( query );
+ QueuePosition pos = QueuePosition.find( first( params.get( "pos" ) ) );
+ if ( pos != null ) {
+ query.setPosition( pos );
+ }
+ }
+
+ if ( params.containsKey( "update" ) ) {
+ query = newQueryIfNull( query );
+ query.setUpdate( ConversionUtils.getBoolean( first( params.get( "update" ) ) ) );
+ }
+
+ if ( params.containsKey( "synchronized" ) ) {
+ query = newQueryIfNull( query );
+ query.setSynchronized( ConversionUtils.getBoolean( first( params.get( "synchronized" ) ) ) );
+ }
+
+ if ( params.containsKey( "timeout" ) ) {
+ query = newQueryIfNull( query );
+ query.setTimeout( ConversionUtils.getLong( first( params.get( "timeout" ) ) ) );
+ }
+
+ if ( ( query != null ) && ( consumer != null ) ) {
+ query.setPositionIfUnset( QueuePosition.CONSUMER );
+ }
+
+ return query;
+ }
+
+
+ public UUID getConsumerId() {
+ return consumerId;
+ }
+
+
+ public void setConsumerId( UUID consumerId ) {
+ this.consumerId = consumerId;
+ }
+
+
+ public QueueQuery withConsumerId( UUID consumerId ) {
+ this.consumerId = consumerId;
+ setPositionIfUnset( QueuePosition.CONSUMER );
+ return this;
+ }
+
+
+ public QueueQuery withConsumer( String consumer ) {
+ consumerId = getConsumerId( consumer );
+ setPositionIfUnset( QueuePosition.CONSUMER );
+ return this;
+ }
+
+
+ public long getLastTimestamp() {
+ return lastTimestamp;
+ }
+
+
+ public void setLastTimestamp( long lastTimestamp ) {
+ this.lastTimestamp = lastTimestamp;
+ }
+
+
+ public QueueQuery withLastTimestamp( long lastTimestamp ) {
+ this.lastTimestamp = lastTimestamp;
+ return this;
+ }
+
+
+ public UUID getLastMessageId() {
+ return lastMessageId;
+ }
+
+
+ public void setLastMessageId( UUID lastMessageId ) {
+ this.lastMessageId = lastMessageId;
+ }
+
+
+ public QueueQuery withLastMessageId( UUID lastMessageId ) {
+ this.lastMessageId = lastMessageId;
+ return this;
+ }
+
+
+ public QueuePosition getPosition() {
+ if ( position != null ) {
+ return position;
+ }
+ return QueuePosition.LAST;
+ }
+
+
+ public boolean isPositionSet() {
+ return position != null;
+ }
+
+
+ public void setPosition( QueuePosition position ) {
+ this.position = position;
+ }
+
+
+ public void setPositionIfUnset( QueuePosition position ) {
+ if ( this.position == null ) {
+ this.position = position;
+ }
+ }
+
+
+ public QueueQuery withPosition( QueuePosition position ) {
+ this.position = position;
+ return this;
+ }
+
+
+ public QueueQuery withPositionInUnset( QueuePosition position ) {
+ if ( this.position == null ) {
+ this.position = position;
+ }
+ return this;
+ }
+
+
+ public static UUID getConsumerId( String consumer ) {
+ if ( consumer == null ) {
+ return null;
+ }
+ if ( isUUID( consumer ) ) {
+ return UUID.fromString( consumer );
+ }
+ else if ( isNotBlank( consumer ) ) {
+ return nameUUIDFromBytes( ( "consumer:" + consumer ).getBytes() );
+ }
+ return null;
+ }
+
+
+ public boolean isSynchronized() {
+ return _synchronized;
+ }
+
+
+ public void setSynchronized( boolean _synchronized ) {
+ this._synchronized = _synchronized;
+ }
+
+
+ public QueueQuery withSynchronized( boolean _synchronized ) {
+ this._synchronized = _synchronized;
+ return this;
+ }
+
+
+ public boolean isUpdate() {
+ return update;
+ }
+
+
+ public void setUpdate( boolean update ) {
+ this.update = update;
+ }
+
+
+ public QueueQuery withUpdate( boolean update ) {
+ this.update = update;
+ return this;
+ }
+
+
+ /** @return the timeout */
+ public long getTimeout() {
+ return timeout;
+ }
+
+
+ /** @param timeout the timeout to set */
+ public void setTimeout( long timeout ) {
+ this.timeout = timeout;
+ setSynchronized( true );
+ }
+
+
+ public QueueQuery withTimeout( long timeout ) {
+ setTimeout( timeout );
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/QueueResults.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/QueueResults.java b/stack/core/src/main/java/org/apache/usergrid/mq/QueueResults.java
new file mode 100644
index 0000000..c9567c3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/QueueResults.java
@@ -0,0 +1,128 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+
+@XmlRootElement
+public class QueueResults {
+
+ private String path;
+ private UUID queue;
+ private List<Message> messages = new ArrayList<Message>();
+ private UUID last;
+ private UUID consumer;
+
+
+ public QueueResults() {
+
+ }
+
+
+ public QueueResults( Message message ) {
+ if ( message != null ) {
+ messages.add( message );
+ }
+ }
+
+
+ public QueueResults( List<Message> messages ) {
+ if ( messages != null ) {
+ this.messages = messages;
+ }
+ }
+
+
+ public QueueResults( String path, UUID queue, List<Message> messages, UUID last, UUID consumer ) {
+ this.path = path;
+ this.queue = queue;
+ this.messages = messages;
+ this.last = last;
+ this.consumer = consumer;
+ }
+
+
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public String getPath() {
+ return path;
+ }
+
+
+ public void setPath( String path ) {
+ this.path = path;
+ }
+
+
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public UUID getQueue() {
+ return queue;
+ }
+
+
+ public void setQueue( UUID queue ) {
+ this.queue = queue;
+ }
+
+
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public List<Message> getMessages() {
+ return messages;
+ }
+
+
+ public void setMessages( List<Message> messages ) {
+ if ( messages == null ) {
+ messages = new ArrayList<Message>();
+ }
+ this.messages = messages;
+ }
+
+
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public UUID getLast() {
+ return last;
+ }
+
+
+ public void setLast( UUID last ) {
+ this.last = last;
+ }
+
+
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public UUID getConsumer() {
+ return consumer;
+ }
+
+
+ public void setConsumer( UUID consumer ) {
+ this.consumer = consumer;
+ }
+
+
+ public int size() {
+ return messages.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/QueueSet.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/QueueSet.java b/stack/core/src/main/java/org/apache/usergrid/mq/QueueSet.java
new file mode 100644
index 0000000..974b7ed
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/QueueSet.java
@@ -0,0 +1,192 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq;
+
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+
+@XmlRootElement
+public class QueueSet {
+
+ List<QueueInfo> queues = new ArrayList<QueueInfo>();
+ boolean more;
+
+
+ public QueueSet() {
+
+ }
+
+
+ public List<QueueInfo> getQueues() {
+ return queues;
+ }
+
+
+ public void setQueues( List<QueueInfo> queues ) {
+ if ( queues == null ) {
+ queues = new ArrayList<QueueInfo>();
+ }
+ this.queues = queues;
+ }
+
+
+ public QueueSet addQueue( String queuePath, UUID queueId ) {
+ QueueInfo queue = new QueueInfo( queuePath, queueId );
+ queues.add( queue );
+ return this;
+ }
+
+
+ public boolean isMore() {
+ return more;
+ }
+
+
+ public void setMore( boolean more ) {
+ this.more = more;
+ }
+
+
+ public boolean hasMore() {
+ return more;
+ }
+
+
+ public int size() {
+ return queues.size();
+ }
+
+
+ @XmlRootElement
+ public static class QueueInfo {
+
+ String path;
+ UUID uuid;
+
+
+ public QueueInfo() {
+ }
+
+
+ public QueueInfo( String path, UUID uuid ) {
+ this.path = path;
+ this.uuid = uuid;
+ }
+
+
+ public String getPath() {
+ return path;
+ }
+
+
+ public void setPath( String path ) {
+ this.path = path;
+ }
+
+
+ public UUID getUuid() {
+ return uuid;
+ }
+
+
+ public void setUuid( UUID uuid ) {
+ this.uuid = uuid;
+ }
+
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = ( prime * result ) + ( ( path == null ) ? 0 : path.hashCode() );
+ result = ( prime * result ) + ( ( uuid == null ) ? 0 : uuid.hashCode() );
+ return result;
+ }
+
+
+ @Override
+ public boolean equals( Object obj ) {
+ if ( this == obj ) {
+ return true;
+ }
+ if ( obj == null ) {
+ return false;
+ }
+ if ( getClass() != obj.getClass() ) {
+ return false;
+ }
+ QueueInfo other = ( QueueInfo ) obj;
+ if ( path == null ) {
+ if ( other.path != null ) {
+ return false;
+ }
+ }
+ else if ( !path.equals( other.path ) ) {
+ return false;
+ }
+ if ( uuid == null ) {
+ if ( other.uuid != null ) {
+ return false;
+ }
+ }
+ else if ( !uuid.equals( other.uuid ) ) {
+ return false;
+ }
+ return true;
+ }
+
+
+ @Override
+ public String toString() {
+ return "QueueInfo [path=" + path + ", uuid=" + uuid + "]";
+ }
+ }
+
+
+ public void setCursorToLastResult() {
+ // TODO Auto-generated method stub
+
+ }
+
+
+ @JsonSerialize(include = Inclusion.NON_NULL)
+ public Object getCursor() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+
+ public void and( QueueSet r ) {
+ Set<QueueInfo> oldSet = new HashSet<QueueInfo>( queues );
+ List<QueueInfo> newList = new ArrayList<QueueInfo>();
+ for ( QueueInfo q : r.getQueues() ) {
+ if ( oldSet.contains( q ) ) {
+ newList.add( q );
+ }
+ }
+ queues = newList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/CassandraMQUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/CassandraMQUtils.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/CassandraMQUtils.java
new file mode 100644
index 0000000..2439597
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/CassandraMQUtils.java
@@ -0,0 +1,277 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.mq.Queue;
+import org.apache.usergrid.mq.QueueQuery;
+import org.apache.usergrid.utils.ConversionUtils;
+import org.apache.usergrid.utils.JsonUtils;
+import org.apache.usergrid.utils.UUIDUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.mutation.Mutator;
+import static me.prettyprint.hector.api.factory.HFactory.createColumn;
+import static org.apache.usergrid.mq.Message.MESSAGE_ID;
+import static org.apache.usergrid.mq.Message.MESSAGE_PROPERTIES;
+import static org.apache.usergrid.mq.Message.MESSAGE_TYPE;
+import static org.apache.usergrid.mq.Queue.QUEUE_NEWEST;
+import static org.apache.usergrid.mq.Queue.QUEUE_OLDEST;
+import static org.apache.usergrid.mq.Queue.QUEUE_PROPERTIES;
+import static org.apache.usergrid.mq.QueuePosition.CONSUMER;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
+import static org.apache.usergrid.utils.ConversionUtils.getLong;
+import static org.apache.usergrid.utils.ConversionUtils.object;
+
+
+public class CassandraMQUtils {
+
+ public static final Logger logger = LoggerFactory.getLogger( CassandraMQUtils.class );
+
+ public static final StringSerializer se = new StringSerializer();
+ public static final ByteBufferSerializer be = new ByteBufferSerializer();
+ public static final UUIDSerializer ue = new UUIDSerializer();
+ public static final BytesArraySerializer bae = new BytesArraySerializer();
+ public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
+ public static final LongSerializer le = new LongSerializer();
+
+ /** Logger for batch operations */
+ private static final Logger batch_logger =
+ LoggerFactory.getLogger( CassandraMQUtils.class.getPackage().getName() + ".BATCH" );
+
+
+ public static void logBatchOperation( String operation, Object columnFamily, Object key, Object columnName,
+ Object columnValue, long timestamp ) {
+
+ if ( batch_logger.isInfoEnabled() ) {
+ batch_logger.info( "{} cf={} key={} name={} value={}", new Object[] {
+ operation, columnFamily, key, columnName, columnValue
+ } );
+ }
+ }
+
+
+ /**
+ * Encode a message into a set of columns. JMS properties are encoded as strings and longs everything else is binary
+ * JSON.
+ */
+ public static Map<ByteBuffer, ByteBuffer> serializeMessage( Message message ) {
+ if ( message == null ) {
+ return null;
+ }
+ Map<ByteBuffer, ByteBuffer> columns = new HashMap<ByteBuffer, ByteBuffer>();
+ for ( Entry<String, Object> property : message.getProperties().entrySet() ) {
+ if ( property.getValue() == null ) {
+ columns.put( bytebuffer( property.getKey() ), null );
+ }
+ else if ( MESSAGE_TYPE.equals( property.getKey() ) || MESSAGE_ID.equals( property.getKey() ) ) {
+ columns.put( bytebuffer( property.getKey() ), bytebuffer( property.getValue() ) );
+ }
+ else {
+ columns.put( bytebuffer( property.getKey() ), JsonUtils.toByteBuffer( property.getValue() ) );
+ }
+ }
+ return columns;
+ }
+
+
+ public static Mutator<ByteBuffer> addMessageToMutator( Mutator<ByteBuffer> m, Message message, long timestamp ) {
+
+ Map<ByteBuffer, ByteBuffer> columns = serializeMessage( message );
+
+ if ( columns == null ) {
+ return m;
+ }
+
+ for ( Map.Entry<ByteBuffer, ByteBuffer> column_entry : columns.entrySet() ) {
+ if ( ( column_entry.getValue() != null ) && column_entry.getValue().hasRemaining() ) {
+ HColumn<ByteBuffer, ByteBuffer> column =
+ createColumn( column_entry.getKey(), column_entry.getValue(), timestamp, be, be );
+ m.addInsertion( bytebuffer( message.getUuid() ), QueuesCF.MESSAGE_PROPERTIES.toString(), column );
+ }
+ else {
+ m.addDeletion( bytebuffer( message.getUuid() ), QueuesCF.MESSAGE_PROPERTIES.toString(),
+ column_entry.getKey(), be, timestamp );
+ }
+ }
+
+ return m;
+ }
+
+
+ public static Message deserializeMessage( List<HColumn<String, ByteBuffer>> columns ) {
+ Message message = null;
+
+ Map<String, Object> properties = new HashMap<String, Object>();
+ for ( HColumn<String, ByteBuffer> column : columns ) {
+ if ( MESSAGE_TYPE.equals( column.getName() ) || MESSAGE_ID.equals( column.getName() ) ) {
+ properties.put( column.getName(),
+ object( MESSAGE_PROPERTIES.get( column.getName() ), column.getValue() ) );
+ }
+ else {
+ properties.put( column.getName(), JsonUtils.fromByteBuffer( column.getValue() ) );
+ }
+ }
+ if ( !properties.isEmpty() ) {
+ message = new Message( properties );
+ }
+
+ return message;
+ }
+
+
+ public static Map<ByteBuffer, ByteBuffer> serializeQueue( Queue queue ) {
+ if ( queue == null ) {
+ return null;
+ }
+ Map<ByteBuffer, ByteBuffer> columns = new HashMap<ByteBuffer, ByteBuffer>();
+ for ( Entry<String, Object> property : queue.getProperties().entrySet() ) {
+ if ( property.getValue() == null ) {
+ continue;
+ }
+ if ( Queue.QUEUE_ID.equals( property.getKey() ) || QUEUE_NEWEST.equals( property.getKey() ) || QUEUE_OLDEST
+ .equals( property.getKey() ) ) {
+ continue;
+ }
+ if ( QUEUE_PROPERTIES.containsKey( property.getKey() ) ) {
+ columns.put( bytebuffer( property.getKey() ), bytebuffer( property.getValue() ) );
+ }
+ else {
+ columns.put( bytebuffer( property.getKey() ), JsonUtils.toByteBuffer( property.getValue() ) );
+ }
+ }
+ return columns;
+ }
+
+
+ public static Queue deserializeQueue( List<HColumn<String, ByteBuffer>> columns ) {
+ Queue queue = null;
+
+ Map<String, Object> properties = new HashMap<String, Object>();
+ for ( HColumn<String, ByteBuffer> column : columns ) {
+ if ( QUEUE_PROPERTIES.containsKey( column.getName() ) ) {
+ properties
+ .put( column.getName(), object( QUEUE_PROPERTIES.get( column.getName() ), column.getValue() ) );
+ }
+ else {
+ properties.put( column.getName(), JsonUtils.fromByteBuffer( column.getValue() ) );
+ }
+ }
+ if ( !properties.isEmpty() ) {
+ queue = new Queue( properties );
+ }
+
+ return queue;
+ }
+
+
+ public static Mutator<ByteBuffer> addQueueToMutator( Mutator<ByteBuffer> m, Queue queue, long timestamp ) {
+
+ Map<ByteBuffer, ByteBuffer> columns = serializeQueue( queue );
+
+ if ( columns == null ) {
+ return m;
+ }
+
+ for ( Map.Entry<ByteBuffer, ByteBuffer> column_entry : columns.entrySet() ) {
+ if ( ( column_entry.getValue() != null ) && column_entry.getValue().hasRemaining() ) {
+ HColumn<ByteBuffer, ByteBuffer> column =
+ createColumn( column_entry.getKey(), column_entry.getValue(), timestamp, be, be );
+ m.addInsertion( bytebuffer( queue.getUuid() ), QueuesCF.QUEUE_PROPERTIES.toString(), column );
+ }
+ else {
+ m.addDeletion( bytebuffer( queue.getUuid() ), QueuesCF.QUEUE_PROPERTIES.toString(),
+ column_entry.getKey(), be, timestamp );
+ }
+ }
+
+ return m;
+ }
+
+
+ public static ByteBuffer getQueueShardRowKey( UUID uuid, long ts ) {
+ ByteBuffer bytes = ByteBuffer.allocate( 24 );
+ bytes.putLong( uuid.getMostSignificantBits() );
+ bytes.putLong( uuid.getLeastSignificantBits() );
+ bytes.putLong( ts );
+ return ( ByteBuffer ) bytes.rewind();
+ }
+
+
+ /** Get a row key in format of queueId+clientId */
+ public static ByteBuffer getQueueClientTransactionKey( UUID queueId, UUID clientId ) {
+ ByteBuffer bytes = ByteBuffer.allocate( 32 );
+ bytes.putLong( queueId.getMostSignificantBits() );
+ bytes.putLong( queueId.getLeastSignificantBits() );
+ bytes.putLong( clientId.getMostSignificantBits() );
+ bytes.putLong( clientId.getLeastSignificantBits() );
+ return ( ByteBuffer ) bytes.rewind();
+ }
+
+
+ public static UUID getUUIDFromRowKey( ByteBuffer bytes ) {
+ return ConversionUtils.uuid( bytes );
+ }
+
+
+ public static long getLongFromRowKey( ByteBuffer bytes ) {
+ bytes = bytes.slice();
+ return getLong( 16 );
+ }
+
+
+ /** Get the queueId from the path */
+ public static UUID getQueueId( String path ) {
+ String queuePath = Queue.normalizeQueuePath( path );
+ if ( queuePath == null ) {
+ queuePath = "/";
+ }
+
+ logger.info( "QueueManagerFactoryImpl.getFromQueue: {}", queuePath );
+
+ return Queue.getQueueId( queuePath );
+ }
+
+
+ /** Get the consumer Id from the queue id */
+ public static UUID getConsumerId( UUID queueId, QueueQuery query ) {
+ UUID consumerId = queueId;
+
+ if ( query.getPosition() == CONSUMER ) {
+ consumerId = query.getConsumerId();
+ if ( ( consumerId == null ) && ( query.getPosition() == CONSUMER ) ) {
+ consumerId = UUIDUtils.newTimeUUID();
+ }
+ }
+ return consumerId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/MessageIndexUpdate.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/MessageIndexUpdate.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/MessageIndexUpdate.java
new file mode 100644
index 0000000..7184516
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/MessageIndexUpdate.java
@@ -0,0 +1,119 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.usergrid.mq.Message;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.mutation.Mutator;
+import static me.prettyprint.hector.api.factory.HFactory.createColumn;
+import static org.apache.usergrid.mq.Message.MESSAGE_PROPERTIES;
+import static org.apache.usergrid.mq.cassandra.QueueIndexUpdate.indexValueCode;
+import static org.apache.usergrid.mq.cassandra.QueueIndexUpdate.validIndexableValue;
+import static org.apache.usergrid.mq.cassandra.QueueIndexUpdate.validIndexableValueOrJson;
+import static org.apache.usergrid.mq.cassandra.QueueManagerImpl.DICTIONARY_MESSAGE_INDEXES;
+import static org.apache.usergrid.mq.cassandra.QueuesCF.PROPERTY_INDEX;
+import static org.apache.usergrid.mq.cassandra.QueuesCF.QUEUE_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
+import static org.apache.usergrid.utils.IndexUtils.getKeyValueList;
+
+
+public class MessageIndexUpdate {
+
+ public static final boolean FULLTEXT = false;
+
+ final Message message;
+ final Map<String, List<Map.Entry<String, Object>>> propertyEntryList;
+
+ public static final StringSerializer se = new StringSerializer();
+ public static final ByteBufferSerializer be = new ByteBufferSerializer();
+ public static final UUIDSerializer ue = new UUIDSerializer();
+ public static final BytesArraySerializer bae = new BytesArraySerializer();
+ public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
+ public static final LongSerializer le = new LongSerializer();
+
+
+ public MessageIndexUpdate( Message message ) {
+ this.message = message;
+
+ if ( message.isIndexed() ) {
+ propertyEntryList = new HashMap<String, List<Map.Entry<String, Object>>>();
+
+ for ( Map.Entry<String, Object> property : message.getProperties().entrySet() ) {
+
+ if ( !MESSAGE_PROPERTIES.containsKey( property.getKey() ) && validIndexableValueOrJson(
+ property.getValue() ) ) {
+
+ List<Map.Entry<String, Object>> list =
+ getKeyValueList( property.getKey(), property.getValue(), FULLTEXT );
+
+ propertyEntryList.put( property.getKey(), list );
+ }
+ }
+ }
+ else {
+ propertyEntryList = null;
+ }
+ }
+
+
+ public void addToMutation( Mutator<ByteBuffer> batch, UUID queueId, long shard_ts, long timestamp ) {
+
+ if ( propertyEntryList != null ) {
+ for ( Entry<String, List<Entry<String, Object>>> property : propertyEntryList.entrySet() ) {
+
+ for ( Map.Entry<String, Object> indexEntry : property.getValue() ) {
+
+ if ( validIndexableValue( indexEntry.getValue() ) ) {
+
+ batch.addInsertion( bytebuffer( key( queueId, shard_ts, indexEntry.getKey() ) ),
+ PROPERTY_INDEX.getColumnFamily(), createColumn(
+ new DynamicComposite( indexValueCode( indexEntry.getValue() ), indexEntry.getValue(),
+ message.getUuid() ), ByteBuffer.allocate( 0 ), timestamp, dce, be ) );
+
+ batch.addInsertion( bytebuffer( key( queueId, DICTIONARY_MESSAGE_INDEXES ) ),
+ QUEUE_DICTIONARIES.getColumnFamily(),
+ createColumn( indexEntry.getKey(), ByteBuffer.allocate( 0 ), timestamp, se, be ) );
+ }
+ }
+
+ batch.addInsertion( bytebuffer( key( queueId, DICTIONARY_MESSAGE_INDEXES ) ),
+ QUEUE_DICTIONARIES.getColumnFamily(),
+ createColumn( property.getKey(), ByteBuffer.allocate( 0 ), timestamp, se, be ) );
+ }
+ }
+ }
+
+
+ public Message getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueIndexUpdate.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueIndexUpdate.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueIndexUpdate.java
new file mode 100644
index 0000000..b71e190
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueIndexUpdate.java
@@ -0,0 +1,347 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra;
+
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.codehaus.jackson.JsonNode;
+
+import com.fasterxml.uuid.UUIDComparator;
+
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.mutation.Mutator;
+import static java.nio.ByteBuffer.wrap;
+import static org.apache.usergrid.utils.JsonUtils.toJsonNode;
+import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
+
+
+public class QueueIndexUpdate {
+
+ public static final byte VALUE_CODE_BYTES = 0;
+ public static final byte VALUE_CODE_UTF8 = 1;
+ public static final byte VALUE_CODE_UUID = 2;
+ public static final byte VALUE_CODE_INT = 3;
+ public static final byte VALUE_CODE_MAX = 127;
+
+ public static int INDEX_STRING_VALUE_LENGTH = 1024;
+
+ private Mutator<ByteBuffer> batch;
+ private String queuePath;
+ private UUID queueId;
+ private String entryName;
+ private Object entryValue;
+ private final List<QueueIndexEntry> prevEntries = new ArrayList<QueueIndexEntry>();
+ private final List<QueueIndexEntry> newEntries = new ArrayList<QueueIndexEntry>();
+ private final Set<String> indexesSet = new LinkedHashSet<String>();
+ private long timestamp;
+ private final UUID timestampUuid;
+
+
+ public QueueIndexUpdate( Mutator<ByteBuffer> batch, String queuePath, UUID queueId, String entryName,
+ Object entryValue, UUID timestampUuid ) {
+ this.batch = batch;
+ this.queuePath = queuePath;
+ this.queueId = queueId;
+ this.entryName = entryName;
+ this.entryValue = entryValue;
+ timestamp = getTimestampInMicros( timestampUuid );
+ this.timestampUuid = timestampUuid;
+ }
+
+
+ public Mutator<ByteBuffer> getBatch() {
+ return batch;
+ }
+
+
+ public void setBatch( Mutator<ByteBuffer> batch ) {
+ this.batch = batch;
+ }
+
+
+ public String getQueuePath() {
+ return queuePath;
+ }
+
+
+ public void setQueuePath( String queuePath ) {
+ this.queuePath = queuePath;
+ }
+
+
+ public UUID getQueueId() {
+ return queueId;
+ }
+
+
+ public void setQueueId( UUID queueId ) {
+ this.queueId = queueId;
+ }
+
+
+ public String getEntryName() {
+ return entryName;
+ }
+
+
+ public void setEntryName( String entryName ) {
+ this.entryName = entryName;
+ }
+
+
+ public Object getEntryValue() {
+ return entryValue;
+ }
+
+
+ public void setEntryValue( Object entryValue ) {
+ this.entryValue = entryValue;
+ }
+
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+
+ public void setTimestamp( long timestamp ) {
+ this.timestamp = timestamp;
+ }
+
+
+ public UUID getTimestampUuid() {
+ return timestampUuid;
+ }
+
+
+ public List<QueueIndexEntry> getPrevEntries() {
+ return prevEntries;
+ }
+
+
+ public void addPrevEntry( String path, Object value, UUID timestamp ) {
+ QueueIndexEntry entry = new QueueIndexEntry( path, value, timestamp );
+ prevEntries.add( entry );
+ }
+
+
+ public List<QueueIndexEntry> getNewEntries() {
+ return newEntries;
+ }
+
+
+ public void addNewEntry( String path, Object value ) {
+ QueueIndexEntry entry = new QueueIndexEntry( path, value, timestampUuid );
+ newEntries.add( entry );
+ }
+
+
+ public Set<String> getIndexesSet() {
+ return indexesSet;
+ }
+
+
+ public void addIndex( String index ) {
+ indexesSet.add( index );
+ }
+
+
+ public class QueueIndexEntry {
+ private final byte code;
+ private String path;
+ private final Object value;
+ private final UUID timestampUuid;
+
+
+ public QueueIndexEntry( String path, Object value, UUID timestampUuid ) {
+ this.path = path;
+ this.value = value;
+ code = indexValueCode( value );
+ this.timestampUuid = timestampUuid;
+ }
+
+
+ public String getPath() {
+ return path;
+ }
+
+
+ public void setPath( String path ) {
+ this.path = path;
+ }
+
+
+ public Object getValue() {
+ return value;
+ }
+
+
+ public byte getValueCode() {
+ return code;
+ }
+
+
+ public UUID getTimestampUuid() {
+ return timestampUuid;
+ }
+
+
+ public DynamicComposite getIndexComposite() {
+ return new DynamicComposite( code, value, getQueueId(), getQueuePath(), timestampUuid );
+ }
+ }
+
+
+ private static String prepStringForIndex( String str ) {
+ str = str.trim().toLowerCase();
+ str = str.substring( 0, Math.min( INDEX_STRING_VALUE_LENGTH, str.length() ) );
+ return str;
+ }
+
+
+ /**
+ * @param obj
+ * @return
+ */
+ public static Object toIndexableValue( Object obj ) {
+ if ( obj == null ) {
+ return null;
+ }
+
+ if ( obj instanceof String ) {
+ return prepStringForIndex( ( String ) obj );
+ }
+
+ // UUIDs, and BigIntegers map to Cassandra UTF8Type and IntegerType
+ if ( ( obj instanceof UUID ) || ( obj instanceof BigInteger ) ) {
+ return obj;
+ }
+
+ // For any numeric values, turn them into a long
+ // and make them BigIntegers for IntegerType
+ if ( obj instanceof Number ) {
+ return BigInteger.valueOf( ( ( Number ) obj ).longValue() );
+ }
+
+ if ( obj instanceof Boolean ) {
+ return BigInteger.valueOf( ( ( Boolean ) obj ) ? 1L : 0L );
+ }
+
+ if ( obj instanceof Date ) {
+ return BigInteger.valueOf( ( ( Date ) obj ).getTime() );
+ }
+
+ if ( obj instanceof byte[] ) {
+ return wrap( ( byte[] ) obj );
+ }
+
+ if ( obj instanceof ByteBuffer ) {
+ return obj;
+ }
+
+ JsonNode json = toJsonNode( obj );
+ if ( ( json != null ) && json.isValueNode() ) {
+ if ( json.isBigInteger() ) {
+ return json.getBigIntegerValue();
+ }
+ else if ( json.isNumber() || json.isBoolean() ) {
+ return BigInteger.valueOf( json.getValueAsLong() );
+ }
+ else if ( json.isTextual() ) {
+ return prepStringForIndex( json.getTextValue() );
+ }
+ else if ( json.isBinary() ) {
+ try {
+ return wrap( json.getBinaryValue() );
+ }
+ catch ( IOException e ) {
+ }
+ }
+ }
+
+ return null;
+ }
+
+
+ public static boolean validIndexableValue( Object obj ) {
+ return toIndexableValue( obj ) != null;
+ }
+
+
+ public static boolean validIndexableValueOrJson( Object obj ) {
+ if ( ( obj instanceof Map ) || ( obj instanceof List ) || ( obj instanceof JsonNode ) ) {
+ return true;
+ }
+ return toIndexableValue( obj ) != null;
+ }
+
+
+ public static byte indexValueCode( Object obj ) {
+ obj = toIndexableValue( obj );
+ if ( obj instanceof String ) {
+ return VALUE_CODE_UTF8;
+ }
+ else if ( obj instanceof UUID ) {
+ return VALUE_CODE_UUID;
+ }
+ else if ( obj instanceof BigInteger ) {
+ return VALUE_CODE_INT;
+ }
+ else if ( obj instanceof Number ) {
+ return VALUE_CODE_INT;
+ }
+ else {
+ return VALUE_CODE_BYTES;
+ }
+ }
+
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static int compareIndexedValues( Object o1, Object o2 ) {
+ o1 = toIndexableValue( o1 );
+ o2 = toIndexableValue( o2 );
+ if ( ( o1 == null ) && ( o2 == null ) ) {
+ return 0;
+ }
+ else if ( o1 == null ) {
+ return -1;
+ }
+ else if ( o2 == null ) {
+ return 1;
+ }
+ int c1 = indexValueCode( o1 );
+ int c2 = indexValueCode( o2 );
+ if ( c1 == c2 ) {
+ if ( o1 instanceof UUID ) {
+ UUIDComparator.staticCompare( ( UUID ) o1, ( UUID ) o2 );
+ }
+ else if ( o1 instanceof Comparable ) {
+ return ( ( Comparable ) o1 ).compareTo( o2 );
+ }
+ }
+ return c1 - c2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerFactoryImpl.java
new file mode 100644
index 0000000..e312e68
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerFactoryImpl.java
@@ -0,0 +1,85 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.mq.cassandra;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.locking.LockManager;
+import org.apache.usergrid.mq.QueueManager;
+import org.apache.usergrid.mq.QueueManagerFactory;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
+import org.apache.usergrid.persistence.cassandra.CounterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+
+
+public class QueueManagerFactoryImpl implements QueueManagerFactory {
+
+ public static final Logger logger = LoggerFactory.getLogger( QueueManagerFactoryImpl.class );
+
+ public static String IMPLEMENTATION_DESCRIPTION = "Cassandra Queue Manager Factory 1.0";
+
+ private CassandraService cass;
+ private CounterUtils counterUtils;
+ private LockManager lockManager;
+ private int lockTimeout;
+
+ public static final StringSerializer se = new StringSerializer();
+ public static final ByteBufferSerializer be = new ByteBufferSerializer();
+ public static final UUIDSerializer ue = new UUIDSerializer();
+ public static final BytesArraySerializer bae = new BytesArraySerializer();
+ public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
+ public static final LongSerializer le = new LongSerializer();
+
+
+ /**
+ * Must be constructed with a CassandraClientPool.
+ *
+ * @param cass the cassandra client pool
+ * @param counterUtils the CounterUtils
+ */
+ public QueueManagerFactoryImpl( CassandraService cass, CounterUtils counterUtils, LockManager lockManager, int lockTimeout ) {
+ this.cass = cass;
+ this.counterUtils = counterUtils;
+ this.lockManager = lockManager;
+ this.lockTimeout = lockTimeout;
+ }
+
+
+ @Override
+ public String getImpementationDescription() throws Exception {
+ return IMPLEMENTATION_DESCRIPTION;
+ }
+
+
+ @Override
+ public QueueManager getQueueManager( UUID applicationId ) {
+ QueueManagerImpl qm = new QueueManagerImpl();
+ qm.init( cass, counterUtils, lockManager, applicationId, lockTimeout );
+ return qm;
+ //return applicationContext.getAutowireCapableBeanFactory()
+ // .createBean(QueueManagerImpl.class)
+ // .init(this, cass, counterUtils, applicationId);
+ }
+}