You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/28 23:21:25 UTC
[15/96] [abbrv] [partial] Change package namespace to
org.apache.usergrid
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/usergrid/persistence/cassandra/ApplicationCF.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/usergrid/persistence/cassandra/ApplicationCF.java b/stack/core/src/main/java/org/usergrid/persistence/cassandra/ApplicationCF.java
deleted file mode 100644
index 0cca4c5..0000000
--- a/stack/core/src/main/java/org/usergrid/persistence/cassandra/ApplicationCF.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.usergrid.persistence.cassandra;
-
-
-import java.util.List;
-
-import me.prettyprint.hector.api.ddl.ColumnDefinition;
-
-import static me.prettyprint.hector.api.ddl.ComparatorType.COUNTERTYPE;
-import static org.usergrid.persistence.cassandra.CassandraPersistenceUtils.getIndexMetadata;
-
-
-public enum ApplicationCF implements CFEnum {
-
- /** This is where the entity objects are stored */
- ENTITY_PROPERTIES( "Entity_Properties", "BytesType" ),
-
- /** each row models name:value pairs. {@see org.usergrid.persistence.Schema} for the list of dictionary types */
- ENTITY_DICTIONARIES( "Entity_Dictionaries", "BytesType" ),
-
- /**
- * Rows that are full of UUIDs. Used when we want to have a row full of references to other entities. Mainly, this
- * is for collections. Collections are represented by this CF.
- */
- ENTITY_ID_SETS( "Entity_Id_Sets", "UUIDType" ),
-
- /**
- * Typed vs. untyped dictionary. Dynamic entity dictionaries end up here. {@link
- * EntityManagerImpl#getDictionaryAsMap(org.usergrid.persistence.EntityRef, String)}
- */
- ENTITY_COMPOSITE_DICTIONARIES( "Entity_Composite_Dictionaries",
- "DynamicCompositeType(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType," +
- "t=>TimeUUIDType,s=>UTF8Type,u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
- "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
- "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))" ),
-
- /** No longer used? */
- ENTITY_METADATA( "Entity_Metadata", "BytesType" ),
-
- /** Contains all secondary indexes for entities */
- ENTITY_INDEX( "Entity_Index",
- "DynamicCompositeType(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType," +
- "t=>TimeUUIDType,s=>UTF8Type,u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
- "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
- "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))" ),
-
- /** Unique index for properties that must remain the same */
- ENTITY_UNIQUE( "Entity_Unique", "UUIDType" ),
-
- /** Contains all properties that have ever been indexed for an entity */
- ENTITY_INDEX_ENTRIES( "Entity_Index_Entries",
- "DynamicCompositeType(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType," +
- "t=>TimeUUIDType,s=>UTF8Type,u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
- "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
- "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))" ),
-
- /** All roles that exist within an application */
- APPLICATION_ROLES( "Application_Roles", "BytesType" ),
-
- /** Application counters */
- APPLICATION_AGGREGATE_COUNTERS( "Application_Aggregate_Counters", "LongType", COUNTERTYPE.getClassName() ),
-
- /** Entity counters */
- ENTITY_COUNTERS( "Entity_Counters", "BytesType", COUNTERTYPE.getClassName() ),;
- public final static String DEFAULT_DYNAMIC_COMPOSITE_ALIASES =
- "(a=>AsciiType,b=>BytesType,i=>IntegerType,x=>LexicalUUIDType,l=>LongType,t=>TimeUUIDType,s=>UTF8Type," +
- "u=>UUIDType,A=>AsciiType(reversed=true),B=>BytesType(reversed=true)," +
- "I=>IntegerType(reversed=true),X=>LexicalUUIDType(reversed=true),L=>LongType(reversed=true)," +
- "T=>TimeUUIDType(reversed=true),S=>UTF8Type(reversed=true),U=>UUIDType(reversed=true))";
-
- private final String cf;
- private final String comparator;
- private final String validator;
- private final String indexes;
- private final boolean create;
-
-
- ApplicationCF( String cf, String comparator ) {
- this.cf = cf;
- this.comparator = comparator;
- validator = null;
- indexes = null;
- create = true;
- }
-
-
- ApplicationCF( String cf, String comparator, String validator ) {
- this.cf = cf;
- this.comparator = comparator;
- this.validator = validator;
- indexes = null;
- create = true;
- }
-
-
- ApplicationCF( String cf, String comparator, String validator, String indexes ) {
- this.cf = cf;
- this.comparator = comparator;
- this.validator = validator;
- this.indexes = indexes;
- create = true;
- }
-
-
- @Override
- public String toString() {
- return cf;
- }
-
-
- @Override
- public String getColumnFamily() {
- return cf;
- }
-
-
- @Override
- public String getComparator() {
- return comparator;
- }
-
-
- @Override
- public String getValidator() {
- return validator;
- }
-
-
- @Override
- public boolean isComposite() {
- return comparator.startsWith( "DynamicCompositeType" );
- }
-
-
- @Override
- public List<ColumnDefinition> getMetadata() {
- return getIndexMetadata( indexes );
- }
-
-
- @Override
- public boolean create() {
- return create;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/usergrid/persistence/cassandra/CFEnum.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/usergrid/persistence/cassandra/CFEnum.java b/stack/core/src/main/java/org/usergrid/persistence/cassandra/CFEnum.java
deleted file mode 100644
index 859dd1a..0000000
--- a/stack/core/src/main/java/org/usergrid/persistence/cassandra/CFEnum.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.usergrid.persistence.cassandra;
-
-
-import java.util.List;
-
-import me.prettyprint.hector.api.ddl.ColumnDefinition;
-
-
-public interface CFEnum {
-
- public String getColumnFamily();
-
- public String getComparator();
-
- public String getValidator();
-
- public boolean isComposite();
-
- public List<ColumnDefinition> getMetadata();
-
- public boolean create();
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraPersistenceUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraPersistenceUtils.java b/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraPersistenceUtils.java
deleted file mode 100644
index 00dc4bc..0000000
--- a/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraPersistenceUtils.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.usergrid.persistence.cassandra;
-
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.codehaus.jackson.JsonNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.usergrid.utils.JsonUtils;
-
-import org.apache.cassandra.thrift.ColumnDef;
-import org.apache.cassandra.thrift.IndexType;
-import org.apache.commons.lang.StringUtils;
-
-import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.cassandra.serializers.UUIDSerializer;
-import me.prettyprint.cassandra.service.ThriftColumnDef;
-import me.prettyprint.hector.api.ClockResolution;
-import me.prettyprint.hector.api.beans.DynamicComposite;
-import me.prettyprint.hector.api.beans.HColumn;
-import me.prettyprint.hector.api.ddl.ColumnDefinition;
-import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
-import me.prettyprint.hector.api.ddl.ComparatorType;
-import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
-import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.mutation.MutationResult;
-import me.prettyprint.hector.api.mutation.Mutator;
-
-import static java.nio.ByteBuffer.wrap;
-
-import static me.prettyprint.hector.api.factory.HFactory.createClockResolution;
-import static me.prettyprint.hector.api.factory.HFactory.createColumn;
-import static org.apache.commons.beanutils.MethodUtils.invokeStaticMethod;
-import static org.apache.commons.lang.StringUtils.removeEnd;
-import static org.apache.commons.lang.StringUtils.removeStart;
-import static org.apache.commons.lang.StringUtils.split;
-import static org.apache.commons.lang.StringUtils.substringAfterLast;
-import static org.usergrid.persistence.Schema.PROPERTY_TYPE;
-import static org.usergrid.persistence.Schema.PROPERTY_UUID;
-import static org.usergrid.persistence.Schema.serializeEntityProperty;
-import static org.usergrid.utils.ClassUtils.isBasicType;
-import static org.usergrid.utils.ConversionUtils.bytebuffer;
-import static org.usergrid.utils.JsonUtils.toJsonNode;
-import static org.usergrid.utils.StringUtils.replaceAll;
-import static org.usergrid.utils.StringUtils.stringOrSubstringBeforeFirst;
-
-
-/** @author edanuff */
-public class CassandraPersistenceUtils {
-
- private static final Logger logger = LoggerFactory.getLogger( CassandraPersistenceUtils.class );
-
- /** Logger for batch operations */
- private static final Logger batch_logger =
- LoggerFactory.getLogger( CassandraPersistenceUtils.class.getPackage().getName() + ".BATCH" );
-
- /**
- *
- */
- public static final ByteBuffer PROPERTY_TYPE_AS_BYTES = bytebuffer( PROPERTY_TYPE );
-
- /**
- *
- */
- public static final ByteBuffer PROPERTY_ID_AS_BYTES = bytebuffer( PROPERTY_UUID );
-
- /**
- *
- */
- public static final char KEY_DELIM = ':';
-
- /**
- *
- */
- public static final UUID NULL_ID = new UUID( 0, 0 );
-
- public static final StringSerializer se = new StringSerializer();
- public static final UUIDSerializer ue = new UUIDSerializer();
- public static final ByteBufferSerializer be = new ByteBufferSerializer();
-
-
- /**
- * @param operation
- * @param columnFamily
- * @param key
- * @param columnName
- * @param columnValue
- * @param timestamp
- */
- public static void logBatchOperation( String operation, Object columnFamily, Object key, Object columnName,
- Object columnValue, long timestamp ) {
-
- if ( batch_logger.isDebugEnabled() ) {
- batch_logger.debug( "{} cf={} key={} name={} value={}",
- new Object[] { operation, columnFamily, key, columnName, columnValue } );
- }
- }
-
-
- public static void addInsertToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, Object columnName,
- Object columnValue, long timestamp ) {
-
- logBatchOperation( "Insert", columnFamily, key, columnName, columnValue, timestamp );
-
- if ( columnName instanceof List<?> ) {
- columnName = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
- }
- if ( columnValue instanceof List<?> ) {
- columnValue = DynamicComposite.toByteBuffer( ( List<?> ) columnValue );
- }
-
- HColumn<ByteBuffer, ByteBuffer> column =
- createColumn( bytebuffer( columnName ), bytebuffer( columnValue ), timestamp, be, be );
- m.addInsertion( bytebuffer( key ), columnFamily.toString(), column );
- }
-
-
- public static void addInsertToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, Map<?, ?> columns,
- long timestamp ) throws Exception {
-
- for ( Entry<?, ?> entry : columns.entrySet() ) {
- addInsertToMutator( m, columnFamily, key, entry.getKey(), entry.getValue(), timestamp );
- }
- }
-
-
- public static void addPropertyToMutator( Mutator<ByteBuffer> m, Object key, String entityType, String propertyName,
- Object propertyValue, long timestamp ) {
-
- logBatchOperation( "Insert", ApplicationCF.ENTITY_PROPERTIES, key, propertyName, propertyValue, timestamp );
-
- HColumn<ByteBuffer, ByteBuffer> column = createColumn( bytebuffer( propertyName ),
- serializeEntityProperty( entityType, propertyName, propertyValue ), timestamp, be, be );
- m.addInsertion( bytebuffer( key ), ApplicationCF.ENTITY_PROPERTIES.toString(), column );
- }
-
-
- public static void addPropertyToMutator( Mutator<ByteBuffer> m, Object key, String entityType,
- Map<String, ?> columns, long timestamp ) throws Exception {
-
- for ( Entry<String, ?> entry : columns.entrySet() ) {
- addPropertyToMutator( m, key, entityType, entry.getKey(), entry.getValue(), timestamp );
- }
- }
-
-
- /** Delete the row */
- public static void addDeleteToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, long timestamp )
- throws Exception {
-
- logBatchOperation( "Delete", columnFamily, key, null, null, timestamp );
-
- m.addDeletion( bytebuffer( key ), columnFamily.toString(), timestamp );
- }
-
-
- public static void addDeleteToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, Object columnName,
- long timestamp ) throws Exception {
-
- logBatchOperation( "Delete", columnFamily, key, columnName, null, timestamp );
-
- if ( columnName instanceof List<?> ) {
- columnName = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
- }
-
- m.addDeletion( bytebuffer( key ), columnFamily.toString(), bytebuffer( columnName ), be, timestamp );
- }
-
-
- public static void addDeleteToMutator( Mutator<ByteBuffer> m, Object columnFamily, Object key, long timestamp,
- Object... columnNames ) throws Exception {
-
- for ( Object columnName : columnNames ) {
- logBatchOperation( "Delete", columnFamily, key, columnName, null, timestamp );
-
- if ( columnName instanceof List<?> ) {
- columnName = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
- }
-
- m.addDeletion( bytebuffer( key ), columnFamily.toString(), bytebuffer( columnName ), be, timestamp );
- }
- }
-
-
- public static Map<String, ByteBuffer> getColumnMap( List<HColumn<String, ByteBuffer>> columns ) {
- Map<String, ByteBuffer> column_map = new TreeMap<String, ByteBuffer>( String.CASE_INSENSITIVE_ORDER );
- if ( columns != null ) {
- for ( HColumn<String, ByteBuffer> column : columns ) {
- String column_name = column.getName();
- column_map.put( column_name, column.getValue() );
- }
- }
- return column_map;
- }
-
-
- public static <K, V> Map<K, V> asMap( List<HColumn<K, V>> columns ) {
- if ( columns == null ) {
- return null;
- }
- Map<K, V> column_map = new LinkedHashMap<K, V>();
- for ( HColumn<K, V> column : columns ) {
- K column_name = column.getName();
- column_map.put( column_name, column.getValue() );
- }
- return column_map;
- }
-
-
- public static List<ByteBuffer> getAsByteKeys( List<UUID> ids ) {
- List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
- for ( UUID id : ids ) {
- keys.add( bytebuffer( key( id ) ) );
- }
- return keys;
- }
-
-
- /** @return timestamp value for current time */
- public static long createTimestamp() {
- return createClockResolution( ClockResolution.MICROSECONDS ).createClock();
- }
-
-
- /** @return normalized group path */
- public static String normalizeGroupPath( String path ) {
- path = replaceAll( path.toLowerCase().trim(), "//", "/" );
- path = removeStart( path, "/" );
- path = removeEnd( path, "/" );
- return path;
- }
-
-
- /** @return a composite key */
- public static Object key( Object... objects ) {
- if ( objects.length == 1 ) {
- Object obj = objects[0];
- if ( ( obj instanceof UUID ) || ( obj instanceof ByteBuffer ) ) {
- return obj;
- }
- }
- StringBuilder s = new StringBuilder();
- for ( Object obj : objects ) {
- if ( obj instanceof String ) {
- s.append( ( ( String ) obj ).toLowerCase() );
- }
- else if ( obj instanceof List<?> ) {
- s.append( key( ( ( List<?> ) obj ).toArray() ) );
- }
- else if ( obj instanceof Object[] ) {
- s.append( key( ( Object[] ) obj ) );
- }
- else if ( obj != null ) {
- s.append( obj );
- }
- else {
- s.append( "*" );
- }
-
- s.append( KEY_DELIM );
- }
-
- s.deleteCharAt( s.length() - 1 );
-
- return s.toString();
- }
-
-
- /** @return UUID for composite key */
- public static UUID keyID( Object... objects ) {
- if ( objects.length == 1 ) {
- Object obj = objects[0];
- if ( obj instanceof UUID ) {
- return ( UUID ) obj;
- }
- }
- String keyStr = key( objects ).toString();
- if ( keyStr.length() == 0 ) {
- return NULL_ID;
- }
- UUID uuid = UUID.nameUUIDFromBytes( keyStr.getBytes() );
- logger.debug( "Key {} equals UUID {}", keyStr, uuid );
- return uuid;
- }
-
-
- /** @return UUID for entity alias */
- public static UUID aliasID( UUID ownerId, String aliasType, String alias ) {
- return keyID( ownerId, aliasType, alias );
- }
-
-
- public static Mutator<ByteBuffer> buildSetIdListMutator( Mutator<ByteBuffer> batch, UUID targetId,
- String columnFamily, String keyPrefix, String keySuffix,
- List<UUID> keyIds, long timestamp ) throws Exception {
- for ( UUID keyId : keyIds ) {
- ByteBuffer key = null;
- if ( ( StringUtils.isNotEmpty( keyPrefix ) ) || ( StringUtils.isNotEmpty( keySuffix ) ) ) {
- key = bytebuffer( keyPrefix + keyId.toString() + keySuffix );
- }
- else {
- key = bytebuffer( keyId );
- }
- addInsertToMutator( batch, columnFamily, key, targetId, ByteBuffer.allocate( 0 ), timestamp );
- }
- return batch;
- }
-
-
- public static MutationResult batchExecute( Mutator<?> m, int retries ) {
- for ( int i = 0; i < retries; i++ ) {
- try {
- return m.execute();
- }
- catch ( Exception e ) {
- logger.error( "Unable to execute mutation, retrying...", e );
- }
- }
- return m.execute();
- }
-
-
- public static Object toStorableValue( Object obj ) {
- if ( obj == null ) {
- return null;
- }
-
- if ( isBasicType( obj.getClass() ) ) {
- return obj;
- }
-
- if ( obj instanceof ByteBuffer ) {
- return obj;
- }
-
- JsonNode json = toJsonNode( obj );
- if ( ( json != null ) && json.isValueNode() ) {
- if ( json.isBigInteger() ) {
- return json.getBigIntegerValue();
- }
- else if ( json.isNumber() || json.isBoolean() ) {
- return BigInteger.valueOf( json.getValueAsLong() );
- }
- else if ( json.isTextual() ) {
- return json.getTextValue();
- }
- else if ( json.isBinary() ) {
- try {
- return wrap( json.getBinaryValue() );
- }
- catch ( IOException e ) {
- }
- }
- }
-
- return json;
- }
-
-
- public static ByteBuffer toStorableBinaryValue( Object obj ) {
- obj = toStorableValue( obj );
- if ( obj instanceof JsonNode ) {
- return JsonUtils.toByteBuffer( obj );
- }
- else {
- return bytebuffer( obj );
- }
- }
-
-
- public static ByteBuffer toStorableBinaryValue( Object obj, boolean forceJson ) {
- obj = toStorableValue( obj );
- if ( ( obj instanceof JsonNode ) || ( forceJson && ( obj != null ) && !( obj instanceof ByteBuffer ) ) ) {
- return JsonUtils.toByteBuffer( obj );
- }
- else {
- return bytebuffer( obj );
- }
- }
-
-
- public static List<ColumnDefinition> getIndexMetadata( String indexes ) {
- if ( indexes == null ) {
- return null;
- }
- String[] index_entries = split( indexes, ',' );
- List<ColumnDef> columns = new ArrayList<ColumnDef>();
- for ( String index_entry : index_entries ) {
- String column_name = stringOrSubstringBeforeFirst( index_entry, ':' ).trim();
- String comparer = substringAfterLast( index_entry, ":" ).trim();
- if ( StringUtils.isBlank( comparer ) ) {
- comparer = "UUIDType";
- }
- if ( StringUtils.isNotBlank( column_name ) ) {
- ColumnDef cd = new ColumnDef( bytebuffer( column_name ), comparer );
- cd.setIndex_name( column_name );
- cd.setIndex_type( IndexType.KEYS );
- columns.add( cd );
- }
- }
- return ThriftColumnDef.fromThriftList( columns );
- }
-
-
- public static List<ColumnFamilyDefinition> getCfDefs( Class<? extends CFEnum> cfEnum, String keyspace ) {
- return getCfDefs( cfEnum, null, keyspace );
- }
-
-
- public static List<ColumnFamilyDefinition> getCfDefs( Class<? extends CFEnum> cfEnum,
- List<ColumnFamilyDefinition> cf_defs, String keyspace ) {
-
- if ( cf_defs == null ) {
- cf_defs = new ArrayList<ColumnFamilyDefinition>();
- }
-
- CFEnum[] values = null;
- try {
- values = ( CFEnum[] ) invokeStaticMethod( cfEnum, "values", ( Object[] ) null );
- }
- catch ( Exception e ) {
- logger.error( "Couldn't get CFEnum values", e );
- }
- if ( values == null ) {
- return null;
- }
-
- for ( CFEnum cf : values ) {
- if ( !cf.create() ) {
- continue;
- }
- String defaultValidationClass = cf.getValidator();
- List<ColumnDefinition> metadata = cf.getMetadata();
-
- ColumnFamilyDefinition cf_def = HFactory.createColumnFamilyDefinition( keyspace, cf.getColumnFamily(),
- ComparatorType.getByClassName( cf.getComparator() ), metadata );
-
- if ( defaultValidationClass != null ) {
- cf_def.setDefaultValidationClass( defaultValidationClass );
- }
-
- cf_defs.add( cf_def );
- }
-
- return cf_defs;
- }
-
-
- public static void validateKeyspace( CFEnum[] cf_enums, KeyspaceDefinition ksDef ) {
- Map<String, ColumnFamilyDefinition> cfs = new HashMap<String, ColumnFamilyDefinition>();
- for ( ColumnFamilyDefinition cf : ksDef.getCfDefs() ) {
- cfs.put( cf.getName(), cf );
- }
- for ( CFEnum c : cf_enums ) {
- if ( !cfs.keySet().contains( c.getColumnFamily() ) ) {
-
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraService.java b/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraService.java
deleted file mode 100644
index a6b6268..0000000
--- a/stack/core/src/main/java/org/usergrid/persistence/cassandra/CassandraService.java
+++ /dev/null
@@ -1,1135 +0,0 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.usergrid.persistence.cassandra;
-
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.usergrid.locking.LockManager;
-import org.usergrid.persistence.IndexBucketLocator;
-import org.usergrid.persistence.IndexBucketLocator.IndexType;
-import org.usergrid.persistence.cassandra.index.IndexBucketScanner;
-import org.usergrid.persistence.cassandra.index.IndexScanner;
-
-import me.prettyprint.cassandra.connection.HConnectionManager;
-import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
-import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
-import me.prettyprint.cassandra.serializers.BytesArraySerializer;
-import me.prettyprint.cassandra.serializers.DynamicCompositeSerializer;
-import me.prettyprint.cassandra.serializers.LongSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.cassandra.serializers.UUIDSerializer;
-import me.prettyprint.cassandra.service.CassandraHostConfigurator;
-import me.prettyprint.cassandra.service.ThriftKsDef;
-import me.prettyprint.hector.api.Cluster;
-import me.prettyprint.hector.api.ConsistencyLevelPolicy;
-import me.prettyprint.hector.api.HConsistencyLevel;
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.Serializer;
-import me.prettyprint.hector.api.beans.ColumnSlice;
-import me.prettyprint.hector.api.beans.DynamicComposite;
-import me.prettyprint.hector.api.beans.HColumn;
-import me.prettyprint.hector.api.beans.OrderedRows;
-import me.prettyprint.hector.api.beans.Row;
-import me.prettyprint.hector.api.beans.Rows;
-import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
-import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
-import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.mutation.Mutator;
-import me.prettyprint.hector.api.query.ColumnQuery;
-import me.prettyprint.hector.api.query.CountQuery;
-import me.prettyprint.hector.api.query.MultigetSliceQuery;
-import me.prettyprint.hector.api.query.QueryResult;
-import me.prettyprint.hector.api.query.RangeSlicesQuery;
-import me.prettyprint.hector.api.query.SliceQuery;
-
-import static me.prettyprint.cassandra.service.FailoverPolicy.ON_FAIL_TRY_ALL_AVAILABLE;
-import static me.prettyprint.hector.api.factory.HFactory.createColumn;
-import static me.prettyprint.hector.api.factory.HFactory.createMultigetSliceQuery;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
-import static me.prettyprint.hector.api.factory.HFactory.createRangeSlicesQuery;
-import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
-import static me.prettyprint.hector.api.factory.HFactory.createVirtualKeyspace;
-import static org.apache.commons.collections.MapUtils.getIntValue;
-import static org.apache.commons.collections.MapUtils.getString;
-import static org.usergrid.persistence.cassandra.ApplicationCF.ENTITY_ID_SETS;
-import static org.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
-import static org.usergrid.persistence.cassandra.CassandraPersistenceUtils.buildSetIdListMutator;
-import static org.usergrid.utils.ConversionUtils.bytebuffer;
-import static org.usergrid.utils.ConversionUtils.bytebuffers;
-import static org.usergrid.utils.JsonUtils.mapToFormattedJsonString;
-import static org.usergrid.utils.MapUtils.asMap;
-import static org.usergrid.utils.MapUtils.filter;
-
-
-public class CassandraService {
-
- public static String SYSTEM_KEYSPACE = "Usergrid";
-
- public static String STATIC_APPLICATION_KEYSPACE = "Usergrid_Applications";
-
- public static final boolean USE_VIRTUAL_KEYSPACES = true;
-
- public static final String APPLICATIONS_CF = "Applications";
- public static final String PROPERTIES_CF = "Properties";
- public static final String TOKENS_CF = "Tokens";
- public static final String PRINCIPAL_TOKEN_CF = "PrincipalTokens";
-
- public static final int DEFAULT_COUNT = 1000;
- public static final int ALL_COUNT = 100000;
- public static final int INDEX_ENTRY_LIST_COUNT = 1000;
- public static final int DEFAULT_SEARCH_COUNT = 10000;
-
- public static final int RETRY_COUNT = 5;
-
- public static final String DEFAULT_APPLICATION = "default-app";
- public static final String DEFAULT_ORGANIZATION = "usergrid";
- public static final String MANAGEMENT_APPLICATION = "management";
-
- public static final UUID MANAGEMENT_APPLICATION_ID = new UUID( 0, 1 );
- public static final UUID DEFAULT_APPLICATION_ID = new UUID( 0, 16 );
-
- private static final Logger logger = LoggerFactory.getLogger( CassandraService.class );
-
- private static final Logger db_logger =
- LoggerFactory.getLogger( CassandraService.class.getPackage().getName() + ".DB" );
-
- Cluster cluster;
- CassandraHostConfigurator chc;
- Properties properties;
- LockManager lockManager;
-
- ConsistencyLevelPolicy consistencyLevelPolicy;
-
- private Keyspace systemKeyspace;
-
- private Map<String, String> accessMap;
-
- public static final StringSerializer se = new StringSerializer();
- public static final ByteBufferSerializer be = new ByteBufferSerializer();
- public static final UUIDSerializer ue = new UUIDSerializer();
- public static final BytesArraySerializer bae = new BytesArraySerializer();
- public static final DynamicCompositeSerializer dce = new DynamicCompositeSerializer();
- public static final LongSerializer le = new LongSerializer();
-
- public static final UUID NULL_ID = new UUID( 0, 0 );
-
-
- public CassandraService( Properties properties, Cluster cluster,
- CassandraHostConfigurator cassandraHostConfigurator, LockManager lockManager ) {
- this.properties = properties;
- this.cluster = cluster;
- chc = cassandraHostConfigurator;
- this.lockManager = lockManager;
- db_logger.info( "" + cluster.getKnownPoolHosts( false ) );
- }
-
-
- public void init() throws Exception {
- if ( consistencyLevelPolicy == null ) {
- consistencyLevelPolicy = new ConfigurableConsistencyLevel();
- ( ( ConfigurableConsistencyLevel ) consistencyLevelPolicy )
- .setDefaultReadConsistencyLevel( HConsistencyLevel.ONE );
- }
- accessMap = new HashMap<String, String>( 2 );
- accessMap.put( "username", properties.getProperty( "cassandra.username" ) );
- accessMap.put( "password", properties.getProperty( "cassandra.password" ) );
- systemKeyspace =
- HFactory.createKeyspace( SYSTEM_KEYSPACE, cluster, consistencyLevelPolicy, ON_FAIL_TRY_ALL_AVAILABLE,
- accessMap );
- }
-
-
- public Cluster getCluster() {
- return cluster;
- }
-
-
- public void setCluster( Cluster cluster ) {
- this.cluster = cluster;
- }
-
-
- public CassandraHostConfigurator getCassandraHostConfigurator() {
- return chc;
- }
-
-
- public void setCassandraHostConfigurator( CassandraHostConfigurator chc ) {
- this.chc = chc;
- }
-
-
- public Properties getProperties() {
- return properties;
- }
-
-
- public void setProperties( Properties properties ) {
- this.properties = properties;
- }
-
-
- public Map<String, String> getPropertiesMap() {
- if ( properties != null ) {
- return asMap( properties );
- }
- return null;
- }
-
-
- public LockManager getLockManager() {
- return lockManager;
- }
-
-
- public void setLockManager( LockManager lockManager ) {
- this.lockManager = lockManager;
- }
-
-
- public ConsistencyLevelPolicy getConsistencyLevelPolicy() {
- return consistencyLevelPolicy;
- }
-
-
- public void setConsistencyLevelPolicy( ConsistencyLevelPolicy consistencyLevelPolicy ) {
- this.consistencyLevelPolicy = consistencyLevelPolicy;
- }
-
-
- /** @return keyspace for application UUID */
- public static String keyspaceForApplication( UUID applicationId ) {
- if ( USE_VIRTUAL_KEYSPACES ) {
- return STATIC_APPLICATION_KEYSPACE;
- }
- else {
- return "Application_" + applicationId.toString().replace( '-', '_' );
- }
- }
-
-
- public static UUID prefixForApplication( UUID applicationId ) {
- if ( USE_VIRTUAL_KEYSPACES ) {
- return applicationId;
- }
- else {
- return null;
- }
- }
-
-
- public Keyspace getKeyspace( String keyspace, UUID prefix ) {
- Keyspace ko = null;
- if ( USE_VIRTUAL_KEYSPACES && ( prefix != null ) ) {
- ko = createVirtualKeyspace( keyspace, prefix, ue, cluster, consistencyLevelPolicy,
- ON_FAIL_TRY_ALL_AVAILABLE, accessMap );
- }
- else {
- ko = HFactory.createKeyspace( keyspace, cluster, consistencyLevelPolicy, ON_FAIL_TRY_ALL_AVAILABLE,
- accessMap );
- }
- return ko;
- }
-
-
- public Keyspace getApplicationKeyspace( UUID applicationId ) {
- assert applicationId != null;
- Keyspace ko = getKeyspace( keyspaceForApplication( applicationId ), prefixForApplication( applicationId ) );
- return ko;
- }
-
-
- /** The Usergrid_Applications keyspace directly */
- public Keyspace getUsergridApplicationKeyspace() {
- return getKeyspace( STATIC_APPLICATION_KEYSPACE, null );
- }
-
-
- public Keyspace getSystemKeyspace() {
- return systemKeyspace;
- }
-
-
- public boolean checkKeyspacesExist() {
- boolean exists = false;
- try {
- exists = cluster.describeKeyspace( SYSTEM_KEYSPACE ) != null
- && cluster.describeKeyspace( STATIC_APPLICATION_KEYSPACE ) != null;
- }
- catch ( Exception ex ) {
- logger.error( "could not describe keyspaces", ex );
- }
- return exists;
- }
-
-
- /**
- * Lazy creates a column family in the keyspace. If it doesn't exist, it will be created, then the call will sleep
- * until all nodes have acknowledged the schema change
- */
- public void createColumnFamily( String keyspace, ColumnFamilyDefinition cfDef ) {
-
- if ( !keySpaceExists( keyspace ) ) {
- createKeySpace( keyspace );
- }
-
-
- //add the cf
-
- if ( !cfExists( keyspace, cfDef.getName() ) ) {
-
- //default read repair chance to 0.1
- cfDef.setReadRepairChance( 0.1d );
-
- cluster.addColumnFamily( cfDef, true );
- logger.info( "Created column family {} in keyspace {}", cfDef.getName(), keyspace );
- }
- }
-
-
- /** Create the column families in the list */
- public void createColumnFamilies( String keyspace, List<ColumnFamilyDefinition> cfDefs ) {
- for ( ColumnFamilyDefinition cfDef : cfDefs ) {
- createColumnFamily( keyspace, cfDef );
- }
- }
-
-
- /** Check if the keyspace exsts */
- public boolean keySpaceExists( String keyspace ) {
- KeyspaceDefinition ksDef = cluster.describeKeyspace( keyspace );
-
- return ksDef != null;
- }
-
-
- /** Create the keyspace */
- private void createKeySpace( String keyspace ) {
- logger.info( "Creating keyspace: {}", keyspace );
-
- String strategy_class =
- getString( properties, "cassandra.keyspace.strategy", "org.apache.cassandra.locator.SimpleStrategy" );
- logger.info( "Using strategy: {}", strategy_class );
-
- int replication_factor = getIntValue( properties, "cassandra.keyspace.replication", 1 );
- logger.info( "Using replication (may be overriden by strategy options): {}", replication_factor );
-
- // try {
- ThriftKsDef ks_def = ( ThriftKsDef ) HFactory
- .createKeyspaceDefinition( keyspace, strategy_class, replication_factor,
- new ArrayList<ColumnFamilyDefinition>() );
-
- @SuppressWarnings({ "unchecked", "rawtypes" }) Map<String, String> strategy_options =
- filter( ( Map ) properties, "cassandra.keyspace.strategy.options.", true );
- if ( strategy_options.size() > 0 ) {
- logger.info( "Strategy options: {}", mapToFormattedJsonString( strategy_options ) );
- ks_def.setStrategyOptions( strategy_options );
- }
-
- cluster.addKeyspace( ks_def );
-
- waitForCreation( keyspace );
-
- logger.info( "Created keyspace {}", keyspace );
- }
-
-
- /** Wait until all nodes agree on the same schema version */
- private void waitForCreation( String keyspace ) {
-
- while ( true ) {
- Map<String, List<String>> versions = cluster.describeSchemaVersions();
- // only 1 version, return
- if ( versions != null && versions.size() == 1 ) {
- return;
- }
- // sleep and try again
- try {
- Thread.sleep( 100 );
- }
- catch ( InterruptedException e ) {
- }
- }
- }
-
-
- /** Return true if the column family exists */
- public boolean cfExists( String keyspace, String cfName ) {
- KeyspaceDefinition ksDef = cluster.describeKeyspace( keyspace );
-
- if ( ksDef == null ) {
- return false;
- }
-
- for ( ColumnFamilyDefinition cf : ksDef.getCfDefs() ) {
- if ( cfName.equals( cf.getName() ) ) {
- return true;
- }
- }
-
- return false;
- }
-
-
- /**
- * Gets the columns.
- *
- * @param keyspace the keyspace
- * @param columnFamily the column family
- * @param key the key
- *
- * @return columns
- *
- * @throws Exception the exception
- */
- public <N, V> List<HColumn<N, V>> getAllColumns( Keyspace ko, Object columnFamily, Object key,
- Serializer<N> nameSerializer, Serializer<V> valueSerializer )
- throws Exception {
-
- if ( db_logger.isInfoEnabled() ) {
- db_logger.info( "getColumns cf={} key={}", columnFamily, key );
- }
-
- SliceQuery<ByteBuffer, N, V> q = createSliceQuery( ko, be, nameSerializer, valueSerializer );
- q.setColumnFamily( columnFamily.toString() );
- q.setKey( bytebuffer( key ) );
- q.setRange( null, null, false, ALL_COUNT );
- QueryResult<ColumnSlice<N, V>> r = q.execute();
- ColumnSlice<N, V> slice = r.get();
- List<HColumn<N, V>> results = slice.getColumns();
-
- if ( db_logger.isInfoEnabled() ) {
- if ( results == null ) {
- db_logger.info( "getColumns returned null" );
- }
- else {
- db_logger.info( "getColumns returned {} columns", results.size() );
- }
- }
-
- return results;
- }
-
-
- public List<HColumn<String, ByteBuffer>> getAllColumns( Keyspace ko, Object columnFamily, Object key )
- throws Exception {
- return getAllColumns( ko, columnFamily, key, se, be );
- }
-
-
- public Set<String> getAllColumnNames( Keyspace ko, Object columnFamily, Object key ) throws Exception {
- List<HColumn<String, ByteBuffer>> columns = getAllColumns( ko, columnFamily, key );
- Set<String> set = new LinkedHashSet<String>();
- for ( HColumn<String, ByteBuffer> column : columns ) {
- set.add( column.getName() );
- }
- return set;
- }
-
-
- /**
- * Gets the columns.
- *
- * @param keyspace the keyspace
- * @param columnFamily the column family
- * @param key the key
- * @param start the start
- * @param finish the finish
- * @param count the count
- * @param reversed the reversed
- *
- * @return columns
- *
- * @throws Exception the exception
- */
- public List<HColumn<ByteBuffer, ByteBuffer>> getColumns( Keyspace ko, Object columnFamily, Object key, Object start,
- Object finish, int count, boolean reversed )
- throws Exception {
-
- if ( db_logger.isDebugEnabled() ) {
- db_logger.debug( "getColumns cf=" + columnFamily + " key=" + key + " start=" + start + " finish=" + finish
- + " count=" + count + " reversed=" + reversed );
- }
-
- SliceQuery<ByteBuffer, ByteBuffer, ByteBuffer> q = createSliceQuery( ko, be, be, be );
- q.setColumnFamily( columnFamily.toString() );
- q.setKey( bytebuffer( key ) );
-
- ByteBuffer start_bytes = null;
- if ( start instanceof DynamicComposite ) {
- start_bytes = ( ( DynamicComposite ) start ).serialize();
- }
- else if ( start instanceof List ) {
- start_bytes = DynamicComposite.toByteBuffer( ( List<?> ) start );
- }
- else {
- start_bytes = bytebuffer( start );
- }
-
- ByteBuffer finish_bytes = null;
- if ( finish instanceof DynamicComposite ) {
- finish_bytes = ( ( DynamicComposite ) finish ).serialize();
- }
- else if ( finish instanceof List ) {
- finish_bytes = DynamicComposite.toByteBuffer( ( List<?> ) finish );
- }
- else {
- finish_bytes = bytebuffer( finish );
- }
-
- /*
- * if (reversed) { q.setRange(finish_bytes, start_bytes, reversed, count); }
- * else { q.setRange(start_bytes, finish_bytes, reversed, count); }
- */
- q.setRange( start_bytes, finish_bytes, reversed, count );
- QueryResult<ColumnSlice<ByteBuffer, ByteBuffer>> r = q.execute();
- ColumnSlice<ByteBuffer, ByteBuffer> slice = r.get();
- List<HColumn<ByteBuffer, ByteBuffer>> results = slice.getColumns();
-
- if ( db_logger.isDebugEnabled() ) {
- if ( results == null ) {
- db_logger.debug( "getColumns returned null" );
- }
- else {
- db_logger.debug( "getColumns returned " + results.size() + " columns" );
- }
- }
-
- return results;
- }
-
-
- public Map<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>> multiGetColumns( Keyspace ko, Object columnFamily,
- List<?> keys, Object start,
- Object finish, int count,
- boolean reversed ) throws Exception {
-
- if ( db_logger.isDebugEnabled() ) {
- db_logger.debug( "multiGetColumns cf=" + columnFamily + " keys=" + keys + " start=" + start + " finish="
- + finish + " count=" + count + " reversed=" + reversed );
- }
-
- MultigetSliceQuery<ByteBuffer, ByteBuffer, ByteBuffer> q = createMultigetSliceQuery( ko, be, be, be );
- q.setColumnFamily( columnFamily.toString() );
- q.setKeys( bytebuffers( keys ) );
-
- ByteBuffer start_bytes = null;
- if ( start instanceof DynamicComposite ) {
- start_bytes = ( ( DynamicComposite ) start ).serialize();
- }
- else if ( start instanceof List ) {
- start_bytes = DynamicComposite.toByteBuffer( ( List<?> ) start );
- }
- else {
- start_bytes = bytebuffer( start );
- }
-
- ByteBuffer finish_bytes = null;
- if ( finish instanceof DynamicComposite ) {
- finish_bytes = ( ( DynamicComposite ) finish ).serialize();
- }
- else if ( finish instanceof List ) {
- finish_bytes = DynamicComposite.toByteBuffer( ( List<?> ) finish );
- }
- else {
- finish_bytes = bytebuffer( finish );
- }
-
- q.setRange( start_bytes, finish_bytes, reversed, count );
- QueryResult<Rows<ByteBuffer, ByteBuffer, ByteBuffer>> r = q.execute();
- Rows<ByteBuffer, ByteBuffer, ByteBuffer> rows = r.get();
-
- Map<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>> results =
- new LinkedHashMap<ByteBuffer, List<HColumn<ByteBuffer, ByteBuffer>>>();
- for ( Row<ByteBuffer, ByteBuffer, ByteBuffer> row : rows ) {
- results.put( row.getKey(), row.getColumnSlice().getColumns() );
- }
-
- return results;
- }
-
-
- /**
- * Gets the columns.
- *
- * @param keyspace the keyspace
- * @param columnFamily the column family
- * @param keys the keys
- *
- * @return map of keys to columns
- *
- * @throws Exception the exception
- */
- public <K, N, V> Rows<K, N, V> getRows( Keyspace ko, Object columnFamily, Collection<K> keys,
- Serializer<K> keySerializer, Serializer<N> nameSerializer,
- Serializer<V> valueSerializer ) throws Exception {
-
- if ( db_logger.isDebugEnabled() ) {
- db_logger.debug( "getColumns cf=" + columnFamily + " keys=" + keys );
- }
-
- MultigetSliceQuery<K, N, V> q = createMultigetSliceQuery( ko, keySerializer, nameSerializer, valueSerializer );
- q.setColumnFamily( columnFamily.toString() );
- q.setKeys( keys );
- q.setRange( null, null, false, ALL_COUNT );
- QueryResult<Rows<K, N, V>> r = q.execute();
- Rows<K, N, V> results = r.get();
-
- if ( db_logger.isInfoEnabled() ) {
- if ( results == null ) {
- db_logger.info( "getColumns returned null" );
- }
- else {
- db_logger.info( "getColumns returned " + results.getCount() + " columns" );
- }
- }
-
- return results;
- }
-
-
- /**
- * Gets the columns.
- *
- * @param keyspace the keyspace
- * @param columnFamily the column family
- * @param key the key
- * @param columnNames the column names
- *
- * @return columns
- *
- * @throws Exception the exception
- */
- @SuppressWarnings("unchecked")
- public <N, V> List<HColumn<N, V>> getColumns( Keyspace ko, Object columnFamily, Object key, Set<String> columnNames,
- Serializer<N> nameSerializer, Serializer<V> valueSerializer )
- throws Exception {
-
- if ( db_logger.isDebugEnabled() ) {
- db_logger.debug( "getColumns cf=" + columnFamily + " key=" + key + " names=" + columnNames );
- }
-
- SliceQuery<ByteBuffer, N, V> q = createSliceQuery( ko, be, nameSerializer, valueSerializer );
- q.setColumnFamily( columnFamily.toString() );
- q.setKey( bytebuffer( key ) );
- // q.setColumnNames(columnNames.toArray(new String[0]));
- q.setColumnNames( ( N[] ) nameSerializer.fromBytesSet( se.toBytesSet( new ArrayList<String>( columnNames ) ) )
- .toArray() );
-
- QueryResult<ColumnSlice<N, V>> r = q.execute();
- ColumnSlice<N, V> slice = r.get();
- List<HColumn<N, V>> results = slice.getColumns();
-
- if ( db_logger.isInfoEnabled() ) {
- if ( results == null ) {
- db_logger.info( "getColumns returned null" );
- }
- else {
- db_logger.info( "getColumns returned " + results.size() + " columns" );
- }
- }
-
- return results;
- }
-
-
- /**
- * Gets the columns.
- *
- * @param keyspace the keyspace
- * @param columnFamily the column family
- * @param keys the keys
- * @param columnNames the column names
- *
- * @return map of keys to columns
- *
- * @throws Exception the exception
- */
- @SuppressWarnings("unchecked")
- public <K, N, V> Rows<K, N, V> getRows( Keyspace ko, Object columnFamily, Collection<K> keys,
- Collection<String> columnNames, Serializer<K> keySerializer,
- Serializer<N> nameSerializer, Serializer<V> valueSerializer )
- throws Exception {
-
- if ( db_logger.isDebugEnabled() ) {
- db_logger.debug( "getColumns cf=" + columnFamily + " keys=" + keys + " names=" + columnNames );
- }
-
- MultigetSliceQuery<K, N, V> q = createMultigetSliceQuery( ko, keySerializer, nameSerializer, valueSerializer );
- q.setColumnFamily( columnFamily.toString() );
- q.setKeys( keys );
- q.setColumnNames( ( N[] ) nameSerializer.fromBytesSet( se.toBytesSet( new ArrayList<String>( columnNames ) ) )
- .toArray() );
- QueryResult<Rows<K, N, V>> r = q.execute();
- Rows<K, N, V> results = r.get();
-
- if ( db_logger.isInfoEnabled() ) {
- if ( results == null ) {
- db_logger.info( "getColumns returned null" );
- }
- else {
- db_logger.info( "getColumns returned " + results.getCount() + " columns" );
- }
- }
-
- return results;
- }
-
-
- /**
- * Gets the column.
- *
- * @param keyspace the keyspace
- * @param columnFamily the column family
- * @param key the key
- * @param column the column
- *
- * @return column
- *
- * @throws Exception the exception
- */
- public <N, V> HColumn<N, V> getColumn( Keyspace ko, Object columnFamily, Object key, N column,
- Serializer<N> nameSerializer, Serializer<V> valueSerializer )
- throws Exception {
-
- if ( db_logger.isDebugEnabled() ) {
- db_logger.debug( "getColumn cf=" + columnFamily + " key=" + key + " column=" + column );
- }
-
- /*
- * ByteBuffer column_bytes = null; if (column instanceof List) {
- * column_bytes = Composite.serializeToByteBuffer((List<?>) column); } else
- * { column_bytes = bytebuffer(column); }
- */
-
- ColumnQuery<ByteBuffer, N, V> q = HFactory.createColumnQuery( ko, be, nameSerializer, valueSerializer );
- QueryResult<HColumn<N, V>> r =
- q.setKey( bytebuffer( key ) ).setName( column ).setColumnFamily( columnFamily.toString() ).execute();
- HColumn<N, V> result = r.get();
-
- if ( db_logger.isInfoEnabled() ) {
- if ( result == null ) {
- db_logger.info( "getColumn returned null" );
- }
- }
-
- return result;
- }
-
-
- public <N, V> ColumnSlice<N, V> getColumns( Keyspace ko, Object columnFamily, Object key, N[] columns,
- Serializer<N> nameSerializer, Serializer<V> valueSerializer )
- throws Exception {
-
- if ( db_logger.isDebugEnabled() ) {
- db_logger.debug( "getColumn cf=" + columnFamily + " key=" + key + " column=" + columns );
- }
-
- /*
- * ByteBuffer column_bytes = null; if (column instanceof List) {
- * column_bytes = Composite.serializeToByteBuffer((List<?>) column); } else
- * { column_bytes = bytebuffer(column); }
- */
-
- SliceQuery<ByteBuffer, N, V> q = HFactory.createSliceQuery( ko, be, nameSerializer, valueSerializer );
- QueryResult<ColumnSlice<N, V>> r =
- q.setKey( bytebuffer( key ) ).setColumnNames( columns ).setColumnFamily( columnFamily.toString() )
- .execute();
- ColumnSlice<N, V> result = r.get();
-
- if ( db_logger.isDebugEnabled() ) {
- if ( result == null ) {
- db_logger.debug( "getColumn returned null" );
- }
- }
-
- return result;
- }
-
-
- public HColumn<String, ByteBuffer> getColumn( Keyspace ko, Object columnFamily, Object key, String column )
- throws Exception {
- return getColumn( ko, columnFamily, key, column, se, be );
- }
-
-
- public void setColumn( Keyspace ko, Object columnFamily, Object key, Object columnName, Object columnValue )
- throws Exception {
- this.setColumn( ko, columnFamily, key, columnName, columnValue, 0 );
- }
-
-
- public void setColumn( Keyspace ko, Object columnFamily, Object key, Object columnName, Object columnValue,
- int ttl ) throws Exception {
-
- if ( db_logger.isDebugEnabled() ) {
- db_logger.debug( "setColumn cf=" + columnFamily + " key=" + key + " name=" + columnName + " value="
- + columnValue );
- }
-
- ByteBuffer name_bytes = null;
- if ( columnName instanceof List ) {
- name_bytes = DynamicComposite.toByteBuffer( ( List<?> ) columnName );
- }
- else {
- name_bytes = bytebuffer( columnName );
- }
-
- ByteBuffer value_bytes = null;
- if ( columnValue instanceof List ) {
- value_bytes = DynamicComposite.toByteBuffer( ( List<?> ) columnValue );
- }
- else {
- value_bytes = bytebuffer( columnValue );
- }
-
- HColumn<ByteBuffer, ByteBuffer> col = createColumn( name_bytes, value_bytes, be, be );
- if ( ttl != 0 ) {
- col.setTtl( ttl );
- }
- Mutator<ByteBuffer> m = createMutator( ko, be );
- m.insert( bytebuffer( key ), columnFamily.toString(), col );
- }
-
-
- /**
- * Sets the columns.
- *
- * @param keyspace the keyspace
- * @param columnFamily the column family
- * @param key the key
- * @param map the map
- *
- * @throws Exception the exception
- */
- public void setColumns( Keyspace ko, Object columnFamily, byte[] key, Map<?, ?> map ) throws Exception {
- this.setColumns( ko, columnFamily, key, map, 0 );
- }
-
-
- public void setColumns( Keyspace ko, Object columnFamily, byte[] key, Map<?, ?> map, int ttl ) throws Exception {
-
- if ( db_logger.isDebugEnabled() ) {
- db_logger.debug( "setColumns cf=" + columnFamily + " key=" + key + " map=" + map + ( ttl != 0 ?
- " ttl=" + ttl : "" ) );
- }
-
- Mutator<ByteBuffer> m = createMutator( ko, be );
- long timestamp = createTimestamp();
-
- for ( Object name : map.keySet() ) {
- Object value = map.get( name );
- if ( value != null ) {
-
- ByteBuffer name_bytes = null;
- if ( name instanceof List ) {
- name_bytes = DynamicComposite.toByteBuffer( ( List<?> ) name );
- }
- else {
- name_bytes = bytebuffer( name );
- }
-
- ByteBuffer value_bytes = null;
- if ( value instanceof List ) {
- value_bytes = DynamicComposite.toByteBuffer( ( List<?> ) value );
- }
- else {
- value_bytes = bytebuffer( value );
- }
-
- HColumn<ByteBuffer, ByteBuffer> col = createColumn( name_bytes, value_bytes, timestamp, be, be );
- if ( ttl != 0 ) {
- col.setTtl( ttl );
- }
- m.addInsertion( bytebuffer( key ), columnFamily.toString(),
- createColumn( name_bytes, value_bytes, timestamp, be, be ) );
- }
- }
- batchExecute( m, CassandraService.RETRY_COUNT );
- }
-
-
- /**
- * Create a timestamp based on the TimeResolution set to the cluster.
- *
- * @return a timestamp
- */
- public long createTimestamp() {
- return chc.getClockResolution().createClock();
- }
-
-
- /**
- * Delete column.
- *
- * @param keyspace the keyspace
- * @param columnFamily the column family
- * @param key the key
- * @param column the column
- *
- * @throws Exception the exception
- */
- public void deleteColumn( Keyspace ko, Object columnFamily, Object key, Object column ) throws Exception {
-
- if ( db_logger.isDebugEnabled() ) {
- db_logger.debug( "deleteColumn cf=" + columnFamily + " key=" + key + " name=" + column );
- }
-
- Mutator<ByteBuffer> m = createMutator( ko, be );
- m.delete( bytebuffer( key ), columnFamily.toString(), bytebuffer( column ), be );
- }
-
-
- /**
- * Gets the row keys.
- *
- * @param keyspace the keyspace
- * @param columnFamily the column family
- *
- * @return set of keys
- *
- * @throws Exception the exception
- */
- public <K> Set<K> getRowKeySet( Keyspace ko, Object columnFamily, Serializer<K> keySerializer ) throws Exception {
-
- if ( db_logger.isDebugEnabled() ) {
- db_logger.debug( "getRowKeys cf=" + columnFamily );
- }
-
- RangeSlicesQuery<K, ByteBuffer, ByteBuffer> q = createRangeSlicesQuery( ko, keySerializer, be, be );
- q.setColumnFamily( columnFamily.toString() );
- q.setKeys( null, null );
- q.setColumnNames( new ByteBuffer[0] );
- QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> r = q.execute();
- OrderedRows<K, ByteBuffer, ByteBuffer> rows = r.get();
-
- Set<K> results = new LinkedHashSet<K>();
- for ( Row<K, ByteBuffer, ByteBuffer> row : rows ) {
- results.add( row.getKey() );
- }
-
- if ( db_logger.isDebugEnabled() ) {
- {
- db_logger.debug( "getRowKeys returned " + results.size() + " rows" );
- }
- }
-
- return results;
- }
-
-
- /**
- * Gets the row keys as uui ds.
- *
- * @param keyspace the keyspace
- * @param columnFamily the column family
- *
- * @return list of row key UUIDs
- *
- * @throws Exception the exception
- */
- public <K> List<K> getRowKeyList( Keyspace ko, Object columnFamily, Serializer<K> keySerializer ) throws Exception {
-
- RangeSlicesQuery<K, ByteBuffer, ByteBuffer> q = createRangeSlicesQuery( ko, keySerializer, be, be );
- q.setColumnFamily( columnFamily.toString() );
- q.setKeys( null, null );
- q.setColumnNames( new ByteBuffer[0] );
- QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> r = q.execute();
- OrderedRows<K, ByteBuffer, ByteBuffer> rows = r.get();
-
- List<K> list = new ArrayList<K>();
- for ( Row<K, ByteBuffer, ByteBuffer> row : rows ) {
- list.add( row.getKey() );
- // K uuid = row.getKey();
- // if (uuid != UUIDUtils.ZERO_UUID) {
- // list.add(uuid);
- // }
- }
-
- return list;
- }
-
-
- /**
- * Delete row.
- *
- * @param keyspace the keyspace
- * @param columnFamily the column family
- * @param key the key
- *
- * @throws Exception the exception
- */
- public void deleteRow( Keyspace ko, final Object columnFamily, final Object key ) throws Exception {
-
- if ( db_logger.isDebugEnabled() ) {
- db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key );
- }
-
- createMutator( ko, be ).addDeletion( bytebuffer( key ), columnFamily.toString() ).execute();
- }
-
-
- public void deleteRow( Keyspace ko, final Object columnFamily, final String key ) throws Exception {
-
- if ( db_logger.isDebugEnabled() ) {
- db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key );
- }
-
- createMutator( ko, se ).addDeletion( key, columnFamily.toString() ).execute();
- }
-
-
- /**
- * Delete row.
- *
- * @param keyspace the keyspace
- * @param columnFamily the column family
- * @param key the key
- * @param timestamp the timestamp
- *
- * @throws Exception the exception
- */
- public void deleteRow( Keyspace ko, final Object columnFamily, final Object key, final long timestamp )
- throws Exception {
-
- if ( db_logger.isDebugEnabled() ) {
- db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key + " timestamp=" + timestamp );
- }
-
- createMutator( ko, be ).addDeletion( bytebuffer( key ), columnFamily.toString(), timestamp ).execute();
- }
-
-
- /**
- * Gets the id list.
- *
- * @param ko the keyspace
- * @param key the key
- * @param start the start
- * @param finish the finish
- * @param count the count
- * @param reversed True if the scan should be reversed
- * @param locator The index locator instance
- * @param applicationId The applicationId
- * @param collectionName The name of the collection to get the Ids for
- *
- * @return list of columns as UUIDs
- *
- * @throws Exception the exception
- */
- public IndexScanner getIdList( Keyspace ko, Object key, UUID start, UUID finish, int count, boolean reversed,
- IndexBucketLocator locator, UUID applicationId, String collectionName )
- throws Exception {
-
- if ( count <= 0 ) {
- count = DEFAULT_COUNT;
- }
-
- if ( NULL_ID.equals( start ) ) {
- start = null;
- }
-
- IndexScanner scanner =
- new IndexBucketScanner( this, locator, ENTITY_ID_SETS, applicationId, IndexType.COLLECTION, key, start,
- finish, reversed, count, collectionName );
-
- return scanner;
- }
-
-
- public int countColumns( Keyspace ko, Object columnFamily, Object key ) throws Exception {
-
-
- CountQuery<ByteBuffer, ByteBuffer> cq = HFactory.createCountQuery( ko, be, be );
- cq.setColumnFamily( columnFamily.toString() );
- cq.setKey( bytebuffer( key ) );
- cq.setRange( ByteBuffer.allocate( 0 ), ByteBuffer.allocate( 0 ), 100000000 );
- QueryResult<Integer> r = cq.execute();
- if ( r == null ) {
- return 0;
- }
- return r.get();
- }
-
-
- /**
- * Sets the id list.
- *
- * @param keyspace the keyspace
- * @param targetId the target id
- * @param columnFamily the column family
- * @param keyPrefix the key prefix
- * @param keySuffix the key suffix
- * @param keyIds the key ids
- * @param setColumnValue the set column value
- *
- * @throws Exception the exception
- */
- public void setIdList( Keyspace ko, UUID targetId, String keyPrefix, String keySuffix, List<UUID> keyIds )
- throws Exception {
- long timestamp = createTimestamp();
- Mutator<ByteBuffer> batch = createMutator( ko, be );
- batch = buildSetIdListMutator( batch, targetId, ENTITY_ID_SETS.toString(), keyPrefix, keySuffix, keyIds,
- timestamp );
- batchExecute( batch, CassandraService.RETRY_COUNT );
- }
-
-
- boolean clusterUp = false;
-
-
- public void startClusterHealthCheck() {
-
- ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
- executorService.scheduleWithFixedDelay( new Runnable() {
- @Override
- public void run() {
- if ( cluster != null ) {
- HConnectionManager connectionManager = cluster.getConnectionManager();
- if ( connectionManager != null ) {
- clusterUp = !connectionManager.getHosts().isEmpty();
- }
- }
- }
- }, 1, 5, TimeUnit.SECONDS );
- }
-
- public void destroy() throws Exception {
- if (cluster != null) {
- HConnectionManager connectionManager = cluster.getConnectionManager();
- if (connectionManager != null) {
- connectionManager.shutdown();
- }
- }
- cluster = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2c2acbe4/stack/core/src/main/java/org/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java b/stack/core/src/main/java/org/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java
deleted file mode 100644
index c093329..0000000
--- a/stack/core/src/main/java/org/usergrid/persistence/cassandra/ConnectedEntityRefImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*******************************************************************************
- * Copyright 2012 Apigee Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.usergrid.persistence.cassandra;
-
-
-import java.util.UUID;
-
-import org.usergrid.persistence.ConnectedEntityRef;
-import org.usergrid.persistence.EntityRef;
-import org.usergrid.persistence.SimpleEntityRef;
-
-
-public class ConnectedEntityRefImpl extends SimpleEntityRef implements ConnectedEntityRef {
-
- final String connectionType;
-
-
- public ConnectedEntityRefImpl() {
- super( null, null );
- connectionType = null;
- }
-
-
- public ConnectedEntityRefImpl( String connectionType, EntityRef connectedEntity ) {
- super( connectedEntity.getType(), connectedEntity.getUuid() );
- this.connectionType = connectionType;
- }
-
-
- public ConnectedEntityRefImpl( String connectionType, String entityType, UUID entityId ) {
- super( entityType, entityId );
- this.connectionType = connectionType;
- }
-
-
- @Override
- public String getConnectionType() {
- return connectionType;
- }
-
-
- public static String getConnectionType( ConnectedEntityRef connection ) {
- if ( connection == null ) {
- return null;
- }
- return connection.getConnectionType();
- }
-}