You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/28 23:21:49 UTC

[39/96] [abbrv] [partial] Change package namespace to org.apache.usergrid

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ApplicationCF.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ApplicationCF.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ApplicationCF.java
new file mode 100644
index 0000000..817f47f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ApplicationCF.java
@@ -0,0 +1,159 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.util.List;
+
+import me.prettyprint.hector.api.ddl.ColumnDefinition;
+import static me.prettyprint.hector.api.ddl.ComparatorType.COUNTERTYPE;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.getIndexMetadata;
+
+
+public enum ApplicationCF implements CFEnum {
+
+    /** This is where the entity objects are stored */
+    ENTITY_PROPERTIES( "Entity_Properties", "BytesType" ),
+
+    /** each row models name:value pairs. {@see org.apache.usergrid.persistence.Schema} for the list of dictionary types */
+    ENTITY_DICTIONARIES( "Entity_Dictionaries", "BytesType" ),
+
+    /**
+     * Rows that are full of UUIDs. Used when we want to have a row full of references to other entities. Mainly, this
+     * is for collections. Collections are represented by this CF.
+     */
+    ENTITY_ID_SETS( "Entity_Id_Sets", "UUIDType" ),
+
+    /**
+     * Typed vs. untyped dictionary. Dynamic entity dictionaries end up here. {@link
+     * EntityManagerImpl#getDictionaryAsMap(org.apache.usergrid.persistence.EntityRef, String)}
+     */
+    ENTITY_COMPOSITE_DICTIONARIES( "Entity_Composite_Dictionaries",
+            "DynamicCompositeType(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType," +
+                    "t=>TimeUUIDType,s=>UTF8Type,u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
+                    "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
+                    "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))" ),
+
+    /** No longer used? */
+    ENTITY_METADATA( "Entity_Metadata", "BytesType" ),
+
+    /** Contains all secondary indexes for entities */
+    ENTITY_INDEX( "Entity_Index",
+            "DynamicCompositeType(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType," +
+                    "t=>TimeUUIDType,s=>UTF8Type,u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
+                    "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
+                    "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))" ),
+
+    /** Unique index for properties that must remain the same */
+    ENTITY_UNIQUE( "Entity_Unique", "UUIDType" ),
+
+    /** Contains all properties that have ever been indexed for an entity */
+    ENTITY_INDEX_ENTRIES( "Entity_Index_Entries",
+            "DynamicCompositeType(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType," +
+                    "t=>TimeUUIDType,s=>UTF8Type,u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
+                    "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
+                    "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))" ),
+
+    /** All roles that exist within an application */
+    APPLICATION_ROLES( "Application_Roles", "BytesType" ),
+
+    /** Application counters */
+    APPLICATION_AGGREGATE_COUNTERS( "Application_Aggregate_Counters", "LongType", COUNTERTYPE.getClassName() ),
+
+    /** Entity counters */
+    ENTITY_COUNTERS( "Entity_Counters", "BytesType", COUNTERTYPE.getClassName() ),;
+    public final static String DEFAULT_DYNAMIC_COMPOSITE_ALIASES =
+            "(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType,t=>TimeUUIDType,s=>UTF8Type," +
+                    "u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
+                    "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
+                    "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))";
+
+    private final String cf;
+    private final String comparator;
+    private final String validator;
+    private final String indexes;
+    private final boolean create;
+
+
+    ApplicationCF( String cf, String comparator ) {
+        this.cf = cf;
+        this.comparator = comparator;
+        validator = null;
+        indexes = null;
+        create = true;
+    }
+
+
+    ApplicationCF( String cf, String comparator, String validator ) {
+        this.cf = cf;
+        this.comparator = comparator;
+        this.validator = validator;
+        indexes = null;
+        create = true;
+    }
+
+
+    ApplicationCF( String cf, String comparator, String validator, String indexes ) {
+        this.cf = cf;
+        this.comparator = comparator;
+        this.validator = validator;
+        this.indexes = indexes;
+        create = true;
+    }
+
+
+    @Override
+    public String toString() {
+        return cf;
+    }
+
+
+    @Override
+    public String getColumnFamily() {
+        return cf;
+    }
+
+
+    @Override
+    public String getComparator() {
+        return comparator;
+    }
+
+
+    @Override
+    public String getValidator() {
+        return validator;
+    }
+
+
+    @Override
+    public boolean isComposite() {
+        return comparator.startsWith( "DynamicCompositeType" );
+    }
+
+
+    @Override
+    public List<ColumnDefinition> getMetadata() {
+        return getIndexMetadata( indexes );
+    }
+
+
+    @Override
+    public boolean create() {
+        return create;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CFEnum.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CFEnum.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CFEnum.java
new file mode 100644
index 0000000..6457406
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CFEnum.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.util.List;
+
+import me.prettyprint.hector.api.ddl.ColumnDefinition;
+
+
+public interface CFEnum {
+
+    public String getColumnFamily();
+
+    public String getComparator();
+
+    public String getValidator();
+
+    public boolean isComposite();
+
+    public List<ColumnDefinition> getMetadata();
+
+    public boolean create();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraPersistenceUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraPersistenceUtils.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraPersistenceUtils.java
new file mode 100644
index 0000000..9637845
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraPersistenceUtils.java
@@ -0,0 +1,483 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.codehaus.jackson.JsonNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.cassandra.thrift.ColumnDef;
+import org.apache.cassandra.thrift.IndexType;
+import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.utils.JsonUtils;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.cassandra.service.ThriftColumnDef;
+import me.prettyprint.hector.api.ClockResolution;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.ddl.ColumnDefinition;
+import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.mutation.MutationResult;
+import me.prettyprint.hector.api.mutation.Mutator;
+import static java.nio.ByteBuffer.wrap;
+import static me.prettyprint.hector.api.factory.HFactory.createClockResolution;
+import static me.prettyprint.hector.api.factory.HFactory.createColumn;
+import static org.apache.commons.beanutils.MethodUtils.invokeStaticMethod;
+import static org.apache.commons.lang.StringUtils.removeEnd;
+import static org.apache.commons.lang.StringUtils.removeStart;
+import static org.apache.commons.lang.StringUtils.split;
+import static org.apache.commons.lang.StringUtils.substringAfterLast;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
+import static org.apache.usergrid.persistence.Schema.serializeEntityProperty;
+import static org.apache.usergrid.utils.ClassUtils.isBasicType;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
+import static org.apache.usergrid.utils.JsonUtils.toJsonNode;
+import static org.apache.usergrid.utils.StringUtils.replaceAll;
+import static org.apache.usergrid.utils.StringUtils.stringOrSubstringBeforeFirst;
+
+
+/** @author edanuff */
+public class CassandraPersistenceUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger( CassandraPersistenceUtils.class );
+
+    /** Logger for batch operations */
+    private static final Logger batch_logger =
+            LoggerFactory.getLogger( CassandraPersistenceUtils.class.getPackage().getName() + ".BATCH" );
+
+    /**
+     *
+     */
+    public static final ByteBuffer PROPERTY_TYPE_AS_BYTES = bytebuffer( PROPERTY_TYPE );
+
+    /**
+     *
+     */
+    public static final ByteBuffer PROPERTY_ID_AS_BYTES = bytebuffer( PROPERTY_UUID );
+
+    /**
+     *
+     */
+    public static final char KEY_DELIM = ':';
+
+    /**
+     *
+     */
+    public static final UUID NULL_ID = new UUID( 0, 0 );
+
+    public static final StringSerializer se = new StringSerializer();
+    public static final UUIDSerializer ue = new UUIDSerializer();
+    public static final ByteBufferSerializer be = new ByteBufferSerializer();
+
+
+    /**
+     * @param operation
+     * @param columnFamily
+     * @param key
+     * @param columnName
+     * @param columnValue
+     * @param timestamp
+     */
+    public static void logBatchOperation( String operation, Object columnFamily, Object key, Object columnName,
+                                          Object columnValue, long timestamp ) {
+
+        if ( batch_logger.isDebugEnabled() ) {
+            batch_logger.debug( "{} cf={} key={} name={} value={}",
+                    new Object[] { operation, columnFamily, key, columnName, columnValue } );
+        }
+    }
+
+
+    public static void addInsertToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, Object columnName,
+                                           Object columnValue, long timestamp ) {
+
+        logBatchOperation( "Insert", columnFamily, key, columnName, columnValue, timestamp );
+
+        if ( columnName instanceof List<?> ) {
+            columnName = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
+        }
+        if ( columnValue instanceof List<?> ) {
+            columnValue = DynamicComposite.toByteBuffer( ( List<?> ) columnValue );
+        }
+
+        HColumn<ByteBuffer, ByteBuffer> column =
+                createColumn( bytebuffer( columnName ), bytebuffer( columnValue ), timestamp, be, be );
+        m.addInsertion( bytebuffer( key ), columnFamily.toString(), column );
+    }
+
+
+    public static void addInsertToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, Map<?, ?> columns,
+                                           long timestamp ) throws Exception {
+
+        for ( Entry<?, ?> entry : columns.entrySet() ) {
+            addInsertToMutator( m, columnFamily, key, entry.getKey(), entry.getValue(), timestamp );
+        }
+    }
+
+
+    public static void addPropertyToMutator( Mutator<ByteBuffer> m, Object key, String entityType, String propertyName,
+                                             Object propertyValue, long timestamp ) {
+
+        logBatchOperation( "Insert", ApplicationCF.ENTITY_PROPERTIES, key, propertyName, propertyValue, timestamp );
+
+        HColumn<ByteBuffer, ByteBuffer> column = createColumn( bytebuffer( propertyName ),
+                serializeEntityProperty( entityType, propertyName, propertyValue ), timestamp, be, be );
+        m.addInsertion( bytebuffer( key ), ApplicationCF.ENTITY_PROPERTIES.toString(), column );
+    }
+
+
+    public static void addPropertyToMutator( Mutator<ByteBuffer> m, Object key, String entityType,
+                                             Map<String, ?> columns, long timestamp ) throws Exception {
+
+        for ( Entry<String, ?> entry : columns.entrySet() ) {
+            addPropertyToMutator( m, key, entityType, entry.getKey(), entry.getValue(), timestamp );
+        }
+    }
+
+
+    /** Delete the row */
+    public static void addDeleteToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, long timestamp )
+            throws Exception {
+
+        logBatchOperation( "Delete", columnFamily, key, null, null, timestamp );
+
+        m.addDeletion( bytebuffer( key ), columnFamily.toString(), timestamp );
+    }
+
+
+    public static void addDeleteToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, Object columnName,
+                                           long timestamp ) throws Exception {
+
+        logBatchOperation( "Delete", columnFamily, key, columnName, null, timestamp );
+
+        if ( columnName instanceof List<?> ) {
+            columnName = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
+        }
+
+        m.addDeletion( bytebuffer( key ), columnFamily.toString(), bytebuffer( columnName ), be, timestamp );
+    }
+
+
+    public static void addDeleteToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, long timestamp,
+                                           Object... columnNames ) throws Exception {
+
+        for ( Object columnName : columnNames ) {
+            logBatchOperation( "Delete", columnFamily, key, columnName, null, timestamp );
+
+            if ( columnName instanceof List<?> ) {
+                columnName = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
+            }
+
+            m.addDeletion( bytebuffer( key ), columnFamily.toString(), bytebuffer( columnName ), be, timestamp );
+        }
+    }
+
+
+    public static Map<String, ByteBuffer> getColumnMap( List<HColumn<String, ByteBuffer>> columns ) {
+        Map<String, ByteBuffer> column_map = new TreeMap<String, ByteBuffer>( String.CASE_INSENSITIVE_ORDER );
+        if ( columns != null ) {
+            for ( HColumn<String, ByteBuffer> column : columns ) {
+                String column_name = column.getName();
+                column_map.put( column_name, column.getValue() );
+            }
+        }
+        return column_map;
+    }
+
+
+    public static <K, V> Map<K, V> asMap( List<HColumn<K, V>> columns ) {
+        if ( columns == null ) {
+            return null;
+        }
+        Map<K, V> column_map = new LinkedHashMap<K, V>();
+        for ( HColumn<K, V> column : columns ) {
+            K column_name = column.getName();
+            column_map.put( column_name, column.getValue() );
+        }
+        return column_map;
+    }
+
+
+    public static List<ByteBuffer> getAsByteKeys( List<UUID> ids ) {
+        List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
+        for ( UUID id : ids ) {
+            keys.add( bytebuffer( key( id ) ) );
+        }
+        return keys;
+    }
+
+
+    /** @return timestamp value for current time */
+    public static long createTimestamp() {
+        return createClockResolution( ClockResolution.MICROSECONDS ).createClock();
+    }
+
+
+    /** @return normalized group path */
+    public static String normalizeGroupPath( String path ) {
+        path = replaceAll( path.toLowerCase().trim(), "//", "/" );
+        path = removeStart( path, "/" );
+        path = removeEnd( path, "/" );
+        return path;
+    }
+
+
+    /** @return a composite key */
+    public static Object key( Object... objects ) {
+        if ( objects.length == 1 ) {
+            Object obj = objects[0];
+            if ( ( obj instanceof UUID ) || ( obj instanceof ByteBuffer ) ) {
+                return obj;
+            }
+        }
+        StringBuilder s = new StringBuilder();
+        for ( Object obj : objects ) {
+            if ( obj instanceof String ) {
+                s.append( ( ( String ) obj ).toLowerCase() );
+            }
+            else if ( obj instanceof List<?> ) {
+                s.append( key( ( ( List<?> ) obj ).toArray() ) );
+            }
+            else if ( obj instanceof Object[] ) {
+                s.append( key( ( Object[] ) obj ) );
+            }
+            else if ( obj != null ) {
+                s.append( obj );
+            }
+            else {
+                s.append( "*" );
+            }
+
+            s.append( KEY_DELIM );
+        }
+
+        s.deleteCharAt( s.length() - 1 );
+
+        return s.toString();
+    }
+
+
+    /** @return UUID for composite key */
+    public static UUID keyID( Object... objects ) {
+        if ( objects.length == 1 ) {
+            Object obj = objects[0];
+            if ( obj instanceof UUID ) {
+                return ( UUID ) obj;
+            }
+        }
+        String keyStr = key( objects ).toString();
+        if ( keyStr.length() == 0 ) {
+            return NULL_ID;
+        }
+        UUID uuid = UUID.nameUUIDFromBytes( keyStr.getBytes() );
+        logger.debug( "Key {} equals UUID {}", keyStr, uuid );
+        return uuid;
+    }
+
+
+    /** @return UUID for entity alias */
+    public static UUID aliasID( UUID ownerId, String aliasType, String alias ) {
+        return keyID( ownerId, aliasType, alias );
+    }
+
+
+    public static Mutator<ByteBuffer> buildSetIdListMutator( Mutator<ByteBuffer> batch, UUID targetId,
+                                                             String columnFamily, String keyPrefix, String keySuffix,
+                                                             List<UUID> keyIds, long timestamp ) throws Exception {
+        for ( UUID keyId : keyIds ) {
+            ByteBuffer key = null;
+            if ( ( StringUtils.isNotEmpty( keyPrefix ) ) || ( StringUtils.isNotEmpty( keySuffix ) ) ) {
+                key = bytebuffer( keyPrefix + keyId.toString() + keySuffix );
+            }
+            else {
+                key = bytebuffer( keyId );
+            }
+            addInsertToMutator( batch, columnFamily, key, targetId, ByteBuffer.allocate( 0 ), timestamp );
+        }
+        return batch;
+    }
+
+
+    public static MutationResult batchExecute( Mutator<?> m, int retries ) {
+        for ( int i = 0; i < retries; i++ ) {
+            try {
+                return m.execute();
+            }
+            catch ( Exception e ) {
+                logger.error( "Unable to execute mutation, retrying...", e );
+            }
+        }
+        return m.execute();
+    }
+
+
+    public static Object toStorableValue( Object obj ) {
+        if ( obj == null ) {
+            return null;
+        }
+
+        if ( isBasicType( obj.getClass() ) ) {
+            return obj;
+        }
+
+        if ( obj instanceof ByteBuffer ) {
+            return obj;
+        }
+
+        JsonNode json = toJsonNode( obj );
+        if ( ( json != null ) && json.isValueNode() ) {
+            if ( json.isBigInteger() ) {
+                return json.getBigIntegerValue();
+            }
+            else if ( json.isNumber() || json.isBoolean() ) {
+                return BigInteger.valueOf( json.getValueAsLong() );
+            }
+            else if ( json.isTextual() ) {
+                return json.getTextValue();
+            }
+            else if ( json.isBinary() ) {
+                try {
+                    return wrap( json.getBinaryValue() );
+                }
+                catch ( IOException e ) {
+                }
+            }
+        }
+
+        return json;
+    }
+
+
+    public static ByteBuffer toStorableBinaryValue( Object obj ) {
+        obj = toStorableValue( obj );
+        if ( obj instanceof JsonNode ) {
+            return JsonUtils.toByteBuffer( obj );
+        }
+        else {
+            return bytebuffer( obj );
+        }
+    }
+
+
+    public static ByteBuffer toStorableBinaryValue( Object obj, boolean forceJson ) {
+        obj = toStorableValue( obj );
+        if ( ( obj instanceof JsonNode ) || ( forceJson && ( obj != null ) && !( obj instanceof ByteBuffer ) ) ) {
+            return JsonUtils.toByteBuffer( obj );
+        }
+        else {
+            return bytebuffer( obj );
+        }
+    }
+
+
+    public static List<ColumnDefinition> getIndexMetadata( String indexes ) {
+        if ( indexes == null ) {
+            return null;
+        }
+        String[] index_entries = split( indexes, ',' );
+        List<ColumnDef> columns = new ArrayList<ColumnDef>();
+        for ( String index_entry : index_entries ) {
+            String column_name = stringOrSubstringBeforeFirst( index_entry, ':' ).trim();
+            String comparer = substringAfterLast( index_entry, ":" ).trim();
+            if ( StringUtils.isBlank( comparer ) ) {
+                comparer = "UUIDType";
+            }
+            if ( StringUtils.isNotBlank( column_name ) ) {
+                ColumnDef cd = new ColumnDef( bytebuffer( column_name ), comparer );
+                cd.setIndex_name( column_name );
+                cd.setIndex_type( IndexType.KEYS );
+                columns.add( cd );
+            }
+        }
+        return ThriftColumnDef.fromThriftList( columns );
+    }
+
+
+    public static List<ColumnFamilyDefinition> getCfDefs( Class<? extends CFEnum> cfEnum, String keyspace ) {
+        return getCfDefs( cfEnum, null, keyspace );
+    }
+
+
+    public static List<ColumnFamilyDefinition> getCfDefs( Class<? extends CFEnum> cfEnum,
+                                                          List<ColumnFamilyDefinition> cf_defs, String keyspace ) {
+
+        if ( cf_defs == null ) {
+            cf_defs = new ArrayList<ColumnFamilyDefinition>();
+        }
+
+        CFEnum[] values = null;
+        try {
+            values = ( CFEnum[] ) invokeStaticMethod( cfEnum, "values", ( Object[] ) null );
+        }
+        catch ( Exception e ) {
+            logger.error( "Couldn't get CFEnum values", e );
+        }
+        if ( values == null ) {
+            return null;
+        }
+
+        for ( CFEnum cf : values ) {
+            if ( !cf.create() ) {
+                continue;
+            }
+            String defaultValidationClass = cf.getValidator();
+            List<ColumnDefinition> metadata = cf.getMetadata();
+
+            ColumnFamilyDefinition cf_def = HFactory.createColumnFamilyDefinition( keyspace, cf.getColumnFamily(),
+                    ComparatorType.getByClassName( cf.getComparator() ), metadata );
+
+            if ( defaultValidationClass != null ) {
+                cf_def.setDefaultValidationClass( defaultValidationClass );
+            }
+
+            cf_defs.add( cf_def );
+        }
+
+        return cf_defs;
+    }
+
+
+    public static void validateKeyspace( CFEnum[] cf_enums, KeyspaceDefinition ksDef ) {
+        Map<String, ColumnFamilyDefinition> cfs = new HashMap<String, ColumnFamilyDefinition>();
+        for ( ColumnFamilyDefinition cf : ksDef.getCfDefs() ) {
+            cfs.put( cf.getName(), cf );
+        }
+        for ( CFEnum c : cf_enums ) {
+            if ( !cfs.keySet().contains( c.getColumnFamily() ) ) {
+
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
new file mode 100644
index 0000000..1b2b9e3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
@@ -0,0 +1,1134 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.locking.LockManager;
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
+import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
+import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.cassandra.connection.HConnectionManager;
+import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.cassandra.service.CassandraHostConfigurator;
+import me.prettyprint.cassandra.service.ThriftKsDef;
+import me.prettyprint.hector.api.Cluster;
+import me.prettyprint.hector.api.ConsistencyLevelPolicy;
+import me.prettyprint.hector.api.HConsistencyLevel;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.beans.ColumnSlice;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.OrderedRows;
+import me.prettyprint.hector.api.beans.Row;
+import me.prettyprint.hector.api.beans.Rows;
+import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
+import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.mutation.Mutator;
+import me.prettyprint.hector.api.query.ColumnQuery;
+import me.prettyprint.hector.api.query.CountQuery;
+import me.prettyprint.hector.api.query.MultigetSliceQuery;
+import me.prettyprint.hector.api.query.QueryResult;
+import me.prettyprint.hector.api.query.RangeSlicesQuery;
+import me.prettyprint.hector.api.query.SliceQuery;
+import static me.prettyprint.cassandra.service.FailoverPolicy.ON_FAIL_TRY_ALL_AVAILABLE;
+import static me.prettyprint.hector.api.factory.HFactory.createColumn;
+import static me.prettyprint.hector.api.factory.HFactory.createMultigetSliceQuery;
+import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static me.prettyprint.hector.api.factory.HFactory.createRangeSlicesQuery;
+import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
+import static me.prettyprint.hector.api.factory.HFactory.createVirtualKeyspace;
+import static org.apache.commons.collections.MapUtils.getIntValue;
+import static org.apache.commons.collections.MapUtils.getString;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_ID_SETS;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.buildSetIdListMutator;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffers;
+import static org.apache.usergrid.utils.JsonUtils.mapToFormattedJsonString;
+import static org.apache.usergrid.utils.MapUtils.asMap;
+import static org.apache.usergrid.utils.MapUtils.filter;
+
+
+public class CassandraService {
+
+    public static String SYSTEM_KEYSPACE = "Usergrid";
+
+    public static String STATIC_APPLICATION_KEYSPACE = "Usergrid_Applications";
+
+    public static final boolean USE_VIRTUAL_KEYSPACES = true;
+
+    public static final String APPLICATIONS_CF = "Applications";
+    public static final String PROPERTIES_CF = "Properties";
+    public static final String TOKENS_CF = "Tokens";
+    public static final String PRINCIPAL_TOKEN_CF = "PrincipalTokens";
+
+    public static final int DEFAULT_COUNT = 1000;
+    public static final int ALL_COUNT = 100000;
+    public static final int INDEX_ENTRY_LIST_COUNT = 1000;
+    public static final int DEFAULT_SEARCH_COUNT = 10000;
+
+    public static final int RETRY_COUNT = 5;
+
+    public static final String DEFAULT_APPLICATION = "default-app";
+    public static final String DEFAULT_ORGANIZATION = "usergrid";
+    public static final String MANAGEMENT_APPLICATION = "management";
+
+    public static final UUID MANAGEMENT_APPLICATION_ID = new UUID( 0, 1 );
+    public static final UUID DEFAULT_APPLICATION_ID = new UUID( 0, 16 );
+
+    private static final Logger logger = LoggerFactory.getLogger( CassandraService.class );
+
+    private static final Logger db_logger =
+            LoggerFactory.getLogger( CassandraService.class.getPackage().getName() + ".DB" );
+
+    Cluster cluster;
+    CassandraHostConfigurator chc;
+    Properties properties;
+    LockManager lockManager;
+
+    ConsistencyLevelPolicy consistencyLevelPolicy;
+
+    private Keyspace systemKeyspace;
+
+    private Map<String, String> accessMap;
+
+    public static final StringSerializer se = new StringSerializer();
+    public static final ByteBufferSerializer be = new ByteBufferSerializer();
+    public static final UUIDSerializer ue = new UUIDSerializer();
+    public static final BytesArraySerializer bae = new BytesArraySerializer();
+    public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
+    public static final LongSerializer le = new LongSerializer();
+
+    public static final UUID NULL_ID = new UUID( 0, 0 );
+
+
+    public CassandraService( Properties properties, Cluster cluster,
+                             CassandraHostConfigurator cassandraHostConfigurator, LockManager lockManager ) {
+        this.properties = properties;
+        this.cluster = cluster;
+        chc = cassandraHostConfigurator;
+        this.lockManager = lockManager;
+        db_logger.info( "" + cluster.getKnownPoolHosts( false ) );
+    }
+
+
+    public void init() throws Exception {
+        if ( consistencyLevelPolicy == null ) {
+            consistencyLevelPolicy = new ConfigurableConsistencyLevel();
+            ( ( ConfigurableConsistencyLevel ) consistencyLevelPolicy )
+                    .setDefaultReadConsistencyLevel( HConsistencyLevel.ONE );
+        }
+        accessMap = new HashMap<String, String>( 2 );
+        accessMap.put( "username", properties.getProperty( "cassandra.username" ) );
+        accessMap.put( "password", properties.getProperty( "cassandra.password" ) );
+        systemKeyspace =
+                HFactory.createKeyspace( SYSTEM_KEYSPACE, cluster, consistencyLevelPolicy, ON_FAIL_TRY_ALL_AVAILABLE,
+                        accessMap );
+    }
+
+
+    public Cluster getCluster() {
+        return cluster;
+    }
+
+
+    public void setCluster( Cluster cluster ) {
+        this.cluster = cluster;
+    }
+
+
+    public CassandraHostConfigurator getCassandraHostConfigurator() {
+        return chc;
+    }
+
+
+    public void setCassandraHostConfigurator( CassandraHostConfigurator chc ) {
+        this.chc = chc;
+    }
+
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+
+    public void setProperties( Properties properties ) {
+        this.properties = properties;
+    }
+
+
+    public Map<String, String> getPropertiesMap() {
+        if ( properties != null ) {
+            return asMap( properties );
+        }
+        return null;
+    }
+
+
+    public LockManager getLockManager() {
+        return lockManager;
+    }
+
+
+    public void setLockManager( LockManager lockManager ) {
+        this.lockManager = lockManager;
+    }
+
+
+    public ConsistencyLevelPolicy getConsistencyLevelPolicy() {
+        return consistencyLevelPolicy;
+    }
+
+
+    public void setConsistencyLevelPolicy( ConsistencyLevelPolicy consistencyLevelPolicy ) {
+        this.consistencyLevelPolicy = consistencyLevelPolicy;
+    }
+
+
+    /** @return keyspace for application UUID */
+    public static String keyspaceForApplication( UUID applicationId ) {
+        if ( USE_VIRTUAL_KEYSPACES ) {
+            return STATIC_APPLICATION_KEYSPACE;
+        }
+        else {
+            return "Application_" + applicationId.toString().replace( '-', '_' );
+        }
+    }
+
+
+    public static UUID prefixForApplication( UUID applicationId ) {
+        if ( USE_VIRTUAL_KEYSPACES ) {
+            return applicationId;
+        }
+        else {
+            return null;
+        }
+    }
+
+
+    public Keyspace getKeyspace( String keyspace, UUID prefix ) {
+        Keyspace ko = null;
+        if ( USE_VIRTUAL_KEYSPACES && ( prefix != null ) ) {
+            ko = createVirtualKeyspace( keyspace, prefix, ue, cluster, consistencyLevelPolicy,
+                    ON_FAIL_TRY_ALL_AVAILABLE, accessMap );
+        }
+        else {
+            ko = HFactory.createKeyspace( keyspace, cluster, consistencyLevelPolicy, ON_FAIL_TRY_ALL_AVAILABLE,
+                    accessMap );
+        }
+        return ko;
+    }
+
+
+    public Keyspace getApplicationKeyspace( UUID applicationId ) {
+        assert applicationId != null;
+        Keyspace ko = getKeyspace( keyspaceForApplication( applicationId ), prefixForApplication( applicationId ) );
+        return ko;
+    }
+
+
+    /** The Usergrid_Applications keyspace directly */
+    public Keyspace getUsergridApplicationKeyspace() {
+        return getKeyspace( STATIC_APPLICATION_KEYSPACE, null );
+    }
+
+
+    public Keyspace getSystemKeyspace() {
+        return systemKeyspace;
+    }
+
+
+    public boolean checkKeyspacesExist() {
+        boolean exists = false;
+        try {
+            exists = cluster.describeKeyspace( SYSTEM_KEYSPACE ) != null
+                    && cluster.describeKeyspace( STATIC_APPLICATION_KEYSPACE ) != null;
+        }
+        catch ( Exception ex ) {
+            logger.error( "could not describe keyspaces", ex );
+        }
+        return exists;
+    }
+
+
+    /**
+     * Lazy creates a column family in the keyspace. If it doesn't exist, it will be created, then the call will sleep
+     * until all nodes have acknowledged the schema change
+     */
+    public void createColumnFamily( String keyspace, ColumnFamilyDefinition cfDef ) {
+
+        if ( !keySpaceExists( keyspace ) ) {
+            createKeySpace( keyspace );
+        }
+
+
+        //add the cf
+
+        if ( !cfExists( keyspace, cfDef.getName() ) ) {
+
+            //default read repair chance to 0.1
+            cfDef.setReadRepairChance( 0.1d );
+
+            cluster.addColumnFamily( cfDef, true );
+            logger.info( "Created column family {} in keyspace {}", cfDef.getName(), keyspace );
+        }
+    }
+
+
+    /** Create the column families in the list */
+    public void createColumnFamilies( String keyspace, List<ColumnFamilyDefinition> cfDefs ) {
+        for ( ColumnFamilyDefinition cfDef : cfDefs ) {
+            createColumnFamily( keyspace, cfDef );
+        }
+    }
+
+
+    /** Check if the keyspace exsts */
+    public boolean keySpaceExists( String keyspace ) {
+        KeyspaceDefinition ksDef = cluster.describeKeyspace( keyspace );
+
+        return ksDef != null;
+    }
+
+
+    /** Create the keyspace */
+    private void createKeySpace( String keyspace ) {
+        logger.info( "Creating keyspace: {}", keyspace );
+
+        String strategy_class =
+                getString( properties, "cassandra.keyspace.strategy", "org.apache.cassandra.locator.SimpleStrategy" );
+        logger.info( "Using strategy: {}", strategy_class );
+
+        int replication_factor = getIntValue( properties, "cassandra.keyspace.replication", 1 );
+        logger.info( "Using replication (may be overriden by strategy options): {}", replication_factor );
+
+        // try {
+        ThriftKsDef ks_def = ( ThriftKsDef ) HFactory
+                .createKeyspaceDefinition( keyspace, strategy_class, replication_factor,
+                        new ArrayList<ColumnFamilyDefinition>() );
+
+        @SuppressWarnings({ "unchecked", "rawtypes" }) Map<String, String> strategy_options =
+                filter( ( Map ) properties, "cassandra.keyspace.strategy.options.", true );
+        if ( strategy_options.size() > 0 ) {
+            logger.info( "Strategy options: {}", mapToFormattedJsonString( strategy_options ) );
+            ks_def.setStrategyOptions( strategy_options );
+        }
+
+        cluster.addKeyspace( ks_def );
+
+        waitForCreation( keyspace );
+
+        logger.info( "Created keyspace {}", keyspace );
+    }
+
+
+    /** Wait until all nodes agree on the same schema version */
+    private void waitForCreation( String keyspace ) {
+
+        while ( true ) {
+            Map<String, List<String>> versions = cluster.describeSchemaVersions();
+            // only 1 version, return
+            if ( versions != null && versions.size() == 1 ) {
+                return;
+            }
+            // sleep and try again
+            try {
+                Thread.sleep( 100 );
+            }
+            catch ( InterruptedException e ) {
+            }
+        }
+    }
+
+
+    /** Return true if the column family exists */
+    public boolean cfExists( String keyspace, String cfName ) {
+        KeyspaceDefinition ksDef = cluster.describeKeyspace( keyspace );
+
+        if ( ksDef == null ) {
+            return false;
+        }
+
+        for ( ColumnFamilyDefinition cf : ksDef.getCfDefs() ) {
+            if ( cfName.equals( cf.getName() ) ) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+
+    /**
+     * Gets the columns.
+     *
+     * @param keyspace the keyspace
+     * @param columnFamily the column family
+     * @param key the key
+     *
+     * @return columns
+     *
+     * @throws Exception the exception
+     */
+    public <N, V> List<HColumn<N, V>> getAllColumns( Keyspace ko, Object columnFamily, Object key,
+                                                     Serializer<N> nameSerializer, Serializer<V> valueSerializer )
+            throws Exception {
+
+        if ( db_logger.isInfoEnabled() ) {
+            db_logger.info( "getColumns cf={} key={}", columnFamily, key );
+        }
+
+        SliceQuery<ByteBuffer, N, V> q = createSliceQuery( ko, be, nameSerializer, valueSerializer );
+        q.setColumnFamily( columnFamily.toString() );
+        q.setKey( bytebuffer( key ) );
+        q.setRange( null, null, false, ALL_COUNT );
+        QueryResult<ColumnSlice<N, V>> r = q.execute();
+        ColumnSlice<N, V> slice = r.get();
+        List<HColumn<N, V>> results = slice.getColumns();
+
+        if ( db_logger.isInfoEnabled() ) {
+            if ( results == null ) {
+                db_logger.info( "getColumns returned null" );
+            }
+            else {
+                db_logger.info( "getColumns returned {} columns", results.size() );
+            }
+        }
+
+        return results;
+    }
+
+
+    public List<HColumn<String, ByteBuffer>> getAllColumns( Keyspace ko, Object columnFamily, Object key )
+            throws Exception {
+        return getAllColumns( ko, columnFamily, key, se, be );
+    }
+
+
+    public Set<String> getAllColumnNames( Keyspace ko, Object columnFamily, Object key ) throws Exception {
+        List<HColumn<String, ByteBuffer>> columns = getAllColumns( ko, columnFamily, key );
+        Set<String> set = new LinkedHashSet<String>();
+        for ( HColumn<String, ByteBuffer> column : columns ) {
+            set.add( column.getName() );
+        }
+        return set;
+    }
+
+
+    /**
+     * Gets the columns.
+     *
+     * @param keyspace the keyspace
+     * @param columnFamily the column family
+     * @param key the key
+     * @param start the start
+     * @param finish the finish
+     * @param count the count
+     * @param reversed the reversed
+     *
+     * @return columns
+     *
+     * @throws Exception the exception
+     */
+    public List<HColumn<ByteBuffer, ByteBuffer>> getColumns( Keyspace ko, Object columnFamily, Object key, Object start,
+                                                             Object finish, int count, boolean reversed )
+            throws Exception {
+
+        if ( db_logger.isDebugEnabled() ) {
+            db_logger.debug( "getColumns cf=" + columnFamily + " key=" + key + " start=" + start + " finish=" + finish
+                    + " count=" + count + " reversed=" + reversed );
+        }
+
+        SliceQuery<ByteBuffer, ByteBuffer, ByteBuffer> q = createSliceQuery( ko, be, be, be );
+        q.setColumnFamily( columnFamily.toString() );
+        q.setKey( bytebuffer( key ) );
+
+        ByteBuffer start_bytes = null;
+        if ( start instanceof DynamicComposite ) {
+            start_bytes = ( ( DynamicComposite ) start ).serialize();
+        }
+        else if ( start instanceof List ) {
+            start_bytes = DynamicComposite.toByteBuffer( ( List<?> ) start );
+        }
+        else {
+            start_bytes = bytebuffer( start );
+        }
+
+        ByteBuffer finish_bytes = null;
+        if ( finish instanceof DynamicComposite ) {
+            finish_bytes = ( ( DynamicComposite ) finish ).serialize();
+        }
+        else if ( finish instanceof List ) {
+            finish_bytes = DynamicComposite.toByteBuffer( ( List<?> ) finish );
+        }
+        else {
+            finish_bytes = bytebuffer( finish );
+        }
+
+    /*
+     * if (reversed) { q.setRange(finish_bytes, start_bytes, reversed, count); }
+     * else { q.setRange(start_bytes, finish_bytes, reversed, count); }
+     */
+        q.setRange( start_bytes, finish_bytes, reversed, count );
+        QueryResult<ColumnSlice<ByteBuffer, ByteBuffer>> r = q.execute();
+        ColumnSlice<ByteBuffer, ByteBuffer> slice = r.get();
+        List<HColumn<ByteBuffer, ByteBuffer>> results = slice.getColumns();
+
+        if ( db_logger.isDebugEnabled() ) {
+            if ( results == null ) {
+                db_logger.debug( "getColumns returned null" );
+            }
+            else {
+                db_logger.debug( "getColumns returned " + results.size() + " columns" );
+            }
+        }
+
+        return results;
+    }
+
+
+    public Map<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>> multiGetColumns( Keyspace ko, Object columnFamily,
+                                                                                   List<?> keys, Object start,
+                                                                                   Object finish, int count,
+                                                                                   boolean reversed ) throws Exception {
+
+        if ( db_logger.isDebugEnabled() ) {
+            db_logger.debug( "multiGetColumns cf=" + columnFamily + " keys=" + keys + " start=" + start + " finish="
+                    + finish + " count=" + count + " reversed=" + reversed );
+        }
+
+        MultigetSliceQuery<ByteBuffer, ByteBuffer, ByteBuffer> q = createMultigetSliceQuery( ko, be, be, be );
+        q.setColumnFamily( columnFamily.toString() );
+        q.setKeys( bytebuffers( keys ) );
+
+        ByteBuffer start_bytes = null;
+        if ( start instanceof DynamicComposite ) {
+            start_bytes = ( ( DynamicComposite ) start ).serialize();
+        }
+        else if ( start instanceof List ) {
+            start_bytes = DynamicComposite.toByteBuffer( ( List<?> ) start );
+        }
+        else {
+            start_bytes = bytebuffer( start );
+        }
+
+        ByteBuffer finish_bytes = null;
+        if ( finish instanceof DynamicComposite ) {
+            finish_bytes = ( ( DynamicComposite ) finish ).serialize();
+        }
+        else if ( finish instanceof List ) {
+            finish_bytes = DynamicComposite.toByteBuffer( ( List<?> ) finish );
+        }
+        else {
+            finish_bytes = bytebuffer( finish );
+        }
+
+        q.setRange( start_bytes, finish_bytes, reversed, count );
+        QueryResult<Rows<ByteBuffer, ByteBuffer, ByteBuffer>> r = q.execute();
+        Rows<ByteBuffer, ByteBuffer, ByteBuffer> rows = r.get();
+
+        Map<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>> results =
+                new LinkedHashMap<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>>();
+        for ( Row<ByteBuffer, ByteBuffer, ByteBuffer> row : rows ) {
+            results.put( row.getKey(), row.getColumnSlice().getColumns() );
+        }
+
+        return results;
+    }
+
+
+    /**
+     * Gets the columns.
+     *
+     * @param keyspace the keyspace
+     * @param columnFamily the column family
+     * @param keys the keys
+     *
+     * @return map of keys to columns
+     *
+     * @throws Exception the exception
+     */
+    public <K, N, V> Rows<K, N, V> getRows( Keyspace ko, Object columnFamily, Collection<K> keys,
+                                            Serializer<K> keySerializer, Serializer<N> nameSerializer,
+                                            Serializer<V> valueSerializer ) throws Exception {
+
+        if ( db_logger.isDebugEnabled() ) {
+            db_logger.debug( "getColumns cf=" + columnFamily + " keys=" + keys );
+        }
+
+        MultigetSliceQuery<K, N, V> q = createMultigetSliceQuery( ko, keySerializer, nameSerializer, valueSerializer );
+        q.setColumnFamily( columnFamily.toString() );
+        q.setKeys( keys );
+        q.setRange( null, null, false, ALL_COUNT );
+        QueryResult<Rows<K, N, V>> r = q.execute();
+        Rows<K, N, V> results = r.get();
+
+        if ( db_logger.isInfoEnabled() ) {
+            if ( results == null ) {
+                db_logger.info( "getColumns returned null" );
+            }
+            else {
+                db_logger.info( "getColumns returned " + results.getCount() + " columns" );
+            }
+        }
+
+        return results;
+    }
+
+
+    /**
+     * Gets the columns.
+     *
+     * @param keyspace the keyspace
+     * @param columnFamily the column family
+     * @param key the key
+     * @param columnNames the column names
+     *
+     * @return columns
+     *
+     * @throws Exception the exception
+     */
+    @SuppressWarnings("unchecked")
+    public <N, V> List<HColumn<N, V>> getColumns( Keyspace ko, Object columnFamily, Object key, Set<String> columnNames,
+                                                  Serializer<N> nameSerializer, Serializer<V> valueSerializer )
+            throws Exception {
+
+        if ( db_logger.isDebugEnabled() ) {
+            db_logger.debug( "getColumns cf=" + columnFamily + " key=" + key + " names=" + columnNames );
+        }
+
+        SliceQuery<ByteBuffer, N, V> q = createSliceQuery( ko, be, nameSerializer, valueSerializer );
+        q.setColumnFamily( columnFamily.toString() );
+        q.setKey( bytebuffer( key ) );
+        // q.setColumnNames(columnNames.toArray(new String[0]));
+        q.setColumnNames( ( N[] ) nameSerializer.fromBytesSet( se.toBytesSet( new ArrayList<String>( columnNames ) ) )
+                                                .toArray() );
+
+        QueryResult<ColumnSlice<N, V>> r = q.execute();
+        ColumnSlice<N, V> slice = r.get();
+        List<HColumn<N, V>> results = slice.getColumns();
+
+        if ( db_logger.isInfoEnabled() ) {
+            if ( results == null ) {
+                db_logger.info( "getColumns returned null" );
+            }
+            else {
+                db_logger.info( "getColumns returned " + results.size() + " columns" );
+            }
+        }
+
+        return results;
+    }
+
+
+    /**
+     * Gets the columns.
+     *
+     * @param keyspace the keyspace
+     * @param columnFamily the column family
+     * @param keys the keys
+     * @param columnNames the column names
+     *
+     * @return map of keys to columns
+     *
+     * @throws Exception the exception
+     */
+    @SuppressWarnings("unchecked")
+    public <K, N, V> Rows<K, N, V> getRows( Keyspace ko, Object columnFamily, Collection<K> keys,
+                                            Collection<String> columnNames, Serializer<K> keySerializer,
+                                            Serializer<N> nameSerializer, Serializer<V> valueSerializer )
+            throws Exception {
+
+        if ( db_logger.isDebugEnabled() ) {
+            db_logger.debug( "getColumns cf=" + columnFamily + " keys=" + keys + " names=" + columnNames );
+        }
+
+        MultigetSliceQuery<K, N, V> q = createMultigetSliceQuery( ko, keySerializer, nameSerializer, valueSerializer );
+        q.setColumnFamily( columnFamily.toString() );
+        q.setKeys( keys );
+        q.setColumnNames( ( N[] ) nameSerializer.fromBytesSet( se.toBytesSet( new ArrayList<String>( columnNames ) ) )
+                                                .toArray() );
+        QueryResult<Rows<K, N, V>> r = q.execute();
+        Rows<K, N, V> results = r.get();
+
+        if ( db_logger.isInfoEnabled() ) {
+            if ( results == null ) {
+                db_logger.info( "getColumns returned null" );
+            }
+            else {
+                db_logger.info( "getColumns returned " + results.getCount() + " columns" );
+            }
+        }
+
+        return results;
+    }
+
+
+    /**
+     * Gets the column.
+     *
+     * @param keyspace the keyspace
+     * @param columnFamily the column family
+     * @param key the key
+     * @param column the column
+     *
+     * @return column
+     *
+     * @throws Exception the exception
+     */
+    public <N, V> HColumn<N, V> getColumn( Keyspace ko, Object columnFamily, Object key, N column,
+                                           Serializer<N> nameSerializer, Serializer<V> valueSerializer )
+            throws Exception {
+
+        if ( db_logger.isDebugEnabled() ) {
+            db_logger.debug( "getColumn cf=" + columnFamily + " key=" + key + " column=" + column );
+        }
+
+    /*
+     * ByteBuffer column_bytes = null; if (column instanceof List) {
+     * column_bytes = Composite.serializeToByteBuffer((List<?>) column); } else
+     * { column_bytes = bytebuffer(column); }
+     */
+
+        ColumnQuery<ByteBuffer, N, V> q = HFactory.createColumnQuery( ko, be, nameSerializer, valueSerializer );
+        QueryResult<HColumn<N, V>> r =
+                q.setKey( bytebuffer( key ) ).setName( column ).setColumnFamily( columnFamily.toString() ).execute();
+        HColumn<N, V> result = r.get();
+
+        if ( db_logger.isInfoEnabled() ) {
+            if ( result == null ) {
+                db_logger.info( "getColumn returned null" );
+            }
+        }
+
+        return result;
+    }
+
+
+    public <N, V> ColumnSlice<N, V> getColumns( Keyspace ko, Object columnFamily, Object key, N[] columns,
+                                                Serializer<N> nameSerializer, Serializer<V> valueSerializer )
+            throws Exception {
+
+        if ( db_logger.isDebugEnabled() ) {
+            db_logger.debug( "getColumn cf=" + columnFamily + " key=" + key + " column=" + columns );
+        }
+
+    /*
+     * ByteBuffer column_bytes = null; if (column instanceof List) {
+     * column_bytes = Composite.serializeToByteBuffer((List<?>) column); } else
+     * { column_bytes = bytebuffer(column); }
+     */
+
+        SliceQuery<ByteBuffer, N, V> q = HFactory.createSliceQuery( ko, be, nameSerializer, valueSerializer );
+        QueryResult<ColumnSlice<N, V>> r =
+                q.setKey( bytebuffer( key ) ).setColumnNames( columns ).setColumnFamily( columnFamily.toString() )
+                 .execute();
+        ColumnSlice<N, V> result = r.get();
+
+        if ( db_logger.isDebugEnabled() ) {
+            if ( result == null ) {
+                db_logger.debug( "getColumn returned null" );
+            }
+        }
+
+        return result;
+    }
+
+
+    public HColumn<String, ByteBuffer> getColumn( Keyspace ko, Object columnFamily, Object key, String column )
+            throws Exception {
+        return getColumn( ko, columnFamily, key, column, se, be );
+    }
+
+
+    public void setColumn( Keyspace ko, Object columnFamily, Object key, Object columnName, Object columnValue )
+            throws Exception {
+        this.setColumn( ko, columnFamily, key, columnName, columnValue, 0 );
+    }
+
+
+    public void setColumn( Keyspace ko, Object columnFamily, Object key, Object columnName, Object columnValue,
+                           int ttl ) throws Exception {
+
+        if ( db_logger.isDebugEnabled() ) {
+            db_logger.debug( "setColumn cf=" + columnFamily + " key=" + key + " name=" + columnName + " value="
+                    + columnValue );
+        }
+
+        ByteBuffer name_bytes = null;
+        if ( columnName instanceof List ) {
+            name_bytes = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
+        }
+        else {
+            name_bytes = bytebuffer( columnName );
+        }
+
+        ByteBuffer value_bytes = null;
+        if ( columnValue instanceof List ) {
+            value_bytes = DynamicComposite.toByteBuffer( ( List<?> ) columnValue );
+        }
+        else {
+            value_bytes = bytebuffer( columnValue );
+        }
+
+        HColumn<ByteBuffer, ByteBuffer> col = createColumn( name_bytes, value_bytes, be, be );
+        if ( ttl != 0 ) {
+            col.setTtl( ttl );
+        }
+        Mutator<ByteBuffer> m = createMutator( ko, be );
+        m.insert( bytebuffer( key ), columnFamily.toString(), col );
+    }
+
+
+    /**
+     * Sets the columns.
+     *
+     * @param keyspace the keyspace
+     * @param columnFamily the column family
+     * @param key the key
+     * @param map the map
+     *
+     * @throws Exception the exception
+     */
+    public void setColumns( Keyspace ko, Object columnFamily, byte[] key, Map<?, ?> map ) throws Exception {
+        this.setColumns( ko, columnFamily, key, map, 0 );
+    }
+
+
+    public void setColumns( Keyspace ko, Object columnFamily, byte[] key, Map<?, ?> map, int ttl ) throws Exception {
+
+        if ( db_logger.isDebugEnabled() ) {
+            db_logger.debug( "setColumns cf=" + columnFamily + " key=" + key + " map=" + map + ( ttl != 0 ?
+                                                                                                 " ttl=" + ttl : "" ) );
+        }
+
+        Mutator<ByteBuffer> m = createMutator( ko, be );
+        long timestamp = createTimestamp();
+
+        for ( Object name : map.keySet() ) {
+            Object value = map.get( name );
+            if ( value != null ) {
+
+                ByteBuffer name_bytes = null;
+                if ( name instanceof List ) {
+                    name_bytes = DynamicComposite.toByteBuffer( ( List<?> ) name );
+                }
+                else {
+                    name_bytes = bytebuffer( name );
+                }
+
+                ByteBuffer value_bytes = null;
+                if ( value instanceof List ) {
+                    value_bytes = DynamicComposite.toByteBuffer( ( List<?> ) value );
+                }
+                else {
+                    value_bytes = bytebuffer( value );
+                }
+
+                HColumn<ByteBuffer, ByteBuffer> col = createColumn( name_bytes, value_bytes, timestamp, be, be );
+                if ( ttl != 0 ) {
+                    col.setTtl( ttl );
+                }
+                m.addInsertion( bytebuffer( key ), columnFamily.toString(),
+                        createColumn( name_bytes, value_bytes, timestamp, be, be ) );
+            }
+        }
+        batchExecute( m, CassandraService.RETRY_COUNT );
+    }
+
+
+    /**
+     * Create a timestamp based on the TimeResolution set to the cluster.
+     *
+     * @return a timestamp
+     */
+    public long createTimestamp() {
+        return chc.getClockResolution().createClock();
+    }
+
+
+    /**
+     * Delete column.
+     *
+     * @param keyspace the keyspace
+     * @param columnFamily the column family
+     * @param key the key
+     * @param column the column
+     *
+     * @throws Exception the exception
+     */
+    public void deleteColumn( Keyspace ko, Object columnFamily, Object key, Object column ) throws Exception {
+
+        if ( db_logger.isDebugEnabled() ) {
+            db_logger.debug( "deleteColumn cf=" + columnFamily + " key=" + key + " name=" + column );
+        }
+
+        Mutator<ByteBuffer> m = createMutator( ko, be );
+        m.delete( bytebuffer( key ), columnFamily.toString(), bytebuffer( column ), be );
+    }
+
+
+    /**
+     * Gets the row keys.
+     *
+     * @param keyspace the keyspace
+     * @param columnFamily the column family
+     *
+     * @return set of keys
+     *
+     * @throws Exception the exception
+     */
+    public <K> Set<K> getRowKeySet( Keyspace ko, Object columnFamily, Serializer<K> keySerializer ) throws Exception {
+
+        if ( db_logger.isDebugEnabled() ) {
+            db_logger.debug( "getRowKeys cf=" + columnFamily );
+        }
+
+        RangeSlicesQuery<K, ByteBuffer, ByteBuffer> q = createRangeSlicesQuery( ko, keySerializer, be, be );
+        q.setColumnFamily( columnFamily.toString() );
+        q.setKeys( null, null );
+        q.setColumnNames( new ByteBuffer[0] );
+        QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> r = q.execute();
+        OrderedRows<K, ByteBuffer, ByteBuffer> rows = r.get();
+
+        Set<K> results = new LinkedHashSet<K>();
+        for ( Row<K, ByteBuffer, ByteBuffer> row : rows ) {
+            results.add( row.getKey() );
+        }
+
+        if ( db_logger.isDebugEnabled() ) {
+            {
+                db_logger.debug( "getRowKeys returned " + results.size() + " rows" );
+            }
+        }
+
+        return results;
+    }
+
+
+    /**
+     * Gets the row keys as uui ds.
+     *
+     * @param keyspace the keyspace
+     * @param columnFamily the column family
+     *
+     * @return list of row key UUIDs
+     *
+     * @throws Exception the exception
+     */
+    public <K> List<K> getRowKeyList( Keyspace ko, Object columnFamily, Serializer<K> keySerializer ) throws Exception {
+
+        RangeSlicesQuery<K, ByteBuffer, ByteBuffer> q = createRangeSlicesQuery( ko, keySerializer, be, be );
+        q.setColumnFamily( columnFamily.toString() );
+        q.setKeys( null, null );
+        q.setColumnNames( new ByteBuffer[0] );
+        QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> r = q.execute();
+        OrderedRows<K, ByteBuffer, ByteBuffer> rows = r.get();
+
+        List<K> list = new ArrayList<K>();
+        for ( Row<K, ByteBuffer, ByteBuffer> row : rows ) {
+            list.add( row.getKey() );
+            // K uuid = row.getKey();
+            // if (uuid != UUIDUtils.ZERO_UUID) {
+            // list.add(uuid);
+            // }
+        }
+
+        return list;
+    }
+
+
+    /**
+     * Delete row.
+     *
+     * @param keyspace the keyspace
+     * @param columnFamily the column family
+     * @param key the key
+     *
+     * @throws Exception the exception
+     */
+    public void deleteRow( Keyspace ko, final Object columnFamily, final Object key ) throws Exception {
+
+        if ( db_logger.isDebugEnabled() ) {
+            db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key );
+        }
+
+        createMutator( ko, be ).addDeletion( bytebuffer( key ), columnFamily.toString() ).execute();
+    }
+
+
+    public void deleteRow( Keyspace ko, final Object columnFamily, final String key ) throws Exception {
+
+        if ( db_logger.isDebugEnabled() ) {
+            db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key );
+        }
+
+        createMutator( ko, se ).addDeletion( key, columnFamily.toString() ).execute();
+    }
+
+
+    /**
+     * Delete row.
+     *
+     * @param keyspace the keyspace
+     * @param columnFamily the column family
+     * @param key the key
+     * @param timestamp the timestamp
+     *
+     * @throws Exception the exception
+     */
+    public void deleteRow( Keyspace ko, final Object columnFamily, final Object key, final long timestamp )
+            throws Exception {
+
+        if ( db_logger.isDebugEnabled() ) {
+            db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key + " timestamp=" + timestamp );
+        }
+
+        createMutator( ko, be ).addDeletion( bytebuffer( key ), columnFamily.toString(), timestamp ).execute();
+    }
+
+
+    /**
+     * Gets the id list.
+     *
+     * @param ko the keyspace
+     * @param key the key
+     * @param start the start
+     * @param finish the finish
+     * @param count the count
+     * @param reversed True if the scan should be reversed
+     * @param locator The index locator instance
+     * @param applicationId The applicationId
+     * @param collectionName The name of the collection to get the Ids for
+     *
+     * @return list of columns as UUIDs
+     *
+     * @throws Exception the exception
+     */
+    public IndexScanner getIdList( Keyspace ko, Object key, UUID start, UUID finish, int count, boolean reversed,
+                                   IndexBucketLocator locator, UUID applicationId, String collectionName )
+            throws Exception {
+
+        if ( count <= 0 ) {
+            count = DEFAULT_COUNT;
+        }
+
+        if ( NULL_ID.equals( start ) ) {
+            start = null;
+        }
+
+        IndexScanner scanner =
+                new IndexBucketScanner( this, locator, ENTITY_ID_SETS, applicationId, IndexType.COLLECTION, key, start,
+                        finish, reversed, count, collectionName );
+
+        return scanner;
+    }
+
+
+    public int countColumns( Keyspace ko, Object columnFamily, Object key ) throws Exception {
+
+
+        CountQuery<ByteBuffer, ByteBuffer> cq = HFactory.createCountQuery( ko, be, be );
+        cq.setColumnFamily( columnFamily.toString() );
+        cq.setKey( bytebuffer( key ) );
+        cq.setRange( ByteBuffer.allocate( 0 ), ByteBuffer.allocate( 0 ), 100000000 );
+        QueryResult<Integer> r = cq.execute();
+        if ( r == null ) {
+            return 0;
+        }
+        return r.get();
+    }
+
+
+    /**
+     * Sets the id list.
+     *
+     * @param keyspace the keyspace
+     * @param targetId the target id
+     * @param columnFamily the column family
+     * @param keyPrefix the key prefix
+     * @param keySuffix the key suffix
+     * @param keyIds the key ids
+     * @param setColumnValue the set column value
+     *
+     * @throws Exception the exception
+     */
+    public void setIdList( Keyspace ko, UUID targetId, String keyPrefix, String keySuffix, List<UUID> keyIds )
+            throws Exception {
+        long timestamp = createTimestamp();
+        Mutator<ByteBuffer> batch = createMutator( ko, be );
+        batch = buildSetIdListMutator( batch, targetId, ENTITY_ID_SETS.toString(), keyPrefix, keySuffix, keyIds,
+                timestamp );
+        batchExecute( batch, CassandraService.RETRY_COUNT );
+    }
+
+
+    boolean clusterUp = false;
+
+
+    public void startClusterHealthCheck() {
+
+        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+        executorService.scheduleWithFixedDelay( new Runnable() {
+            @Override
+            public void run() {
+                if ( cluster != null ) {
+                    HConnectionManager connectionManager = cluster.getConnectionManager();
+                    if ( connectionManager != null ) {
+                        clusterUp = !connectionManager.getHosts().isEmpty();
+                    }
+                }
+            }
+        }, 1, 5, TimeUnit.SECONDS );
+    }
+    
+    public void destroy() throws Exception {
+    	if (cluster != null) {
+    		HConnectionManager connectionManager = cluster.getConnectionManager();
+    		if (connectionManager != null) {
+    			connectionManager.shutdown();
+    		}
+    	}
+    	cluster = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java
new file mode 100644
index 0000000..2ebc199
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java
@@ -0,0 +1,61 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.ConnectedEntityRef;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+
+
+public class ConnectedEntityRefImpl extends SimpleEntityRef implements ConnectedEntityRef {
+
+    final String connectionType;
+
+
+    public ConnectedEntityRefImpl() {
+        super( null, null );
+        connectionType = null;
+    }
+
+
+    public ConnectedEntityRefImpl( String connectionType, EntityRef connectedEntity ) {
+        super( connectedEntity.getType(), connectedEntity.getUuid() );
+        this.connectionType = connectionType;
+    }
+
+
+    public ConnectedEntityRefImpl( String connectionType, String entityType, UUID entityId ) {
+        super( entityType, entityId );
+        this.connectionType = connectionType;
+    }
+
+
+    @Override
+    public String getConnectionType() {
+        return connectionType;
+    }
+
+
+    public static String getConnectionType( ConnectedEntityRef connection ) {
+        if ( connection == null ) {
+            return null;
+        }
+        return connection.getConnectionType();
+    }
+}