You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/08/17 21:48:42 UTC
[35/38] usergrid git commit: Merge branch 'master' into
datastax-cass-driver Fix issue with UniqueValueSerialization backwards
compatibility via CQL and legacy Usergrid data.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index 5320152,8c1f2d2..0753281
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@@ -18,34 -18,40 +18,38 @@@
package org.apache.usergrid.persistence.collection.serialization.impl;
++
+import java.nio.ByteBuffer;
import java.util.*;
-import com.netflix.astyanax.util.RangeBuilder;
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Using;
+import org.apache.usergrid.persistence.core.CassandraConfig;
++import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.*;
++
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.db.marshal.BytesType;
--
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
-import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
-import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
-import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.CassandraFig;
- import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.field.Field;
import com.google.common.base.Preconditions;
+
+ import com.netflix.astyanax.ColumnListMutation;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.model.ConsistencyLevel;
-import com.netflix.astyanax.model.Row;
-import com.netflix.astyanax.query.RowQuery;
++
+
/**
* Reads and writes to UniqueValues column family.
@@@ -53,27 -59,25 +57,29 @@@
public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
implements UniqueValueSerializationStrategy {
- private static final Logger log = LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class );
+ private static final Logger logger = LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class );
+ public static final String UUID_TYPE_REVERSED = "UUIDType(reversed=true)";
- private final MultiTenantColumnFamily<ScopedRowKey<FieldKey>, EntityVersion>
- CF_UNIQUE_VALUES;
+ private final String TABLE_UNIQUE_VALUES;
+ private final String TABLE_UNIQUE_VALUES_LOG;
+
+ private final Map COLUMNS_UNIQUE_VALUES;
+ private final Map COLUMNS_UNIQUE_VALUES_LOG;
- private final MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry>
- CF_ENTITY_UNIQUE_VALUE_LOG ;
public static final int COL_VALUE = 0x0;
+ private final Comparator<UniqueValue> uniqueValueComparator = new UniqueValueComparator();
+
private final SerializationFig serializationFig;
- protected final Keyspace keyspace;
private final CassandraFig cassandraFig;
+ private final Session session;
+ private final CassandraConfig cassandraConfig;
+
/**
* Construct serialization strategy for keyspace.
@@@ -186,134 -182,251 +192,285 @@@
final EntityVersion ev = new EntityVersion( entityId, entityVersion );
final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field );
- return doWrite( scope, value, new RowOp() {
- @Override
- public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) {
- colMutation.deleteColumn( ev );
- }
+ ByteBuffer partitionKey = getPartitionKey( scope.getApplication(), value.getEntityId().getType(),
+ value.getField().getTypeName().toString(), value.getField().getName(), value.getField().getValue());
+ ByteBuffer columnValue = serializeUniqueValueColumn(ev);
- @Override
- public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) {
- colMutation.deleteColumn( uniqueFieldEntry );
- }
- } );
- }
-
-
- /**
- * Do the column update or delete for the given column and row key
- *
- * @param applicationScope We need to use this when getting the keyspace
- * @param uniqueValue The unique value to write
- * @param op The operation to write
- */
- private MutationBatch doWrite( ApplicationScope applicationScope, UniqueValue uniqueValue, RowOp op ) {
- final MutationBatch batch = keyspace.prepareMutationBatch();
+ final Clause uniqueEqKey = QueryBuilder.eq("key", partitionKey );
+ final Clause uniqueEqColumn = QueryBuilder.eq("column1", columnValue );
+ Statement uniqueDelete = QueryBuilder.delete().from(TABLE_UNIQUE_VALUES).where(uniqueEqKey).and(uniqueEqColumn);
+ batch.add(uniqueDelete);
- final Id applicationId = applicationScope.getApplication();
- final FieldKey fieldKey = createUniqueValueKey( applicationId, uniqueValue.getEntityId().getType(), uniqueValue.getField() );
+ ByteBuffer logPartitionKey = getLogPartitionKey(scope.getApplication(), entityId);
+ ByteBuffer logColumnValue = serializeUniqueValueLogColumn(uniqueFieldEntry);
- op.doLookup( batch.withRow( CF_UNIQUE_VALUES, ScopedRowKey.fromKey( applicationId, fieldKey ) ) );
+ final Clause uniqueLogEqKey = QueryBuilder.eq("key", logPartitionKey );
+ final Clause uniqueLogEqColumn = QueryBuilder.eq("column1", logColumnValue );
- final EntityKey entityKey = createEntityUniqueLogKey( applicationId, uniqueValue.getEntityId() );
+ Statement uniqueLogDelete = QueryBuilder.delete()
+ .from(TABLE_UNIQUE_VALUES_LOG).where(uniqueLogEqKey).and( uniqueLogEqColumn);
- op.doLog( batch.withRow( CF_ENTITY_UNIQUE_VALUE_LOG,
- ScopedRowKey.fromKey( applicationId, entityKey ) ) );
+ batch.add(uniqueLogDelete);
+ if ( logger.isTraceEnabled() ) {
+ logger.trace( "Building batch statement for unique value entity={} version={} name={} value={} ",
- uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion(),
- uniqueValue.getField().getName(), uniqueValue.getField().getValue() );
++ value.getEntityId().getUuid(), value.getEntityVersion(),
++ value.getField().getName(), value.getField().getValue() );
+ }
+
+
+
return batch;
}
@Override
-- public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields )
- {
- return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCl() ), type, fields );
- throws ConnectionException {
- return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCL() ), type, fields, false);
++ public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields ) {
++
++ return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCl() ), type, fields, false );
++
}
+ @Override
+ public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields,
- boolean useReadRepair)
- throws ConnectionException {
- return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCL() ), type, fields, useReadRepair);
++ boolean useReadRepair) {
++
++ return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getReadCl() ), type, fields, useReadRepair);
++
+ }
+
+
@Override
- public UniqueValueSet load(final ApplicationScope appScope, final ConsistencyLevel consistencyLevel,
- final String type, final Collection<Field> fields, boolean useReadRepair) throws ConnectionException {
+ public UniqueValueSet load( final ApplicationScope appScope,
+ final ConsistencyLevel consistencyLevel,
- final String type, final Collection<Field> fields ) {
++ final String type, final Collection<Field> fields, boolean useReadRepair ) {
++
Preconditions.checkNotNull( fields, "fields are required" );
Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" );
- return loadCQL(appScope, consistencyLevel, type, fields);
++ return loadCQL(appScope, consistencyLevel, type, fields, useReadRepair);
+
+ }
+
+
+ private UniqueValueSet loadCQL( final ApplicationScope appScope,
+ final ConsistencyLevel consistencyLevel,
- final String type, final Collection<Field> fields ) {
++ final String type, final Collection<Field> fields, boolean useReadRepair ) {
+
+ Preconditions.checkNotNull( fields, "fields are required" );
+ Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" );
- final List<ScopedRowKey<FieldKey>> keys = new ArrayList<>( fields.size() );
final Id applicationId = appScope.getApplication();
- for ( Field field : fields ) {
+ // row key = app UUID + app type + entityType + field type + field name + field value
- List<ByteBuffer> partitionKeys = new ArrayList<>( fields.size() );
- final FieldKey key = createUniqueValueKey( applicationId, type, field );
+
+
- final ScopedRowKey<FieldKey> rowKey =
- ScopedRowKey.fromKey( applicationId, key );
+
- keys.add( rowKey );
- }
++ //List<ByteBuffer> partitionKeys = new ArrayList<>( fields.size() );
+
+ final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() );
+
- Iterator<Row<ScopedRowKey<FieldKey>, EntityVersion>> results =
- keyspace.prepareQuery( CF_UNIQUE_VALUES ).setConsistencyLevel( consistencyLevel ).getKeySlice( keys )
- .withColumnRange(new RangeBuilder().setLimit(serializationFig.getMaxLoadSize()).build())
- .execute().getResult().iterator();
+
- if( !results.hasNext()){
- if(logger.isTraceEnabled()){
- logger.trace("No partitions returned for unique value lookup");
- }
- }
+ for ( Field field : fields ) {
+
+ //log.info(Bytes.toHexString(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue())));
- partitionKeys.add(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue()));
++ //partitionKeys.add(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue()));
- }
- while ( results.hasNext() )
++ final Clause inKey = QueryBuilder.in("key", getPartitionKey(applicationId, type,
++ field.getTypeName().toString(), field.getName(), field.getValue()) );
- final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() );
- {
++ final Statement statement = QueryBuilder.select().all().from(TABLE_UNIQUE_VALUES)
++ .where(inKey)
++ .setConsistencyLevel(consistencyLevel);
- final Clause inKey = QueryBuilder.in("key", partitionKeys );
- final Row<ScopedRowKey<FieldKey>, EntityVersion> unique = results.next();
++ final ResultSet resultSet = session.execute(statement);
- final Statement statement = QueryBuilder.select().all().from(TABLE_UNIQUE_VALUES)
- .where(inKey)
- .setConsistencyLevel(consistencyLevel);
- final Field field = parseRowKey( unique.getKey() );
- final ResultSet resultSet = session.execute(statement);
- final Iterator<Column<EntityVersion>> columnList = unique.getColumns().iterator();
++ Iterator<com.datastax.driver.core.Row> results = resultSet.iterator();
- //sanity check, nothing to do, skip it
- if ( !columnList.hasNext() ) {
++ if( !results.hasNext()){
+ if(logger.isTraceEnabled()){
- logger.trace("No cells exist in partition for unique value [{}={}]",
- field.getName(), field.getValue().toString());
++ logger.trace("No rows returned for unique value lookup of field: {}", field);
+ }
- continue;
+ }
- Iterator<com.datastax.driver.core.Row> results = resultSet.iterator();
+ List<UniqueValue> candidates = new ArrayList<>();
- while( results.hasNext() ){
- /**
- * While iterating the columns, a rule is being enforced to only EVER return the oldest UUID. This means
- * the UUID with the oldest timestamp ( it was the original entity written for the unique value ).
- *
- * We do this to prevent cycling of unique value -> entity UUID mappings as this data is ordered by the
- * entity's version and not the entity's timestamp itself.
- *
- * If newer entity UUIDs are encountered, they are removed from the unique value tables, however their
- * backing serialized entity data is left in tact in case a cleanup / audit is later needed.
- */
- while (columnList.hasNext()) {
++ while( results.hasNext() ){
+
- final com.datastax.driver.core.Row unique = results.next();
- ByteBuffer partitionKey = unique.getBytes("key");
- ByteBuffer column = unique.getBytesUnsafe("column1");
++ final com.datastax.driver.core.Row unique = results.next();
++ ByteBuffer partitionKey = unique.getBytes("key");
++ ByteBuffer column = unique.getBytesUnsafe("column1");
+
- List<Object> keyContents = deserializePartitionKey(partitionKey);
- List<Object> columnContents = deserializeUniqueValueColumn(column);
++ List<Object> keyContents = deserializePartitionKey(partitionKey);
++ List<Object> columnContents = deserializeUniqueValueColumn(column);
+
- FieldTypeName fieldType;
- String name;
- String value;
- if(this instanceof UniqueValueSerializationStrategyV2Impl) {
++ FieldTypeName fieldType;
++ String name;
++ String value;
++ if(this instanceof UniqueValueSerializationStrategyV2Impl) {
+
- fieldType = FieldTypeName.valueOf((String) keyContents.get(3));
- name = (String) keyContents.get(4);
- value = (String) keyContents.get(5);
+
- }else{
++ fieldType = FieldTypeName.valueOf((String) keyContents.get(3));
++ name = (String) keyContents.get(4);
++ value = (String) keyContents.get(5);
+
- fieldType = FieldTypeName.valueOf((String) keyContents.get(5));
- name = (String) keyContents.get(6);
- value = (String) keyContents.get(7);
++ }else{
+
- }
+
- Field field = getField(name, value, fieldType);
++ fieldType = FieldTypeName.valueOf((String) keyContents.get(5));
++ name = (String) keyContents.get(6);
++ value = (String) keyContents.get(7);
++
++
++ }
++
++ Field returnedField = getField(name, value, fieldType);
++
++
++ final EntityVersion entityVersion = new EntityVersion(
++ new SimpleId((UUID)columnContents.get(1), (String)columnContents.get(2)), (UUID)columnContents.get(0));
++// //sanity check, nothing to do, skip it
++// if ( !columnList.hasNext() ) {
++// if(logger.isTraceEnabled()){
++// logger.trace("No cells exist in partition for unique value [{}={}]",
++// field.getName(), field.getValue().toString());
++// }
++// continue;
++// }
++
++
++
++
++ /**
++ * While iterating the rows, a rule is enforced to only EVER return the oldest UUID for the field.
++ * This means the UUID with the oldest timestamp ( it was the original entity written for
++ * the unique value ).
++ *
++ * We do this to prevent cycling of unique value -> entity UUID mappings as this data is ordered by the
++ * entity's version and not the entity's timestamp itself.
++ *
++ * If newer entity UUIDs are encountered, they are removed from the unique value tables, however their
++ * backing serialized entity data is left in tact in case a cleanup / audit is later needed.
++ */
+
- final EntityVersion entityVersion = columnList.next().getName();
+
+ final UniqueValue uniqueValue =
- new UniqueValueImpl(field, entityVersion.getEntityId(), entityVersion.getEntityVersion());
++ new UniqueValueImpl(returnedField, entityVersion.getEntityId(), entityVersion.getEntityVersion());
+
+ // set the initial candidate and move on
+ if (candidates.size() == 0) {
+ candidates.add(uniqueValue);
+ if (logger.isTraceEnabled()) {
+ logger.trace("First entry for unique value [{}={}] found for application [{}], adding " +
+ "entry with entity id [{}] and entity version [{}] to the candidate list and continuing",
- field.getName(), field.getValue().toString(), applicationId.getType(),
++ returnedField.getName(), returnedField.getValue().toString(), applicationId.getType(),
+ uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());
+ }
- final EntityVersion entityVersion = new EntityVersion(
- new SimpleId((UUID)columnContents.get(1), (String)columnContents.get(2)), (UUID)columnContents.get(0));
+ continue;
+ }
+ if(!useReadRepair){
- final UniqueValueImpl uniqueValue =
- new UniqueValueImpl( field, entityVersion.getEntityId(), entityVersion.getEntityVersion() );
+ // take only the first
+ if (logger.isTraceEnabled()) {
+ logger.trace("Read repair not enabled for this request of unique value [{}={}], breaking out" +
- " of cell loop", field.getName(), field.getValue().toString());
++ " of cell loop", returnedField.getName(), returnedField.getValue().toString());
+ }
+ break;
+
+ } else {
+
+
+ final int result = uniqueValueComparator.compare(uniqueValue, candidates.get(candidates.size() - 1));
+
+ if (result == 0) {
+
+ // do nothing, only versions can be newer and we're not worried about newer versions of same entity
+ if (logger.isTraceEnabled()) {
+ logger.trace("Current unique value [{}={}] entry has UUID [{}] equal to candidate UUID [{}]",
- field.getName(), field.getValue().toString(), uniqueValue.getEntityId().getUuid(),
++ returnedField.getName(), returnedField.getValue().toString(), uniqueValue.getEntityId().getUuid(),
+ candidates.get(candidates.size() -1));
+ }
+
+ // update candidate w/ latest version
+ candidates.add(uniqueValue);
+
+ } else if (result < 0) {
+
+ // delete the duplicate from the unique value index
+ candidates.forEach(candidate -> {
+
- try {
-
- logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer " +
- "entry with entity id [{}] and entity version [{}]", field.getName(),
- field.getValue().toString(), applicationId.getUuid(),
- candidate.getEntityId().getUuid(), candidate.getEntityVersion());
-
- delete(appScope, candidate).execute();
++ logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer " +
++ "entry with entity id [{}] and entity version [{}]", returnedField.getName(),
++ returnedField.getValue().toString(), applicationId.getUuid(),
++ candidate.getEntityId().getUuid(), candidate.getEntityVersion());
+
- } catch (ConnectionException e) {
- logger.error( "Unable to connect to cassandra during duplicate repair of [{}={}]",
- field.getName(), field.getValue().toString() );
- }
++ session.execute(deleteCQL(appScope, candidate));
+
+ });
+
+ // clear the transient candidates list
+ candidates.clear();
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("Updating candidate unique value [{}={}] to entity id [{}] and " +
- "entity version [{}]", field.getName(), field.getValue().toString(),
++ "entity version [{}]", returnedField.getName(), returnedField.getValue().toString(),
+ uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());
+
+ }
+
+ // add our new candidate to the list
+ candidates.add(uniqueValue);
+
+
+ } else {
+
+ logger.warn("Duplicate unique value [{}={}] found for application [{}], removing newer entry " +
- "with entity id [{}] and entity version [{}].", field.getName(), field.getValue().toString(),
++ "with entity id [{}] and entity version [{}].", returnedField.getName(), returnedField.getValue().toString(),
+ applicationId.getUuid(), uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());
+
+ // delete the duplicate from the unique value index
- delete(appScope, uniqueValue).execute();
++ session.execute(deleteCQL(appScope, uniqueValue));
+
+
+ }
+
+ }
- }
+
- // take the last candidate ( should be the latest version) and add to the result set
++ }
+
- final UniqueValue returnValue = candidates.get(candidates.size() -1);
- if(logger.isTraceEnabled()){
- logger.trace("Adding unique value [{}={}] with entity id [{}] and entity version [{}] to response set",
- returnValue.getField().getName(), returnValue.getField().getValue().toString(),
- returnValue.getEntityId().getUuid(), returnValue.getEntityVersion());
++ if ( candidates.size() > 0 ) {
++ // take the last candidate ( should be the latest version) and add to the result set
++ final UniqueValue returnValue = candidates.get(candidates.size() - 1);
++ if (logger.isTraceEnabled()) {
++ logger.trace("Adding unique value [{}={}] with entity id [{}] and entity version [{}] to response set",
++ returnValue.getField().getName(), returnValue.getField().getValue().toString(),
++ returnValue.getEntityId().getUuid(), returnValue.getEntityVersion());
++ }
++ uniqueValueSet.addValue(returnValue);
+ }
- uniqueValueSet.addValue(returnValue);
+
- uniqueValueSet.addValue(uniqueValue);
}
++
return uniqueValueSet;
+
}
@@@ -366,103 -471,101 +523,127 @@@
/**
- * Converts raw columns to the expected output
+ * Get the CQL table definition for the unique values log table
*/
- private static final class UniqueEntryParser implements ColumnParser<UniqueFieldEntry, UniqueValue> {
+ protected abstract TableDefinition getEntityUniqueLogTable();
+
+ public class AllUniqueFieldsIterator implements Iterable<UniqueValue>, Iterator<UniqueValue> {
+
+ private final Session session;
+ private final Statement query;
private final Id entityId;
+ private Iterator<Row> sourceIterator;
- private UniqueEntryParser( final Id entityId ) {this.entityId = entityId;}
- @Override
- public UniqueValue parseColumn( final Column<UniqueFieldEntry> column ) {
- final UniqueFieldEntry entry = column.getName();
+ public AllUniqueFieldsIterator( final Session session, final Statement query, final Id entityId){
+
+ this.session = session;
+ this.query = query;
+ this.entityId = entityId;
- return new UniqueValueImpl( entry.getField(), entityId, entry.getVersion() );
}
- }
- @Override
- public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+ @Override
+ public Iterator<UniqueValue> iterator() {
+ return this;
+ }
- final MultiTenantColumnFamilyDefinition uniqueLookupCF =
- new MultiTenantColumnFamilyDefinition( CF_UNIQUE_VALUES, BytesType.class.getSimpleName(),
- ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
- MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
+ @Override
+ public boolean hasNext() {
- final MultiTenantColumnFamilyDefinition uniqueLogCF =
- new MultiTenantColumnFamilyDefinition( CF_ENTITY_UNIQUE_VALUE_LOG, BytesType.class.getSimpleName(),
- ColumnTypes.DYNAMIC_COMPOSITE_TYPE, BytesType.class.getSimpleName(),
- MultiTenantColumnFamilyDefinition.CacheOption.KEYS );
+ if ( sourceIterator == null ) {
- return Arrays.asList( uniqueLookupCF, uniqueLogCF );
- }
+ advanceIterator();
+ return sourceIterator.hasNext();
+ }
- /**
- * Get the column family for the unique fields
- */
- protected abstract MultiTenantColumnFamily<ScopedRowKey<FieldKey>, EntityVersion> getUniqueValuesCF();
+ return sourceIterator.hasNext();
+ }
+ @Override
+ public UniqueValue next() {
- /**
- * Generate a key that is compatible with the column family
- *
- * @param applicationId The applicationId
- * @param type The type in the field
- * @param field The field we're creating the key for
- */
- protected abstract FieldKey createUniqueValueKey(final Id applicationId, final String type, final Field field );
+ com.datastax.driver.core.Row next = sourceIterator.next();
- /**
- * Parse the row key into the field
- * @param rowKey
- * @return
- */
- protected abstract Field parseRowKey(final ScopedRowKey<FieldKey> rowKey);
+ ByteBuffer column = next.getBytesUnsafe("column1");
+ List<Object> columnContents = deserializeUniqueValueLogColumn(column);
- /**
- * Get the column family for the unique field CF
- */
- protected abstract MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry> getEntityUniqueLogCF();
+ UUID version = (UUID) columnContents.get(0);
+ String name = (String) columnContents.get(1);
+ String value = (String) columnContents.get(2);
+ FieldTypeName fieldType = FieldTypeName.valueOf((String) columnContents.get(3));
- /**
- * Generate a key that is compatible with the column family
- *
- * @param applicationId The applicationId
- * @param uniqueValueId The uniqueValue
- */
- protected abstract EntityKey createEntityUniqueLogKey(final Id applicationId, final Id uniqueValueId );
+ return new UniqueValueImpl(getField(name, value, fieldType), entityId, version);
+
+ }
+
+ private void advanceIterator() {
+
+ sourceIterator = session.execute(query).iterator();
+ }
+ }
+
+ private Field getField( String name, String value, FieldTypeName fieldType){
+
+ Field field = null;
+
+ switch ( fieldType ) {
+ case BOOLEAN:
+ field = new BooleanField( name, Boolean.parseBoolean( value ) );
+ break;
+ case DOUBLE:
+ field = new DoubleField( name, Double.parseDouble( value ) );
+ break;
+ case FLOAT:
+ field = new FloatField( name, Float.parseFloat( value ) );
+ break;
+ case INTEGER:
+ field = new IntegerField( name, Integer.parseInt( value ) );
+ break;
+ case LONG:
+ field = new LongField( name, Long.parseLong( value ) );
+ break;
+ case STRING:
+ field = new StringField( name, value );
+ break;
+ case UUID:
+ field = new UUIDField( name, UUID.fromString( value ) );
+ break;
+ }
+
+ return field;
+
+ }
+
+ private class UniqueValueComparator implements Comparator<UniqueValue> {
+
+ @Override
+ public int compare(UniqueValue o1, UniqueValue o2) {
+
+ if( o1.getEntityId().getUuid().equals(o2.getEntityId().getUuid())){
+
+ return 0;
+
+ }else if( o1.getEntityId().getUuid().timestamp() < o2.getEntityId().getUuid().timestamp()){
+
+ return -1;
+
+ }
+
+ // if the UUIDs are not equal and o1's timestamp is not less than o2's timestamp,
+ // then o1 must be greater than o2
+ return 1;
+
+
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
index 4b5653f,f971b23..61f0f80
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
@@@ -91,10 -113,23 +91,24 @@@ public class UniqueValueSerializationSt
return migration.to.load( applicationScope, type, fields );
}
+ @Override
+ public UniqueValueSet load( final ApplicationScope applicationScope, final String type,
- final Collection<Field> fields, boolean useReadRepair ) throws ConnectionException {
++ final Collection<Field> fields, boolean useReadRepair ) {
+
+ final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip();
+
+ if ( migration.needsMigration() ) {
+ return migration.from.load( applicationScope, type, fields, useReadRepair );
+ }
+
+ return migration.to.load( applicationScope, type, fields, useReadRepair );
+ }
+
@Override
- public UniqueValueSet load(final ApplicationScope applicationScope, final ConsistencyLevel consistencyLevel,
- final String type, final Collection<Field> fields, boolean useReadRepair) throws ConnectionException {
+ public UniqueValueSet load( final ApplicationScope applicationScope, final ConsistencyLevel consistencyLevel,
- final String type, final Collection<Field> fields ) {
++ final String type, final Collection<Field> fields, boolean useReadRepair ) {
++
final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
index d305044,6a1cb58..55ba011
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
@@@ -133,280 -114,40 +133,286 @@@ public class UniqueValueSerializationSt
@Override
- protected CollectionPrefixedKey<Field> createUniqueValueKey( final Id applicationId,
- final String type, final Field field) {
+ protected List<Object> deserializePartitionKey(ByteBuffer bb){
+
+
+ /**
+ * List<Object> keys = new ArrayList<>(8);
+ keys.add(0, appUUID);
+ keys.add(1, applicationType);
+ keys.add(2, appUUID);
+ keys.add(3, applicationType);
+ keys.add(4, entityType);
+ keys.add(5, fieldType);
+ keys.add(6, fieldName);
+ keys.add(7, fieldValueString);
+
+ */
+
+ int count = 0;
+ List<Object> stuff = new ArrayList<>();
+ while(bb.hasRemaining()){
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
+ if(count == 0 || count == 2){
+ stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }else{
+ stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+ count++;
+ }
+
+ return stuff;
+
+ }
+
+ @Override
+ protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
+
+ /**
+ * final UUID version = value.getVersion();
+ final Field<?> field = value.getField();
+
+ final FieldTypeName fieldType = field.getTypeName();
+ final String fieldValue = field.getValue().toString().toLowerCase();
+
+
+ DynamicComposite composite = new DynamicComposite( );
+
+ //we want to sort ascending to descending by version
+ composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
+ composite.addComponent( field.getName(), STRING_SERIALIZER );
+ composite.addComponent( fieldValue, STRING_SERIALIZER );
+ composite.addComponent( fieldType.name() , STRING_SERIALIZER);
+ */
+
+ // values are serialized as strings, not sure why, and always lower cased
+ String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase();
+
+
+ List<Object> keys = new ArrayList<>(4);
+ keys.add(fieldEntry.getVersion());
+ keys.add(fieldEntry.getField().getName());
+ keys.add(fieldValueString);
+ keys.add(fieldEntry.getField().getTypeName().name());
+
+ String comparator = UUID_TYPE_REVERSED;
+
+ int size = 16+fieldEntry.getField().getName().length()+fieldEntry.getField().getValue().toString().length()+
+ fieldEntry.getField().getTypeName().name().length();
+
+ // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
+ size += keys.size()*5;
+
+ // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow
+ size += keys.size()*comparator.length();
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+
+ for (Object key : keys) {
+ if(key.equals(fieldEntry.getVersion())) {
+ int p = comparator.indexOf("(reversed=true)");
+ boolean desc = false;
+ if (p >= 0) {
+ comparator = comparator.substring(0, p);
+ desc = true;
+ }
- final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( type );
+ byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data
+ if (desc) {
+ a = (byte) Character.toUpperCase((char) a);
+ }
+ stuff.putShort((short) ('\u8000' | a));
+ }else{
+ comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here
+ stuff.putShort((short)comparator.length());
+ stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+ }
- final CollectionPrefixedKey<Field> uniquePrefixedKey =
- new CollectionPrefixedKey<>( collectionName, applicationId, field );
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ // put a short that indicates how big the buffer is for this item
+ stuff.putShort((short) kb.remaining());
+
+ // put the actual item
+ stuff.put(kb.slice());
+
+ // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+ stuff.put((byte) 0);
+
+
+ }
+
+ stuff.flip();
+ return stuff.duplicate();
- return uniquePrefixedKey;
}
+ @Override
+ protected ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ){
+
+ return serializeKey(applicationId.getUuid(), applicationId.getType(),
+ entityType, fieldType, fieldName, fieldValue);
+
+ }
+
+ @Override
+ protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId){
+
+ return serializeLogKey(applicationId.getUuid(), applicationId.getType(),
+ uniqueValueId.getUuid(), uniqueValueId.getType());
+
+ }
@Override
- protected Field parseRowKey( final ScopedRowKey<CollectionPrefixedKey<Field>> rowKey ) {
- return rowKey.getKey().getSubKey();
+ protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){
+
+ /**
+ * final Id entityId = ev.getEntityId();
+ final UUID entityUuid = entityId.getUuid();
+ final String entityType = entityId.getType();
+
+ CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
+
+ builder.addUUID( entityVersion );
+ builder.addUUID( entityUuid );
+ builder.addString(entityType );
+ */
+
+ String comparator = "UTF8Type";
+
+ List<Object> keys = new ArrayList<>(3);
+ keys.add(entityVersion.getEntityVersion());
+ keys.add(entityVersion.getEntityId().getUuid());
+ keys.add(entityVersion.getEntityId().getType());
+
+ // UUIDs are 16 bytes
+ int size = 16+16+entityVersion.getEntityId().getType().length();
+
+ // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
+ size += keys.size()*5;
+
+ // we always add comparator to the buffer as well
+ size += keys.size()*comparator.length();
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ if(key instanceof UUID){
+ comparator = "UUIDType";
+ }else{
+ comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text
+ }
+
+ stuff.putShort((short)comparator.length());
+ stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ // put a short that indicates how big the buffer is for this item
+ stuff.putShort((short) kb.remaining());
+
+ // put the actual item
+ stuff.put(kb.slice());
+
+ // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+ stuff.put((byte) 0);
+
+
+ }
+
+ stuff.flip();
+ return stuff.duplicate();
+
}
+ @Override
+ protected List<Object> deserializeUniqueValueColumn(ByteBuffer bb){
+
+ List<Object> stuff = new ArrayList<>();
+ int count = 0;
+ while(bb.hasRemaining()){
+
- // custom columns have a short at beginning for comparator (which we don't use here )
- ByteBuffer comparator = CQLUtils.getWithShortLength(bb);
++ // pull of custom comparator (per Astyanax deserialize)
++ int e = CQLUtils.getShortLength(bb);
++ if((e & '\u8000') == 0) {
++ CQLUtils.getBytes(bb, e);
++ } else {
++ // do nothing
++ }
+
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
+
+
+ // first two composites are UUIDs, rest are strings
+ if(count == 0) {
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else if(count ==1){
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else{
+ stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+ count++;
+ }
+
+ return stuff;
+
+ }
@Override
- protected CollectionPrefixedKey<Id> createEntityUniqueLogKey( final Id applicationId,
- final Id uniqueValueId ) {
+ protected List<Object> deserializeUniqueValueLogColumn(ByteBuffer bb){
+
+
+ /**
+ * List<Object> keys = new ArrayList<>(4);
+ keys.add(fieldEntry.getVersion());
+ keys.add(fieldEntry.getField().getName());
+ keys.add(fieldValueString);
+ keys.add(fieldEntry.getField().getTypeName().name());
+ */
+ List<Object> stuff = new ArrayList<>();
+ int count = 0;
+ while(bb.hasRemaining()){
- // the comparator info is different for the UUID reversed type vs. UTF8 type
- if(count ==0){
- bb.getShort(); // take the reversed comparator byte off
- }else {
- ByteBuffer comparator = CQLUtils.getWithShortLength(bb);
- final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( uniqueValueId.getType() );
++ // pull of custom comparator (per Astyanax deserialize)
++ int e = CQLUtils.getShortLength(bb);
++ if((e & '\u8000') == 0) {
++ CQLUtils.getBytes(bb, e);
++ } else {
++ // do nothing
+ }
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
- final CollectionPrefixedKey<Id> collectionPrefixedEntityKey =
- new CollectionPrefixedKey<>( collectionName, applicationId, uniqueValueId );
+ // first composite is a UUID, rest are strings
+ if(count == 0) {
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else{
+ stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+ count++;
+ }
+
+ return stuff;
- return collectionPrefixedEntityKey;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
index 6ea5c1e,40622a4..92c0a5b
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
@@@ -20,20 -20,21 +20,21 @@@
package org.apache.usergrid.persistence.collection.serialization.impl;
-import java.util.Arrays;
-import java.util.Collection;
+import java.nio.ByteBuffer;
+import java.util.*;
-import org.apache.cassandra.db.marshal.BytesType;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.Session;
++import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
-import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
-import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
+import org.apache.usergrid.persistence.core.CassandraConfig;
+import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.field.Field;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@@ -124,277 -114,20 +125,284 @@@ public class UniqueValueSerializationSt
@Override
- protected TypeField createUniqueValueKey( final Id applicationId, final String type, final Field field) {
- return new TypeField(type,field);
+ protected List<Object> deserializePartitionKey(ByteBuffer bb){
+
+
+ /**
+ * List<Object> keys = new ArrayList<>(6);
+ keys.add(0, appUUID); // UUID
+ keys.add(1, applicationType); // String
+ keys.add(2, entityType); // String
+ keys.add(3, fieldType); // String
+ keys.add(4, fieldName); // String
+ keys.add(5, fieldValueString); // String
+
+ */
+
+ List<Object> stuff = new ArrayList<>();
+ while(bb.hasRemaining()){
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
+ if(stuff.size() == 0){
+ stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }else{
+ stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+ }
+
+ return stuff;
+
}
+ @Override
+ protected ByteBuffer serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
+
+ /**
+ * final UUID version = value.getVersion();
+ final Field<?> field = value.getField();
+
+ final FieldTypeName fieldType = field.getTypeName();
+ final String fieldValue = field.getValue().toString().toLowerCase();
+
+
+ DynamicComposite composite = new DynamicComposite( );
+
+ //we want to sort ascending to descending by version
+ composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
+ composite.addComponent( field.getName(), STRING_SERIALIZER );
+ composite.addComponent( fieldValue, STRING_SERIALIZER );
+ composite.addComponent( fieldType.name() , STRING_SERIALIZER);
+ */
+
+ // values are serialized as strings, not sure why, and always lower cased
+ String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase();
+
+
+ List<Object> keys = new ArrayList<>(4);
+ keys.add(fieldEntry.getVersion());
+ keys.add(fieldEntry.getField().getName());
+ keys.add(fieldValueString);
+ keys.add(fieldEntry.getField().getTypeName().name());
+
+ String comparator = UUID_TYPE_REVERSED;
+
+ int size = 16+fieldEntry.getField().getName().length()+fieldEntry.getField().getValue().toString().length()+
+ fieldEntry.getField().getTypeName().name().length();
+
+ // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
+ size += keys.size()*5;
+
+ // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow
+ size += keys.size()*comparator.length();
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+
+ for (Object key : keys) {
+
+ if(key.equals(fieldEntry.getVersion())) {
+ int p = comparator.indexOf("(reversed=true)");
+ boolean desc = false;
+ if (p >= 0) {
+ comparator = comparator.substring(0, p);
+ desc = true;
+ }
+
+ byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data
+ if (desc) {
+ a = (byte) Character.toUpperCase((char) a);
+ }
+
+ stuff.putShort((short) ('\u8000' | a));
+ }else{
+ comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here
+ stuff.putShort((short)comparator.length());
+ stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+ }
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ // put a short that indicates how big the buffer is for this item
+ stuff.putShort((short) kb.remaining());
+
+ // put the actual item
+ stuff.put(kb.slice());
+
+ // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+ stuff.put((byte) 0);
+
+
+ }
+
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
@Override
- protected Field parseRowKey( final ScopedRowKey<TypeField> rowKey ) {
- return rowKey.getKey().getField();
+ protected ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ){
+
+ return serializeKey(applicationId.getUuid(), applicationId.getType(),
+ entityType, fieldType, fieldName, fieldValue);
+
+ }
+
+ @Override
+ protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId){
+
+ return serializeLogKey(applicationId.getUuid(), applicationId.getType(),
+ uniqueValueId.getUuid(), uniqueValueId.getType());
+
+ }
+
+ @Override
+ protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){
+
+ /**
+ * final Id entityId = ev.getEntityId();
+ final UUID entityUuid = entityId.getUuid();
+ final String entityType = entityId.getType();
+
+ CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
+
+ builder.addUUID( entityVersion );
+ builder.addUUID( entityUuid );
+ builder.addString(entityType );
+ */
+
+ String comparator = "UTF8Type";
+
+ List<Object> keys = new ArrayList<>(3);
+ keys.add(entityVersion.getEntityVersion());
+ keys.add(entityVersion.getEntityId().getUuid());
+ keys.add(entityVersion.getEntityId().getType());
+
+ // UUIDs are 16 bytes
+ int size = 16+16+entityVersion.getEntityId().getType().length();
+
+ // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
+ size += keys.size()*5;
+
+ // we always add comparator to the buffer as well
+ size += keys.size()*comparator.length();
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
++ // custom comparator mappings in CQLUtils.COMPOSITE_TYPE ( more leftover from Asytanax )
+ if(key instanceof UUID){
+ comparator = "UUIDType";
+ }else{
+ comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text
+ }
+
+ stuff.putShort((short)comparator.length());
+ stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ // put a short that indicates how big the buffer is for this item
+ stuff.putShort((short) kb.remaining());
+
+ // put the actual item
+ stuff.put(kb.slice());
+
+ // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+ stuff.put((byte) 0);
+
+
+ }
+
+ stuff.flip();
+ return stuff.duplicate();
+
}
+ @Override
+ protected List<Object> deserializeUniqueValueColumn(ByteBuffer bb){
+
+ List<Object> stuff = new ArrayList<>();
+ int count = 0;
+ while(bb.hasRemaining()){
+
- // custom columns have a short at beginning for comparator (which we don't use here )
- ByteBuffer comparator = CQLUtils.getWithShortLength(bb);
++ // pull of custom comparator (per Astyanax deserialize)
++ int e = CQLUtils.getShortLength(bb);
++ if((e & '\u8000') == 0) {
++ CQLUtils.getBytes(bb, e);
++ } else {
++ // do nothing
++ }
++
+
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
+
+
+ // first two composites are UUIDs, rest are strings
+ if(count == 0) {
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else if(count ==1){
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else{
+ stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+ count++;
+ }
+
+ return stuff;
+
+ }
@Override
- protected Id createEntityUniqueLogKey( final Id applicationId, final Id uniqueValueId ) {
- return uniqueValueId;
+ protected List<Object> deserializeUniqueValueLogColumn(ByteBuffer bb){
+
+
+ /**
+ * List<Object> keys = new ArrayList<>(4);
+ keys.add(fieldEntry.getVersion());
+ keys.add(fieldEntry.getField().getName());
+ keys.add(fieldValueString);
+ keys.add(fieldEntry.getField().getTypeName().name());
+ */
+
+ List<Object> stuff = new ArrayList<>();
+ int count = 0;
+ while(bb.hasRemaining()){
+
- // the comparator info is different for the UUID reversed type vs. UTF8 type
- if(count ==0){
- bb.getShort(); // take the reversed comparator byte off
- }else {
- ByteBuffer comparator = CQLUtils.getWithShortLength(bb);
++ int e = CQLUtils.getShortLength(bb);
++ if((e & '\u8000') == 0) {
++ CQLUtils.getBytes(bb, e);
++ } else {
++ // do nothing
+ }
+
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
+
+
+ // first composite is a UUID, rest are strings
+ if(count == 0) {
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else{
+ stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+ count++;
+ }
+
+ return stuff;
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
index 0000000,2cad32c..ed88ba6
mode 000000,100644..100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
@@@ -1,0 -1,94 +1,101 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.usergrid.persistence.collection.uniquevalues;
+
++import com.datastax.driver.core.BatchStatement;
++import com.datastax.driver.core.Session;
++import com.datastax.driver.core.querybuilder.Batch;
+ import com.google.inject.Inject;
+ import com.google.inject.Singleton;
+ import com.netflix.astyanax.MutationBatch;
+ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+ import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+ import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+ import org.apache.usergrid.persistence.model.entity.Id;
+ import org.apache.usergrid.persistence.model.field.Field;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Iterator;
+ import java.util.UUID;
+
+
+ @Singleton
+ public class UniqueValuesTableImpl implements UniqueValuesTable {
+ private static final Logger logger = LoggerFactory.getLogger( UniqueValuesTableImpl.class );
+
- final UniqueValueSerializationStrategy strat;
- final UniqueValuesFig uniqueValuesFig;
++ private final UniqueValueSerializationStrategy strat;
++ private final UniqueValuesFig uniqueValuesFig;
++ private final Session session;
+
+ @Inject
- public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat, UniqueValuesFig uniqueValuesFig) {
++ public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat,
++ final UniqueValuesFig uniqueValuesFig,
++ final Session session ) {
+ this.strat = strat;
+ this.uniqueValuesFig = uniqueValuesFig;
++ this.session = session;
+ }
+
+
+ @Override
+ public Id lookupOwner( ApplicationScope scope, String type, Field field) throws ConnectionException {
+
+ UniqueValueSet set = strat.load( scope, type, Collections.singletonList( field ) );
+ UniqueValue uv = set.getValue( field.getName() );
+ return uv == null ? null : uv.getEntityId();
+ }
+
+ @Override
+ public void reserve( ApplicationScope scope, Id owner, UUID version, Field field ) throws ConnectionException {
+
+ UniqueValue uv = new UniqueValueImpl( field, owner, version);
- final MutationBatch write = strat.write( scope, uv, uniqueValuesFig.getUniqueValueReservationTtl() );
- write.execute();
++ final BatchStatement statement = strat.writeCQL( scope, uv, uniqueValuesFig.getUniqueValueReservationTtl() );
++ session.execute(statement);
+ }
+
+ @Override
+ public void confirm( ApplicationScope scope, Id owner, UUID version, Field field) throws ConnectionException {
+
+ UniqueValue uv = new UniqueValueImpl( field, owner, version);
- final MutationBatch write = strat.write( scope, uv );
- write.execute();
++ final BatchStatement statement = strat.writeCQL( scope, uv, -1 );
++ session.execute(statement);
+
+ }
+
+ @Override
+ public void cancel( ApplicationScope scope, Id owner, UUID version, Field field) throws ConnectionException {
+
+ UniqueValue uv = new UniqueValueImpl( field, owner, version );
- final MutationBatch write = strat.delete( scope, uv );
- write.execute();
++ final BatchStatement statement = strat.deleteCQL( scope, uv );
++ session.execute(statement);
+ }
+
+ @Override
+ public Iterator<UniqueValue> getUniqueValues(ApplicationScope scope, Id entityId) {
+ return strat.getAllUniqueFields( scope, entityId );
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
index b18b095,89169ac..f98a3ea
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
@@@ -82,13 -71,12 +82,14 @@@ public class MarkCommitTest extends Abs
//run the stage
- WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session );
- WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null);
++ WriteCommit newStage
++ = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null, session);
+
- //verify the observable is correct
- Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
-
+ //verify the observable is correct
+ Entity result = newStage.call(
+ new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
//verify the log entry is correct
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
index 60281d4,dcc473c..df0fc9e
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
@@@ -92,10 -84,12 +92,13 @@@ public class WriteCommitTest extends Ab
//run the stage
- WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session );
+ WriteCommit newStage =
- new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null );
++ new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null, session );
+
- Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
+
+ Entity result = newStage.call(
+ new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
//verify the log entry is correct
@@@ -142,7 -133,7 +145,8 @@@
when( mvccEntityStrategy.write( any( ApplicationScope.class ), any( MvccEntity.class ) ) )
.thenReturn( entityMutation );
- new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session ).call( event );
- new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null ).call( event );
++ new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null, session ).call( event );
++
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
index 9d0cd20,401d23e..87226be
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyIT.java
@@@ -18,12 -18,9 +18,10 @@@
package org.apache.usergrid.persistence.collection.mvcc.stage.write;
- import org.junit.Rule;
- import org.junit.Test;
- import org.junit.runner.RunWith;
-
- import org.apache.usergrid.persistence.collection.EntityCollectionManager;
- import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
++import com.datastax.driver.core.Session;
+ import com.google.inject.Inject;
+ import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+ import org.apache.usergrid.persistence.collection.*;
import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
import org.apache.usergrid.persistence.collection.mvcc.stage.TestEntityGenerator;
@@@ -60,8 -70,25 +71,28 @@@ public class WriteUniqueVerifyIT extend
public MigrationManagerRule migrationManagerRule;
@Inject
+ public UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+
+ @Inject
public EntityCollectionManagerFactory cmf;
+ @Inject
+ ActorSystemManager actorSystemManager;
+
+ @Inject
+ UniqueValuesService uniqueValuesService;
+
++ @Inject
++ Session session;
++
+
+ @Before
+ public void initAkka() {
+ // each test class needs unique port number
+ initAkka( 2553, actorSystemManager, uniqueValuesService );
+ }
+
+
@Test
public void testConflict() {
@@@ -137,9 -162,69 +166,69 @@@
entity.setField(new StringField("name", "Alfa Romeo 8C Competizione", true));
entity.setField(new StringField("identifier", "ar8c", true));
entity.setField(new IntegerField("top_speed_mph", 182));
- entityManager.write( entity ).toBlocking().last();
+ entityManager.write( entity, null ).toBlocking().last();
entity.setField( new StringField("foo", "bar"));
- entityManager.write( entity ).toBlocking().last();
+ entityManager.write( entity, null ).toBlocking().last();
+ }
+
+ @Test
+ public void testConflictReadRepair() throws Exception {
+
+ final Id appId = new SimpleId("testNoConflict");
+
+
+
+ final ApplicationScope scope = new ApplicationScopeImpl( appId);
+
+ final EntityCollectionManager entityManager = cmf.createCollectionManager( scope );
+
+ final Entity entity = TestEntityGenerator.generateEntity();
+ entity.setField(new StringField("name", "Porsche 911 GT3", true));
+ entity.setField(new StringField("identifier", "911gt3", true));
+ entity.setField(new IntegerField("top_speed_mph", 194));
+ entityManager.write( entity, null ).toBlocking().last();
+
+
+ FieldSet fieldSet =
+ entityManager.getEntitiesFromFields("test", Collections.singletonList(entity.getField("name")), true)
+ .toBlocking().last();
+
+ MvccEntity entityFetched = fieldSet.getEntity( entity.getField("name") );
+
+
+ final Entity entityDuplicate = TestEntityGenerator.generateEntity();
+ UniqueValue uniqueValue = new UniqueValueImpl(new StringField("name", "Porsche 911 GT3", true),
+ entityDuplicate.getId(), UUIDGenerator.newTimeUUID());
+
+ // manually insert a record to simulate a 'duplicate' trying to be inserted
- uniqueValueSerializationStrategy.
- write(scope, uniqueValue).execute();
++ session.execute(uniqueValueSerializationStrategy.
++ writeCQL(scope, uniqueValue, -1));
+
+
+
+ FieldSet fieldSetAgain =
+ entityManager.getEntitiesFromFields("test", Collections.singletonList(entity.getField("name")), true)
+ .toBlocking().last();
+
+ MvccEntity entityFetchedAgain = fieldSetAgain.getEntity( entity.getField("name") );
+
+ assertEquals(entityFetched, entityFetchedAgain);
+
+
+ // now test writing the original entity again ( simulates a PUT )
+ // this should read repair and work
+ entityManager.write( entity, null ).toBlocking().last();
+
+ FieldSet fieldSetAgainAgain =
+ entityManager.getEntitiesFromFields("test", Collections.singletonList(entity.getField("name")), true)
+ .toBlocking().last();
+
+ MvccEntity entityFetchedAgainAgain = fieldSetAgainAgain.getEntity( entity.getField("name") );
+
+ assertEquals(entityFetched, entityFetchedAgainAgain);
+
+
+
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b23c20a2/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
index 3ddc14d,7afba05..1290a5c
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
@@@ -18,17 -18,20 +18,27 @@@
package org.apache.usergrid.persistence.collection.mvcc.stage.write;
++
+import com.datastax.driver.core.Session;
- import org.junit.Rule;
- import org.junit.Test;
- import org.junit.runner.RunWith;
++
+ import com.google.inject.Inject;
+ import com.netflix.astyanax.Keyspace;
+ import com.netflix.astyanax.MutationBatch;
+ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+ import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
+ import org.apache.usergrid.persistence.collection.AbstractUniqueValueTest;
+ import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
++
+import org.apache.usergrid.persistence.core.CassandraConfig;
++
+ import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
++
import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.test.ITRunner;
@@@ -84,12 -96,11 +106,12 @@@ public class WriteUniqueVerifyTest exte
final MvccEntity mvccEntity = fromEntity( entity );
// run the stage
- WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace,cassandraConfig, session );
- WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace, cassandraConfig, null, null, null );
++ WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace, cassandraConfig, null, null, null, session );
++
- newStage.call(
- new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ;
+ newStage.call( new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ;
- //if we get here, it's a success. We want to test no exceptions are thrown
+ // if we get here, it's a success. We want to test no exceptions are thrown
verify(batch, never()).execute();
}