You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/28 23:21:25 UTC

[15/96] [abbrv] [partial] Change package namespace to org.apache.usergrid

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/usergrid/persistence/cassandra/ApplicationCF.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/usergrid/persistence/cassandra/ApplicationCF.java b/stack/core/src/main/java/org/usergrid/persistence/cassandra/ApplicationCF.java
deleted file mode 100644
index 0cca4c5..0000000
--- a/stack/core/src/main/java/org/usergrid/persistence/cassandra/ApplicationCF.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.usergrid.persistence.cassandra;
-
-
-import java.util.List;
-
-import me.prettyprint.hector.api.ddl.ColumnDefinition;
-
-import static me.prettyprint.hector.api.ddl.ComparatorType.COUNTERTYPE;
-import static org.usergrid.persistence.cassandra.CassandraPersistenceUtils.getIndexMetadata;
-
-
-public enum ApplicationCF implements CFEnum {
-
-    /** This is where the entity objects are stored */
-    ENTITY_PROPERTIES( "Entity_Properties", "BytesType" ),
-
-    /** each row models name:value pairs. {@see org.usergrid.persistence.Schema} for the list of dictionary types */
-    ENTITY_DICTIONARIES( "Entity_Dictionaries", "BytesType" ),
-
-    /**
-     * Rows that are full of UUIDs. Used when we want to have a row full of references to other entities. Mainly, this
-     * is for collections. Collections are represented by this CF.
-     */
-    ENTITY_ID_SETS( "Entity_Id_Sets", "UUIDType" ),
-
-    /**
-     * Typed vs. untyped dictionary. Dynamic entity dictionaries end up here. {@link
-     * EntityManagerImpl#getDictionaryAsMap(org.usergrid.persistence.EntityRef, String)}
-     */
-    ENTITY_COMPOSITE_DICTIONARIES( "Entity_Composite_Dictionaries",
-            "DynamicCompositeType(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType," +
-                    "t=>TimeUUIDType,s=>UTF8Type,u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
-                    "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
-                    "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))" ),
-
-    /** No longer used? */
-    ENTITY_METADATA( "Entity_Metadata", "BytesType" ),
-
-    /** Contains all secondary indexes for entities */
-    ENTITY_INDEX( "Entity_Index",
-            "DynamicCompositeType(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType," +
-                    "t=>TimeUUIDType,s=>UTF8Type,u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
-                    "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
-                    "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))" ),
-
-    /** Unique index for properties that must remain the same */
-    ENTITY_UNIQUE( "Entity_Unique", "UUIDType" ),
-
-    /** Contains all properties that have ever been indexed for an entity */
-    ENTITY_INDEX_ENTRIES( "Entity_Index_Entries",
-            "DynamicCompositeType(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType," +
-                    "t=>TimeUUIDType,s=>UTF8Type,u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
-                    "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
-                    "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))" ),
-
-    /** All roles that exist within an application */
-    APPLICATION_ROLES( "Application_Roles", "BytesType" ),
-
-    /** Application counters */
-    APPLICATION_AGGREGATE_COUNTERS( "Application_Aggregate_Counters", "LongType", COUNTERTYPE.getClassName() ),
-
-    /** Entity counters */
-    ENTITY_COUNTERS( "Entity_Counters", "BytesType", COUNTERTYPE.getClassName() ),;
-    public final static String DEFAULT_DYNAMIC_COMPOSITE_ALIASES =
-            "(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType,t=>TimeUUIDType,s=>UTF8Type," +
-                    "u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
-                    "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
-                    "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))";
-
-    private final String cf;
-    private final String comparator;
-    private final String validator;
-    private final String indexes;
-    private final boolean create;
-
-
-    ApplicationCF( String cf, String comparator ) {
-        this.cf = cf;
-        this.comparator = comparator;
-        validator = null;
-        indexes = null;
-        create = true;
-    }
-
-
-    ApplicationCF( String cf, String comparator, String validator ) {
-        this.cf = cf;
-        this.comparator = comparator;
-        this.validator = validator;
-        indexes = null;
-        create = true;
-    }
-
-
-    ApplicationCF( String cf, String comparator, String validator, String indexes ) {
-        this.cf = cf;
-        this.comparator = comparator;
-        this.validator = validator;
-        this.indexes = indexes;
-        create = true;
-    }
-
-
-    @Override
-    public String toString() {
-        return cf;
-    }
-
-
-    @Override
-    public String getColumnFamily() {
-        return cf;
-    }
-
-
-    @Override
-    public String getComparator() {
-        return comparator;
-    }
-
-
-    @Override
-    public String getValidator() {
-        return validator;
-    }
-
-
-    @Override
-    public boolean isComposite() {
-        return comparator.startsWith( "DynamicCompositeType" );
-    }
-
-
-    @Override
-    public List<ColumnDefinition> getMetadata() {
-        return getIndexMetadata( indexes );
-    }
-
-
-    @Override
-    public boolean create() {
-        return create;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/usergrid/persistence/cassandra/CFEnum.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/usergrid/persistence/cassandra/CFEnum.java b/stack/core/src/main/java/org/usergrid/persistence/cassandra/CFEnum.java
deleted file mode 100644
index 859dd1a..0000000
--- a/stack/core/src/main/java/org/usergrid/persistence/cassandra/CFEnum.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.usergrid.persistence.cassandra;
-
-
-import java.util.List;
-
-import me.prettyprint.hector.api.ddl.ColumnDefinition;
-
-
-public interface CFEnum {
-
-    public String getColumnFamily();
-
-    public String getComparator();
-
-    public String getValidator();
-
-    public boolean isComposite();
-
-    public List<ColumnDefinition> getMetadata();
-
-    public boolean create();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraPersistenceUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraPersistenceUtils.java b/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraPersistenceUtils.java
deleted file mode 100644
index 00dc4bc..0000000
--- a/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraPersistenceUtils.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.usergrid.persistence.cassandra;
-
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.codehaus.jackson.JsonNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.usergrid.utils.JsonUtils;
-
-import org.apache.cassandra.thrift.ColumnDef;
-import org.apache.cassandra.thrift.IndexType;
-import org.apache.commons.lang.StringUtils;
-
-import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.cassandra.serializers.UUIDSerializer;
-import me.prettyprint.cassandra.service.ThriftColumnDef;
-import me.prettyprint.hector.api.ClockResolution;
-import me.prettyprint.hector.api.beans.DynamicComposite;
-import me.prettyprint.hector.api.beans.HColumn;
-import me.prettyprint.hector.api.ddl.ColumnDefinition;
-import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
-import me.prettyprint.hector.api.ddl.ComparatorType;
-import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
-import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.mutation.MutationResult;
-import me.prettyprint.hector.api.mutation.Mutator;
-
-import static java.nio.ByteBuffer.wrap;
-
-import static me.prettyprint.hector.api.factory.HFactory.createClockResolution;
-import static me.prettyprint.hector.api.factory.HFactory.createColumn;
-import static org.apache.commons.beanutils.MethodUtils.invokeStaticMethod;
-import static org.apache.commons.lang.StringUtils.removeEnd;
-import static org.apache.commons.lang.StringUtils.removeStart;
-import static org.apache.commons.lang.StringUtils.split;
-import static org.apache.commons.lang.StringUtils.substringAfterLast;
-import static org.usergrid.persistence.Schema.PROPERTY_TYPE;
-import static org.usergrid.persistence.Schema.PROPERTY_UUID;
-import static org.usergrid.persistence.Schema.serializeEntityProperty;
-import static org.usergrid.utils.ClassUtils.isBasicType;
-import static org.usergrid.utils.ConversionUtils.bytebuffer;
-import static org.usergrid.utils.JsonUtils.toJsonNode;
-import static org.usergrid.utils.StringUtils.replaceAll;
-import static org.usergrid.utils.StringUtils.stringOrSubstringBeforeFirst;
-
-
-/** @author edanuff */
-public class CassandraPersistenceUtils {
-
-    private static final Logger logger = LoggerFactory.getLogger( CassandraPersistenceUtils.class );
-
-    /** Logger for batch operations */
-    private static final Logger batch_logger =
-            LoggerFactory.getLogger( CassandraPersistenceUtils.class.getPackage().getName() + ".BATCH" );
-
-    /**
-     *
-     */
-    public static final ByteBuffer PROPERTY_TYPE_AS_BYTES = bytebuffer( PROPERTY_TYPE );
-
-    /**
-     *
-     */
-    public static final ByteBuffer PROPERTY_ID_AS_BYTES = bytebuffer( PROPERTY_UUID );
-
-    /**
-     *
-     */
-    public static final char KEY_DELIM = ':';
-
-    /**
-     *
-     */
-    public static final UUID NULL_ID = new UUID( 0, 0 );
-
-    public static final StringSerializer se = new StringSerializer();
-    public static final UUIDSerializer ue = new UUIDSerializer();
-    public static final ByteBufferSerializer be = new ByteBufferSerializer();
-
-
-    /**
-     * @param operation
-     * @param columnFamily
-     * @param key
-     * @param columnName
-     * @param columnValue
-     * @param timestamp
-     */
-    public static void logBatchOperation( String operation, Object columnFamily, Object key, Object columnName,
-                                          Object columnValue, long timestamp ) {
-
-        if ( batch_logger.isDebugEnabled() ) {
-            batch_logger.debug( "{} cf={} key={} name={} value={}",
-                    new Object[] { operation, columnFamily, key, columnName, columnValue } );
-        }
-    }
-
-
-    public static void addInsertToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, Object columnName,
-                                           Object columnValue, long timestamp ) {
-
-        logBatchOperation( "Insert", columnFamily, key, columnName, columnValue, timestamp );
-
-        if ( columnName instanceof List<?> ) {
-            columnName = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
-        }
-        if ( columnValue instanceof List<?> ) {
-            columnValue = DynamicComposite.toByteBuffer( ( List<?> ) columnValue );
-        }
-
-        HColumn<ByteBuffer, ByteBuffer> column =
-                createColumn( bytebuffer( columnName ), bytebuffer( columnValue ), timestamp, be, be );
-        m.addInsertion( bytebuffer( key ), columnFamily.toString(), column );
-    }
-
-
-    public static void addInsertToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, Map<?, ?> columns,
-                                           long timestamp ) throws Exception {
-
-        for ( Entry<?, ?> entry : columns.entrySet() ) {
-            addInsertToMutator( m, columnFamily, key, entry.getKey(), entry.getValue(), timestamp );
-        }
-    }
-
-
-    public static void addPropertyToMutator( Mutator<ByteBuffer> m, Object key, String entityType, String propertyName,
-                                             Object propertyValue, long timestamp ) {
-
-        logBatchOperation( "Insert", ApplicationCF.ENTITY_PROPERTIES, key, propertyName, propertyValue, timestamp );
-
-        HColumn<ByteBuffer, ByteBuffer> column = createColumn( bytebuffer( propertyName ),
-                serializeEntityProperty( entityType, propertyName, propertyValue ), timestamp, be, be );
-        m.addInsertion( bytebuffer( key ), ApplicationCF.ENTITY_PROPERTIES.toString(), column );
-    }
-
-
-    public static void addPropertyToMutator( Mutator<ByteBuffer> m, Object key, String entityType,
-                                             Map<String, ?> columns, long timestamp ) throws Exception {
-
-        for ( Entry<String, ?> entry : columns.entrySet() ) {
-            addPropertyToMutator( m, key, entityType, entry.getKey(), entry.getValue(), timestamp );
-        }
-    }
-
-
-    /** Delete the row */
-    public static void addDeleteToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, long timestamp )
-            throws Exception {
-
-        logBatchOperation( "Delete", columnFamily, key, null, null, timestamp );
-
-        m.addDeletion( bytebuffer( key ), columnFamily.toString(), timestamp );
-    }
-
-
-    public static void addDeleteToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, Object columnName,
-                                           long timestamp ) throws Exception {
-
-        logBatchOperation( "Delete", columnFamily, key, columnName, null, timestamp );
-
-        if ( columnName instanceof List<?> ) {
-            columnName = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
-        }
-
-        m.addDeletion( bytebuffer( key ), columnFamily.toString(), bytebuffer( columnName ), be, timestamp );
-    }
-
-
-    public static void addDeleteToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, long timestamp,
-                                           Object... columnNames ) throws Exception {
-
-        for ( Object columnName : columnNames ) {
-            logBatchOperation( "Delete", columnFamily, key, columnName, null, timestamp );
-
-            if ( columnName instanceof List<?> ) {
-                columnName = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
-            }
-
-            m.addDeletion( bytebuffer( key ), columnFamily.toString(), bytebuffer( columnName ), be, timestamp );
-        }
-    }
-
-
-    public static Map<String, ByteBuffer> getColumnMap( List<HColumn<String, ByteBuffer>> columns ) {
-        Map<String, ByteBuffer> column_map = new TreeMap<String, ByteBuffer>( String.CASE_INSENSITIVE_ORDER );
-        if ( columns != null ) {
-            for ( HColumn<String, ByteBuffer> column : columns ) {
-                String column_name = column.getName();
-                column_map.put( column_name, column.getValue() );
-            }
-        }
-        return column_map;
-    }
-
-
-    public static <K, V> Map<K, V> asMap( List<HColumn<K, V>> columns ) {
-        if ( columns == null ) {
-            return null;
-        }
-        Map<K, V> column_map = new LinkedHashMap<K, V>();
-        for ( HColumn<K, V> column : columns ) {
-            K column_name = column.getName();
-            column_map.put( column_name, column.getValue() );
-        }
-        return column_map;
-    }
-
-
-    public static List<ByteBuffer> getAsByteKeys( List<UUID> ids ) {
-        List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
-        for ( UUID id : ids ) {
-            keys.add( bytebuffer( key( id ) ) );
-        }
-        return keys;
-    }
-
-
-    /** @return timestamp value for current time */
-    public static long createTimestamp() {
-        return createClockResolution( ClockResolution.MICROSECONDS ).createClock();
-    }
-
-
-    /** @return normalized group path */
-    public static String normalizeGroupPath( String path ) {
-        path = replaceAll( path.toLowerCase().trim(), "//", "/" );
-        path = removeStart( path, "/" );
-        path = removeEnd( path, "/" );
-        return path;
-    }
-
-
-    /** @return a composite key */
-    public static Object key( Object... objects ) {
-        if ( objects.length == 1 ) {
-            Object obj = objects[0];
-            if ( ( obj instanceof UUID ) || ( obj instanceof ByteBuffer ) ) {
-                return obj;
-            }
-        }
-        StringBuilder s = new StringBuilder();
-        for ( Object obj : objects ) {
-            if ( obj instanceof String ) {
-                s.append( ( ( String ) obj ).toLowerCase() );
-            }
-            else if ( obj instanceof List<?> ) {
-                s.append( key( ( ( List<?> ) obj ).toArray() ) );
-            }
-            else if ( obj instanceof Object[] ) {
-                s.append( key( ( Object[] ) obj ) );
-            }
-            else if ( obj != null ) {
-                s.append( obj );
-            }
-            else {
-                s.append( "*" );
-            }
-
-            s.append( KEY_DELIM );
-        }
-
-        s.deleteCharAt( s.length() - 1 );
-
-        return s.toString();
-    }
-
-
-    /** @return UUID for composite key */
-    public static UUID keyID( Object... objects ) {
-        if ( objects.length == 1 ) {
-            Object obj = objects[0];
-            if ( obj instanceof UUID ) {
-                return ( UUID ) obj;
-            }
-        }
-        String keyStr = key( objects ).toString();
-        if ( keyStr.length() == 0 ) {
-            return NULL_ID;
-        }
-        UUID uuid = UUID.nameUUIDFromBytes( keyStr.getBytes() );
-        logger.debug( "Key {} equals UUID {}", keyStr, uuid );
-        return uuid;
-    }
-
-
-    /** @return UUID for entity alias */
-    public static UUID aliasID( UUID ownerId, String aliasType, String alias ) {
-        return keyID( ownerId, aliasType, alias );
-    }
-
-
-    public static Mutator<ByteBuffer> buildSetIdListMutator( Mutator<ByteBuffer> batch, UUID targetId,
-                                                             String columnFamily, String keyPrefix, String keySuffix,
-                                                             List<UUID> keyIds, long timestamp ) throws Exception {
-        for ( UUID keyId : keyIds ) {
-            ByteBuffer key = null;
-            if ( ( StringUtils.isNotEmpty( keyPrefix ) ) || ( StringUtils.isNotEmpty( keySuffix ) ) ) {
-                key = bytebuffer( keyPrefix + keyId.toString() + keySuffix );
-            }
-            else {
-                key = bytebuffer( keyId );
-            }
-            addInsertToMutator( batch, columnFamily, key, targetId, ByteBuffer.allocate( 0 ), timestamp );
-        }
-        return batch;
-    }
-
-
-    public static MutationResult batchExecute( Mutator<?> m, int retries ) {
-        for ( int i = 0; i < retries; i++ ) {
-            try {
-                return m.execute();
-            }
-            catch ( Exception e ) {
-                logger.error( "Unable to execute mutation, retrying...", e );
-            }
-        }
-        return m.execute();
-    }
-
-
-    public static Object toStorableValue( Object obj ) {
-        if ( obj == null ) {
-            return null;
-        }
-
-        if ( isBasicType( obj.getClass() ) ) {
-            return obj;
-        }
-
-        if ( obj instanceof ByteBuffer ) {
-            return obj;
-        }
-
-        JsonNode json = toJsonNode( obj );
-        if ( ( json != null ) && json.isValueNode() ) {
-            if ( json.isBigInteger() ) {
-                return json.getBigIntegerValue();
-            }
-            else if ( json.isNumber() || json.isBoolean() ) {
-                return BigInteger.valueOf( json.getValueAsLong() );
-            }
-            else if ( json.isTextual() ) {
-                return json.getTextValue();
-            }
-            else if ( json.isBinary() ) {
-                try {
-                    return wrap( json.getBinaryValue() );
-                }
-                catch ( IOException e ) {
-                }
-            }
-        }
-
-        return json;
-    }
-
-
-    public static ByteBuffer toStorableBinaryValue( Object obj ) {
-        obj = toStorableValue( obj );
-        if ( obj instanceof JsonNode ) {
-            return JsonUtils.toByteBuffer( obj );
-        }
-        else {
-            return bytebuffer( obj );
-        }
-    }
-
-
-    public static ByteBuffer toStorableBinaryValue( Object obj, boolean forceJson ) {
-        obj = toStorableValue( obj );
-        if ( ( obj instanceof JsonNode ) || ( forceJson && ( obj != null ) && !( obj instanceof ByteBuffer ) ) ) {
-            return JsonUtils.toByteBuffer( obj );
-        }
-        else {
-            return bytebuffer( obj );
-        }
-    }
-
-
-    public static List<ColumnDefinition> getIndexMetadata( String indexes ) {
-        if ( indexes == null ) {
-            return null;
-        }
-        String[] index_entries = split( indexes, ',' );
-        List<ColumnDef> columns = new ArrayList<ColumnDef>();
-        for ( String index_entry : index_entries ) {
-            String column_name = stringOrSubstringBeforeFirst( index_entry, ':' ).trim();
-            String comparer = substringAfterLast( index_entry, ":" ).trim();
-            if ( StringUtils.isBlank( comparer ) ) {
-                comparer = "UUIDType";
-            }
-            if ( StringUtils.isNotBlank( column_name ) ) {
-                ColumnDef cd = new ColumnDef( bytebuffer( column_name ), comparer );
-                cd.setIndex_name( column_name );
-                cd.setIndex_type( IndexType.KEYS );
-                columns.add( cd );
-            }
-        }
-        return ThriftColumnDef.fromThriftList( columns );
-    }
-
-
-    public static List<ColumnFamilyDefinition> getCfDefs( Class<? extends CFEnum> cfEnum, String keyspace ) {
-        return getCfDefs( cfEnum, null, keyspace );
-    }
-
-
-    public static List<ColumnFamilyDefinition> getCfDefs( Class<? extends CFEnum> cfEnum,
-                                                          List<ColumnFamilyDefinition> cf_defs, String keyspace ) {
-
-        if ( cf_defs == null ) {
-            cf_defs = new ArrayList<ColumnFamilyDefinition>();
-        }
-
-        CFEnum[] values = null;
-        try {
-            values = ( CFEnum[] ) invokeStaticMethod( cfEnum, "values", ( Object[] ) null );
-        }
-        catch ( Exception e ) {
-            logger.error( "Couldn't get CFEnum values", e );
-        }
-        if ( values == null ) {
-            return null;
-        }
-
-        for ( CFEnum cf : values ) {
-            if ( !cf.create() ) {
-                continue;
-            }
-            String defaultValidationClass = cf.getValidator();
-            List<ColumnDefinition> metadata = cf.getMetadata();
-
-            ColumnFamilyDefinition cf_def = HFactory.createColumnFamilyDefinition( keyspace, cf.getColumnFamily(),
-                    ComparatorType.getByClassName( cf.getComparator() ), metadata );
-
-            if ( defaultValidationClass != null ) {
-                cf_def.setDefaultValidationClass( defaultValidationClass );
-            }
-
-            cf_defs.add( cf_def );
-        }
-
-        return cf_defs;
-    }
-
-
-    public static void validateKeyspace( CFEnum[] cf_enums, KeyspaceDefinition ksDef ) {
-        Map<String, ColumnFamilyDefinition> cfs = new HashMap<String, ColumnFamilyDefinition>();
-        for ( ColumnFamilyDefinition cf : ksDef.getCfDefs() ) {
-            cfs.put( cf.getName(), cf );
-        }
-        for ( CFEnum c : cf_enums ) {
-            if ( !cfs.keySet().contains( c.getColumnFamily() ) ) {
-
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraService.java b/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraService.java
deleted file mode 100644
index a6b6268..0000000
--- a/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraService.java
+++ /dev/null
@@ -1,1135 +0,0 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.usergrid.persistence.cassandra;
-
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.usergrid.locking.LockManager;
-import org.usergrid.persistence.IndexBucketLocator;
-import org.usergrid.persistence.IndexBucketLocator.IndexType;
-import org.usergrid.persistence.cassandra.index.IndexBucketScanner;
-import org.usergrid.persistence.cassandra.index.IndexScanner;
-
-import me.prettyprint.cassandra.connection.HConnectionManager;
-import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
-import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
-import me.prettyprint.cassandra.serializers.BytesArraySerializer;
-import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
-import me.prettyprint.cassandra.serializers.LongSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.cassandra.serializers.UUIDSerializer;
-import me.prettyprint.cassandra.service.CassandraHostConfigurator;
-import me.prettyprint.cassandra.service.ThriftKsDef;
-import me.prettyprint.hector.api.Cluster;
-import me.prettyprint.hector.api.ConsistencyLevelPolicy;
-import me.prettyprint.hector.api.HConsistencyLevel;
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.Serializer;
-import me.prettyprint.hector.api.beans.ColumnSlice;
-import me.prettyprint.hector.api.beans.DynamicComposite;
-import me.prettyprint.hector.api.beans.HColumn;
-import me.prettyprint.hector.api.beans.OrderedRows;
-import me.prettyprint.hector.api.beans.Row;
-import me.prettyprint.hector.api.beans.Rows;
-import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
-import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
-import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.mutation.Mutator;
-import me.prettyprint.hector.api.query.ColumnQuery;
-import me.prettyprint.hector.api.query.CountQuery;
-import me.prettyprint.hector.api.query.MultigetSliceQuery;
-import me.prettyprint.hector.api.query.QueryResult;
-import me.prettyprint.hector.api.query.RangeSlicesQuery;
-import me.prettyprint.hector.api.query.SliceQuery;
-
-import static me.prettyprint.cassandra.service.FailoverPolicy.ON_FAIL_TRY_ALL_AVAILABLE;
-import static me.prettyprint.hector.api.factory.HFactory.createColumn;
-import static me.prettyprint.hector.api.factory.HFactory.createMultigetSliceQuery;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
-import static me.prettyprint.hector.api.factory.HFactory.createRangeSlicesQuery;
-import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
-import static me.prettyprint.hector.api.factory.HFactory.createVirtualKeyspace;
-import static org.apache.commons.collections.MapUtils.getIntValue;
-import static org.apache.commons.collections.MapUtils.getString;
-import static org.usergrid.persistence.cassandra.ApplicationCF.ENTITY_ID_SETS;
-import static org.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
-import static org.usergrid.persistence.cassandra.CassandraPersistenceUtils.buildSetIdListMutator;
-import static org.usergrid.utils.ConversionUtils.bytebuffer;
-import static org.usergrid.utils.ConversionUtils.bytebuffers;
-import static org.usergrid.utils.JsonUtils.mapToFormattedJsonString;
-import static org.usergrid.utils.MapUtils.asMap;
-import static org.usergrid.utils.MapUtils.filter;
-
-
-public class CassandraService {
-
-    public static String SYSTEM_KEYSPACE = "Usergrid";
-
-    public static String STATIC_APPLICATION_KEYSPACE = "Usergrid_Applications";
-
-    public static final boolean USE_VIRTUAL_KEYSPACES = true;
-
-    public static final String APPLICATIONS_CF = "Applications";
-    public static final String PROPERTIES_CF = "Properties";
-    public static final String TOKENS_CF = "Tokens";
-    public static final String PRINCIPAL_TOKEN_CF = "PrincipalTokens";
-
-    public static final int DEFAULT_COUNT = 1000;
-    public static final int ALL_COUNT = 100000;
-    public static final int INDEX_ENTRY_LIST_COUNT = 1000;
-    public static final int DEFAULT_SEARCH_COUNT = 10000;
-
-    public static final int RETRY_COUNT = 5;
-
-    public static final String DEFAULT_APPLICATION = "default-app";
-    public static final String DEFAULT_ORGANIZATION = "usergrid";
-    public static final String MANAGEMENT_APPLICATION = "management";
-
-    public static final UUID MANAGEMENT_APPLICATION_ID = new UUID( 0, 1 );
-    public static final UUID DEFAULT_APPLICATION_ID = new UUID( 0, 16 );
-
-    private static final Logger logger = LoggerFactory.getLogger( CassandraService.class );
-
-    private static final Logger db_logger =
-            LoggerFactory.getLogger( CassandraService.class.getPackage().getName() + ".DB" );
-
-    Cluster cluster;
-    CassandraHostConfigurator chc;
-    Properties properties;
-    LockManager lockManager;
-
-    ConsistencyLevelPolicy consistencyLevelPolicy;
-
-    private Keyspace systemKeyspace;
-
-    private Map<String, String> accessMap;
-
-    public static final StringSerializer se = new StringSerializer();
-    public static final ByteBufferSerializer be = new ByteBufferSerializer();
-    public static final UUIDSerializer ue = new UUIDSerializer();
-    public static final BytesArraySerializer bae = new BytesArraySerializer();
-    public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
-    public static final LongSerializer le = new LongSerializer();
-
-    public static final UUID NULL_ID = new UUID( 0, 0 );
-
-
-    public CassandraService( Properties properties, Cluster cluster,
-                             CassandraHostConfigurator cassandraHostConfigurator, LockManager lockManager ) {
-        this.properties = properties;
-        this.cluster = cluster;
-        chc = cassandraHostConfigurator;
-        this.lockManager = lockManager;
-        db_logger.info( "" + cluster.getKnownPoolHosts( false ) );
-    }
-
-
-    public void init() throws Exception {
-        if ( consistencyLevelPolicy == null ) {
-            consistencyLevelPolicy = new ConfigurableConsistencyLevel();
-            ( ( ConfigurableConsistencyLevel ) consistencyLevelPolicy )
-                    .setDefaultReadConsistencyLevel( HConsistencyLevel.ONE );
-        }
-        accessMap = new HashMap<String, String>( 2 );
-        accessMap.put( "username", properties.getProperty( "cassandra.username" ) );
-        accessMap.put( "password", properties.getProperty( "cassandra.password" ) );
-        systemKeyspace =
-                HFactory.createKeyspace( SYSTEM_KEYSPACE, cluster, consistencyLevelPolicy, ON_FAIL_TRY_ALL_AVAILABLE,
-                        accessMap );
-    }
-
-
-    public Cluster getCluster() {
-        return cluster;
-    }
-
-
-    public void setCluster( Cluster cluster ) {
-        this.cluster = cluster;
-    }
-
-
-    public CassandraHostConfigurator getCassandraHostConfigurator() {
-        return chc;
-    }
-
-
-    public void setCassandraHostConfigurator( CassandraHostConfigurator chc ) {
-        this.chc = chc;
-    }
-
-
-    public Properties getProperties() {
-        return properties;
-    }
-
-
-    public void setProperties( Properties properties ) {
-        this.properties = properties;
-    }
-
-
-    public Map<String, String> getPropertiesMap() {
-        if ( properties != null ) {
-            return asMap( properties );
-        }
-        return null;
-    }
-
-
-    public LockManager getLockManager() {
-        return lockManager;
-    }
-
-
-    public void setLockManager( LockManager lockManager ) {
-        this.lockManager = lockManager;
-    }
-
-
-    public ConsistencyLevelPolicy getConsistencyLevelPolicy() {
-        return consistencyLevelPolicy;
-    }
-
-
-    public void setConsistencyLevelPolicy( ConsistencyLevelPolicy consistencyLevelPolicy ) {
-        this.consistencyLevelPolicy = consistencyLevelPolicy;
-    }
-
-
-    /** @return keyspace for application UUID */
-    public static String keyspaceForApplication( UUID applicationId ) {
-        if ( USE_VIRTUAL_KEYSPACES ) {
-            return STATIC_APPLICATION_KEYSPACE;
-        }
-        else {
-            return "Application_" + applicationId.toString().replace( '-', '_' );
-        }
-    }
-
-
-    public static UUID prefixForApplication( UUID applicationId ) {
-        if ( USE_VIRTUAL_KEYSPACES ) {
-            return applicationId;
-        }
-        else {
-            return null;
-        }
-    }
-
-
-    public Keyspace getKeyspace( String keyspace, UUID prefix ) {
-        Keyspace ko = null;
-        if ( USE_VIRTUAL_KEYSPACES && ( prefix != null ) ) {
-            ko = createVirtualKeyspace( keyspace, prefix, ue, cluster, consistencyLevelPolicy,
-                    ON_FAIL_TRY_ALL_AVAILABLE, accessMap );
-        }
-        else {
-            ko = HFactory.createKeyspace( keyspace, cluster, consistencyLevelPolicy, ON_FAIL_TRY_ALL_AVAILABLE,
-                    accessMap );
-        }
-        return ko;
-    }
-
-
-    public Keyspace getApplicationKeyspace( UUID applicationId ) {
-        assert applicationId != null;
-        Keyspace ko = getKeyspace( keyspaceForApplication( applicationId ), prefixForApplication( applicationId ) );
-        return ko;
-    }
-
-
-    /** The Usergrid_Applications keyspace directly */
-    public Keyspace getUsergridApplicationKeyspace() {
-        return getKeyspace( STATIC_APPLICATION_KEYSPACE, null );
-    }
-
-
-    public Keyspace getSystemKeyspace() {
-        return systemKeyspace;
-    }
-
-
-    public boolean checkKeyspacesExist() {
-        boolean exists = false;
-        try {
-            exists = cluster.describeKeyspace( SYSTEM_KEYSPACE ) != null
-                    && cluster.describeKeyspace( STATIC_APPLICATION_KEYSPACE ) != null;
-        }
-        catch ( Exception ex ) {
-            logger.error( "could not describe keyspaces", ex );
-        }
-        return exists;
-    }
-
-
-    /**
-     * Lazy creates a column family in the keyspace. If it doesn't exist, it will be created, then the call will sleep
-     * until all nodes have acknowledged the schema change
-     */
-    public void createColumnFamily( String keyspace, ColumnFamilyDefinition cfDef ) {
-
-        if ( !keySpaceExists( keyspace ) ) {
-            createKeySpace( keyspace );
-        }
-
-
-        //add the cf
-
-        if ( !cfExists( keyspace, cfDef.getName() ) ) {
-
-            //default read repair chance to 0.1
-            cfDef.setReadRepairChance( 0.1d );
-
-            cluster.addColumnFamily( cfDef, true );
-            logger.info( "Created column family {} in keyspace {}", cfDef.getName(), keyspace );
-        }
-    }
-
-
-    /** Create the column families in the list */
-    public void createColumnFamilies( String keyspace, List<ColumnFamilyDefinition> cfDefs ) {
-        for ( ColumnFamilyDefinition cfDef : cfDefs ) {
-            createColumnFamily( keyspace, cfDef );
-        }
-    }
-
-
-    /** Check if the keyspace exsts */
-    public boolean keySpaceExists( String keyspace ) {
-        KeyspaceDefinition ksDef = cluster.describeKeyspace( keyspace );
-
-        return ksDef != null;
-    }
-
-
-    /** Create the keyspace */
-    private void createKeySpace( String keyspace ) {
-        logger.info( "Creating keyspace: {}", keyspace );
-
-        String strategy_class =
-                getString( properties, "cassandra.keyspace.strategy", "org.apache.cassandra.locator.SimpleStrategy" );
-        logger.info( "Using strategy: {}", strategy_class );
-
-        int replication_factor = getIntValue( properties, "cassandra.keyspace.replication", 1 );
-        logger.info( "Using replication (may be overriden by strategy options): {}", replication_factor );
-
-        // try {
-        ThriftKsDef ks_def = ( ThriftKsDef ) HFactory
-                .createKeyspaceDefinition( keyspace, strategy_class, replication_factor,
-                        new ArrayList<ColumnFamilyDefinition>() );
-
-        @SuppressWarnings({ "unchecked", "rawtypes" }) Map<String, String> strategy_options =
-                filter( ( Map ) properties, "cassandra.keyspace.strategy.options.", true );
-        if ( strategy_options.size() > 0 ) {
-            logger.info( "Strategy options: {}", mapToFormattedJsonString( strategy_options ) );
-            ks_def.setStrategyOptions( strategy_options );
-        }
-
-        cluster.addKeyspace( ks_def );
-
-        waitForCreation( keyspace );
-
-        logger.info( "Created keyspace {}", keyspace );
-    }
-
-
-    /** Wait until all nodes agree on the same schema version */
-    private void waitForCreation( String keyspace ) {
-
-        while ( true ) {
-            Map<String, List<String>> versions = cluster.describeSchemaVersions();
-            // only 1 version, return
-            if ( versions != null && versions.size() == 1 ) {
-                return;
-            }
-            // sleep and try again
-            try {
-                Thread.sleep( 100 );
-            }
-            catch ( InterruptedException e ) {
-            }
-        }
-    }
-
-
-    /** Return true if the column family exists */
-    public boolean cfExists( String keyspace, String cfName ) {
-        KeyspaceDefinition ksDef = cluster.describeKeyspace( keyspace );
-
-        if ( ksDef == null ) {
-            return false;
-        }
-
-        for ( ColumnFamilyDefinition cf : ksDef.getCfDefs() ) {
-            if ( cfName.equals( cf.getName() ) ) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-
-    /**
-     * Gets the columns.
-     *
-     * @param keyspace the keyspace
-     * @param columnFamily the column family
-     * @param key the key
-     *
-     * @return columns
-     *
-     * @throws Exception the exception
-     */
-    public <N, V> List<HColumn<N, V>> getAllColumns( Keyspace ko, Object columnFamily, Object key,
-                                                     Serializer<N> nameSerializer, Serializer<V> valueSerializer )
-            throws Exception {
-
-        if ( db_logger.isInfoEnabled() ) {
-            db_logger.info( "getColumns cf={} key={}", columnFamily, key );
-        }
-
-        SliceQuery<ByteBuffer, N, V> q = createSliceQuery( ko, be, nameSerializer, valueSerializer );
-        q.setColumnFamily( columnFamily.toString() );
-        q.setKey( bytebuffer( key ) );
-        q.setRange( null, null, false, ALL_COUNT );
-        QueryResult<ColumnSlice<N, V>> r = q.execute();
-        ColumnSlice<N, V> slice = r.get();
-        List<HColumn<N, V>> results = slice.getColumns();
-
-        if ( db_logger.isInfoEnabled() ) {
-            if ( results == null ) {
-                db_logger.info( "getColumns returned null" );
-            }
-            else {
-                db_logger.info( "getColumns returned {} columns", results.size() );
-            }
-        }
-
-        return results;
-    }
-
-
-    public List<HColumn<String, ByteBuffer>> getAllColumns( Keyspace ko, Object columnFamily, Object key )
-            throws Exception {
-        return getAllColumns( ko, columnFamily, key, se, be );
-    }
-
-
-    public Set<String> getAllColumnNames( Keyspace ko, Object columnFamily, Object key ) throws Exception {
-        List<HColumn<String, ByteBuffer>> columns = getAllColumns( ko, columnFamily, key );
-        Set<String> set = new LinkedHashSet<String>();
-        for ( HColumn<String, ByteBuffer> column : columns ) {
-            set.add( column.getName() );
-        }
-        return set;
-    }
-
-
-    /**
-     * Gets the columns.
-     *
-     * @param keyspace the keyspace
-     * @param columnFamily the column family
-     * @param key the key
-     * @param start the start
-     * @param finish the finish
-     * @param count the count
-     * @param reversed the reversed
-     *
-     * @return columns
-     *
-     * @throws Exception the exception
-     */
-    public List<HColumn<ByteBuffer, ByteBuffer>> getColumns( Keyspace ko, Object columnFamily, Object key, Object start,
-                                                             Object finish, int count, boolean reversed )
-            throws Exception {
-
-        if ( db_logger.isDebugEnabled() ) {
-            db_logger.debug( "getColumns cf=" + columnFamily + " key=" + key + " start=" + start + " finish=" + finish
-                    + " count=" + count + " reversed=" + reversed );
-        }
-
-        SliceQuery<ByteBuffer, ByteBuffer, ByteBuffer> q = createSliceQuery( ko, be, be, be );
-        q.setColumnFamily( columnFamily.toString() );
-        q.setKey( bytebuffer( key ) );
-
-        ByteBuffer start_bytes = null;
-        if ( start instanceof DynamicComposite ) {
-            start_bytes = ( ( DynamicComposite ) start ).serialize();
-        }
-        else if ( start instanceof List ) {
-            start_bytes = DynamicComposite.toByteBuffer( ( List<?> ) start );
-        }
-        else {
-            start_bytes = bytebuffer( start );
-        }
-
-        ByteBuffer finish_bytes = null;
-        if ( finish instanceof DynamicComposite ) {
-            finish_bytes = ( ( DynamicComposite ) finish ).serialize();
-        }
-        else if ( finish instanceof List ) {
-            finish_bytes = DynamicComposite.toByteBuffer( ( List<?> ) finish );
-        }
-        else {
-            finish_bytes = bytebuffer( finish );
-        }
-
-    /*
-     * if (reversed) { q.setRange(finish_bytes, start_bytes, reversed, count); }
-     * else { q.setRange(start_bytes, finish_bytes, reversed, count); }
-     */
-        q.setRange( start_bytes, finish_bytes, reversed, count );
-        QueryResult<ColumnSlice<ByteBuffer, ByteBuffer>> r = q.execute();
-        ColumnSlice<ByteBuffer, ByteBuffer> slice = r.get();
-        List<HColumn<ByteBuffer, ByteBuffer>> results = slice.getColumns();
-
-        if ( db_logger.isDebugEnabled() ) {
-            if ( results == null ) {
-                db_logger.debug( "getColumns returned null" );
-            }
-            else {
-                db_logger.debug( "getColumns returned " + results.size() + " columns" );
-            }
-        }
-
-        return results;
-    }
-
-
-    public Map<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>> multiGetColumns( Keyspace ko, Object columnFamily,
-                                                                                   List<?> keys, Object start,
-                                                                                   Object finish, int count,
-                                                                                   boolean reversed ) throws Exception {
-
-        if ( db_logger.isDebugEnabled() ) {
-            db_logger.debug( "multiGetColumns cf=" + columnFamily + " keys=" + keys + " start=" + start + " finish="
-                    + finish + " count=" + count + " reversed=" + reversed );
-        }
-
-        MultigetSliceQuery<ByteBuffer, ByteBuffer, ByteBuffer> q = createMultigetSliceQuery( ko, be, be, be );
-        q.setColumnFamily( columnFamily.toString() );
-        q.setKeys( bytebuffers( keys ) );
-
-        ByteBuffer start_bytes = null;
-        if ( start instanceof DynamicComposite ) {
-            start_bytes = ( ( DynamicComposite ) start ).serialize();
-        }
-        else if ( start instanceof List ) {
-            start_bytes = DynamicComposite.toByteBuffer( ( List<?> ) start );
-        }
-        else {
-            start_bytes = bytebuffer( start );
-        }
-
-        ByteBuffer finish_bytes = null;
-        if ( finish instanceof DynamicComposite ) {
-            finish_bytes = ( ( DynamicComposite ) finish ).serialize();
-        }
-        else if ( finish instanceof List ) {
-            finish_bytes = DynamicComposite.toByteBuffer( ( List<?> ) finish );
-        }
-        else {
-            finish_bytes = bytebuffer( finish );
-        }
-
-        q.setRange( start_bytes, finish_bytes, reversed, count );
-        QueryResult<Rows<ByteBuffer, ByteBuffer, ByteBuffer>> r = q.execute();
-        Rows<ByteBuffer, ByteBuffer, ByteBuffer> rows = r.get();
-
-        Map<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>> results =
-                new LinkedHashMap<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>>();
-        for ( Row<ByteBuffer, ByteBuffer, ByteBuffer> row : rows ) {
-            results.put( row.getKey(), row.getColumnSlice().getColumns() );
-        }
-
-        return results;
-    }
-
-
-    /**
-     * Gets the columns.
-     *
-     * @param keyspace the keyspace
-     * @param columnFamily the column family
-     * @param keys the keys
-     *
-     * @return map of keys to columns
-     *
-     * @throws Exception the exception
-     */
-    public <K, N, V> Rows<K, N, V> getRows( Keyspace ko, Object columnFamily, Collection<K> keys,
-                                            Serializer<K> keySerializer, Serializer<N> nameSerializer,
-                                            Serializer<V> valueSerializer ) throws Exception {
-
-        if ( db_logger.isDebugEnabled() ) {
-            db_logger.debug( "getColumns cf=" + columnFamily + " keys=" + keys );
-        }
-
-        MultigetSliceQuery<K, N, V> q = createMultigetSliceQuery( ko, keySerializer, nameSerializer, valueSerializer );
-        q.setColumnFamily( columnFamily.toString() );
-        q.setKeys( keys );
-        q.setRange( null, null, false, ALL_COUNT );
-        QueryResult<Rows<K, N, V>> r = q.execute();
-        Rows<K, N, V> results = r.get();
-
-        if ( db_logger.isInfoEnabled() ) {
-            if ( results == null ) {
-                db_logger.info( "getColumns returned null" );
-            }
-            else {
-                db_logger.info( "getColumns returned " + results.getCount() + " columns" );
-            }
-        }
-
-        return results;
-    }
-
-
-    /**
-     * Gets the columns.
-     *
-     * @param keyspace the keyspace
-     * @param columnFamily the column family
-     * @param key the key
-     * @param columnNames the column names
-     *
-     * @return columns
-     *
-     * @throws Exception the exception
-     */
-    @SuppressWarnings("unchecked")
-    public <N, V> List<HColumn<N, V>> getColumns( Keyspace ko, Object columnFamily, Object key, Set<String> columnNames,
-                                                  Serializer<N> nameSerializer, Serializer<V> valueSerializer )
-            throws Exception {
-
-        if ( db_logger.isDebugEnabled() ) {
-            db_logger.debug( "getColumns cf=" + columnFamily + " key=" + key + " names=" + columnNames );
-        }
-
-        SliceQuery<ByteBuffer, N, V> q = createSliceQuery( ko, be, nameSerializer, valueSerializer );
-        q.setColumnFamily( columnFamily.toString() );
-        q.setKey( bytebuffer( key ) );
-        // q.setColumnNames(columnNames.toArray(new String[0]));
-        q.setColumnNames( ( N[] ) nameSerializer.fromBytesSet( se.toBytesSet( new ArrayList<String>( columnNames ) ) )
-                                                .toArray() );
-
-        QueryResult<ColumnSlice<N, V>> r = q.execute();
-        ColumnSlice<N, V> slice = r.get();
-        List<HColumn<N, V>> results = slice.getColumns();
-
-        if ( db_logger.isInfoEnabled() ) {
-            if ( results == null ) {
-                db_logger.info( "getColumns returned null" );
-            }
-            else {
-                db_logger.info( "getColumns returned " + results.size() + " columns" );
-            }
-        }
-
-        return results;
-    }
-
-
-    /**
-     * Gets the columns.
-     *
-     * @param keyspace the keyspace
-     * @param columnFamily the column family
-     * @param keys the keys
-     * @param columnNames the column names
-     *
-     * @return map of keys to columns
-     *
-     * @throws Exception the exception
-     */
-    @SuppressWarnings("unchecked")
-    public <K, N, V> Rows<K, N, V> getRows( Keyspace ko, Object columnFamily, Collection<K> keys,
-                                            Collection<String> columnNames, Serializer<K> keySerializer,
-                                            Serializer<N> nameSerializer, Serializer<V> valueSerializer )
-            throws Exception {
-
-        if ( db_logger.isDebugEnabled() ) {
-            db_logger.debug( "getColumns cf=" + columnFamily + " keys=" + keys + " names=" + columnNames );
-        }
-
-        MultigetSliceQuery<K, N, V> q = createMultigetSliceQuery( ko, keySerializer, nameSerializer, valueSerializer );
-        q.setColumnFamily( columnFamily.toString() );
-        q.setKeys( keys );
-        q.setColumnNames( ( N[] ) nameSerializer.fromBytesSet( se.toBytesSet( new ArrayList<String>( columnNames ) ) )
-                                                .toArray() );
-        QueryResult<Rows<K, N, V>> r = q.execute();
-        Rows<K, N, V> results = r.get();
-
-        if ( db_logger.isInfoEnabled() ) {
-            if ( results == null ) {
-                db_logger.info( "getColumns returned null" );
-            }
-            else {
-                db_logger.info( "getColumns returned " + results.getCount() + " columns" );
-            }
-        }
-
-        return results;
-    }
-
-
-    /**
-     * Gets the column.
-     *
-     * @param keyspace the keyspace
-     * @param columnFamily the column family
-     * @param key the key
-     * @param column the column
-     *
-     * @return column
-     *
-     * @throws Exception the exception
-     */
-    public <N, V> HColumn<N, V> getColumn( Keyspace ko, Object columnFamily, Object key, N column,
-                                           Serializer<N> nameSerializer, Serializer<V> valueSerializer )
-            throws Exception {
-
-        if ( db_logger.isDebugEnabled() ) {
-            db_logger.debug( "getColumn cf=" + columnFamily + " key=" + key + " column=" + column );
-        }
-
-    /*
-     * ByteBuffer column_bytes = null; if (column instanceof List) {
-     * column_bytes = Composite.serializeToByteBuffer((List<?>) column); } else
-     * { column_bytes = bytebuffer(column); }
-     */
-
-        ColumnQuery<ByteBuffer, N, V> q = HFactory.createColumnQuery( ko, be, nameSerializer, valueSerializer );
-        QueryResult<HColumn<N, V>> r =
-                q.setKey( bytebuffer( key ) ).setName( column ).setColumnFamily( columnFamily.toString() ).execute();
-        HColumn<N, V> result = r.get();
-
-        if ( db_logger.isInfoEnabled() ) {
-            if ( result == null ) {
-                db_logger.info( "getColumn returned null" );
-            }
-        }
-
-        return result;
-    }
-
-
-    public <N, V> ColumnSlice<N, V> getColumns( Keyspace ko, Object columnFamily, Object key, N[] columns,
-                                                Serializer<N> nameSerializer, Serializer<V> valueSerializer )
-            throws Exception {
-
-        if ( db_logger.isDebugEnabled() ) {
-            db_logger.debug( "getColumn cf=" + columnFamily + " key=" + key + " column=" + columns );
-        }
-
-    /*
-     * ByteBuffer column_bytes = null; if (column instanceof List) {
-     * column_bytes = Composite.serializeToByteBuffer((List<?>) column); } else
-     * { column_bytes = bytebuffer(column); }
-     */
-
-        SliceQuery<ByteBuffer, N, V> q = HFactory.createSliceQuery( ko, be, nameSerializer, valueSerializer );
-        QueryResult<ColumnSlice<N, V>> r =
-                q.setKey( bytebuffer( key ) ).setColumnNames( columns ).setColumnFamily( columnFamily.toString() )
-                 .execute();
-        ColumnSlice<N, V> result = r.get();
-
-        if ( db_logger.isDebugEnabled() ) {
-            if ( result == null ) {
-                db_logger.debug( "getColumn returned null" );
-            }
-        }
-
-        return result;
-    }
-
-
-    public HColumn<String, ByteBuffer> getColumn( Keyspace ko, Object columnFamily, Object key, String column )
-            throws Exception {
-        return getColumn( ko, columnFamily, key, column, se, be );
-    }
-
-
-    public void setColumn( Keyspace ko, Object columnFamily, Object key, Object columnName, Object columnValue )
-            throws Exception {
-        this.setColumn( ko, columnFamily, key, columnName, columnValue, 0 );
-    }
-
-
-    public void setColumn( Keyspace ko, Object columnFamily, Object key, Object columnName, Object columnValue,
-                           int ttl ) throws Exception {
-
-        if ( db_logger.isDebugEnabled() ) {
-            db_logger.debug( "setColumn cf=" + columnFamily + " key=" + key + " name=" + columnName + " value="
-                    + columnValue );
-        }
-
-        ByteBuffer name_bytes = null;
-        if ( columnName instanceof List ) {
-            name_bytes = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
-        }
-        else {
-            name_bytes = bytebuffer( columnName );
-        }
-
-        ByteBuffer value_bytes = null;
-        if ( columnValue instanceof List ) {
-            value_bytes = DynamicComposite.toByteBuffer( ( List<?> ) columnValue );
-        }
-        else {
-            value_bytes = bytebuffer( columnValue );
-        }
-
-        HColumn<ByteBuffer, ByteBuffer> col = createColumn( name_bytes, value_bytes, be, be );
-        if ( ttl != 0 ) {
-            col.setTtl( ttl );
-        }
-        Mutator<ByteBuffer> m = createMutator( ko, be );
-        m.insert( bytebuffer( key ), columnFamily.toString(), col );
-    }
-
-
-    /**
-     * Sets the columns.
-     *
-     * @param keyspace the keyspace
-     * @param columnFamily the column family
-     * @param key the key
-     * @param map the map
-     *
-     * @throws Exception the exception
-     */
-    public void setColumns( Keyspace ko, Object columnFamily, byte[] key, Map<?, ?> map ) throws Exception {
-        this.setColumns( ko, columnFamily, key, map, 0 );
-    }
-
-
-    public void setColumns( Keyspace ko, Object columnFamily, byte[] key, Map<?, ?> map, int ttl ) throws Exception {
-
-        if ( db_logger.isDebugEnabled() ) {
-            db_logger.debug( "setColumns cf=" + columnFamily + " key=" + key + " map=" + map + ( ttl != 0 ?
-                                                                                                 " ttl=" + ttl : "" ) );
-        }
-
-        Mutator<ByteBuffer> m = createMutator( ko, be );
-        long timestamp = createTimestamp();
-
-        for ( Object name : map.keySet() ) {
-            Object value = map.get( name );
-            if ( value != null ) {
-
-                ByteBuffer name_bytes = null;
-                if ( name instanceof List ) {
-                    name_bytes = DynamicComposite.toByteBuffer( ( List<?> ) name );
-                }
-                else {
-                    name_bytes = bytebuffer( name );
-                }
-
-                ByteBuffer value_bytes = null;
-                if ( value instanceof List ) {
-                    value_bytes = DynamicComposite.toByteBuffer( ( List<?> ) value );
-                }
-                else {
-                    value_bytes = bytebuffer( value );
-                }
-
-                HColumn<ByteBuffer, ByteBuffer> col = createColumn( name_bytes, value_bytes, timestamp, be, be );
-                if ( ttl != 0 ) {
-                    col.setTtl( ttl );
-                }
-                m.addInsertion( bytebuffer( key ), columnFamily.toString(),
-                        createColumn( name_bytes, value_bytes, timestamp, be, be ) );
-            }
-        }
-        batchExecute( m, CassandraService.RETRY_COUNT );
-    }
-
-
-    /**
-     * Create a timestamp based on the TimeResolution set to the cluster.
-     *
-     * @return a timestamp
-     */
-    public long createTimestamp() {
-        return chc.getClockResolution().createClock();
-    }
-
-
-    /**
-     * Delete column.
-     *
-     * @param keyspace the keyspace
-     * @param columnFamily the column family
-     * @param key the key
-     * @param column the column
-     *
-     * @throws Exception the exception
-     */
-    public void deleteColumn( Keyspace ko, Object columnFamily, Object key, Object column ) throws Exception {
-
-        if ( db_logger.isDebugEnabled() ) {
-            db_logger.debug( "deleteColumn cf=" + columnFamily + " key=" + key + " name=" + column );
-        }
-
-        Mutator<ByteBuffer> m = createMutator( ko, be );
-        m.delete( bytebuffer( key ), columnFamily.toString(), bytebuffer( column ), be );
-    }
-
-
-    /**
-     * Gets the row keys.
-     *
-     * @param keyspace the keyspace
-     * @param columnFamily the column family
-     *
-     * @return set of keys
-     *
-     * @throws Exception the exception
-     */
-    public <K> Set<K> getRowKeySet( Keyspace ko, Object columnFamily, Serializer<K> keySerializer ) throws Exception {
-
-        if ( db_logger.isDebugEnabled() ) {
-            db_logger.debug( "getRowKeys cf=" + columnFamily );
-        }
-
-        RangeSlicesQuery<K, ByteBuffer, ByteBuffer> q = createRangeSlicesQuery( ko, keySerializer, be, be );
-        q.setColumnFamily( columnFamily.toString() );
-        q.setKeys( null, null );
-        q.setColumnNames( new ByteBuffer[0] );
-        QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> r = q.execute();
-        OrderedRows<K, ByteBuffer, ByteBuffer> rows = r.get();
-
-        Set<K> results = new LinkedHashSet<K>();
-        for ( Row<K, ByteBuffer, ByteBuffer> row : rows ) {
-            results.add( row.getKey() );
-        }
-
-        if ( db_logger.isDebugEnabled() ) {
-            {
-                db_logger.debug( "getRowKeys returned " + results.size() + " rows" );
-            }
-        }
-
-        return results;
-    }
-
-
-    /**
-     * Gets the row keys as uui ds.
-     *
-     * @param keyspace the keyspace
-     * @param columnFamily the column family
-     *
-     * @return list of row key UUIDs
-     *
-     * @throws Exception the exception
-     */
-    public <K> List<K> getRowKeyList( Keyspace ko, Object columnFamily, Serializer<K> keySerializer ) throws Exception {
-
-        RangeSlicesQuery<K, ByteBuffer, ByteBuffer> q = createRangeSlicesQuery( ko, keySerializer, be, be );
-        q.setColumnFamily( columnFamily.toString() );
-        q.setKeys( null, null );
-        q.setColumnNames( new ByteBuffer[0] );
-        QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> r = q.execute();
-        OrderedRows<K, ByteBuffer, ByteBuffer> rows = r.get();
-
-        List<K> list = new ArrayList<K>();
-        for ( Row<K, ByteBuffer, ByteBuffer> row : rows ) {
-            list.add( row.getKey() );
-            // K uuid = row.getKey();
-            // if (uuid != UUIDUtils.ZERO_UUID) {
-            // list.add(uuid);
-            // }
-        }
-
-        return list;
-    }
-
-
-    /**
-     * Delete row.
-     *
-     * @param keyspace the keyspace
-     * @param columnFamily the column family
-     * @param key the key
-     *
-     * @throws Exception the exception
-     */
-    public void deleteRow( Keyspace ko, final Object columnFamily, final Object key ) throws Exception {
-
-        if ( db_logger.isDebugEnabled() ) {
-            db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key );
-        }
-
-        createMutator( ko, be ).addDeletion( bytebuffer( key ), columnFamily.toString() ).execute();
-    }
-
-
-    public void deleteRow( Keyspace ko, final Object columnFamily, final String key ) throws Exception {
-
-        if ( db_logger.isDebugEnabled() ) {
-            db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key );
-        }
-
-        createMutator( ko, se ).addDeletion( key, columnFamily.toString() ).execute();
-    }
-
-
-    /**
-     * Delete row.
-     *
-     * @param keyspace the keyspace
-     * @param columnFamily the column family
-     * @param key the key
-     * @param timestamp the timestamp
-     *
-     * @throws Exception the exception
-     */
-    public void deleteRow( Keyspace ko, final Object columnFamily, final Object key, final long timestamp )
-            throws Exception {
-
-        if ( db_logger.isDebugEnabled() ) {
-            db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key + " timestamp=" + timestamp );
-        }
-
-        createMutator( ko, be ).addDeletion( bytebuffer( key ), columnFamily.toString(), timestamp ).execute();
-    }
-
-
-    /**
-     * Gets the id list.
-     *
-     * @param ko the keyspace
-     * @param key the key
-     * @param start the start
-     * @param finish the finish
-     * @param count the count
-     * @param reversed True if the scan should be reversed
-     * @param locator The index locator instance
-     * @param applicationId The applicationId
-     * @param collectionName The name of the collection to get the Ids for
-     *
-     * @return list of columns as UUIDs
-     *
-     * @throws Exception the exception
-     */
-    public IndexScanner getIdList( Keyspace ko, Object key, UUID start, UUID finish, int count, boolean reversed,
-                                   IndexBucketLocator locator, UUID applicationId, String collectionName )
-            throws Exception {
-
-        if ( count <= 0 ) {
-            count = DEFAULT_COUNT;
-        }
-
-        if ( NULL_ID.equals( start ) ) {
-            start = null;
-        }
-
-        IndexScanner scanner =
-                new IndexBucketScanner( this, locator, ENTITY_ID_SETS, applicationId, IndexType.COLLECTION, key, start,
-                        finish, reversed, count, collectionName );
-
-        return scanner;
-    }
-
-
-    public int countColumns( Keyspace ko, Object columnFamily, Object key ) throws Exception {
-
-
-        CountQuery<ByteBuffer, ByteBuffer> cq = HFactory.createCountQuery( ko, be, be );
-        cq.setColumnFamily( columnFamily.toString() );
-        cq.setKey( bytebuffer( key ) );
-        cq.setRange( ByteBuffer.allocate( 0 ), ByteBuffer.allocate( 0 ), 100000000 );
-        QueryResult<Integer> r = cq.execute();
-        if ( r == null ) {
-            return 0;
-        }
-        return r.get();
-    }
-
-
-    /**
-     * Sets the id list.
-     *
-     * @param keyspace the keyspace
-     * @param targetId the target id
-     * @param columnFamily the column family
-     * @param keyPrefix the key prefix
-     * @param keySuffix the key suffix
-     * @param keyIds the key ids
-     * @param setColumnValue the set column value
-     *
-     * @throws Exception the exception
-     */
-    public void setIdList( Keyspace ko, UUID targetId, String keyPrefix, String keySuffix, List<UUID> keyIds )
-            throws Exception {
-        long timestamp = createTimestamp();
-        Mutator<ByteBuffer> batch = createMutator( ko, be );
-        batch = buildSetIdListMutator( batch, targetId, ENTITY_ID_SETS.toString(), keyPrefix, keySuffix, keyIds,
-                timestamp );
-        batchExecute( batch, CassandraService.RETRY_COUNT );
-    }
-
-
-    boolean clusterUp = false;
-
-
-    public void startClusterHealthCheck() {
-
-        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
-        executorService.scheduleWithFixedDelay( new Runnable() {
-            @Override
-            public void run() {
-                if ( cluster != null ) {
-                    HConnectionManager connectionManager = cluster.getConnectionManager();
-                    if ( connectionManager != null ) {
-                        clusterUp = !connectionManager.getHosts().isEmpty();
-                    }
-                }
-            }
-        }, 1, 5, TimeUnit.SECONDS );
-    }
-    
-    public void destroy() throws Exception {
-    	if (cluster != null) {
-    		HConnectionManager connectionManager = cluster.getConnectionManager();
-    		if (connectionManager != null) {
-    			connectionManager.shutdown();
-    		}
-    	}
-    	cluster = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java b/stack/core/src/main/java/org/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java
deleted file mode 100644
index c093329..0000000
--- a/stack/core/src/main/java/org/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.usergrid.persistence.cassandra;
-
-
-import java.util.UUID;
-
-import org.usergrid.persistence.ConnectedEntityRef;
-import org.usergrid.persistence.EntityRef;
-import org.usergrid.persistence.SimpleEntityRef;
-
-
-public class ConnectedEntityRefImpl extends SimpleEntityRef implements ConnectedEntityRef {
-
-    final String connectionType;
-
-
-    public ConnectedEntityRefImpl() {
-        super( null, null );
-        connectionType = null;
-    }
-
-
-    public ConnectedEntityRefImpl( String connectionType, EntityRef connectedEntity ) {
-        super( connectedEntity.getType(), connectedEntity.getUuid() );
-        this.connectionType = connectionType;
-    }
-
-
-    public ConnectedEntityRefImpl( String connectionType, String entityType, UUID entityId ) {
-        super( entityType, entityId );
-        this.connectionType = connectionType;
-    }
-
-
-    @Override
-    public String getConnectionType() {
-        return connectionType;
-    }
-
-
-    public static String getConnectionType( ConnectedEntityRef connection ) {
-        if ( connection == null ) {
-            return null;
-        }
-        return connection.getConnectionType();
-    }
-}