You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/28 23:21:48 UTC

[38/96] [abbrv] [partial] Change package namespace to org.apache.usergrid

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectionRefImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectionRefImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectionRefImpl.java
new file mode 100644
index 0000000..ed257e6
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectionRefImpl.java
@@ -0,0 +1,627 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.persistence.ConnectedEntityRef;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+
+import static org.apache.usergrid.persistence.SimpleEntityRef.ref;
+import static org.apache.usergrid.utils.ConversionUtils.ascii;
+import static org.apache.usergrid.utils.ConversionUtils.uuidToBytesNullOk;
+
+
+/** @author edanuff */
+public class ConnectionRefImpl implements ConnectionRef {
+
+    public static final int MAX_LINKS = 1;
+
+    /**
+     *
+     */
+    public static final int ALL = 0;
+    /**
+     *
+     */
+    public static final int BY_CONNECTION_TYPE = 1;
+    /**
+     *
+     */
+    public static final int BY_ENTITY_TYPE = 2;
+    /**
+     *
+     */
+    public static final int BY_CONNECTION_AND_ENTITY_TYPE = 3;
+
+    /**
+     *
+     */
+    public static final String NULL_ENTITY_TYPE = "Null";
+    /**
+     *
+     */
+    public static final UUID NULL_ID = new UUID( 0, 0 );
+
+    private static final Logger logger = LoggerFactory.getLogger( ConnectionRefImpl.class );
+
+
+    public static final String CONNECTION_ENTITY_TYPE = "Connection";
+    public static final String CONNECTION_ENTITY_CONNECTION_TYPE = "connection";
+
+
+    private final EntityRef connectingEntity;
+
+    private final List<ConnectedEntityRef> pairedConnections;
+
+    private final ConnectedEntityRef connectedEntity;
+
+
+    /**
+     *
+     */
+    public ConnectionRefImpl() {
+        connectingEntity = SimpleEntityRef.ref();
+        pairedConnections = Collections.emptyList();
+        connectedEntity = new ConnectedEntityRefImpl();
+    }
+
+
+    /**
+     * @param connectingEntityType
+     * @param connectingEntityId
+     * @param connectionType
+     * @param connectedEntityType
+     * @param connectedEntityId
+     */
+    public ConnectionRefImpl( String connectingEntityType, UUID connectingEntityId, String connectionType,
+                              String connectedEntityType, UUID connectedEntityId ) {
+
+        connectingEntity = ref( connectingEntityType, connectingEntityId );
+
+        pairedConnections = Collections.emptyList();
+
+        connectedEntity = new ConnectedEntityRefImpl( connectionType, connectedEntityType, connectedEntityId );
+    }
+
+
+    /** Create a connection from the source to the target entity */
+    public ConnectionRefImpl( EntityRef source, String connectionType, EntityRef target ) {
+
+        this.connectingEntity = ref( source );
+
+        pairedConnections = Collections.emptyList();
+
+        this.connectedEntity = new ConnectedEntityRefImpl( connectionType, target );
+    }
+
+
+    public ConnectionRefImpl( ConnectionRef connection ) {
+
+        connectingEntity = connection.getConnectingEntity();
+
+        List<ConnectedEntityRef> pc = connection.getPairedConnections();
+        if ( pc == null ) {
+            pc = Collections.emptyList();
+        }
+        pairedConnections = pc;
+
+        connectedEntity = connection.getConnectedEntity();
+    }
+
+
+    public ConnectionRefImpl( EntityRef connectingEntity, ConnectedEntityRef... connections ) {
+
+        this.connectingEntity = ref( connectingEntity );
+
+        ConnectedEntityRef ce = new ConnectedEntityRefImpl();
+        List<ConnectedEntityRef> pc = Collections.emptyList();
+        if ( connections.length > 0 ) {
+
+            ce = connections[connections.length - 1];
+
+            if ( connections.length > 1 ) {
+                pc = Arrays.asList( Arrays.copyOfRange( connections, 0, connections.length - 2 ) );
+            }
+        }
+        pairedConnections = pc;
+        connectedEntity = ce;
+    }
+
+
+    public ConnectionRefImpl( ConnectionRef connection, ConnectedEntityRef... connections ) {
+
+        if ( connection == null ) {
+            throw new NullPointerException( "ConnectionImpl constructor \'connection\' cannot be null" );
+        }
+
+        connectingEntity = connection.getConnectingEntity();
+
+        if ( connections.length > 0 ) {
+
+            pairedConnections = new ArrayList<ConnectedEntityRef>();
+            pairedConnections.addAll( connection.getPairedConnections() );
+            pairedConnections.add( connection.getConnectedEntity() );
+
+            connectedEntity = connections[connections.length - 1];
+
+            if ( connections.length > 1 ) {
+                pairedConnections
+                        .addAll( Arrays.asList( Arrays.copyOfRange( connections, 0, connections.length - 2 ) ) );
+            }
+        }
+        else {
+            pairedConnections = new ArrayList<ConnectedEntityRef>();
+            connectedEntity = new ConnectedEntityRefImpl();
+        }
+    }
+
+
+    public ConnectionRefImpl( EntityRef connectingEntity, List<ConnectedEntityRef> pairedConnections,
+                              ConnectedEntityRef connectedEntity ) {
+        this.connectingEntity = connectingEntity;
+        this.pairedConnections = pairedConnections;
+        this.connectedEntity = connectedEntity;
+    }
+
+
+    public UUID getSearchIndexId() {
+        return null;
+    }
+
+
+    public String getSearchConnectionType() {
+        return null;
+    }
+
+
+    public String getSearchResultType() {
+        return null;
+    }
+
+
+    public String getSearchIndexName() {
+        return null;
+    }
+
+
+    @Override
+    public EntityRef getConnectingEntity() {
+        return connectingEntity;
+    }
+
+
+    /**
+     * @return
+     */
+    public String getConnectingEntityType() {
+        if ( connectingEntity == null ) {
+            return null;
+        }
+        return connectingEntity.getType();
+    }
+
+
+    /**
+     * @return
+     */
+    public UUID getConnectingEntityId() {
+        if ( connectingEntity == null ) {
+            return null;
+        }
+        return connectingEntity.getUuid();
+    }
+
+
+    @Override
+    public List<ConnectedEntityRef> getPairedConnections() {
+        return pairedConnections;
+    }
+
+
+    public ConnectedEntityRef getFirstPairedConnection() {
+        ConnectedEntityRef pairedConnection = null;
+
+        if ( ( pairedConnections != null ) && ( pairedConnections.size() > 0 ) ) {
+            pairedConnection = pairedConnections.get( 0 );
+        }
+
+        return pairedConnection;
+    }
+
+
+    public UUID getFirstPairedConnectedEntityId() {
+        ConnectedEntityRef pairedConnection = getFirstPairedConnection();
+        if ( pairedConnection != null ) {
+            return pairedConnection.getUuid();
+        }
+        return null;
+    }
+
+
+    public String getFirstPairedConnectedEntityType() {
+        ConnectedEntityRef pairedConnection = getFirstPairedConnection();
+        if ( pairedConnection != null ) {
+            return pairedConnection.getType();
+        }
+        return null;
+    }
+
+
+    public String getFirstPairedConnectionType() {
+        ConnectedEntityRef pairedConnection = getFirstPairedConnection();
+        if ( pairedConnection != null ) {
+            return pairedConnection.getConnectionType();
+        }
+        return null;
+    }
+
+
+    @Override
+    public ConnectedEntityRef getConnectedEntity() {
+        return connectedEntity;
+    }
+
+
+    /**
+     * @return
+     */
+    @Override
+    public String getConnectionType() {
+        if ( connectedEntity == null ) {
+            return null;
+        }
+        return connectedEntity.getConnectionType();
+    }
+
+
+    /**
+     * @return
+     */
+    public String getConnectedEntityType() {
+        if ( connectedEntity == null ) {
+            return null;
+        }
+        return connectedEntity.getType();
+    }
+
+
+    /**
+     * @return
+     */
+    public UUID getConnectedEntityId() {
+        return connectedEntity.getUuid();
+    }
+
+
+    private UUID id;
+
+
+    /** @return connection id */
+    @Override
+    public UUID getUuid() {
+        if ( id == null ) {
+            id = getId( getConnectingEntity(), getConnectedEntity(),
+                    getPairedConnections().toArray( new ConnectedEntityRef[0] ) );
+        }
+        return id;
+    }
+
+
+    @Override
+    public String getType() {
+        return CONNECTION_ENTITY_TYPE;
+    }
+
+
+    public UUID getIndexId() {
+        return getIndexId( getConnectingEntity(), getConnectionType(), getConnectedEntityType(),
+                pairedConnections.toArray( new ConnectedEntityRef[0] ) );
+    }
+
+
+    public UUID getConnectingIndexId() {
+        return getIndexId( getConnectingEntity(), getConnectionType(), null,
+                pairedConnections.toArray( new ConnectedEntityRef[0] ) );
+    }
+
+
+    public ConnectionRefImpl getConnectionToConnectionEntity() {
+        return new ConnectionRefImpl( getConnectingEntity(),
+                new ConnectedEntityRefImpl( CONNECTION_ENTITY_CONNECTION_TYPE, CONNECTION_ENTITY_TYPE, getUuid() ) );
+    }
+
+
+    /** @return index ids */
+    public UUID[] getIndexIds() {
+
+        return getIndexIds( getConnectingEntity(), getConnectedEntity().getConnectionType(),
+                getConnectedEntity().getType(), getPairedConnections().toArray( new ConnectedEntityRef[0] ) );
+    }
+
+
+    static String typeOrDefault( String type ) {
+        if ( ( type == null ) || ( type.length() == 0 ) ) {
+            return NULL_ENTITY_TYPE;
+        }
+        return type;
+    }
+
+
+    static UUID idOrDefault( UUID uuid ) {
+        if ( uuid == null ) {
+            return NULL_ID;
+        }
+        return uuid;
+    }
+
+
+    public static boolean connectionsNull( ConnectedEntityRef... pairedConnections ) {
+        if ( ( pairedConnections == null ) || ( pairedConnections.length == 0 ) ) {
+            return true;
+        }
+
+        for ( ConnectedEntityRef pairedConnection : pairedConnections ) {
+            if ( pairedConnection == null || pairedConnection.getUuid() == null || pairedConnection.getUuid().equals(
+                    NULL_ID ) ) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+
+    public static ConnectedEntityRef[] getConnections( ConnectedEntityRef... connections ) {
+        return connections;
+    }
+
+
+    public static List<ConnectedEntityRef> getConnectionsList( ConnectedEntityRef... connections ) {
+        return Arrays.asList( connections );
+    }
+
+
+    /** @return connection id */
+    public static UUID getId( UUID connectingEntityId, String connectionType, UUID connectedEntityId ) {
+        return getId( connectingEntityId, null, null, connectionType, connectedEntityId );
+    }
+
+
+    /**
+     * Connection id is constructed from packed structure of properties strings are truncated to 16 ascii bytes.
+     * Connection id is now MD5'd into a UUID via UUID.nameUUIDFromBytes() so, technically string concatenation could be
+     * used prior to MD5
+     *
+     * @return connection id
+     */
+    public static UUID getId( UUID connectingEntityId, String pairedConnectionType, UUID pairedConnectingEntityId,
+                              String connectionType, UUID connectedEntityId ) {
+
+        EntityRef connectingEntity = ref( connectingEntityId );
+
+        ConnectedEntityRef[] pairedConnections =
+                getConnections( new ConnectedEntityRefImpl( pairedConnectionType, null, pairedConnectingEntityId ) );
+
+        ConnectedEntityRef connectedEntity = new ConnectedEntityRefImpl( connectionType, null, connectedEntityId );
+
+        return getId( connectingEntity, connectedEntity, pairedConnections );
+    }
+
+
+    public static UUID getId( EntityRef connectingEntity, ConnectedEntityRef connectedEntity,
+                              ConnectedEntityRef... pairedConnections ) {
+        UUID uuid = null;
+        try {
+
+            if ( connectionsNull( pairedConnections ) && connectionsNull( connectedEntity ) ) {
+                return connectingEntity.getUuid();
+            }
+
+            ByteArrayOutputStream byteStream = new ByteArrayOutputStream( 16 + ( 32 * pairedConnections.length ) );
+
+            byteStream.write( uuidToBytesNullOk( connectingEntity.getUuid() ) );
+
+            for ( ConnectedEntityRef connection : pairedConnections ) {
+                String connectionType = connection.getConnectionType();
+                UUID connectedEntityID = connection.getUuid();
+
+                byteStream.write( ascii( StringUtils.lowerCase( connectionType ) ) );
+                byteStream.write( uuidToBytesNullOk( connectedEntityID ) );
+            }
+
+            String connectionType = connectedEntity.getConnectionType();
+            if ( connectionType == null ) {
+                connectionType = NULL_ENTITY_TYPE;
+            }
+
+            UUID connectedEntityID = connectedEntity.getUuid();
+
+            byteStream.write( ascii( StringUtils.lowerCase( connectionType ) ) );
+            byteStream.write( uuidToBytesNullOk( connectedEntityID ) );
+
+            byte[] raw_id = byteStream.toByteArray();
+
+            // logger.info("raw connection index id: " +
+            // Hex.encodeHexString(raw_id));
+
+            uuid = UUID.nameUUIDFromBytes( raw_id );
+
+            // logger.info("connection index uuid: " + uuid);
+
+        }
+        catch ( IOException e ) {
+            logger.error( "Unable to create connection UUID", e );
+        }
+        return uuid;
+    }
+
+
+    /** @return connection index id */
+    public static UUID getIndexId( UUID connectingEntityId, String connectionType, String connectedEntityType ) {
+        return getIndexId( connectingEntityId, null, null, connectionType, connectedEntityType );
+    }
+
+
+    /** @return connection index id */
+    public static UUID getIndexId( UUID connectingEntityId, String pairedConnectionType, UUID pairedConnectingEntityId,
+                                   String connectionType, String connectedEntityType ) {
+
+        EntityRef connectingEntity = ref( connectingEntityId );
+
+        ConnectedEntityRef[] pairedConnections =
+                getConnections( new ConnectedEntityRefImpl( pairedConnectionType, null, pairedConnectingEntityId ) );
+
+        return getIndexId( connectingEntity, connectionType, connectedEntityType, pairedConnections );
+    }
+
+
+    public static UUID getIndexId( EntityRef connectingEntity, String connectionType, String connectedEntityType,
+                                   ConnectedEntityRef... pairedConnections ) {
+
+        UUID uuid = null;
+        try {
+
+            if ( connectionsNull( pairedConnections ) && ( ( connectionType == null ) && ( connectedEntityType
+                    == null ) ) ) {
+                return connectingEntity.getUuid();
+            }
+
+            ByteArrayOutputStream byteStream = new ByteArrayOutputStream( 16 + ( 32 * pairedConnections.length ) );
+
+            byteStream.write( uuidToBytesNullOk( connectingEntity.getUuid() ) );
+
+            for ( ConnectedEntityRef connection : pairedConnections ) {
+                String type = connection.getConnectionType();
+                UUID id = connection.getUuid();
+
+                byteStream.write( ascii( StringUtils.lowerCase( type ) ) );
+                byteStream.write( uuidToBytesNullOk( id ) );
+            }
+
+            if ( connectionType == null ) {
+                connectionType = NULL_ENTITY_TYPE;
+            }
+            if ( connectedEntityType == null ) {
+                connectedEntityType = NULL_ENTITY_TYPE;
+            }
+
+            byteStream.write( ascii( StringUtils.lowerCase( connectionType ) ) );
+            byteStream.write( ascii( StringUtils.lowerCase( connectedEntityType ) ) );
+
+            byte[] raw_id = byteStream.toByteArray();
+
+            logger.info( "raw connection index id: " + Hex.encodeHexString( raw_id ) );
+
+            uuid = UUID.nameUUIDFromBytes( raw_id );
+
+            logger.info( "connection index uuid: " + uuid );
+        }
+        catch ( IOException e ) {
+            logger.error( "Unable to create connection index UUID", e );
+        }
+        return uuid;
+    }
+
+
+    /** @return connection index id */
+    public static UUID getIndexId( int variant, UUID connectingEntityId, String pairedConnectionType,
+                                   UUID pairedConnectingEntityId, String connectionType, String connectedEntityType ) {
+
+        EntityRef connectingEntity = ref( connectingEntityId );
+
+        ConnectedEntityRef[] pairedConnections =
+                getConnections( new ConnectedEntityRefImpl( pairedConnectionType, null, pairedConnectingEntityId ) );
+
+        return getIndexId( variant, connectingEntity, connectionType, connectedEntityType, pairedConnections );
+    }
+
+
+    public static UUID getIndexId( int variant, EntityRef connectingEntity, String connectionType,
+                                   String connectedEntityType, ConnectedEntityRef... pairedConnections ) {
+
+        switch ( variant ) {
+
+            case ALL:
+                if ( connectionsNull( pairedConnections ) ) {
+                    return connectingEntity.getUuid();
+                }
+                else {
+                    return getIndexId( connectingEntity, null, null, pairedConnections );
+                }
+
+            case BY_ENTITY_TYPE:
+                return getIndexId( connectingEntity, null, connectedEntityType, pairedConnections );
+
+            case BY_CONNECTION_TYPE:
+                return getIndexId( connectingEntity, connectionType, null, pairedConnections );
+
+            case BY_CONNECTION_AND_ENTITY_TYPE:
+                return getIndexId( connectingEntity, connectionType, connectedEntityType, pairedConnections );
+        }
+
+        return connectingEntity.getUuid();
+    }
+
+
+    /** @return index ids */
+    public static UUID[] getIndexIds( UUID connectingEntityId, String connectionType, String connectedEntityType ) {
+        return getIndexIds( connectingEntityId, null, null, connectionType, connectedEntityType );
+    }
+
+
+    /** @return index ids */
+    public static UUID[] getIndexIds( UUID connectingEntityId, String pairedConnectionType,
+                                      UUID pairedConnectingEntityId, String connectionType,
+                                      String connectedEntityType ) {
+
+        UUID[] variants = new UUID[4];
+
+        for ( int i = 0; i < 4; i++ ) {
+            variants[i] =
+                    getIndexId( i, connectingEntityId, pairedConnectionType, pairedConnectingEntityId, connectionType,
+                            connectedEntityType );
+        }
+
+        return variants;
+    }
+
+
+    public static UUID[] getIndexIds( EntityRef connectingEntity, String connectionType, String connectedEntityType,
+                                      ConnectedEntityRef... pairedConnections ) {
+
+        UUID[] variants = new UUID[4];
+
+        for ( int i = 0; i < 4; i++ ) {
+            variants[i] = getIndexId( i, connectingEntity, connectionType, connectedEntityType, pairedConnections );
+        }
+
+        return variants;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CounterUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CounterUtils.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CounterUtils.java
new file mode 100644
index 0000000..2de4f0d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CounterUtils.java
@@ -0,0 +1,397 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.count.Batcher;
+import org.apache.usergrid.count.common.Count;
+import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.mq.cassandra.QueuesCF;
+import org.apache.usergrid.persistence.CounterResolution;
+import org.apache.usergrid.persistence.entities.Event;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.PrefixedSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.hector.api.beans.HCounterColumn;
+import me.prettyprint.hector.api.mutation.Mutator;
+import static me.prettyprint.hector.api.factory.HFactory.createColumn;
+import static me.prettyprint.hector.api.factory.HFactory.createCounterColumn;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_COUNTERS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.APPLICATION_AGGREGATE_COUNTERS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COUNTERS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
+
+
+public class CounterUtils {
+
+    public static final Logger logger = LoggerFactory.getLogger( CounterUtils.class );
+
+    public static final LongSerializer le = new LongSerializer();
+    public static final StringSerializer se = new StringSerializer();
+    public static final ByteBufferSerializer be = new ByteBufferSerializer();
+    public static final UUIDSerializer ue = new UUIDSerializer();
+
+    private String counterType = "o";
+
+    private Batcher batcher;
+
+
+    public void setBatcher( Batcher batcher ) {
+        this.batcher = batcher;
+    }
+
+
+    /** Set the type to 'new' ("n"), 'parallel' ("p"), 'old' ("o" - the default) If not one of the above, do nothing */
+    public void setCounterType( String counterType ) {
+        if ( counterType == null ) {
+            return;
+        }
+        if ( "n".equals( counterType ) || "p".equals( counterType ) || "o".equals( counterType ) ) {
+            this.counterType = counterType;
+        }
+    }
+
+
+    public boolean getIsCounterBatched() {
+        return "n".equals( counterType );
+    }
+
+
+    public static class AggregateCounterSelection {
+        public static final String COLON = ":";
+        public static final String STAR = "*";
+        String name;
+        UUID userId;
+        UUID groupId;
+        UUID queueId;
+        String category;
+
+
+        public AggregateCounterSelection( String name, UUID userId, UUID groupId, UUID queueId, String category ) {
+            this.name = name.toLowerCase();
+            this.userId = userId;
+            this.groupId = groupId;
+            this.queueId = queueId;
+            this.category = category;
+        }
+
+
+        public void apply( String name, UUID userId, UUID groupId, UUID queueId, String category ) {
+            this.name = name.toLowerCase();
+            this.userId = userId;
+            this.groupId = groupId;
+            this.queueId = queueId;
+            this.category = category;
+        }
+
+
+        public String getName() {
+            return name;
+        }
+
+
+        public void setName( String name ) {
+            this.name = name;
+        }
+
+
+        public UUID getUserId() {
+            return userId;
+        }
+
+
+        public void setUserId( UUID userId ) {
+            this.userId = userId;
+        }
+
+
+        public UUID getGroupId() {
+            return groupId;
+        }
+
+
+        public void setGroupId( UUID groupId ) {
+            this.groupId = groupId;
+        }
+
+
+        public UUID getQueueId() {
+            return queueId;
+        }
+
+
+        public void setQueueId( UUID queueId ) {
+            this.queueId = queueId;
+        }
+
+
+        public String getCategory() {
+            return category;
+        }
+
+
+        public void setCategory( String category ) {
+            this.category = category;
+        }
+
+
+        public String getRow( CounterResolution resolution ) {
+            return rowBuilder( name, userId, groupId, queueId, category, resolution );
+        }
+
+
+        public static String rowBuilder( String name, UUID userId, UUID groupId, UUID queueId, String category,
+                                         CounterResolution resolution ) {
+            StringBuilder builder = new StringBuilder( name );
+            builder.append( COLON ).append( ( userId != null ? userId.toString() : STAR ) ).append( COLON )
+                   .append( groupId != null ? groupId.toString() : STAR ).append( COLON )
+                   .append( ( queueId != null ? queueId.toString() : STAR ) ).append( COLON )
+                   .append( ( category != null ? category : STAR ) ).append( COLON ).append( resolution.name() );
+            return builder.toString();
+        }
+    }
+
+
+    public void addEventCounterMutations( Mutator<ByteBuffer> m, UUID applicationId, Event event, long timestamp ) {
+        if ( event.getCounters() != null ) {
+            for ( Entry<String, Integer> value : event.getCounters().entrySet() ) {
+                batchIncrementAggregateCounters( m, applicationId, event.getUser(), event.getGroup(), null,
+                        event.getCategory(), value.getKey().toLowerCase(), value.getValue(), event.getTimestamp(),
+                        timestamp );
+            }
+        }
+    }
+
+
+    public void addMessageCounterMutations( Mutator<ByteBuffer> m, UUID applicationId, UUID queueId, Message msg,
+                                            long timestamp ) {
+        if ( msg.getCounters() != null ) {
+            for ( Entry<String, Integer> value : msg.getCounters().entrySet() ) {
+                batchIncrementAggregateCounters( m, applicationId, null, null, queueId, msg.getCategory(),
+                        value.getKey().toLowerCase(), value.getValue(), msg.getTimestamp(), timestamp );
+            }
+        }
+    }
+
+
+    public void batchIncrementAggregateCounters( Mutator<ByteBuffer> m, UUID applicationId, UUID userId, UUID groupId,
+                                                 UUID queueId, String category, Map<String, Long> counters,
+                                                 long timestamp ) {
+        if ( counters != null ) {
+            for ( Entry<String, Long> value : counters.entrySet() ) {
+                batchIncrementAggregateCounters( m, applicationId, userId, groupId, queueId, category,
+                        value.getKey().toLowerCase(), value.getValue(), timestamp );
+            }
+        }
+    }
+
+
+    public void batchIncrementAggregateCounters( Mutator<ByteBuffer> m, UUID applicationId, UUID userId, UUID groupId,
+                                                 UUID queueId, String category, String name, long value,
+                                                 long cassandraTimestamp ) {
+        batchIncrementAggregateCounters( m, applicationId, userId, groupId, queueId, category, name, value,
+                cassandraTimestamp / 1000, cassandraTimestamp );
+    }
+
+
+    public void batchIncrementAggregateCounters( Mutator<ByteBuffer> m, UUID applicationId, UUID userId, UUID groupId,
+                                                 UUID queueId, String category, String name, long value,
+                                                 long counterTimestamp, long cassandraTimestamp ) {
+        for ( CounterResolution resolution : CounterResolution.values() ) {
+            logger.debug( "BIAC for resolution {}", resolution );
+            batchIncrementAggregateCounters( m, userId, groupId, queueId, category, resolution, name, value,
+                    counterTimestamp, applicationId );
+            logger.debug( "DONE BIAC for resolution {}", resolution );
+        }
+        batchIncrementEntityCounter( m, applicationId, name, value, cassandraTimestamp, applicationId );
+        if ( userId != null ) {
+            batchIncrementEntityCounter( m, userId, name, value, cassandraTimestamp, applicationId );
+        }
+        if ( groupId != null ) {
+            batchIncrementEntityCounter( m, groupId, name, value, cassandraTimestamp, applicationId );
+        }
+    }
+
+
+    private void batchIncrementAggregateCounters( Mutator<ByteBuffer> m, UUID userId, UUID groupId, UUID queueId,
+                                                  String category, CounterResolution resolution, String name,
+                                                  long value, long counterTimestamp, UUID applicationId ) {
+
+        String[] segments = StringUtils.split( name, '.' );
+        for ( int j = 0; j < segments.length; j++ ) {
+            name = StringUtils.join( segments, '.', 0, j + 1 );
+            // skip system counter
+            if ( "system".equals( name ) ) {
+                continue;
+            }
+
+            // *:*:*:*
+            handleAggregateCounterRow( m,
+                    AggregateCounterSelection.rowBuilder( name, null, null, null, null, resolution ),
+                    resolution.round( counterTimestamp ), value, applicationId );
+            String currentRow = null;
+            HashSet<String> rowSet = new HashSet<String>( 16 );
+            for ( int i = 0; i < 16; i++ ) {
+
+                boolean include_user = ( i & 0x01 ) != 0;
+                boolean include_group = ( i & 0x02 ) != 0;
+                boolean include_queue = ( i & 0x04 ) != 0;
+                boolean include_category = ( i & 0x08 ) != 0;
+
+                Object[] parameters = {
+                        include_user ? userId : null, include_group ? groupId : null, include_queue ? queueId : null,
+                        include_category ? category : null
+                };
+                int non_null = 0;
+                for ( Object p : parameters ) {
+                    if ( p != null ) {
+                        non_null++;
+                    }
+                }
+                currentRow = AggregateCounterSelection
+                        .rowBuilder( name, ( UUID ) parameters[0], ( UUID ) parameters[1], ( UUID ) parameters[2],
+                                ( String ) parameters[3], resolution );
+
+                if ( non_null > 0 && !rowSet.contains( currentRow ) ) {
+                    rowSet.add( currentRow );
+                    handleAggregateCounterRow( m, currentRow, resolution.round( counterTimestamp ), value,
+                            applicationId );
+                }
+            }
+        }
+    }
+
+
+    private void handleAggregateCounterRow( Mutator<ByteBuffer> m, String key, long column, long value,
+                                            UUID applicationId ) {
+        if ( logger.isDebugEnabled() ) {
+            logger.info( "HACR: aggregateRow for app {} with key {} column {} and value {}",
+                    new Object[] { applicationId, key, column, value } );
+        }
+        if ( "o".equals( counterType ) || "p".equals( counterType ) ) {
+            if ( m != null ) {
+                HCounterColumn<Long> c = createCounterColumn( column, value, le );
+                m.addCounter( bytebuffer( key ), APPLICATION_AGGREGATE_COUNTERS.toString(), c );
+            }
+        }
+        if ( "n".equals( counterType ) || "p".equals( counterType ) ) {
+            // create and add Count
+            PrefixedSerializer ps =
+                    new PrefixedSerializer( applicationId, UUIDSerializer.get(), StringSerializer.get() );
+            batcher.add(
+                    new Count( APPLICATION_AGGREGATE_COUNTERS.toString(), ps.toByteBuffer( key ), column, value ) );
+        }
+    }
+
+
+    public AggregateCounterSelection getAggregateCounterSelection( String name, UUID userId, UUID groupId, UUID queueId,
+                                                                   String category ) {
+        return new AggregateCounterSelection( name, userId, groupId, queueId, category );
+    }
+
+
+    public String getAggregateCounterRow( String name, UUID userId, UUID groupId, UUID queueId, String category,
+                                          CounterResolution resolution ) {
+        return AggregateCounterSelection.rowBuilder( name, userId, groupId, queueId, category, resolution );
+    }
+
+
+    public List<String> getAggregateCounterRows( List<AggregateCounterSelection> selections,
+                                                 CounterResolution resolution ) {
+        List<String> keys = new ArrayList<String>();
+        for ( AggregateCounterSelection selection : selections ) {
+            keys.add( selection.getRow( resolution ) );
+        }
+        return keys;
+    }
+
+
+    private Mutator<ByteBuffer> batchIncrementEntityCounter( Mutator<ByteBuffer> m, UUID entityId, String name,
+                                                             Long value, long timestamp, UUID applicationId ) {
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "BIEC: Incrementing property {} of entity {} by value {}",
+                    new Object[] { name, entityId, value } );
+        }
+        addInsertToMutator( m, ENTITY_DICTIONARIES, key( entityId, DICTIONARY_COUNTERS ), name, null, timestamp );
+        if ( "o".equals( counterType ) || "p".equals( counterType ) ) {
+            HCounterColumn<String> c = createCounterColumn( name, value );
+            m.addCounter( bytebuffer( entityId ), ENTITY_COUNTERS.toString(), c );
+        }
+        if ( "n".equals( counterType ) || "p".equals( counterType ) ) {
+            PrefixedSerializer ps = new PrefixedSerializer( applicationId, UUIDSerializer.get(), UUIDSerializer.get() );
+            batcher.add( new Count( ENTITY_COUNTERS.toString(), ps.toByteBuffer( entityId ), name, value ) );
+        }
+        return m;
+    }
+
+
+    public Mutator<ByteBuffer> batchIncrementQueueCounter( Mutator<ByteBuffer> m, UUID queueId, String name, long value,
+                                                           long timestamp, UUID applicationId ) {
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "BIQC: Incrementing property {} of queue {} by value {}",
+                    new Object[] { name, queueId, value } );
+        }
+        m.addInsertion( bytebuffer( key( queueId, DICTIONARY_COUNTERS ).toString() ),
+                QueuesCF.QUEUE_DICTIONARIES.toString(),
+                createColumn( name, ByteBuffer.allocate( 0 ), timestamp, se, be ) );
+        if ( "o".equals( counterType ) || "p".equals( counterType ) ) {
+            HCounterColumn<String> c = createCounterColumn( name, value );
+            ByteBuffer keybytes = bytebuffer( queueId );
+            m.addCounter( keybytes, QueuesCF.COUNTERS.toString(), c );
+        }
+        if ( "n".equals( counterType ) || "p".equals( counterType ) ) {
+            PrefixedSerializer ps = new PrefixedSerializer( applicationId, UUIDSerializer.get(), UUIDSerializer.get() );
+            batcher.add( new Count( QueuesCF.COUNTERS.toString(), ps.toByteBuffer( queueId ), name, value ) );
+        }
+        return m;
+    }
+
+
+    public Mutator<ByteBuffer> batchIncrementQueueCounters( Mutator<ByteBuffer> m, UUID queueId,
+                                                            Map<String, Long> values, long timestamp,
+                                                            UUID applicationId ) {
+        for ( Entry<String, Long> entry : values.entrySet() ) {
+            batchIncrementQueueCounter( m, queueId, entry.getKey(), entry.getValue(), timestamp, applicationId );
+        }
+        return m;
+    }
+
+
+    public Mutator<ByteBuffer> batchIncrementQueueCounters( Mutator<ByteBuffer> m, Map<UUID, Map<String, Long>> values,
+                                                            long timestamp, UUID applicationId ) {
+        for ( Entry<UUID, Map<String, Long>> entry : values.entrySet() ) {
+            batchIncrementQueueCounters( m, entry.getKey(), entry.getValue(), timestamp, applicationId );
+        }
+        return m;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CursorCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CursorCache.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CursorCache.java
new file mode 100644
index 0000000..b49f0fe
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CursorCache.java
@@ -0,0 +1,120 @@
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static java.lang.Integer.parseInt;
+import static org.apache.commons.codec.binary.Base64.decodeBase64;
+import static org.apache.commons.codec.binary.Base64.encodeBase64URLSafeString;
+import static org.apache.commons.lang.StringUtils.split;
+import static org.apache.usergrid.utils.ConversionUtils.bytes;
+
+
+/**
+ * Internal cursor parsing
+ *
+ * @author tnine
+ */
+public class CursorCache {
+
+    private Map<Integer, ByteBuffer> cursors = new HashMap<Integer, ByteBuffer>();
+
+
+    public CursorCache() {
+
+    }
+
+
+    /** Create a new cursor cache from the string if passed */
+    public CursorCache( String cursorString ) {
+
+        if ( cursorString == null ) {
+            return;
+        }
+
+        String decoded = new String( decodeBase64( cursorString ) );
+
+        // nothing to do
+        if ( decoded.indexOf( ':' ) < 0 ) {
+            return;
+        }
+
+        String[] cursorTokens = split( decoded, '|' );
+
+        for ( String c : cursorTokens ) {
+
+            String[] parts = split( c, ':' );
+
+            if ( parts.length >= 1 ) {
+
+                int hashCode = parseInt( parts[0] );
+
+                ByteBuffer cursorBytes = null;
+
+                if ( parts.length == 2 ) {
+                    cursorBytes = ByteBuffer.wrap( decodeBase64( parts[1] ) );
+                }
+                else {
+                    cursorBytes = ByteBuffer.allocate( 0 );
+                }
+
+                cursors.put( hashCode, cursorBytes );
+            }
+        }
+    }
+
+
+    /** Set the cursor with the given hash and the new byte buffer */
+    public void setNextCursor( int sliceHash, ByteBuffer newCursor ) {
+        cursors.put( sliceHash, newCursor );
+    }
+
+
+    /** Get the cursor by the hashcode of the slice */
+    public ByteBuffer getCursorBytes( int sliceHash ) {
+        return cursors.get( sliceHash );
+    }
+
+
+    /** Turn the cursor cache into a string */
+    public String asString() {
+        /**
+         * No cursors to return
+         */
+        if ( cursors.size() == 0 ) {
+            return null;
+        }
+
+        StringBuffer buff = new StringBuffer();
+
+        int nullCount = 0;
+        ByteBuffer value = null;
+
+        for ( Entry<Integer, ByteBuffer> entry : cursors.entrySet() ) {
+            value = entry.getValue();
+
+            buff.append( entry.getKey() );
+            buff.append( ":" );
+            buff.append( encodeBase64URLSafeString( bytes( value ) ) );
+            buff.append( "|" );
+
+            // this range was empty, mark it as a null
+            if ( value == null || value.remaining() == 0 ) {
+                nullCount++;
+            }
+        }
+
+        // all cursors are complete, return null
+        if ( nullCount == cursors.size() ) {
+            return null;
+        }
+
+        // trim off the last pipe
+        buff.setLength( buff.length() - 1 );
+
+        return encodeBase64URLSafeString( buff.toString().getBytes() );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
new file mode 100644
index 0000000..51dbc9e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
@@ -0,0 +1,408 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.persistence.DynamicEntity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsException;
+import org.apache.usergrid.utils.UUIDUtils;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.yammer.metrics.annotation.Metered;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.beans.ColumnSlice;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.OrderedRows;
+import me.prettyprint.hector.api.beans.Row;
+import me.prettyprint.hector.api.beans.Rows;
+import me.prettyprint.hector.api.mutation.Mutator;
+import me.prettyprint.hector.api.query.QueryResult;
+import me.prettyprint.hector.api.query.RangeSlicesQuery;
+import static java.lang.String.CASE_INSENSITIVE_ORDER;
+import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static me.prettyprint.hector.api.factory.HFactory.createRangeSlicesQuery;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.asMap;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
+import static org.apache.usergrid.persistence.cassandra.CassandraService.APPLICATIONS_CF;
+import static org.apache.usergrid.persistence.cassandra.CassandraService.PROPERTIES_CF;
+import static org.apache.usergrid.persistence.cassandra.CassandraService.RETRY_COUNT;
+import static org.apache.usergrid.utils.ConversionUtils.uuid;
+
+
+/**
+ * Cassandra-specific implementation of Datastore
+ *
+ * @author edanuff
+ */
+public class EntityManagerFactoryImpl implements EntityManagerFactory, ApplicationContextAware {
+
+    private static final Logger logger = LoggerFactory.getLogger( EntityManagerFactoryImpl.class );
+
+    public static String IMPLEMENTATION_DESCRIPTION = "Cassandra Entity Manager Factory 1.0";
+
+    public static final Class<DynamicEntity> APPLICATION_ENTITY_CLASS = DynamicEntity.class;
+
+    public static final StringSerializer se = new StringSerializer();
+    public static final ByteBufferSerializer be = new ByteBufferSerializer();
+    public static final UUIDSerializer ue = new UUIDSerializer();
+    public static final BytesArraySerializer bae = new BytesArraySerializer();
+    public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
+    public static final LongSerializer le = new LongSerializer();
+
+    ApplicationContext applicationContext;
+
+    CassandraService cass;
+    CounterUtils counterUtils;
+
+    private boolean skipAggregateCounters;
+
+    private LoadingCache<UUID, EntityManager> entityManagers =
+            CacheBuilder.newBuilder().maximumSize( 100 ).build( new CacheLoader<UUID, EntityManager>() {
+                public EntityManager load( UUID appId ) { // no checked exception
+                    return _getEntityManager( appId );
+                }
+            } );
+
+
+    /**
+     * Must be constructed with a CassandraClientPool.
+     *
+     * @param cass the cassandraService instance
+     */
+    public EntityManagerFactoryImpl( CassandraService cass, CounterUtils counterUtils, boolean skipAggregateCounters ) {
+        this.cass = cass;
+        this.counterUtils = counterUtils;
+        this.skipAggregateCounters = skipAggregateCounters;
+        if ( skipAggregateCounters ) {
+            logger.warn( "NOTE: Counters have been disabled by configuration..." );
+        }
+    }
+
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.usergrid.core.Datastore#getImpementationDescription()
+     */
+    @Override
+    public String getImpementationDescription() {
+        return IMPLEMENTATION_DESCRIPTION;
+    }
+
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.usergrid.core.Datastore#getEntityDao(java.util.UUID,
+     * java.util.UUID)
+     */
+    @Override
+    public EntityManager getEntityManager( UUID applicationId ) {
+        try {
+            return entityManagers.get( applicationId );
+        }
+        catch ( Exception ex ) {
+            ex.printStackTrace();
+        }
+        return _getEntityManager( applicationId );
+    }
+
+
+    private EntityManager _getEntityManager( UUID applicationId ) {
+        //EntityManagerImpl em = new EntityManagerImpl();
+        EntityManager em = applicationContext.getBean( "entityManager", EntityManager.class );
+        //em.init(this,cass,counterUtils,applicationId, skipAggregateCounters);
+        em.setApplicationId( applicationId );
+        return em;
+    }
+
+
+    public ApplicationContext getApplicationContext() {
+        return applicationContext;
+    }
+
+
+    /**
+     * Gets the setup.
+     *
+     * @return Setup helper
+     */
+    public Setup getSetup() {
+        return new Setup( this, cass );
+    }
+
+
+    @Override
+    public void setup() throws Exception {
+        Setup setup = getSetup();
+
+        setup.setup();
+
+
+        if ( cass.getPropertiesMap() != null ) {
+            updateServiceProperties( cass.getPropertiesMap() );
+        }
+    }
+
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.usergrid.core.Datastore#createApplication(java.lang.String)
+     */
+    @Override
+    public UUID createApplication( String organization, String name ) throws Exception {
+        return createApplication( organization, name, null );
+    }
+
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.usergrid.core.Datastore#createApplication(java.lang.String,
+     * java.util.Map)
+     */
+    @Override
+    public UUID createApplication( String organizationName, String name, Map<String, Object> properties )
+            throws Exception {
+
+        String appName = buildAppName( organizationName, name );
+
+        HColumn<String, ByteBuffer> column =
+                cass.getColumn( cass.getSystemKeyspace(), APPLICATIONS_CF, appName, PROPERTY_UUID );
+        if ( column != null ) {
+            throw new ApplicationAlreadyExistsException( name );
+            // UUID uuid = uuid(column.getValue());
+            // return uuid;
+        }
+
+        UUID applicationId = UUIDUtils.newTimeUUID();
+        logger.info( "New application id " + applicationId.toString() );
+
+        initializeApplication( organizationName, applicationId, appName, properties );
+
+        return applicationId;
+    }
+
+
+    private String buildAppName( String organizationName, String name ) {
+        return StringUtils.lowerCase( name.contains( "/" ) ? name : organizationName + "/" + name );
+    }
+
+
+    public UUID initializeApplication( String organizationName, UUID applicationId, String name,
+                                       Map<String, Object> properties ) throws Exception {
+
+        String appName = buildAppName( organizationName, name );
+        // check for pre-existing
+        if ( lookupApplication( appName ) != null ) {
+            throw new ApplicationAlreadyExistsException( appName );
+        }
+        if ( properties == null ) {
+            properties = new TreeMap<String, Object>( CASE_INSENSITIVE_ORDER );
+        }
+
+        properties.put( PROPERTY_NAME, appName );
+
+        getSetup().setupApplicationKeyspace( applicationId, appName );
+
+
+        Keyspace ko = cass.getSystemKeyspace();
+        Mutator<ByteBuffer> m = createMutator( ko, be );
+
+        long timestamp = cass.createTimestamp();
+
+        addInsertToMutator( m, APPLICATIONS_CF, appName, PROPERTY_UUID, applicationId, timestamp );
+        addInsertToMutator( m, APPLICATIONS_CF, appName, PROPERTY_NAME, appName, timestamp );
+
+        batchExecute( m, RETRY_COUNT );
+
+        EntityManager em = getEntityManager( applicationId );
+        em.create( TYPE_APPLICATION, APPLICATION_ENTITY_CLASS, properties );
+
+        em.resetRoles();
+
+        return applicationId;
+    }
+
+
+    @Override
+    public UUID importApplication( String organizationName, UUID applicationId, String name,
+                                   Map<String, Object> properties ) throws Exception {
+
+        name = buildAppName( organizationName, name );
+
+        HColumn<String, ByteBuffer> column =
+                cass.getColumn( cass.getSystemKeyspace(), APPLICATIONS_CF, name, PROPERTY_UUID );
+        if ( column != null ) {
+            throw new ApplicationAlreadyExistsException( name );
+            // UUID uuid = uuid(column.getValue());
+            // return uuid;
+        }
+
+        return initializeApplication( organizationName, applicationId, name, properties );
+    }
+
+
+    @Override
+    @Metered(group = "core", name = "EntityManagerFactory_lookupApplication_byName")
+    public UUID lookupApplication( String name ) throws Exception {
+        name = name.toLowerCase();
+        HColumn<String, ByteBuffer> column =
+                cass.getColumn( cass.getSystemKeyspace(), APPLICATIONS_CF, name, PROPERTY_UUID );
+        if ( column != null ) {
+            UUID uuid = uuid( column.getValue() );
+            return uuid;
+        }
+        return null;
+    }
+
+
+    /**
+     * Gets the application.
+     *
+     * @param name the name
+     *
+     * @return application for name
+     *
+     * @throws Exception the exception
+     */
+    @Metered(group = "core", name = "EntityManagerFactory_getApplication")
+    public Application getApplication( String name ) throws Exception {
+        name = name.toLowerCase();
+        HColumn<String, ByteBuffer> column =
+                cass.getColumn( cass.getSystemKeyspace(), APPLICATIONS_CF, name, PROPERTY_UUID );
+        if ( column == null ) {
+            return null;
+        }
+
+        UUID applicationId = uuid( column.getValue() );
+
+        EntityManager em = getEntityManager( applicationId );
+        return ( ( EntityManagerImpl ) em ).getEntity( applicationId, Application.class );
+    }
+
+
+    @Override
+    public Map<String, UUID> getApplications() throws Exception {
+        Map<String, UUID> applications = new TreeMap<String, UUID>( CASE_INSENSITIVE_ORDER );
+        Keyspace ko = cass.getSystemKeyspace();
+        RangeSlicesQuery<String, String, UUID> q = createRangeSlicesQuery( ko, se, se, ue );
+        q.setKeys( "", "\uFFFF" );
+        q.setColumnFamily( APPLICATIONS_CF );
+        q.setColumnNames( PROPERTY_UUID );
+        q.setRowCount( 10000 );
+        QueryResult<OrderedRows<String, String, UUID>> r = q.execute();
+        Rows<String, String, UUID> rows = r.get();
+        for ( Row<String, String, UUID> row : rows ) {
+            ColumnSlice<String, UUID> slice = row.getColumnSlice();
+            HColumn<String, UUID> column = slice.getColumnByName( PROPERTY_UUID );
+            applications.put( row.getKey(), column.getValue() );
+        }
+        return applications;
+    }
+
+
+    @Override
+    public boolean setServiceProperty( String name, String value ) {
+        try {
+            cass.setColumn( cass.getSystemKeyspace(), PROPERTIES_CF, PROPERTIES_CF, name, value );
+            return true;
+        }
+        catch ( Exception e ) {
+            logger.error( "Unable to set property " + name + ": " + e.getMessage() );
+        }
+        return false;
+    }
+
+
+    @Override
+    public boolean deleteServiceProperty( String name ) {
+        try {
+            cass.deleteColumn( cass.getSystemKeyspace(), PROPERTIES_CF, PROPERTIES_CF, name );
+            return true;
+        }
+        catch ( Exception e ) {
+            logger.error( "Unable to delete property " + name + ": " + e.getMessage() );
+        }
+        return false;
+    }
+
+
+    @Override
+    public boolean updateServiceProperties( Map<String, String> properties ) {
+        try {
+            cass.setColumns( cass.getSystemKeyspace(), PROPERTIES_CF, PROPERTIES_CF.getBytes(), properties );
+            return true;
+        }
+        catch ( Exception e ) {
+            logger.error( "Unable to update properties: " + e.getMessage() );
+        }
+        return false;
+    }
+
+
+    @Override
+    public Map<String, String> getServiceProperties() {
+        Map<String, String> properties = null;
+        try {
+            properties = asMap( cass.getAllColumns( cass.getSystemKeyspace(), PROPERTIES_CF, PROPERTIES_CF, se, se ) );
+            return properties;
+        }
+        catch ( Exception e ) {
+            logger.error( "Unable to load properties: " + e.getMessage() );
+        }
+        return null;
+    }
+
+
+    @Override
+    public void setApplicationContext( ApplicationContext applicationContext ) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
+
+
+    public void setCounterUtils( CounterUtils counterUtils ) {
+        this.counterUtils = counterUtils;
+    }
+}