You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/28 23:21:49 UTC
[39/96] [abbrv] [partial] Change package namespace to
org.apache.usergrid
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ApplicationCF.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ApplicationCF.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ApplicationCF.java
new file mode 100644
index 0000000..817f47f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ApplicationCF.java
@@ -0,0 +1,159 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.util.List;
+
+import me.prettyprint.hector.api.ddl.ColumnDefinition;
+import static me.prettyprint.hector.api.ddl.ComparatorType.COUNTERTYPE;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.getIndexMetadata;
+
+
+public enum ApplicationCF implements CFEnum {
+
+ /** This is where the entity objects are stored */
+ ENTITY_PROPERTIES( "Entity_Properties", "BytesType" ),
+
+ /** each row models name:value pairs. {@see org.apache.usergrid.persistence.Schema} for the list of dictionary types */
+ ENTITY_DICTIONARIES( "Entity_Dictionaries", "BytesType" ),
+
+ /**
+ * Rows that are full of UUIDs. Used when we want to have a row full of references to other entities. Mainly, this
+ * is for collections. Collections are represented by this CF.
+ */
+ ENTITY_ID_SETS( "Entity_Id_Sets", "UUIDType" ),
+
+ /**
+ * Typed vs. untyped dictionary. Dynamic entity dictionaries end up here. {@link
+ * EntityManagerImpl#getDictionaryAsMap(org.apache.usergrid.persistence.EntityRef, String)}
+ */
+ ENTITY_COMPOSITE_DICTIONARIES( "Entity_Composite_Dictionaries",
+ "DynamicCompositeType(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType," +
+ "t=>TimeUUIDType,s=>UTF8Type,u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
+ "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
+ "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))" ),
+
+ /** No longer used? */
+ ENTITY_METADATA( "Entity_Metadata", "BytesType" ),
+
+ /** Contains all secondary indexes for entities */
+ ENTITY_INDEX( "Entity_Index",
+ "DynamicCompositeType(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType," +
+ "t=>TimeUUIDType,s=>UTF8Type,u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
+ "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
+ "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))" ),
+
+ /** Unique index for properties that must remain the same */
+ ENTITY_UNIQUE( "Entity_Unique", "UUIDType" ),
+
+ /** Contains all properties that have ever been indexed for an entity */
+ ENTITY_INDEX_ENTRIES( "Entity_Index_Entries",
+ "DynamicCompositeType(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType," +
+ "t=>TimeUUIDType,s=>UTF8Type,u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
+ "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
+ "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))" ),
+
+ /** All roles that exist within an application */
+ APPLICATION_ROLES( "Application_Roles", "BytesType" ),
+
+ /** Application counters */
+ APPLICATION_AGGREGATE_COUNTERS( "Application_Aggregate_Counters", "LongType", COUNTERTYPE.getClassName() ),
+
+ /** Entity counters */
+ ENTITY_COUNTERS( "Entity_Counters", "BytesType", COUNTERTYPE.getClassName() ),;
+ public final static String DEFAULT_DYNAMIC_COMPOSITE_ALIASES =
+ "(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType,t=>TimeUUIDType,s=>UTF8Type," +
+ "u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
+ "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
+ "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))";
+
+ private final String cf;
+ private final String comparator;
+ private final String validator;
+ private final String indexes;
+ private final boolean create;
+
+
+ ApplicationCF( String cf, String comparator ) {
+ this.cf = cf;
+ this.comparator = comparator;
+ validator = null;
+ indexes = null;
+ create = true;
+ }
+
+
+ ApplicationCF( String cf, String comparator, String validator ) {
+ this.cf = cf;
+ this.comparator = comparator;
+ this.validator = validator;
+ indexes = null;
+ create = true;
+ }
+
+
+ ApplicationCF( String cf, String comparator, String validator, String indexes ) {
+ this.cf = cf;
+ this.comparator = comparator;
+ this.validator = validator;
+ this.indexes = indexes;
+ create = true;
+ }
+
+
+ @Override
+ public String toString() {
+ return cf;
+ }
+
+
+ @Override
+ public String getColumnFamily() {
+ return cf;
+ }
+
+
+ @Override
+ public String getComparator() {
+ return comparator;
+ }
+
+
+ @Override
+ public String getValidator() {
+ return validator;
+ }
+
+
+ @Override
+ public boolean isComposite() {
+ return comparator.startsWith( "DynamicCompositeType" );
+ }
+
+
+ @Override
+ public List<ColumnDefinition> getMetadata() {
+ return getIndexMetadata( indexes );
+ }
+
+
+ @Override
+ public boolean create() {
+ return create;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CFEnum.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CFEnum.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CFEnum.java
new file mode 100644
index 0000000..6457406
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CFEnum.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.util.List;
+
+import me.prettyprint.hector.api.ddl.ColumnDefinition;
+
+
+public interface CFEnum {
+
+ public String getColumnFamily();
+
+ public String getComparator();
+
+ public String getValidator();
+
+ public boolean isComposite();
+
+ public List<ColumnDefinition> getMetadata();
+
+ public boolean create();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraPersistenceUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraPersistenceUtils.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraPersistenceUtils.java
new file mode 100644
index 0000000..9637845
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraPersistenceUtils.java
@@ -0,0 +1,483 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.codehaus.jackson.JsonNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.cassandra.thrift.ColumnDef;
+import org.apache.cassandra.thrift.IndexType;
+import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.utils.JsonUtils;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.cassandra.service.ThriftColumnDef;
+import me.prettyprint.hector.api.ClockResolution;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.ddl.ColumnDefinition;
+import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
+import me.prettyprint.hector.api.ddl.ComparatorType;
+import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.mutation.MutationResult;
+import me.prettyprint.hector.api.mutation.Mutator;
+import static java.nio.ByteBuffer.wrap;
+import static me.prettyprint.hector.api.factory.HFactory.createClockResolution;
+import static me.prettyprint.hector.api.factory.HFactory.createColumn;
+import static org.apache.commons.beanutils.MethodUtils.invokeStaticMethod;
+import static org.apache.commons.lang.StringUtils.removeEnd;
+import static org.apache.commons.lang.StringUtils.removeStart;
+import static org.apache.commons.lang.StringUtils.split;
+import static org.apache.commons.lang.StringUtils.substringAfterLast;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
+import static org.apache.usergrid.persistence.Schema.serializeEntityProperty;
+import static org.apache.usergrid.utils.ClassUtils.isBasicType;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
+import static org.apache.usergrid.utils.JsonUtils.toJsonNode;
+import static org.apache.usergrid.utils.StringUtils.replaceAll;
+import static org.apache.usergrid.utils.StringUtils.stringOrSubstringBeforeFirst;
+
+
+/** @author edanuff */
+public class CassandraPersistenceUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger( CassandraPersistenceUtils.class );
+
+ /** Logger for batch operations */
+ private static final Logger batch_logger =
+ LoggerFactory.getLogger( CassandraPersistenceUtils.class.getPackage().getName() + ".BATCH" );
+
+ /**
+ *
+ */
+ public static final ByteBuffer PROPERTY_TYPE_AS_BYTES = bytebuffer( PROPERTY_TYPE );
+
+ /**
+ *
+ */
+ public static final ByteBuffer PROPERTY_ID_AS_BYTES = bytebuffer( PROPERTY_UUID );
+
+ /**
+ *
+ */
+ public static final char KEY_DELIM = ':';
+
+ /**
+ *
+ */
+ public static final UUID NULL_ID = new UUID( 0, 0 );
+
+ public static final StringSerializer se = new StringSerializer();
+ public static final UUIDSerializer ue = new UUIDSerializer();
+ public static final ByteBufferSerializer be = new ByteBufferSerializer();
+
+
+ /**
+ * @param operation
+ * @param columnFamily
+ * @param key
+ * @param columnName
+ * @param columnValue
+ * @param timestamp
+ */
+ public static void logBatchOperation( String operation, Object columnFamily, Object key, Object columnName,
+ Object columnValue, long timestamp ) {
+
+ if ( batch_logger.isDebugEnabled() ) {
+ batch_logger.debug( "{} cf={} key={} name={} value={}",
+ new Object[] { operation, columnFamily, key, columnName, columnValue } );
+ }
+ }
+
+
+ public static void addInsertToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, Object columnName,
+ Object columnValue, long timestamp ) {
+
+ logBatchOperation( "Insert", columnFamily, key, columnName, columnValue, timestamp );
+
+ if ( columnName instanceof List<?> ) {
+ columnName = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
+ }
+ if ( columnValue instanceof List<?> ) {
+ columnValue = DynamicComposite.toByteBuffer( ( List<?> ) columnValue );
+ }
+
+ HColumn<ByteBuffer, ByteBuffer> column =
+ createColumn( bytebuffer( columnName ), bytebuffer( columnValue ), timestamp, be, be );
+ m.addInsertion( bytebuffer( key ), columnFamily.toString(), column );
+ }
+
+
+ public static void addInsertToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, Map<?, ?> columns,
+ long timestamp ) throws Exception {
+
+ for ( Entry<?, ?> entry : columns.entrySet() ) {
+ addInsertToMutator( m, columnFamily, key, entry.getKey(), entry.getValue(), timestamp );
+ }
+ }
+
+
+ public static void addPropertyToMutator( Mutator<ByteBuffer> m, Object key, String entityType, String propertyName,
+ Object propertyValue, long timestamp ) {
+
+ logBatchOperation( "Insert", ApplicationCF.ENTITY_PROPERTIES, key, propertyName, propertyValue, timestamp );
+
+ HColumn<ByteBuffer, ByteBuffer> column = createColumn( bytebuffer( propertyName ),
+ serializeEntityProperty( entityType, propertyName, propertyValue ), timestamp, be, be );
+ m.addInsertion( bytebuffer( key ), ApplicationCF.ENTITY_PROPERTIES.toString(), column );
+ }
+
+
+ public static void addPropertyToMutator( Mutator<ByteBuffer> m, Object key, String entityType,
+ Map<String, ?> columns, long timestamp ) throws Exception {
+
+ for ( Entry<String, ?> entry : columns.entrySet() ) {
+ addPropertyToMutator( m, key, entityType, entry.getKey(), entry.getValue(), timestamp );
+ }
+ }
+
+
+ /** Delete the row */
+ public static void addDeleteToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, long timestamp )
+ throws Exception {
+
+ logBatchOperation( "Delete", columnFamily, key, null, null, timestamp );
+
+ m.addDeletion( bytebuffer( key ), columnFamily.toString(), timestamp );
+ }
+
+
+ public static void addDeleteToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, Object columnName,
+ long timestamp ) throws Exception {
+
+ logBatchOperation( "Delete", columnFamily, key, columnName, null, timestamp );
+
+ if ( columnName instanceof List<?> ) {
+ columnName = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
+ }
+
+ m.addDeletion( bytebuffer( key ), columnFamily.toString(), bytebuffer( columnName ), be, timestamp );
+ }
+
+
+ public static void addDeleteToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, long timestamp,
+ Object... columnNames ) throws Exception {
+
+ for ( Object columnName : columnNames ) {
+ logBatchOperation( "Delete", columnFamily, key, columnName, null, timestamp );
+
+ if ( columnName instanceof List<?> ) {
+ columnName = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
+ }
+
+ m.addDeletion( bytebuffer( key ), columnFamily.toString(), bytebuffer( columnName ), be, timestamp );
+ }
+ }
+
+
+ public static Map<String, ByteBuffer> getColumnMap( List<HColumn<String, ByteBuffer>> columns ) {
+ Map<String, ByteBuffer> column_map = new TreeMap<String, ByteBuffer>( String.CASE_INSENSITIVE_ORDER );
+ if ( columns != null ) {
+ for ( HColumn<String, ByteBuffer> column : columns ) {
+ String column_name = column.getName();
+ column_map.put( column_name, column.getValue() );
+ }
+ }
+ return column_map;
+ }
+
+
+ public static <K, V> Map<K, V> asMap( List<HColumn<K, V>> columns ) {
+ if ( columns == null ) {
+ return null;
+ }
+ Map<K, V> column_map = new LinkedHashMap<K, V>();
+ for ( HColumn<K, V> column : columns ) {
+ K column_name = column.getName();
+ column_map.put( column_name, column.getValue() );
+ }
+ return column_map;
+ }
+
+
+ public static List<ByteBuffer> getAsByteKeys( List<UUID> ids ) {
+ List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
+ for ( UUID id : ids ) {
+ keys.add( bytebuffer( key( id ) ) );
+ }
+ return keys;
+ }
+
+
+ /** @return timestamp value for current time */
+ public static long createTimestamp() {
+ return createClockResolution( ClockResolution.MICROSECONDS ).createClock();
+ }
+
+
+ /** @return normalized group path */
+ public static String normalizeGroupPath( String path ) {
+ path = replaceAll( path.toLowerCase().trim(), "//", "/" );
+ path = removeStart( path, "/" );
+ path = removeEnd( path, "/" );
+ return path;
+ }
+
+
+ /** @return a composite key */
+ public static Object key( Object... objects ) {
+ if ( objects.length == 1 ) {
+ Object obj = objects[0];
+ if ( ( obj instanceof UUID ) || ( obj instanceof ByteBuffer ) ) {
+ return obj;
+ }
+ }
+ StringBuilder s = new StringBuilder();
+ for ( Object obj : objects ) {
+ if ( obj instanceof String ) {
+ s.append( ( ( String ) obj ).toLowerCase() );
+ }
+ else if ( obj instanceof List<?> ) {
+ s.append( key( ( ( List<?> ) obj ).toArray() ) );
+ }
+ else if ( obj instanceof Object[] ) {
+ s.append( key( ( Object[] ) obj ) );
+ }
+ else if ( obj != null ) {
+ s.append( obj );
+ }
+ else {
+ s.append( "*" );
+ }
+
+ s.append( KEY_DELIM );
+ }
+
+ s.deleteCharAt( s.length() - 1 );
+
+ return s.toString();
+ }
+
+
+ /** @return UUID for composite key */
+ public static UUID keyID( Object... objects ) {
+ if ( objects.length == 1 ) {
+ Object obj = objects[0];
+ if ( obj instanceof UUID ) {
+ return ( UUID ) obj;
+ }
+ }
+ String keyStr = key( objects ).toString();
+ if ( keyStr.length() == 0 ) {
+ return NULL_ID;
+ }
+ UUID uuid = UUID.nameUUIDFromBytes( keyStr.getBytes() );
+ logger.debug( "Key {} equals UUID {}", keyStr, uuid );
+ return uuid;
+ }
+
+
+ /** @return UUID for entity alias */
+ public static UUID aliasID( UUID ownerId, String aliasType, String alias ) {
+ return keyID( ownerId, aliasType, alias );
+ }
+
+
+ public static Mutator<ByteBuffer> buildSetIdListMutator( Mutator<ByteBuffer> batch, UUID targetId,
+ String columnFamily, String keyPrefix, String keySuffix,
+ List<UUID> keyIds, long timestamp ) throws Exception {
+ for ( UUID keyId : keyIds ) {
+ ByteBuffer key = null;
+ if ( ( StringUtils.isNotEmpty( keyPrefix ) ) || ( StringUtils.isNotEmpty( keySuffix ) ) ) {
+ key = bytebuffer( keyPrefix + keyId.toString() + keySuffix );
+ }
+ else {
+ key = bytebuffer( keyId );
+ }
+ addInsertToMutator( batch, columnFamily, key, targetId, ByteBuffer.allocate( 0 ), timestamp );
+ }
+ return batch;
+ }
+
+
+ public static MutationResult batchExecute( Mutator<?> m, int retries ) {
+ for ( int i = 0; i < retries; i++ ) {
+ try {
+ return m.execute();
+ }
+ catch ( Exception e ) {
+ logger.error( "Unable to execute mutation, retrying...", e );
+ }
+ }
+ return m.execute();
+ }
+
+
+ public static Object toStorableValue( Object obj ) {
+ if ( obj == null ) {
+ return null;
+ }
+
+ if ( isBasicType( obj.getClass() ) ) {
+ return obj;
+ }
+
+ if ( obj instanceof ByteBuffer ) {
+ return obj;
+ }
+
+ JsonNode json = toJsonNode( obj );
+ if ( ( json != null ) && json.isValueNode() ) {
+ if ( json.isBigInteger() ) {
+ return json.getBigIntegerValue();
+ }
+ else if ( json.isNumber() || json.isBoolean() ) {
+ return BigInteger.valueOf( json.getValueAsLong() );
+ }
+ else if ( json.isTextual() ) {
+ return json.getTextValue();
+ }
+ else if ( json.isBinary() ) {
+ try {
+ return wrap( json.getBinaryValue() );
+ }
+ catch ( IOException e ) {
+ }
+ }
+ }
+
+ return json;
+ }
+
+
+ public static ByteBuffer toStorableBinaryValue( Object obj ) {
+ obj = toStorableValue( obj );
+ if ( obj instanceof JsonNode ) {
+ return JsonUtils.toByteBuffer( obj );
+ }
+ else {
+ return bytebuffer( obj );
+ }
+ }
+
+
+ public static ByteBuffer toStorableBinaryValue( Object obj, boolean forceJson ) {
+ obj = toStorableValue( obj );
+ if ( ( obj instanceof JsonNode ) || ( forceJson && ( obj != null ) && !( obj instanceof ByteBuffer ) ) ) {
+ return JsonUtils.toByteBuffer( obj );
+ }
+ else {
+ return bytebuffer( obj );
+ }
+ }
+
+
+ public static List<ColumnDefinition> getIndexMetadata( String indexes ) {
+ if ( indexes == null ) {
+ return null;
+ }
+ String[] index_entries = split( indexes, ',' );
+ List<ColumnDef> columns = new ArrayList<ColumnDef>();
+ for ( String index_entry : index_entries ) {
+ String column_name = stringOrSubstringBeforeFirst( index_entry, ':' ).trim();
+ String comparer = substringAfterLast( index_entry, ":" ).trim();
+ if ( StringUtils.isBlank( comparer ) ) {
+ comparer = "UUIDType";
+ }
+ if ( StringUtils.isNotBlank( column_name ) ) {
+ ColumnDef cd = new ColumnDef( bytebuffer( column_name ), comparer );
+ cd.setIndex_name( column_name );
+ cd.setIndex_type( IndexType.KEYS );
+ columns.add( cd );
+ }
+ }
+ return ThriftColumnDef.fromThriftList( columns );
+ }
+
+
+ public static List<ColumnFamilyDefinition> getCfDefs( Class<? extends CFEnum> cfEnum, String keyspace ) {
+ return getCfDefs( cfEnum, null, keyspace );
+ }
+
+
+ public static List<ColumnFamilyDefinition> getCfDefs( Class<? extends CFEnum> cfEnum,
+ List<ColumnFamilyDefinition> cf_defs, String keyspace ) {
+
+ if ( cf_defs == null ) {
+ cf_defs = new ArrayList<ColumnFamilyDefinition>();
+ }
+
+ CFEnum[] values = null;
+ try {
+ values = ( CFEnum[] ) invokeStaticMethod( cfEnum, "values", ( Object[] ) null );
+ }
+ catch ( Exception e ) {
+ logger.error( "Couldn't get CFEnum values", e );
+ }
+ if ( values == null ) {
+ return null;
+ }
+
+ for ( CFEnum cf : values ) {
+ if ( !cf.create() ) {
+ continue;
+ }
+ String defaultValidationClass = cf.getValidator();
+ List<ColumnDefinition> metadata = cf.getMetadata();
+
+ ColumnFamilyDefinition cf_def = HFactory.createColumnFamilyDefinition( keyspace, cf.getColumnFamily(),
+ ComparatorType.getByClassName( cf.getComparator() ), metadata );
+
+ if ( defaultValidationClass != null ) {
+ cf_def.setDefaultValidationClass( defaultValidationClass );
+ }
+
+ cf_defs.add( cf_def );
+ }
+
+ return cf_defs;
+ }
+
+
+ public static void validateKeyspace( CFEnum[] cf_enums, KeyspaceDefinition ksDef ) {
+ Map<String, ColumnFamilyDefinition> cfs = new HashMap<String, ColumnFamilyDefinition>();
+ for ( ColumnFamilyDefinition cf : ksDef.getCfDefs() ) {
+ cfs.put( cf.getName(), cf );
+ }
+ for ( CFEnum c : cf_enums ) {
+ if ( !cfs.keySet().contains( c.getColumnFamily() ) ) {
+
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
new file mode 100644
index 0000000..1b2b9e3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
@@ -0,0 +1,1134 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.locking.LockManager;
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
+import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
+import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.cassandra.connection.HConnectionManager;
+import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.BytesArraySerializer;
+import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
+import me.prettyprint.cassandra.serializers.LongSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.cassandra.service.CassandraHostConfigurator;
+import me.prettyprint.cassandra.service.ThriftKsDef;
+import me.prettyprint.hector.api.Cluster;
+import me.prettyprint.hector.api.ConsistencyLevelPolicy;
+import me.prettyprint.hector.api.HConsistencyLevel;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.beans.ColumnSlice;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.OrderedRows;
+import me.prettyprint.hector.api.beans.Row;
+import me.prettyprint.hector.api.beans.Rows;
+import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
+import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.mutation.Mutator;
+import me.prettyprint.hector.api.query.ColumnQuery;
+import me.prettyprint.hector.api.query.CountQuery;
+import me.prettyprint.hector.api.query.MultigetSliceQuery;
+import me.prettyprint.hector.api.query.QueryResult;
+import me.prettyprint.hector.api.query.RangeSlicesQuery;
+import me.prettyprint.hector.api.query.SliceQuery;
+import static me.prettyprint.cassandra.service.FailoverPolicy.ON_FAIL_TRY_ALL_AVAILABLE;
+import static me.prettyprint.hector.api.factory.HFactory.createColumn;
+import static me.prettyprint.hector.api.factory.HFactory.createMultigetSliceQuery;
+import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static me.prettyprint.hector.api.factory.HFactory.createRangeSlicesQuery;
+import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
+import static me.prettyprint.hector.api.factory.HFactory.createVirtualKeyspace;
+import static org.apache.commons.collections.MapUtils.getIntValue;
+import static org.apache.commons.collections.MapUtils.getString;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_ID_SETS;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.buildSetIdListMutator;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
+import static org.apache.usergrid.utils.ConversionUtils.bytebuffers;
+import static org.apache.usergrid.utils.JsonUtils.mapToFormattedJsonString;
+import static org.apache.usergrid.utils.MapUtils.asMap;
+import static org.apache.usergrid.utils.MapUtils.filter;
+
+
+public class CassandraService {
+
+ public static String SYSTEM_KEYSPACE = "Usergrid";
+
+ public static String STATIC_APPLICATION_KEYSPACE = "Usergrid_Applications";
+
+ public static final boolean USE_VIRTUAL_KEYSPACES = true;
+
+ public static final String APPLICATIONS_CF = "Applications";
+ public static final String PROPERTIES_CF = "Properties";
+ public static final String TOKENS_CF = "Tokens";
+ public static final String PRINCIPAL_TOKEN_CF = "PrincipalTokens";
+
+ public static final int DEFAULT_COUNT = 1000;
+ public static final int ALL_COUNT = 100000;
+ public static final int INDEX_ENTRY_LIST_COUNT = 1000;
+ public static final int DEFAULT_SEARCH_COUNT = 10000;
+
+ public static final int RETRY_COUNT = 5;
+
+ public static final String DEFAULT_APPLICATION = "default-app";
+ public static final String DEFAULT_ORGANIZATION = "usergrid";
+ public static final String MANAGEMENT_APPLICATION = "management";
+
+ public static final UUID MANAGEMENT_APPLICATION_ID = new UUID( 0, 1 );
+ public static final UUID DEFAULT_APPLICATION_ID = new UUID( 0, 16 );
+
+ private static final Logger logger = LoggerFactory.getLogger( CassandraService.class );
+
+ private static final Logger db_logger =
+ LoggerFactory.getLogger( CassandraService.class.getPackage().getName() + ".DB" );
+
+ Cluster cluster;
+ CassandraHostConfigurator chc;
+ Properties properties;
+ LockManager lockManager;
+
+ ConsistencyLevelPolicy consistencyLevelPolicy;
+
+ private Keyspace systemKeyspace;
+
+ private Map<String, String> accessMap;
+
+ public static final StringSerializer se = new StringSerializer();
+ public static final ByteBufferSerializer be = new ByteBufferSerializer();
+ public static final UUIDSerializer ue = new UUIDSerializer();
+ public static final BytesArraySerializer bae = new BytesArraySerializer();
+ public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
+ public static final LongSerializer le = new LongSerializer();
+
+ public static final UUID NULL_ID = new UUID( 0, 0 );
+
+
+ public CassandraService( Properties properties, Cluster cluster,
+ CassandraHostConfigurator cassandraHostConfigurator, LockManager lockManager ) {
+ this.properties = properties;
+ this.cluster = cluster;
+ chc = cassandraHostConfigurator;
+ this.lockManager = lockManager;
+ db_logger.info( "" + cluster.getKnownPoolHosts( false ) );
+ }
+
+
+ public void init() throws Exception {
+ if ( consistencyLevelPolicy == null ) {
+ consistencyLevelPolicy = new ConfigurableConsistencyLevel();
+ ( ( ConfigurableConsistencyLevel ) consistencyLevelPolicy )
+ .setDefaultReadConsistencyLevel( HConsistencyLevel.ONE );
+ }
+ accessMap = new HashMap<String, String>( 2 );
+ accessMap.put( "username", properties.getProperty( "cassandra.username" ) );
+ accessMap.put( "password", properties.getProperty( "cassandra.password" ) );
+ systemKeyspace =
+ HFactory.createKeyspace( SYSTEM_KEYSPACE, cluster, consistencyLevelPolicy, ON_FAIL_TRY_ALL_AVAILABLE,
+ accessMap );
+ }
+
+
+ public Cluster getCluster() {
+ return cluster;
+ }
+
+
+ public void setCluster( Cluster cluster ) {
+ this.cluster = cluster;
+ }
+
+
+ public CassandraHostConfigurator getCassandraHostConfigurator() {
+ return chc;
+ }
+
+
+ public void setCassandraHostConfigurator( CassandraHostConfigurator chc ) {
+ this.chc = chc;
+ }
+
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+
+ public void setProperties( Properties properties ) {
+ this.properties = properties;
+ }
+
+
+ public Map<String, String> getPropertiesMap() {
+ if ( properties != null ) {
+ return asMap( properties );
+ }
+ return null;
+ }
+
+
+ public LockManager getLockManager() {
+ return lockManager;
+ }
+
+
+ public void setLockManager( LockManager lockManager ) {
+ this.lockManager = lockManager;
+ }
+
+
+ public ConsistencyLevelPolicy getConsistencyLevelPolicy() {
+ return consistencyLevelPolicy;
+ }
+
+
+ public void setConsistencyLevelPolicy( ConsistencyLevelPolicy consistencyLevelPolicy ) {
+ this.consistencyLevelPolicy = consistencyLevelPolicy;
+ }
+
+
+ /** @return keyspace for application UUID */
+ public static String keyspaceForApplication( UUID applicationId ) {
+ if ( USE_VIRTUAL_KEYSPACES ) {
+ return STATIC_APPLICATION_KEYSPACE;
+ }
+ else {
+ return "Application_" + applicationId.toString().replace( '-', '_' );
+ }
+ }
+
+
+ public static UUID prefixForApplication( UUID applicationId ) {
+ if ( USE_VIRTUAL_KEYSPACES ) {
+ return applicationId;
+ }
+ else {
+ return null;
+ }
+ }
+
+
+ public Keyspace getKeyspace( String keyspace, UUID prefix ) {
+ Keyspace ko = null;
+ if ( USE_VIRTUAL_KEYSPACES && ( prefix != null ) ) {
+ ko = createVirtualKeyspace( keyspace, prefix, ue, cluster, consistencyLevelPolicy,
+ ON_FAIL_TRY_ALL_AVAILABLE, accessMap );
+ }
+ else {
+ ko = HFactory.createKeyspace( keyspace, cluster, consistencyLevelPolicy, ON_FAIL_TRY_ALL_AVAILABLE,
+ accessMap );
+ }
+ return ko;
+ }
+
+
+ public Keyspace getApplicationKeyspace( UUID applicationId ) {
+ assert applicationId != null;
+ Keyspace ko = getKeyspace( keyspaceForApplication( applicationId ), prefixForApplication( applicationId ) );
+ return ko;
+ }
+
+
+ /** The Usergrid_Applications keyspace directly */
+ public Keyspace getUsergridApplicationKeyspace() {
+ return getKeyspace( STATIC_APPLICATION_KEYSPACE, null );
+ }
+
+
+ public Keyspace getSystemKeyspace() {
+ return systemKeyspace;
+ }
+
+
+ public boolean checkKeyspacesExist() {
+ boolean exists = false;
+ try {
+ exists = cluster.describeKeyspace( SYSTEM_KEYSPACE ) != null
+ && cluster.describeKeyspace( STATIC_APPLICATION_KEYSPACE ) != null;
+ }
+ catch ( Exception ex ) {
+ logger.error( "could not describe keyspaces", ex );
+ }
+ return exists;
+ }
+
+
+ /**
+ * Lazy creates a column family in the keyspace. If it doesn't exist, it will be created, then the call will sleep
+ * until all nodes have acknowledged the schema change
+ */
+ public void createColumnFamily( String keyspace, ColumnFamilyDefinition cfDef ) {
+
+ if ( !keySpaceExists( keyspace ) ) {
+ createKeySpace( keyspace );
+ }
+
+
+ //add the cf
+
+ if ( !cfExists( keyspace, cfDef.getName() ) ) {
+
+ //default read repair chance to 0.1
+ cfDef.setReadRepairChance( 0.1d );
+
+ cluster.addColumnFamily( cfDef, true );
+ logger.info( "Created column family {} in keyspace {}", cfDef.getName(), keyspace );
+ }
+ }
+
+
+ /** Create the column families in the list */
+ public void createColumnFamilies( String keyspace, List<ColumnFamilyDefinition> cfDefs ) {
+ for ( ColumnFamilyDefinition cfDef : cfDefs ) {
+ createColumnFamily( keyspace, cfDef );
+ }
+ }
+
+
+ /** Check if the keyspace exsts */
+ public boolean keySpaceExists( String keyspace ) {
+ KeyspaceDefinition ksDef = cluster.describeKeyspace( keyspace );
+
+ return ksDef != null;
+ }
+
+
+ /** Create the keyspace */
+ private void createKeySpace( String keyspace ) {
+ logger.info( "Creating keyspace: {}", keyspace );
+
+ String strategy_class =
+ getString( properties, "cassandra.keyspace.strategy", "org.apache.cassandra.locator.SimpleStrategy" );
+ logger.info( "Using strategy: {}", strategy_class );
+
+ int replication_factor = getIntValue( properties, "cassandra.keyspace.replication", 1 );
+ logger.info( "Using replication (may be overriden by strategy options): {}", replication_factor );
+
+ // try {
+ ThriftKsDef ks_def = ( ThriftKsDef ) HFactory
+ .createKeyspaceDefinition( keyspace, strategy_class, replication_factor,
+ new ArrayList<ColumnFamilyDefinition>() );
+
+ @SuppressWarnings({ "unchecked", "rawtypes" }) Map<String, String> strategy_options =
+ filter( ( Map ) properties, "cassandra.keyspace.strategy.options.", true );
+ if ( strategy_options.size() > 0 ) {
+ logger.info( "Strategy options: {}", mapToFormattedJsonString( strategy_options ) );
+ ks_def.setStrategyOptions( strategy_options );
+ }
+
+ cluster.addKeyspace( ks_def );
+
+ waitForCreation( keyspace );
+
+ logger.info( "Created keyspace {}", keyspace );
+ }
+
+
+ /** Wait until all nodes agree on the same schema version */
+ private void waitForCreation( String keyspace ) {
+
+ while ( true ) {
+ Map<String, List<String>> versions = cluster.describeSchemaVersions();
+ // only 1 version, return
+ if ( versions != null && versions.size() == 1 ) {
+ return;
+ }
+ // sleep and try again
+ try {
+ Thread.sleep( 100 );
+ }
+ catch ( InterruptedException e ) {
+ }
+ }
+ }
+
+
+ /** Return true if the column family exists */
+ public boolean cfExists( String keyspace, String cfName ) {
+ KeyspaceDefinition ksDef = cluster.describeKeyspace( keyspace );
+
+ if ( ksDef == null ) {
+ return false;
+ }
+
+ for ( ColumnFamilyDefinition cf : ksDef.getCfDefs() ) {
+ if ( cfName.equals( cf.getName() ) ) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+
+ /**
+ * Gets the columns.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ *
+ * @return columns
+ *
+ * @throws Exception the exception
+ */
+ public <N, V> List<HColumn<N, V>> getAllColumns( Keyspace ko, Object columnFamily, Object key,
+ Serializer<N> nameSerializer, Serializer<V> valueSerializer )
+ throws Exception {
+
+ if ( db_logger.isInfoEnabled() ) {
+ db_logger.info( "getColumns cf={} key={}", columnFamily, key );
+ }
+
+ SliceQuery<ByteBuffer, N, V> q = createSliceQuery( ko, be, nameSerializer, valueSerializer );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKey( bytebuffer( key ) );
+ q.setRange( null, null, false, ALL_COUNT );
+ QueryResult<ColumnSlice<N, V>> r = q.execute();
+ ColumnSlice<N, V> slice = r.get();
+ List<HColumn<N, V>> results = slice.getColumns();
+
+ if ( db_logger.isInfoEnabled() ) {
+ if ( results == null ) {
+ db_logger.info( "getColumns returned null" );
+ }
+ else {
+ db_logger.info( "getColumns returned {} columns", results.size() );
+ }
+ }
+
+ return results;
+ }
+
+
+ public List<HColumn<String, ByteBuffer>> getAllColumns( Keyspace ko, Object columnFamily, Object key )
+ throws Exception {
+ return getAllColumns( ko, columnFamily, key, se, be );
+ }
+
+
+ public Set<String> getAllColumnNames( Keyspace ko, Object columnFamily, Object key ) throws Exception {
+ List<HColumn<String, ByteBuffer>> columns = getAllColumns( ko, columnFamily, key );
+ Set<String> set = new LinkedHashSet<String>();
+ for ( HColumn<String, ByteBuffer> column : columns ) {
+ set.add( column.getName() );
+ }
+ return set;
+ }
+
+
+ /**
+ * Gets the columns.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ * @param start the start
+ * @param finish the finish
+ * @param count the count
+ * @param reversed the reversed
+ *
+ * @return columns
+ *
+ * @throws Exception the exception
+ */
+ public List<HColumn<ByteBuffer, ByteBuffer>> getColumns( Keyspace ko, Object columnFamily, Object key, Object start,
+ Object finish, int count, boolean reversed )
+ throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "getColumns cf=" + columnFamily + " key=" + key + " start=" + start + " finish=" + finish
+ + " count=" + count + " reversed=" + reversed );
+ }
+
+ SliceQuery<ByteBuffer, ByteBuffer, ByteBuffer> q = createSliceQuery( ko, be, be, be );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKey( bytebuffer( key ) );
+
+ ByteBuffer start_bytes = null;
+ if ( start instanceof DynamicComposite ) {
+ start_bytes = ( ( DynamicComposite ) start ).serialize();
+ }
+ else if ( start instanceof List ) {
+ start_bytes = DynamicComposite.toByteBuffer( ( List<?> ) start );
+ }
+ else {
+ start_bytes = bytebuffer( start );
+ }
+
+ ByteBuffer finish_bytes = null;
+ if ( finish instanceof DynamicComposite ) {
+ finish_bytes = ( ( DynamicComposite ) finish ).serialize();
+ }
+ else if ( finish instanceof List ) {
+ finish_bytes = DynamicComposite.toByteBuffer( ( List<?> ) finish );
+ }
+ else {
+ finish_bytes = bytebuffer( finish );
+ }
+
+ /*
+ * if (reversed) { q.setRange(finish_bytes, start_bytes, reversed, count); }
+ * else { q.setRange(start_bytes, finish_bytes, reversed, count); }
+ */
+ q.setRange( start_bytes, finish_bytes, reversed, count );
+ QueryResult<ColumnSlice<ByteBuffer, ByteBuffer>> r = q.execute();
+ ColumnSlice<ByteBuffer, ByteBuffer> slice = r.get();
+ List<HColumn<ByteBuffer, ByteBuffer>> results = slice.getColumns();
+
+ if ( db_logger.isDebugEnabled() ) {
+ if ( results == null ) {
+ db_logger.debug( "getColumns returned null" );
+ }
+ else {
+ db_logger.debug( "getColumns returned " + results.size() + " columns" );
+ }
+ }
+
+ return results;
+ }
+
+
+ public Map<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>> multiGetColumns( Keyspace ko, Object columnFamily,
+ List<?> keys, Object start,
+ Object finish, int count,
+ boolean reversed ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "multiGetColumns cf=" + columnFamily + " keys=" + keys + " start=" + start + " finish="
+ + finish + " count=" + count + " reversed=" + reversed );
+ }
+
+ MultigetSliceQuery<ByteBuffer, ByteBuffer, ByteBuffer> q = createMultigetSliceQuery( ko, be, be, be );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKeys( bytebuffers( keys ) );
+
+ ByteBuffer start_bytes = null;
+ if ( start instanceof DynamicComposite ) {
+ start_bytes = ( ( DynamicComposite ) start ).serialize();
+ }
+ else if ( start instanceof List ) {
+ start_bytes = DynamicComposite.toByteBuffer( ( List<?> ) start );
+ }
+ else {
+ start_bytes = bytebuffer( start );
+ }
+
+ ByteBuffer finish_bytes = null;
+ if ( finish instanceof DynamicComposite ) {
+ finish_bytes = ( ( DynamicComposite ) finish ).serialize();
+ }
+ else if ( finish instanceof List ) {
+ finish_bytes = DynamicComposite.toByteBuffer( ( List<?> ) finish );
+ }
+ else {
+ finish_bytes = bytebuffer( finish );
+ }
+
+ q.setRange( start_bytes, finish_bytes, reversed, count );
+ QueryResult<Rows<ByteBuffer, ByteBuffer, ByteBuffer>> r = q.execute();
+ Rows<ByteBuffer, ByteBuffer, ByteBuffer> rows = r.get();
+
+ Map<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>> results =
+ new LinkedHashMap<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>>();
+ for ( Row<ByteBuffer, ByteBuffer, ByteBuffer> row : rows ) {
+ results.put( row.getKey(), row.getColumnSlice().getColumns() );
+ }
+
+ return results;
+ }
+
+
+ /**
+ * Gets the columns.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param keys the keys
+ *
+ * @return map of keys to columns
+ *
+ * @throws Exception the exception
+ */
+ public <K, N, V> Rows<K, N, V> getRows( Keyspace ko, Object columnFamily, Collection<K> keys,
+ Serializer<K> keySerializer, Serializer<N> nameSerializer,
+ Serializer<V> valueSerializer ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "getColumns cf=" + columnFamily + " keys=" + keys );
+ }
+
+ MultigetSliceQuery<K, N, V> q = createMultigetSliceQuery( ko, keySerializer, nameSerializer, valueSerializer );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKeys( keys );
+ q.setRange( null, null, false, ALL_COUNT );
+ QueryResult<Rows<K, N, V>> r = q.execute();
+ Rows<K, N, V> results = r.get();
+
+ if ( db_logger.isInfoEnabled() ) {
+ if ( results == null ) {
+ db_logger.info( "getColumns returned null" );
+ }
+ else {
+ db_logger.info( "getColumns returned " + results.getCount() + " columns" );
+ }
+ }
+
+ return results;
+ }
+
+
+ /**
+ * Gets the columns.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ * @param columnNames the column names
+ *
+ * @return columns
+ *
+ * @throws Exception the exception
+ */
+ @SuppressWarnings("unchecked")
+ public <N, V> List<HColumn<N, V>> getColumns( Keyspace ko, Object columnFamily, Object key, Set<String> columnNames,
+ Serializer<N> nameSerializer, Serializer<V> valueSerializer )
+ throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "getColumns cf=" + columnFamily + " key=" + key + " names=" + columnNames );
+ }
+
+ SliceQuery<ByteBuffer, N, V> q = createSliceQuery( ko, be, nameSerializer, valueSerializer );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKey( bytebuffer( key ) );
+ // q.setColumnNames(columnNames.toArray(new String[0]));
+ q.setColumnNames( ( N[] ) nameSerializer.fromBytesSet( se.toBytesSet( new ArrayList<String>( columnNames ) ) )
+ .toArray() );
+
+ QueryResult<ColumnSlice<N, V>> r = q.execute();
+ ColumnSlice<N, V> slice = r.get();
+ List<HColumn<N, V>> results = slice.getColumns();
+
+ if ( db_logger.isInfoEnabled() ) {
+ if ( results == null ) {
+ db_logger.info( "getColumns returned null" );
+ }
+ else {
+ db_logger.info( "getColumns returned " + results.size() + " columns" );
+ }
+ }
+
+ return results;
+ }
+
+
+ /**
+ * Gets the columns.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param keys the keys
+ * @param columnNames the column names
+ *
+ * @return map of keys to columns
+ *
+ * @throws Exception the exception
+ */
+ @SuppressWarnings("unchecked")
+ public <K, N, V> Rows<K, N, V> getRows( Keyspace ko, Object columnFamily, Collection<K> keys,
+ Collection<String> columnNames, Serializer<K> keySerializer,
+ Serializer<N> nameSerializer, Serializer<V> valueSerializer )
+ throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "getColumns cf=" + columnFamily + " keys=" + keys + " names=" + columnNames );
+ }
+
+ MultigetSliceQuery<K, N, V> q = createMultigetSliceQuery( ko, keySerializer, nameSerializer, valueSerializer );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKeys( keys );
+ q.setColumnNames( ( N[] ) nameSerializer.fromBytesSet( se.toBytesSet( new ArrayList<String>( columnNames ) ) )
+ .toArray() );
+ QueryResult<Rows<K, N, V>> r = q.execute();
+ Rows<K, N, V> results = r.get();
+
+ if ( db_logger.isInfoEnabled() ) {
+ if ( results == null ) {
+ db_logger.info( "getColumns returned null" );
+ }
+ else {
+ db_logger.info( "getColumns returned " + results.getCount() + " columns" );
+ }
+ }
+
+ return results;
+ }
+
+
+ /**
+ * Gets the column.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ * @param column the column
+ *
+ * @return column
+ *
+ * @throws Exception the exception
+ */
+ public <N, V> HColumn<N, V> getColumn( Keyspace ko, Object columnFamily, Object key, N column,
+ Serializer<N> nameSerializer, Serializer<V> valueSerializer )
+ throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "getColumn cf=" + columnFamily + " key=" + key + " column=" + column );
+ }
+
+ /*
+ * ByteBuffer column_bytes = null; if (column instanceof List) {
+ * column_bytes = Composite.serializeToByteBuffer((List<?>) column); } else
+ * { column_bytes = bytebuffer(column); }
+ */
+
+ ColumnQuery<ByteBuffer, N, V> q = HFactory.createColumnQuery( ko, be, nameSerializer, valueSerializer );
+ QueryResult<HColumn<N, V>> r =
+ q.setKey( bytebuffer( key ) ).setName( column ).setColumnFamily( columnFamily.toString() ).execute();
+ HColumn<N, V> result = r.get();
+
+ if ( db_logger.isInfoEnabled() ) {
+ if ( result == null ) {
+ db_logger.info( "getColumn returned null" );
+ }
+ }
+
+ return result;
+ }
+
+
+ public <N, V> ColumnSlice<N, V> getColumns( Keyspace ko, Object columnFamily, Object key, N[] columns,
+ Serializer<N> nameSerializer, Serializer<V> valueSerializer )
+ throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "getColumn cf=" + columnFamily + " key=" + key + " column=" + columns );
+ }
+
+ /*
+ * ByteBuffer column_bytes = null; if (column instanceof List) {
+ * column_bytes = Composite.serializeToByteBuffer((List<?>) column); } else
+ * { column_bytes = bytebuffer(column); }
+ */
+
+ SliceQuery<ByteBuffer, N, V> q = HFactory.createSliceQuery( ko, be, nameSerializer, valueSerializer );
+ QueryResult<ColumnSlice<N, V>> r =
+ q.setKey( bytebuffer( key ) ).setColumnNames( columns ).setColumnFamily( columnFamily.toString() )
+ .execute();
+ ColumnSlice<N, V> result = r.get();
+
+ if ( db_logger.isDebugEnabled() ) {
+ if ( result == null ) {
+ db_logger.debug( "getColumn returned null" );
+ }
+ }
+
+ return result;
+ }
+
+
+ public HColumn<String, ByteBuffer> getColumn( Keyspace ko, Object columnFamily, Object key, String column )
+ throws Exception {
+ return getColumn( ko, columnFamily, key, column, se, be );
+ }
+
+
+ public void setColumn( Keyspace ko, Object columnFamily, Object key, Object columnName, Object columnValue )
+ throws Exception {
+ this.setColumn( ko, columnFamily, key, columnName, columnValue, 0 );
+ }
+
+
+ public void setColumn( Keyspace ko, Object columnFamily, Object key, Object columnName, Object columnValue,
+ int ttl ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "setColumn cf=" + columnFamily + " key=" + key + " name=" + columnName + " value="
+ + columnValue );
+ }
+
+ ByteBuffer name_bytes = null;
+ if ( columnName instanceof List ) {
+ name_bytes = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
+ }
+ else {
+ name_bytes = bytebuffer( columnName );
+ }
+
+ ByteBuffer value_bytes = null;
+ if ( columnValue instanceof List ) {
+ value_bytes = DynamicComposite.toByteBuffer( ( List<?> ) columnValue );
+ }
+ else {
+ value_bytes = bytebuffer( columnValue );
+ }
+
+ HColumn<ByteBuffer, ByteBuffer> col = createColumn( name_bytes, value_bytes, be, be );
+ if ( ttl != 0 ) {
+ col.setTtl( ttl );
+ }
+ Mutator<ByteBuffer> m = createMutator( ko, be );
+ m.insert( bytebuffer( key ), columnFamily.toString(), col );
+ }
+
+
+ /**
+ * Sets the columns.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ * @param map the map
+ *
+ * @throws Exception the exception
+ */
+ public void setColumns( Keyspace ko, Object columnFamily, byte[] key, Map<?, ?> map ) throws Exception {
+ this.setColumns( ko, columnFamily, key, map, 0 );
+ }
+
+
+ public void setColumns( Keyspace ko, Object columnFamily, byte[] key, Map<?, ?> map, int ttl ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "setColumns cf=" + columnFamily + " key=" + key + " map=" + map + ( ttl != 0 ?
+ " ttl=" + ttl : "" ) );
+ }
+
+ Mutator<ByteBuffer> m = createMutator( ko, be );
+ long timestamp = createTimestamp();
+
+ for ( Object name : map.keySet() ) {
+ Object value = map.get( name );
+ if ( value != null ) {
+
+ ByteBuffer name_bytes = null;
+ if ( name instanceof List ) {
+ name_bytes = DynamicComposite.toByteBuffer( ( List<?> ) name );
+ }
+ else {
+ name_bytes = bytebuffer( name );
+ }
+
+ ByteBuffer value_bytes = null;
+ if ( value instanceof List ) {
+ value_bytes = DynamicComposite.toByteBuffer( ( List<?> ) value );
+ }
+ else {
+ value_bytes = bytebuffer( value );
+ }
+
+ HColumn<ByteBuffer, ByteBuffer> col = createColumn( name_bytes, value_bytes, timestamp, be, be );
+ if ( ttl != 0 ) {
+ col.setTtl( ttl );
+ }
+ m.addInsertion( bytebuffer( key ), columnFamily.toString(),
+ createColumn( name_bytes, value_bytes, timestamp, be, be ) );
+ }
+ }
+ batchExecute( m, CassandraService.RETRY_COUNT );
+ }
+
+
+ /**
+ * Create a timestamp based on the TimeResolution set to the cluster.
+ *
+ * @return a timestamp
+ */
+ public long createTimestamp() {
+ return chc.getClockResolution().createClock();
+ }
+
+
+ /**
+ * Delete column.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ * @param column the column
+ *
+ * @throws Exception the exception
+ */
+ public void deleteColumn( Keyspace ko, Object columnFamily, Object key, Object column ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "deleteColumn cf=" + columnFamily + " key=" + key + " name=" + column );
+ }
+
+ Mutator<ByteBuffer> m = createMutator( ko, be );
+ m.delete( bytebuffer( key ), columnFamily.toString(), bytebuffer( column ), be );
+ }
+
+
+ /**
+ * Gets the row keys.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ *
+ * @return set of keys
+ *
+ * @throws Exception the exception
+ */
+ public <K> Set<K> getRowKeySet( Keyspace ko, Object columnFamily, Serializer<K> keySerializer ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "getRowKeys cf=" + columnFamily );
+ }
+
+ RangeSlicesQuery<K, ByteBuffer, ByteBuffer> q = createRangeSlicesQuery( ko, keySerializer, be, be );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKeys( null, null );
+ q.setColumnNames( new ByteBuffer[0] );
+ QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> r = q.execute();
+ OrderedRows<K, ByteBuffer, ByteBuffer> rows = r.get();
+
+ Set<K> results = new LinkedHashSet<K>();
+ for ( Row<K, ByteBuffer, ByteBuffer> row : rows ) {
+ results.add( row.getKey() );
+ }
+
+ if ( db_logger.isDebugEnabled() ) {
+ {
+ db_logger.debug( "getRowKeys returned " + results.size() + " rows" );
+ }
+ }
+
+ return results;
+ }
+
+
+ /**
+ * Gets the row keys as uui ds.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ *
+ * @return list of row key UUIDs
+ *
+ * @throws Exception the exception
+ */
+ public <K> List<K> getRowKeyList( Keyspace ko, Object columnFamily, Serializer<K> keySerializer ) throws Exception {
+
+ RangeSlicesQuery<K, ByteBuffer, ByteBuffer> q = createRangeSlicesQuery( ko, keySerializer, be, be );
+ q.setColumnFamily( columnFamily.toString() );
+ q.setKeys( null, null );
+ q.setColumnNames( new ByteBuffer[0] );
+ QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> r = q.execute();
+ OrderedRows<K, ByteBuffer, ByteBuffer> rows = r.get();
+
+ List<K> list = new ArrayList<K>();
+ for ( Row<K, ByteBuffer, ByteBuffer> row : rows ) {
+ list.add( row.getKey() );
+ // K uuid = row.getKey();
+ // if (uuid != UUIDUtils.ZERO_UUID) {
+ // list.add(uuid);
+ // }
+ }
+
+ return list;
+ }
+
+
+ /**
+ * Delete row.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ *
+ * @throws Exception the exception
+ */
+ public void deleteRow( Keyspace ko, final Object columnFamily, final Object key ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key );
+ }
+
+ createMutator( ko, be ).addDeletion( bytebuffer( key ), columnFamily.toString() ).execute();
+ }
+
+
+ public void deleteRow( Keyspace ko, final Object columnFamily, final String key ) throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key );
+ }
+
+ createMutator( ko, se ).addDeletion( key, columnFamily.toString() ).execute();
+ }
+
+
+ /**
+ * Delete row.
+ *
+ * @param keyspace the keyspace
+ * @param columnFamily the column family
+ * @param key the key
+ * @param timestamp the timestamp
+ *
+ * @throws Exception the exception
+ */
+ public void deleteRow( Keyspace ko, final Object columnFamily, final Object key, final long timestamp )
+ throws Exception {
+
+ if ( db_logger.isDebugEnabled() ) {
+ db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key + " timestamp=" + timestamp );
+ }
+
+ createMutator( ko, be ).addDeletion( bytebuffer( key ), columnFamily.toString(), timestamp ).execute();
+ }
+
+
+ /**
+ * Gets the id list.
+ *
+ * @param ko the keyspace
+ * @param key the key
+ * @param start the start
+ * @param finish the finish
+ * @param count the count
+ * @param reversed True if the scan should be reversed
+ * @param locator The index locator instance
+ * @param applicationId The applicationId
+ * @param collectionName The name of the collection to get the Ids for
+ *
+ * @return list of columns as UUIDs
+ *
+ * @throws Exception the exception
+ */
+ public IndexScanner getIdList( Keyspace ko, Object key, UUID start, UUID finish, int count, boolean reversed,
+ IndexBucketLocator locator, UUID applicationId, String collectionName )
+ throws Exception {
+
+ if ( count <= 0 ) {
+ count = DEFAULT_COUNT;
+ }
+
+ if ( NULL_ID.equals( start ) ) {
+ start = null;
+ }
+
+ IndexScanner scanner =
+ new IndexBucketScanner( this, locator, ENTITY_ID_SETS, applicationId, IndexType.COLLECTION, key, start,
+ finish, reversed, count, collectionName );
+
+ return scanner;
+ }
+
+
+ public int countColumns( Keyspace ko, Object columnFamily, Object key ) throws Exception {
+
+
+ CountQuery<ByteBuffer, ByteBuffer> cq = HFactory.createCountQuery( ko, be, be );
+ cq.setColumnFamily( columnFamily.toString() );
+ cq.setKey( bytebuffer( key ) );
+ cq.setRange( ByteBuffer.allocate( 0 ), ByteBuffer.allocate( 0 ), 100000000 );
+ QueryResult<Integer> r = cq.execute();
+ if ( r == null ) {
+ return 0;
+ }
+ return r.get();
+ }
+
+
+ /**
+ * Sets the id list.
+ *
+ * @param keyspace the keyspace
+ * @param targetId the target id
+ * @param columnFamily the column family
+ * @param keyPrefix the key prefix
+ * @param keySuffix the key suffix
+ * @param keyIds the key ids
+ * @param setColumnValue the set column value
+ *
+ * @throws Exception the exception
+ */
+ public void setIdList( Keyspace ko, UUID targetId, String keyPrefix, String keySuffix, List<UUID> keyIds )
+ throws Exception {
+ long timestamp = createTimestamp();
+ Mutator<ByteBuffer> batch = createMutator( ko, be );
+ batch = buildSetIdListMutator( batch, targetId, ENTITY_ID_SETS.toString(), keyPrefix, keySuffix, keyIds,
+ timestamp );
+ batchExecute( batch, CassandraService.RETRY_COUNT );
+ }
+
+
+ boolean clusterUp = false;
+
+
+ public void startClusterHealthCheck() {
+
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ executorService.scheduleWithFixedDelay( new Runnable() {
+ @Override
+ public void run() {
+ if ( cluster != null ) {
+ HConnectionManager connectionManager = cluster.getConnectionManager();
+ if ( connectionManager != null ) {
+ clusterUp = !connectionManager.getHosts().isEmpty();
+ }
+ }
+ }
+ }, 1, 5, TimeUnit.SECONDS );
+ }
+
+ public void destroy() throws Exception {
+ if (cluster != null) {
+ HConnectionManager connectionManager = cluster.getConnectionManager();
+ if (connectionManager != null) {
+ connectionManager.shutdown();
+ }
+ }
+ cluster = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java
new file mode 100644
index 0000000..2ebc199
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java
@@ -0,0 +1,61 @@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.ConnectedEntityRef;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+
+
+public class ConnectedEntityRefImpl extends SimpleEntityRef implements ConnectedEntityRef {
+
+ final String connectionType;
+
+
+ public ConnectedEntityRefImpl() {
+ super( null, null );
+ connectionType = null;
+ }
+
+
+ public ConnectedEntityRefImpl( String connectionType, EntityRef connectedEntity ) {
+ super( connectedEntity.getType(), connectedEntity.getUuid() );
+ this.connectionType = connectionType;
+ }
+
+
+ public ConnectedEntityRefImpl( String connectionType, String entityType, UUID entityId ) {
+ super( entityType, entityId );
+ this.connectionType = connectionType;
+ }
+
+
+ @Override
+ public String getConnectionType() {
+ return connectionType;
+ }
+
+
+ public static String getConnectionType( ConnectedEntityRef connection ) {
+ if ( connection == null ) {
+ return null;
+ }
+ return connection.getConnectionType();
+ }
+}