You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/08/17 21:48:35 UTC
[28/38] usergrid git commit: Initial UniqueValueSerialization
conversion to CQL.
Initial UniqueValueSerialization conversion to CQL.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0c609878
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0c609878
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0c609878
Branch: refs/heads/master
Commit: 0c609878e1eccd35f31bc4fdc86bd3fe9da21593
Parents: ff3f7e8
Author: Michael Russo <mr...@apigee.com>
Authored: Fri May 6 22:36:37 2016 +0800
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri May 6 22:36:37 2016 +0800
----------------------------------------------------------------------
.../mvcc/stage/write/WriteCommit.java | 18 +-
.../mvcc/stage/write/WriteUniqueVerify.java | 23 +-
.../UniqueValueSerializationStrategy.java | 6 +-
.../UniqueValueSerializationStrategyImpl.java | 287 ++++++++++++-
...iqueValueSerializationStrategyProxyImpl.java | 31 +-
.../UniqueValueSerializationStrategyV1Impl.java | 410 ++++++++++++++++++-
.../UniqueValueSerializationStrategyV2Impl.java | 379 ++++++++++++++++-
.../migration/MvccEntityDataMigrationImpl.java | 26 +-
.../mvcc/stage/delete/MarkCommitTest.java | 13 +-
.../mvcc/stage/write/WriteCommitTest.java | 15 +-
.../mvcc/stage/write/WriteUniqueVerifyTest.java | 6 +-
...niqueValueSerializationStrategyImplTest.java | 41 +-
...ctMvccEntityDataMigrationV1ToV3ImplTest.java | 5 +-
.../core/datastax/impl/DataStaxClusterImpl.java | 3 +
14 files changed, 1169 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 7eb96e7..cfac8e4 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.write;
import java.util.UUID;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,11 +69,14 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
private final MvccEntitySerializationStrategy entityStrat;
+ private final Session session;
+
@Inject
public WriteCommit( final MvccLogEntrySerializationStrategy logStrat,
final MvccEntitySerializationStrategy entryStrat,
- final UniqueValueSerializationStrategy uniqueValueStrat) {
+ final UniqueValueSerializationStrategy uniqueValueStrat,
+ final Session session) {
Preconditions.checkNotNull( logStrat, "MvccLogEntrySerializationStrategy is required" );
Preconditions.checkNotNull( entryStrat, "MvccEntitySerializationStrategy is required" );
@@ -80,6 +85,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
this.logEntryStrat = logStrat;
this.entityStrat = entryStrat;
this.uniqueValueStrat = uniqueValueStrat;
+ this.session = session;
}
@@ -103,6 +109,8 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE );
+
+
MutationBatch logMutation = logEntryStrat.write( applicationScope, startEntry );
// now get our actual insert into the entity data
@@ -112,21 +120,23 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
logMutation.mergeShallow( entityMutation );
// re-write the unique values but this time with no TTL
+ final BatchStatement uniqueBatch = new BatchStatement();
+
for ( Field field : EntityUtils.getUniqueFields(mvccEntity.getEntity().get()) ) {
UniqueValue written = new UniqueValueImpl( field,
entityId,version);
- MutationBatch mb = uniqueValueStrat.write(applicationScope, written );
+ uniqueBatch.add(uniqueValueStrat.writeCQL(applicationScope, written, -1 ));
logger.debug("Finalizing {} unique value {}", field.getName(), field.getValue().toString());
- // merge into our existing mutation batch
- logMutation.mergeShallow( mb );
+
}
try {
logMutation.execute();
+ session.execute(uniqueBatch);
}
catch ( ConnectionException e ) {
logger.error( "Failed to execute write asynchronously ", e );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index 585c26e..8e0b202 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,14 +73,20 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
protected final SerializationFig serializationFig;
protected final Keyspace keyspace;
+
+ protected final Session session;
+
private final CassandraConfig cassandraFig;
@Inject
public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy,
- final SerializationFig serializationFig, final Keyspace keyspace, final CassandraConfig cassandraFig ) {
+ final SerializationFig serializationFig, final Keyspace keyspace,
+ final CassandraConfig cassandraFig, final Session session ) {
+
this.keyspace = keyspace;
this.cassandraFig = cassandraFig;
+ this.session = session;
Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" );
Preconditions.checkNotNull( serializationFig, "serializationFig is required" );
@@ -101,7 +109,7 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
final ApplicationScope scope = ioevent.getEntityCollection();
- final MutationBatch batch = keyspace.prepareMutationBatch();
+ final BatchStatement batch = new BatchStatement();
//allocate our max size, worst case
final List<Field> uniqueFields = new ArrayList<>( entity.getFields().size() );
@@ -119,9 +127,8 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
final UniqueValue written = new UniqueValueImpl( field, mvccEntity.getId(), mvccEntity.getVersion() );
// use TTL in case something goes wrong before entity is finally committed
- final MutationBatch mb = uniqueValueStrat.write( scope, written, serializationFig.getTimeout() );
+ batch.add(uniqueValueStrat.writeCQL( scope, written, serializationFig.getTimeout() ));
- batch.mergeShallow( mb );
uniqueFields.add(field);
}
@@ -131,12 +138,8 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
}
//perform the write
- try {
- batch.execute();
- }
- catch ( ConnectionException ex ) {
- throw new RuntimeException( "Unable to write to cassandra", ex );
- }
+ session.execute(batch);
+
// use simple thread pool to verify fields in parallel
ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,cassandraFig,scope, entity.getId().getType(), uniqueFields,entity);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
index 3645107..56e8b87 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@ -21,6 +21,8 @@ package org.apache.usergrid.persistence.collection.serialization;
import java.util.Collection;
import java.util.Iterator;
+import com.datastax.driver.core.BatchStatement;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl;
import org.apache.usergrid.persistence.core.migration.data.VersionedData;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -46,7 +48,6 @@ public interface UniqueValueSerializationStrategy extends Migration, VersionedDa
*
* @return MutatationBatch that encapsulates operation, caller may or may not execute.
*/
- MutationBatch write( ApplicationScope applicationScope, UniqueValue uniqueValue );
/**
* Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
@@ -56,7 +57,8 @@ public interface UniqueValueSerializationStrategy extends Migration, VersionedDa
* @param timeToLive How long object should live in seconds. -1 implies store forever
* @return MutatationBatch that encapsulates operation, caller may or may not execute.
*/
- MutationBatch write( ApplicationScope applicationScope, UniqueValue uniqueValue, int timeToLive );
+
+ BatchStatement writeCQL(ApplicationScope applicationScope, UniqueValue uniqueValue, int timeToLive );
/**
* Load UniqueValue that matches field from collection or null if that value does not exist.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index 0f27167..27a8609 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@ -18,9 +18,19 @@
package org.apache.usergrid.persistence.collection.serialization.impl;
+import java.nio.ByteBuffer;
import java.util.*;
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Using;
+import com.netflix.astyanax.model.*;
+import com.netflix.astyanax.util.RangeBuilder;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,18 +50,13 @@ import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.field.Field;
import com.google.common.base.Preconditions;
import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.model.ConsistencyLevel;
-import com.netflix.astyanax.model.Row;
import com.netflix.astyanax.query.RowQuery;
-import com.netflix.astyanax.util.RangeBuilder;
/**
@@ -62,6 +67,9 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
private static final Logger log = LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class );
+ public static final String UUID_TYPE_REVERSED = "UUIDType(reversed=true)";
+
+
private final MultiTenantColumnFamily<ScopedRowKey<FieldKey>, EntityVersion>
CF_UNIQUE_VALUES;
@@ -70,6 +78,15 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
private final MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry>
CF_ENTITY_UNIQUE_VALUE_LOG ;
+ private final String TABLE_UNIQUE_VALUES;
+ private final String TABLE_UNIQUE_VALUES_LOG;
+
+
+ private final Map COLUMNS_UNIQUE_VALUES;
+ private final Map COLUMNS_UNIQUE_VALUES_LOG;
+
+
+
public static final int COL_VALUE = 0x0;
@@ -77,6 +94,9 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
protected final Keyspace keyspace;
private final CassandraFig cassandraFig;
+ private final Session session;
+ private final CassandraConfig cassandraConfig;
+
/**
* Construct serialization strategy for keyspace.
@@ -86,13 +106,24 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
* @param serializationFig The serialization configuration
*/
public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final CassandraFig cassandraFig,
- final SerializationFig serializationFig ) {
+ final SerializationFig serializationFig,
+ final Session session, final CassandraConfig cassandraConfig) {
this.keyspace = keyspace;
this.cassandraFig = cassandraFig;
this.serializationFig = serializationFig;
+ this.session = session;
+ this.cassandraConfig = cassandraConfig;
+
CF_UNIQUE_VALUES = getUniqueValuesCF();
CF_ENTITY_UNIQUE_VALUE_LOG = getEntityUniqueLogCF();
+
+ TABLE_UNIQUE_VALUES = getUniqueValuesTable().getTableName();
+ TABLE_UNIQUE_VALUES_LOG = getEntityUniqueLogTable().getTableName();
+
+ COLUMNS_UNIQUE_VALUES = getUniqueValuesTable().getColumns();
+ COLUMNS_UNIQUE_VALUES_LOG = getEntityUniqueLogTable().getColumns();
+
}
@@ -129,7 +160,6 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
}
- @Override
public MutationBatch write( final ApplicationScope collectionScope, final UniqueValue value,
final int timeToLive ) {
@@ -163,6 +193,86 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
} );
}
+ @Override
+ public BatchStatement writeCQL( final ApplicationScope collectionScope, final UniqueValue value,
+ final int timeToLive ){
+
+
+ Preconditions.checkNotNull( value, "value is required" );
+
+ BatchStatement batch = new BatchStatement();
+
+ Using ttl = null;
+ if(timeToLive > 0){
+
+ ttl = QueryBuilder.ttl(timeToLive);
+
+ }
+
+ final Id entityId = value.getEntityId();
+ final UUID entityVersion = value.getEntityVersion();
+ final Field<?> field = value.getField();
+
+ ValidationUtils.verifyIdentity( entityId );
+ ValidationUtils.verifyVersion( entityVersion );
+
+ final EntityVersion ev = new EntityVersion( entityId, entityVersion );
+ final UniqueFieldEntry uniqueFieldEntry = new UniqueFieldEntry( entityVersion, field );
+
+ ByteBuffer partitionKey = getPartitionKey(collectionScope.getApplication(), value.getEntityId().getType(),
+ field.getTypeName().toString(), field.getName(), field.getValue());
+
+ ByteBuffer logPartitionKey = getLogPartitionKey(collectionScope.getApplication(), value.getEntityId());
+
+
+ if(ttl != null) {
+
+ Statement uniqueValueStatement = QueryBuilder.insertInto(TABLE_UNIQUE_VALUES)
+ .value("key", partitionKey)
+ .value("column1", serializeUniqueValueColumn(ev))
+ .value("value", DataType.serializeValue(COL_VALUE, ProtocolVersion.NEWEST_SUPPORTED))
+ .using(ttl);
+
+ batch.add(uniqueValueStatement);
+
+
+ }else{
+
+ Statement uniqueValueStatement = QueryBuilder.insertInto(TABLE_UNIQUE_VALUES)
+ .value("key", partitionKey)
+ .value("column1", serializeUniqueValueColumn(ev))
+ .value("value", DataType.serializeValue(COL_VALUE, ProtocolVersion.NEWEST_SUPPORTED));
+
+ batch.add(uniqueValueStatement);
+
+ }
+
+ // we always want to retain the log entry, so never write with the TTL
+ Statement uniqueValueLogStatement = QueryBuilder.insertInto(TABLE_UNIQUE_VALUES_LOG)
+ .value("key", logPartitionKey)
+ .value("column1", serializeUniqueValueLogColumn(uniqueFieldEntry))
+ .value("value", DataType.serializeValue(COL_VALUE, ProtocolVersion.NEWEST_SUPPORTED));
+
+ batch.add(uniqueValueLogStatement);
+
+
+
+ return batch;
+
+ /**
+ * @Override
+ public void doLookup( final ColumnListMutation<EntityVersion> colMutation ) {
+ colMutation.putColumn( ev, COL_VALUE );
+ }
+
+
+ @Override
+ public void doLog( final ColumnListMutation<UniqueFieldEntry> colMutation ) {
+ colMutation.putColumn( uniqueFieldEntry, COL_VALUE );
+ }
+ */
+ }
+
@Override
public MutationBatch delete( final ApplicationScope scope, UniqueValue value ) {
@@ -236,18 +346,26 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
@Override
public UniqueValueSet load( final ApplicationScope colScope, final String type, final Collection<Field> fields )
throws ConnectionException {
- return load( colScope, ConsistencyLevel.valueOf( cassandraFig.getAstyanaxReadCL() ), type, fields );
+ return load( colScope, com.netflix.astyanax.model.ConsistencyLevel.valueOf( cassandraFig.getAstyanaxReadCL() ), type, fields );
}
@Override
- public UniqueValueSet load( final ApplicationScope appScope, final ConsistencyLevel consistencyLevel,
+ public UniqueValueSet load( final ApplicationScope appScope, final com.netflix.astyanax.model.ConsistencyLevel consistencyLevel,
final String type, final Collection<Field> fields ) throws ConnectionException {
Preconditions.checkNotNull( fields, "fields are required" );
Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" );
+ return loadCQL(appScope, com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM, type, fields);
+ //return loadLegacy( appScope, type, fields);
+
+ }
+
+
+ private UniqueValueSet loadLegacy(final ApplicationScope appScope,
+ final String type, final Collection<Field> fields) throws ConnectionException {
final List<ScopedRowKey<FieldKey>> keys = new ArrayList<>( fields.size() );
final Id applicationId = appScope.getApplication();
@@ -265,16 +383,16 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() );
- Iterator<Row<ScopedRowKey<FieldKey>, EntityVersion>> results =
- keyspace.prepareQuery( CF_UNIQUE_VALUES ).setConsistencyLevel( consistencyLevel ).getKeySlice( keys )
- .withColumnRange( new RangeBuilder().setLimit( 1 ).build() ).execute().getResult().iterator();
+ Iterator<com.netflix.astyanax.model.Row<ScopedRowKey<FieldKey>, EntityVersion>> results =
+ keyspace.prepareQuery( CF_UNIQUE_VALUES ).setConsistencyLevel(com.netflix.astyanax.model.ConsistencyLevel.CL_LOCAL_QUORUM ).getKeySlice( keys )
+ .withColumnRange( new RangeBuilder().setLimit( 1 ).build() ).execute().getResult().iterator();
while ( results.hasNext() )
{
- final Row<ScopedRowKey<FieldKey>, EntityVersion> unique = results.next();
+ final com.netflix.astyanax.model.Row<ScopedRowKey<FieldKey>, EntityVersion> unique = results.next();
final Field field = parseRowKey( unique.getKey() );
@@ -296,9 +414,112 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
}
return uniqueValueSet;
+
+ }
+
+ private UniqueValueSet loadCQL( final ApplicationScope appScope, final com.datastax.driver.core.ConsistencyLevel consistencyLevel,
+ final String type, final Collection<Field> fields ) throws ConnectionException {
+
+ Preconditions.checkNotNull( fields, "fields are required" );
+ Preconditions.checkArgument( fields.size() > 0, "More than 1 field must be specified" );
+
+
+ final Id applicationId = appScope.getApplication();
+
+ // row key = app UUID + app type + entityType + field type + field name + field value
+
+ List<ByteBuffer> partitionKeys = new ArrayList<>( fields.size() );
+ for ( Field field : fields ) {
+
+ //log.info(Bytes.toHexString(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue())));
+
+ partitionKeys.add(getPartitionKey(applicationId, type, field.getTypeName().toString(), field.getName(), field.getValue()));
+
+ }
+
+ final UniqueValueSetImpl uniqueValueSet = new UniqueValueSetImpl( fields.size() );
+
+ final Clause inKey = QueryBuilder.in("key", partitionKeys );
+
+ final Statement statement = QueryBuilder.select().all().from(TABLE_UNIQUE_VALUES)
+ .where(inKey)
+ .setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM);
+
+ final ResultSet resultSet = session.execute(statement);
+
+
+ Iterator<com.datastax.driver.core.Row> results = resultSet.iterator();
+
+
+ while( results.hasNext() ){
+
+ final com.datastax.driver.core.Row unique = results.next();
+ ByteBuffer partitionKey = unique.getBytes("key");
+ ByteBuffer column = unique.getBytesUnsafe("column1");
+
+ List<Object> keyContents = deserializePartitionKey(partitionKey);
+ List<Object> columnContents = deserializeUniqueValueColumn(column);
+
+ Field field = null;
+ FieldTypeName fieldType;
+ String name;
+ String value;
+ if(this instanceof UniqueValueSerializationStrategyV2Impl) {
+
+ fieldType = FieldTypeName.valueOf((String) keyContents.get(3));
+ name = (String) keyContents.get(4);
+ value = (String) keyContents.get(5);
+
+ }else{
+
+ fieldType = FieldTypeName.valueOf((String) keyContents.get(5));
+ name = (String) keyContents.get(6);
+ value = (String) keyContents.get(7);
+
+ }
+
+ switch ( fieldType ) {
+ case BOOLEAN:
+ field = new BooleanField( name, Boolean.parseBoolean( value ) );
+ break;
+ case DOUBLE:
+ field = new DoubleField( name, Double.parseDouble( value ) );
+ break;
+ case FLOAT:
+ field = new FloatField( name, Float.parseFloat( value ) );
+ break;
+ case INTEGER:
+ field = new IntegerField( name, Integer.parseInt( value ) );
+ break;
+ case LONG:
+ field = new LongField( name, Long.parseLong( value ) );
+ break;
+ case STRING:
+ field = new StringField( name, value );
+ break;
+ case UUID:
+ field = new UUIDField( name, UUID.fromString( value ) );
+ break;
+ }
+
+ final EntityVersion entityVersion = new EntityVersion(
+ new SimpleId((UUID)columnContents.get(1), (String)columnContents.get(2)), (UUID)columnContents.get(0));
+
+
+ final UniqueValueImpl uniqueValue =
+ new UniqueValueImpl( field, entityVersion.getEntityId(), entityVersion.getEntityVersion() );
+
+ uniqueValueSet.addValue(uniqueValue);
+
+ }
+
+ return uniqueValueSet;
+
}
+
+
@Override
public Iterator<UniqueValue> getAllUniqueFields( final ApplicationScope collectionScope, final Id entityId ) {
Preconditions.checkNotNull( collectionScope, "collectionScope is required" );
@@ -378,7 +599,13 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
@Override
public Collection<TableDefinition> getTables() {
- return Collections.emptyList();
+ final TableDefinition uniqueValues = getUniqueValuesTable();
+
+ final TableDefinition uniqueValuesLog = getEntityUniqueLogTable();
+
+
+ return Arrays.asList( uniqueValues, uniqueValuesLog );
+
}
@@ -389,6 +616,12 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
/**
+ * Get the CQL table definition for the unique values log table
+ */
+ protected abstract TableDefinition getUniqueValuesTable();
+
+
+ /**
* Generate a key that is compatible with the column family
*
* @param applicationId The applicationId
@@ -405,10 +638,32 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
protected abstract Field parseRowKey(final ScopedRowKey<FieldKey> rowKey);
+ protected abstract List<Object> deserializePartitionKey(ByteBuffer bb);
+
+
+ protected abstract Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry);
+
+ protected abstract ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue );
+
+ protected abstract ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId);
+
+ protected abstract ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion);
+
+ protected abstract List<Object> deserializeUniqueValueColumn(ByteBuffer bb);
+
+
+
+
+
+ /**
+ * Get the column family for the unique field CF
+ */
+ protected abstract MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry> getEntityUniqueLogCF();
+
/**
- * Get the column family for the unique field CF
+ * Get the CQL table definition for the unique values log table
*/
- protected abstract MultiTenantColumnFamily<ScopedRowKey<EntityKey>, UniqueFieldEntry> getEntityUniqueLogCF();
+ protected abstract TableDefinition getEntityUniqueLogTable();
/**
* Generate a key that is compatible with the column family
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
index 87b1641..bbfaa2d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyProxyImpl.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import com.datastax.driver.core.BatchStatement;
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
@@ -67,40 +68,22 @@ public class UniqueValueSerializationStrategyProxyImpl implements UniqueValueSer
@Override
- public MutationBatch write( final ApplicationScope applicationScope, final UniqueValue uniqueValue ) {
- final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip();
-
- if ( migration.needsMigration() ) {
- final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
-
- aggregateBatch.mergeShallow( migration.from.write( applicationScope, uniqueValue ) );
- aggregateBatch.mergeShallow( migration.to.write( applicationScope, uniqueValue ) );
-
- return aggregateBatch;
- }
+ public BatchStatement writeCQL(final ApplicationScope applicationScope, final UniqueValue uniqueValue,
+ final int timeToLive ){
- return migration.to.write( applicationScope, uniqueValue );
- }
-
-
- @Override
- public MutationBatch write( final ApplicationScope applicationScope, final UniqueValue uniqueValue,
- final int timeToLive ) {
final MigrationRelationship<UniqueValueSerializationStrategy> migration = getMigrationRelationShip();
if ( migration.needsMigration() ) {
- final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
-
- aggregateBatch.mergeShallow( migration.from.write( applicationScope, uniqueValue, timeToLive ) );
- aggregateBatch.mergeShallow( migration.to.write( applicationScope, uniqueValue, timeToLive ) );
+ migration.from.writeCQL( applicationScope, uniqueValue, timeToLive );
+ migration.to.writeCQL( applicationScope, uniqueValue, timeToLive );
- return aggregateBatch;
}
- return migration.to.write( applicationScope, uniqueValue, timeToLive );
+ return migration.to.writeCQL( applicationScope, uniqueValue, timeToLive );
}
+
@Override
public UniqueValueSet load( final ApplicationScope applicationScope, final String type,
final Collection<Field> fields ) throws ConnectionException {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
index 2235f63..75666fa 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV1Impl.java
@@ -20,20 +20,24 @@
package org.apache.usergrid.persistence.collection.serialization.impl;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
+import java.nio.ByteBuffer;
+import java.util.*;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.Session;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.impl.util.LegacyScopeUtils;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
@@ -50,6 +54,40 @@ import com.netflix.astyanax.Keyspace;
public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializationStrategyImpl<CollectionPrefixedKey<Field>, CollectionPrefixedKey<Id>> {
+
+ private static final String UNIQUE_VALUES_TABLE = CQLUtils.quote("Unique_Values");
+ private static final Collection<String> UNIQUE_VALUES_PARTITION_KEYS = Collections.singletonList("key");
+ private static final Collection<String> UNIQUE_VALUES_COLUMN_KEYS = Collections.singletonList("column1");
+ private static final Map<String, DataType.Name> UNIQUE_VALUES_COLUMNS =
+ new HashMap<String, DataType.Name>() {{
+ put( "key", DataType.Name.BLOB );
+ put( "column1", DataType.Name.BLOB );
+ put( "value", DataType.Name.BLOB ); }};
+ private static final Map<String, String> UNIQUE_VALUES_CLUSTERING_ORDER =
+ new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
+
+
+ private static final String UNIQUE_VALUES_LOG_TABLE = CQLUtils.quote("Entity_Unique_Values");
+ private static final Collection<String> UNIQUE_VALUES_LOG_PARTITION_KEYS = Collections.singletonList("key");
+ private static final Collection<String> UNIQUE_VALUES_LOG_COLUMN_KEYS = Collections.singletonList("column1");
+ private static final Map<String, DataType.Name> UNIQUE_VALUES_LOG_COLUMNS =
+ new HashMap<String, DataType.Name>() {{
+ put( "key", DataType.Name.BLOB );
+ put( "column1", DataType.Name.BLOB );
+ put( "value", DataType.Name.BLOB ); }};
+ private static final Map<String, String> UNIQUE_VALUES_LOG_CLUSTERING_ORDER =
+ new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
+
+
+ private final static TableDefinition uniqueValues =
+ new TableDefinition( UNIQUE_VALUES_TABLE, UNIQUE_VALUES_PARTITION_KEYS, UNIQUE_VALUES_COLUMN_KEYS,
+ UNIQUE_VALUES_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_CLUSTERING_ORDER);
+
+ private final static TableDefinition uniqueValuesLog =
+ new TableDefinition( UNIQUE_VALUES_LOG_TABLE, UNIQUE_VALUES_LOG_PARTITION_KEYS, UNIQUE_VALUES_LOG_COLUMN_KEYS,
+ UNIQUE_VALUES_LOG_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_LOG_CLUSTERING_ORDER);
+
+
private static final CollectionScopedRowKeySerializer<Field> ROW_KEY_SER =
new CollectionScopedRowKeySerializer<>( UniqueFieldRowKeySerializer.get() );
@@ -79,9 +117,11 @@ public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializ
* @param serializationFig The serialization configuration
*/
@Inject
- public UniqueValueSerializationStrategyV1Impl( final Keyspace keyspace, final CassandraFig cassandraFig,
- final SerializationFig serializationFig ) {
- super( keyspace, cassandraFig, serializationFig );
+ public UniqueValueSerializationStrategyV1Impl(final Keyspace keyspace, final CassandraFig cassandraFig,
+ final SerializationFig serializationFig,
+ final Session session,
+ final CassandraConfig cassandraConfig) {
+ super( keyspace, cassandraFig, serializationFig, session, cassandraConfig );
}
@@ -113,6 +153,12 @@ public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializ
return CF_UNIQUE_VALUES;
}
+ @Override
+ protected TableDefinition getUniqueValuesTable(){
+
+ return uniqueValues;
+ }
+
@Override
protected MultiTenantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UniqueFieldEntry>
@@ -122,6 +168,14 @@ public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializ
@Override
+ protected TableDefinition getEntityUniqueLogTable(){
+
+ return uniqueValuesLog;
+
+ }
+
+
+ @Override
protected CollectionPrefixedKey<Field> createUniqueValueKey( final Id applicationId,
final String type, final Field field) {
@@ -141,6 +195,242 @@ public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializ
return rowKey.getKey().getSubKey();
}
+ @Override
+ protected List<Object> deserializePartitionKey(ByteBuffer bb){
+
+
+ /**
+ * List<Object> keys = new ArrayList<>(8);
+ keys.add(0, appUUID);
+ keys.add(1, applicationType);
+ keys.add(2, appUUID);
+ keys.add(3, applicationType);
+ keys.add(4, entityType);
+ keys.add(5, fieldType);
+ keys.add(6, fieldName);
+ keys.add(7, fieldValueString);
+
+ */
+
+ int count = 0;
+ List<Object> stuff = new ArrayList<>();
+ while(bb.hasRemaining()){
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
+ if(count == 0 || count == 2){
+ stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }else{
+ stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+ count++;
+ }
+
+ return stuff;
+
+ }
+
+ @Override
+ protected Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
+
+ /**
+ * final UUID version = value.getVersion();
+ final Field<?> field = value.getField();
+
+ final FieldTypeName fieldType = field.getTypeName();
+ final String fieldValue = field.getValue().toString().toLowerCase();
+
+
+ DynamicComposite composite = new DynamicComposite( );
+
+ //we want to sort ascending to descending by version
+ composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
+ composite.addComponent( field.getName(), STRING_SERIALIZER );
+ composite.addComponent( fieldValue, STRING_SERIALIZER );
+ composite.addComponent( fieldType.name() , STRING_SERIALIZER);
+ */
+
+ // values are serialized as strings, not sure why, and always lower cased
+ String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase();
+
+
+ List<Object> keys = new ArrayList<>(4);
+ keys.add(fieldEntry.getVersion());
+ keys.add(fieldEntry.getField().getName());
+ keys.add(fieldValueString);
+ keys.add(fieldEntry.getField().getTypeName().name());
+
+ String comparator = UUID_TYPE_REVERSED;
+
+ int size = 16+fieldEntry.getField().getName().length()+fieldEntry.getField().getValue().toString().length()+
+ fieldEntry.getField().getTypeName().name().length();
+
+ // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
+ size += keys.size()*65;
+
+ // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow
+ size += keys.size()*comparator.length();
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+
+ for (Object key : keys) {
+
+ if(key.equals(fieldEntry.getVersion())) {
+ int p = comparator.indexOf("(reversed=true)");
+ boolean desc = false;
+ if (p >= 0) {
+ comparator = comparator.substring(0, p);
+ desc = true;
+ }
+
+ byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data
+ if (desc) {
+ a = (byte) Character.toUpperCase((char) a);
+ }
+
+ stuff.putShort((short) ('\u8000' | a));
+ }else{
+ comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here
+ stuff.putShort((short)comparator.length());
+ stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+ }
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ // put a short that indicates how big the buffer is for this item
+ stuff.putShort((short) kb.remaining());
+
+ // put the actual item
+ stuff.put(kb.slice());
+
+ // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+ stuff.put((byte) 0);
+
+
+ }
+
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
+
+ @Override
+ protected ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ){
+
+ return serializeKey(applicationId.getUuid(), applicationId.getType(),
+ entityType, fieldType, fieldName, fieldValue);
+
+ }
+
+ @Override
+ protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId){
+
+ return serializeLogKey(applicationId.getUuid(), applicationId.getType(),
+ uniqueValueId.getUuid(), uniqueValueId.getType());
+
+ }
+
+ @Override
+ protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){
+
+ /**
+ * final Id entityId = ev.getEntityId();
+ final UUID entityUuid = entityId.getUuid();
+ final String entityType = entityId.getType();
+
+ CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
+
+ builder.addUUID( entityVersion );
+ builder.addUUID( entityUuid );
+ builder.addString(entityType );
+ */
+
+ String comparator = "UTF8Type";
+
+ List<Object> keys = new ArrayList<>(3);
+ keys.add(entityVersion.getEntityVersion());
+ keys.add(entityVersion.getEntityId().getUuid());
+ keys.add(entityVersion.getEntityId().getType());
+
+ // UUIDs are 16 bytes
+ int size = 16+16+entityVersion.getEntityId().getType().length();
+
+ // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
+ size += keys.size()*5;
+
+ // we always add comparator to the buffer as well
+ size += keys.size()*comparator.length();
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ if(key instanceof UUID){
+ comparator = "UUIDType";
+ }else{
+ comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text
+ }
+
+ stuff.putShort((short)comparator.length());
+ stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ // put a short that indicates how big the buffer is for this item
+ stuff.putShort((short) kb.remaining());
+
+ // put the actual item
+ stuff.put(kb.slice());
+
+ // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+ stuff.put((byte) 0);
+
+
+ }
+
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
+
+ @Override
+ protected List<Object> deserializeUniqueValueColumn(ByteBuffer bb){
+
+ List<Object> stuff = new ArrayList<>();
+ int count = 0;
+ while(bb.hasRemaining()){
+
+ // custom columns have a short at beginning for comparator (which we don't use here )
+ ByteBuffer comparator = CQLUtils.getWithShortLength(bb);
+
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
+
+
+ // first two composites are UUIDs, rest are strings
+ if(count == 0) {
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else if(count ==1){
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else{
+ stuff.add(DataType.text().deserialize(data.duplicate(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+ count++;
+ }
+
+ return stuff;
+
+ }
+
+
@Override
protected CollectionPrefixedKey<Id> createEntityUniqueLogKey( final Id applicationId,
@@ -163,4 +453,112 @@ public class UniqueValueSerializationStrategyV1Impl extends UniqueValueSerializ
public int getImplementationVersion() {
return CollectionDataVersions.INITIAL.getVersion();
}
+
+
+
+ private ByteBuffer serializeKey( UUID appUUID,
+ String applicationType,
+ String entityType,
+ String fieldType,
+ String fieldName,
+ Object fieldValue ){
+
+ final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType );
+
+// final CollectionPrefixedKey<Field> uniquePrefixedKey =
+// new CollectionPrefixedKey<>( collectionName, applicationId, field );
+
+// //read back the id
+// final Id orgId = ID_SER.fromComposite( parser );
+// final Id scopeId = ID_SER.fromComposite( parser );
+// final String scopeName = parser.readString();
+// final K value = keySerializer.fromComposite( parser );
+
+
+ // values are serialized as strings, not sure why, and always lower cased
+ String fieldValueString = fieldValue.toString().toLowerCase();
+
+ List<Object> keys = new ArrayList<>(8);
+ keys.add(0, appUUID);
+ keys.add(1, applicationType);
+ keys.add(2, appUUID);
+ keys.add(3, applicationType);
+ keys.add(4, collectionName);
+ keys.add(5, fieldType);
+ keys.add(6, fieldName);
+ keys.add(7, fieldValueString);
+
+
+ // UUIDs are 16 bytes, allocate the buffer accordingly
+ int size = 16 + applicationType.length() + 16 + applicationType.length() + collectionName.length() +
+ fieldType.length() + fieldName.length()+fieldValueString.length();
+
+
+ // we always need to add length for the 2 byte short and 1 byte equality
+ size += keys.size()*3;
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ stuff.putShort((short) kb.remaining());
+ stuff.put(kb.slice());
+ stuff.put((byte) 0);
+
+
+ }
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
+
+ private ByteBuffer serializeLogKey(UUID appUUID, String applicationType, UUID entityId, String entityType){
+
+
+ final String collectionName = LegacyScopeUtils.getCollectionScopeNameFromEntityType( entityType );
+//
+//
+// final CollectionPrefixedKey<Id> collectionPrefixedEntityKey =
+// new CollectionPrefixedKey<>( collectionName, applicationId, uniqueValueId );
+
+ List<Object> keys = new ArrayList<>(4);
+ keys.add(appUUID);
+ keys.add(applicationType);
+ keys.add(appUUID);
+ keys.add(applicationType);
+ keys.add(collectionName);
+ keys.add(entityId);
+ keys.add(entityType);
+
+ int size = 16+applicationType.length()+16+applicationType.length()+collectionName.length()+16+entityType.length();
+
+ // we always need to add length for the 2 byte short and 1 byte equality
+ size += keys.size()*3;
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ stuff.putShort((short) kb.remaining());
+ stuff.put(kb.slice());
+ stuff.put((byte) 0);
+
+
+ }
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
index 0f233cf..4177c37 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyV2Impl.java
@@ -20,13 +20,16 @@
package org.apache.usergrid.persistence.collection.serialization.impl;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
+import java.nio.ByteBuffer;
+import java.util.*;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.Session;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
@@ -34,6 +37,7 @@ import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
import org.apache.usergrid.persistence.core.datastax.TableDefinition;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
@@ -49,6 +53,37 @@ import com.netflix.astyanax.Keyspace;
@Singleton
public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializationStrategyImpl<TypeField, Id> {
+ private static final String UNIQUE_VALUES_TABLE = CQLUtils.quote("Unique_Values_V2");
+ private static final Collection<String> UNIQUE_VALUES_PARTITION_KEYS = Collections.singletonList("key");
+ private static final Collection<String> UNIQUE_VALUES_COLUMN_KEYS = Collections.singletonList("column1");
+ private static final Map<String, DataType.Name> UNIQUE_VALUES_COLUMNS =
+ new HashMap<String, DataType.Name>() {{
+ put( "key", DataType.Name.BLOB );
+ put( "column1", DataType.Name.BLOB );
+ put( "value", DataType.Name.BLOB ); }};
+ private static final Map<String, String> UNIQUE_VALUES_CLUSTERING_ORDER =
+ new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
+
+
+ private static final String UNIQUE_VALUES_LOG_TABLE = CQLUtils.quote("Entity_Unique_Values_V2");
+ private static final Collection<String> UNIQUE_VALUES_LOG_PARTITION_KEYS = Collections.singletonList("key");
+ private static final Collection<String> UNIQUE_VALUES_LOG_COLUMN_KEYS = Collections.singletonList("column1");
+ private static final Map<String, DataType.Name> UNIQUE_VALUES_LOG_COLUMNS =
+ new HashMap<String, DataType.Name>() {{
+ put( "key", DataType.Name.BLOB );
+ put( "column1", DataType.Name.BLOB );
+ put( "value", DataType.Name.BLOB ); }};
+ private static final Map<String, String> UNIQUE_VALUES_LOG_CLUSTERING_ORDER =
+ new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
+
+ private final static TableDefinition uniqueValues =
+ new TableDefinition( UNIQUE_VALUES_TABLE, UNIQUE_VALUES_PARTITION_KEYS, UNIQUE_VALUES_COLUMN_KEYS,
+ UNIQUE_VALUES_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_CLUSTERING_ORDER);
+
+ private final static TableDefinition uniqueValuesLog =
+ new TableDefinition( UNIQUE_VALUES_LOG_TABLE, UNIQUE_VALUES_LOG_PARTITION_KEYS, UNIQUE_VALUES_LOG_COLUMN_KEYS,
+ UNIQUE_VALUES_LOG_COLUMNS, TableDefinition.CacheOption.KEYS, UNIQUE_VALUES_LOG_CLUSTERING_ORDER);
+
private static final ScopedRowKeySerializer<TypeField> ROW_KEY_SER = new ScopedRowKeySerializer<>( UniqueTypeFieldRowKeySerializer.get() );
@@ -80,8 +115,10 @@ public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializ
*/
@Inject
public UniqueValueSerializationStrategyV2Impl( final Keyspace keyspace, final CassandraFig cassandraFig,
- final SerializationFig serializationFig ) {
- super( keyspace, cassandraFig, serializationFig );
+ final SerializationFig serializationFig,
+ final Session session,
+ final CassandraConfig cassandraConfig) {
+ super( keyspace, cassandraFig, serializationFig, session, cassandraConfig );
}
@@ -104,7 +141,9 @@ public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializ
@Override
public Collection<TableDefinition> getTables() {
- return Collections.emptyList();
+
+ return Arrays.asList( uniqueValues, uniqueValuesLog );
+
}
@Override
@@ -114,6 +153,12 @@ public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializ
@Override
+ protected TableDefinition getUniqueValuesTable(){
+ return uniqueValues;
+ }
+
+
+ @Override
protected MultiTenantColumnFamily<ScopedRowKey<Id>, UniqueFieldEntry>
getEntityUniqueLogCF() {
return CF_ENTITY_UNIQUE_VALUE_LOG;
@@ -121,6 +166,13 @@ public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializ
@Override
+ protected TableDefinition getEntityUniqueLogTable(){
+ return uniqueValuesLog;
+ }
+
+
+
+ @Override
protected TypeField createUniqueValueKey( final Id applicationId, final String type, final Field field) {
return new TypeField(type,field);
}
@@ -131,6 +183,238 @@ public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializ
return rowKey.getKey().getField();
}
+ @Override
+ protected List<Object> deserializePartitionKey(ByteBuffer bb){
+
+
+ /**
+ * List<Object> keys = new ArrayList<>(6);
+ keys.add(0, appUUID); // UUID
+ keys.add(1, applicationType); // String
+ keys.add(2, entityType); // String
+ keys.add(3, fieldType); // String
+ keys.add(4, fieldName); // String
+ keys.add(5, fieldValueString); // String
+
+ */
+
+ List<Object> stuff = new ArrayList<>();
+ while(bb.hasRemaining()){
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
+ if(stuff.size() == 0){
+ stuff.add(DataType.uuid().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }else{
+ stuff.add(DataType.text().deserialize(data.slice(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+ }
+
+ return stuff;
+
+ }
+
+ @Override
+ protected Object serializeUniqueValueLogColumn(UniqueFieldEntry fieldEntry){
+
+ /**
+ * final UUID version = value.getVersion();
+ final Field<?> field = value.getField();
+
+ final FieldTypeName fieldType = field.getTypeName();
+ final String fieldValue = field.getValue().toString().toLowerCase();
+
+
+ DynamicComposite composite = new DynamicComposite( );
+
+ //we want to sort ascending to descending by version
+ composite.addComponent( version, UUID_SERIALIZER, ColumnTypes.UUID_TYPE_REVERSED);
+ composite.addComponent( field.getName(), STRING_SERIALIZER );
+ composite.addComponent( fieldValue, STRING_SERIALIZER );
+ composite.addComponent( fieldType.name() , STRING_SERIALIZER);
+ */
+
+ // values are serialized as strings, not sure why, and always lower cased
+ String fieldValueString = fieldEntry.getField().getValue().toString().toLowerCase();
+
+
+ List<Object> keys = new ArrayList<>(4);
+ keys.add(fieldEntry.getVersion());
+ keys.add(fieldEntry.getField().getName());
+ keys.add(fieldValueString);
+ keys.add(fieldEntry.getField().getTypeName().name());
+
+ String comparator = UUID_TYPE_REVERSED;
+
+ int size = 16+fieldEntry.getField().getName().length()+fieldEntry.getField().getValue().toString().length()+
+ fieldEntry.getField().getTypeName().name().length();
+
+ // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
+ size += keys.size()*65;
+
+ // uuid type comparator is longest, ensure we allocate buffer using the max size to avoid overflow
+ size += keys.size()*comparator.length();
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+
+ for (Object key : keys) {
+
+ if(key.equals(fieldEntry.getVersion())) {
+ int p = comparator.indexOf("(reversed=true)");
+ boolean desc = false;
+ if (p >= 0) {
+ comparator = comparator.substring(0, p);
+ desc = true;
+ }
+
+ byte a = (byte) 85; // this is the byte value for UUIDType in astyanax used in legacy data
+ if (desc) {
+ a = (byte) Character.toUpperCase((char) a);
+ }
+
+ stuff.putShort((short) ('\u8000' | a));
+ }else{
+ comparator = "UTF8Type"; // only strings are being serialized other than UUIDs here
+ stuff.putShort((short)comparator.length());
+ stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+ }
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ // put a short that indicates how big the buffer is for this item
+ stuff.putShort((short) kb.remaining());
+
+ // put the actual item
+ stuff.put(kb.slice());
+
+ // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+ stuff.put((byte) 0);
+
+
+ }
+
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
+
+ @Override
+ protected ByteBuffer getPartitionKey(Id applicationId, String entityType, String fieldType, String fieldName, Object fieldValue ){
+
+ return serializeKey(applicationId.getUuid(), applicationId.getType(),
+ entityType, fieldType, fieldName, fieldValue);
+
+ }
+
+ @Override
+ protected ByteBuffer getLogPartitionKey(final Id applicationId, final Id uniqueValueId){
+
+ return serializeLogKey(applicationId.getUuid(), applicationId.getType(),
+ uniqueValueId.getUuid(), uniqueValueId.getType());
+
+ }
+
+ @Override
+ protected ByteBuffer serializeUniqueValueColumn(EntityVersion entityVersion){
+
+ /**
+ * final Id entityId = ev.getEntityId();
+ final UUID entityUuid = entityId.getUuid();
+ final String entityType = entityId.getType();
+
+ CompositeBuilder builder = Composites.newDynamicCompositeBuilder();
+
+ builder.addUUID( entityVersion );
+ builder.addUUID( entityUuid );
+ builder.addString(entityType );
+ */
+
+ String comparator = "UTF8Type";
+
+ List<Object> keys = new ArrayList<>(3);
+ keys.add(entityVersion.getEntityVersion());
+ keys.add(entityVersion.getEntityId().getUuid());
+ keys.add(entityVersion.getEntityId().getType());
+
+ // UUIDs are 16 bytes
+ int size = 16+16+entityVersion.getEntityId().getType().length();
+
+ // we always need to add length for the 2 byte comparator short, 2 byte length short and 1 byte equality
+ size += keys.size()*5;
+
+ // we always add comparator to the buffer as well
+ size += keys.size()*comparator.length();
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ if(key instanceof UUID){
+ comparator = "UUIDType";
+ }else{
+ comparator = "UTF8Type"; // if it's not a UUID, the only other thing we're serializing is text
+ }
+
+ stuff.putShort((short)comparator.length());
+ stuff.put(DataType.serializeValue(comparator, ProtocolVersion.NEWEST_SUPPORTED));
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ // put a short that indicates how big the buffer is for this item
+ stuff.putShort((short) kb.remaining());
+
+ // put the actual item
+ stuff.put(kb.slice());
+
+ // put an equality byte ( again not used by part of legacy thrift Astyanax schema)
+ stuff.put((byte) 0);
+
+
+ }
+
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
+
+ @Override
+ protected List<Object> deserializeUniqueValueColumn(ByteBuffer bb){
+
+ List<Object> stuff = new ArrayList<>();
+ int count = 0;
+ while(bb.hasRemaining()){
+
+ // custom columns have a short at beginning for comparator (which we don't use here )
+ ByteBuffer comparator = CQLUtils.getWithShortLength(bb);
+
+ ByteBuffer data = CQLUtils.getWithShortLength(bb);
+
+
+ // first two composites are UUIDs, rest are strings
+ if(count == 0) {
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else if(count ==1){
+ stuff.add(new UUID(data.getLong(), data.getLong()));
+ }else{
+ stuff.add(DataType.text().deserialize(data.duplicate(), ProtocolVersion.NEWEST_SUPPORTED));
+ }
+
+ byte equality = bb.get(); // we don't use this but take the equality byte off the buffer
+
+ count++;
+ }
+
+ return stuff;
+
+ }
+
@Override
protected Id createEntityUniqueLogKey( final Id applicationId, final Id uniqueValueId ) {
@@ -142,4 +426,87 @@ public class UniqueValueSerializationStrategyV2Impl extends UniqueValueSerializ
public int getImplementationVersion() {
return CollectionDataVersions.LOG_REMOVAL.getVersion();
}
+
+
+
+ // row key = app UUID + app type + app UUID + app type + field type + field name + field value
+ private ByteBuffer serializeKey(UUID appUUID,
+ String applicationType,
+ String entityType,
+ String fieldType,
+ String fieldName,
+ Object fieldValue ){
+
+ // values are serialized as strings, not sure why, and always lower cased
+ String fieldValueString = fieldValue.toString().toLowerCase();
+
+ List<Object> keys = new ArrayList<>(6);
+ keys.add(0, appUUID);
+ keys.add(1, applicationType);
+ keys.add(2, entityType);
+ keys.add(3, fieldType);
+ keys.add(4, fieldName);
+ keys.add(5, fieldValueString);
+
+
+ // UUIDs are 16 bytes, allocate the buffer accordingly
+ int size = 16 + applicationType.length() + entityType.length() + fieldType.length() + fieldName.length()+fieldValueString.length();
+
+
+ // we always need to add length for the 2 byte short and 1 byte equality
+ size += keys.size()*3;
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ stuff.putShort((short) kb.remaining());
+ stuff.put(kb.slice());
+ stuff.put((byte) 0);
+
+
+ }
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
+
+ private ByteBuffer serializeLogKey(UUID appUUID, String applicationType, UUID entityId, String entityType){
+
+ List<Object> keys = new ArrayList<>(4);
+ keys.add(appUUID);
+ keys.add(applicationType);
+ keys.add(entityId);
+ keys.add(entityType);
+
+ int size = 16+applicationType.length()+16+entityType.length();
+
+ // we always need to add length for the 2 byte short and 1 byte equality
+ size += keys.size()*3;
+
+ ByteBuffer stuff = ByteBuffer.allocate(size);
+
+ for (Object key : keys) {
+
+ ByteBuffer kb = DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED);
+ if (kb == null) {
+ kb = ByteBuffer.allocate(0);
+ }
+
+ stuff.putShort((short) kb.remaining());
+ stuff.put(kb.slice());
+ stuff.put((byte) 0);
+
+
+ }
+ stuff.flip();
+ return stuff.duplicate();
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index a110ed7..8d52d8b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -26,6 +26,8 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.cql.BatchStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +73,7 @@ public class MvccEntityDataMigrationImpl implements DataMigration{
private static final Logger logger = LoggerFactory.getLogger( MvccEntityDataMigrationImpl.class );
private final Keyspace keyspace;
+ private final Session session;
private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions;
private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3;
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
@@ -80,12 +83,14 @@ public class MvccEntityDataMigrationImpl implements DataMigration{
@Inject
public MvccEntityDataMigrationImpl( final Keyspace keyspace,
+ final Session session,
final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions,
final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3,
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
final MigrationDataProvider<EntityIdScope> migrationDataProvider ) {
this.keyspace = keyspace;
+ this.session = session;
this.allVersions = allVersions;
this.mvccEntitySerializationStrategyV3 = mvccEntitySerializationStrategyV3;
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
@@ -163,8 +168,9 @@ public class MvccEntityDataMigrationImpl implements DataMigration{
final List<Id> toSaveIds = new ArrayList<>( entities.size() );
+ final com.datastax.driver.core.BatchStatement uniqueBatch = new com.datastax.driver.core.BatchStatement();
- for ( EntityToSaveMessage message : entities ) {
+ for ( EntityToSaveMessage message : entities ) {
try {
final MutationBatch entityRewrite = migration.to.write(message.scope, message.entity);
@@ -197,17 +203,14 @@ public class MvccEntityDataMigrationImpl implements DataMigration{
// time with
// no TTL so that cleanup can clean up
// older values
+
+
for (final Field field : EntityUtils.getUniqueFields(message.entity.getEntity().get())) {
final UniqueValue written = new UniqueValueImpl(field, entityId, version);
- final MutationBatch mb = uniqueValueSerializationStrategy.write(message.scope, written);
-
+ uniqueBatch.add(uniqueValueSerializationStrategy.writeCQL(message.scope, written, -1));
- // merge into our
- // existing mutation
- // batch
- totalBatch.mergeShallow(mb);
}
@@ -232,7 +235,7 @@ public class MvccEntityDataMigrationImpl implements DataMigration{
}
- executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong );
+ executeBatch( migration.to.getImplementationVersion(), totalBatch, observer, atomicLong, uniqueBatch );
//now run our cleanup task
@@ -252,10 +255,13 @@ public class MvccEntityDataMigrationImpl implements DataMigration{
}
- protected void executeBatch( final int targetVersion, final MutationBatch batch, final ProgressObserver po,
- final AtomicLong count ) {
+ protected void executeBatch(final int targetVersion, final MutationBatch batch, final ProgressObserver po,
+ final AtomicLong count, com.datastax.driver.core.BatchStatement uniqueBatch) {
try {
+
batch.execute();
+ session.execute(uniqueBatch);
+
po.update( targetVersion, "Finished copying " + count + " entities to the new format" );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
index ad6eac6..b18b095 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
@@ -1,7 +1,13 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.apache.usergrid.persistence.collection.MvccEntity;
@@ -32,6 +38,9 @@ import static org.mockito.Mockito.when;
/** @author tnine */
public class MarkCommitTest extends AbstractMvccEntityStageTest {
+ @Inject
+
+
/** Standard flow */
@Test
public void testStartStage() throws Exception {
@@ -39,6 +48,8 @@ public class MarkCommitTest extends AbstractMvccEntityStageTest {
final ApplicationScope context = mock( ApplicationScope.class );
+ final Session session = mock(Session.class);
+
//mock returning a mock mutation when we do a log entry write
final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class );
@@ -71,7 +82,7 @@ public class MarkCommitTest extends AbstractMvccEntityStageTest {
//run the stage
- WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy );
+ WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session );
//verify the observable is correct
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
index 58642d3..60281d4 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
@@ -18,7 +18,13 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.write;
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.apache.usergrid.persistence.collection.MvccEntity;
@@ -53,6 +59,8 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
final ApplicationScope context = mock( ApplicationScope.class );
+ final Session session = mock(Session.class);
+
//mock returning a mock mutation when we do a log entry write
final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class );
@@ -84,7 +92,7 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
//run the stage
- WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy );
+ WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session );
Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
@@ -116,6 +124,9 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
/**
* Write up mock mutations so we don't npe on the our operations, but rather on the input
*/
+
+ final Session session = mock(Session.class);
+
final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class );
final MutationBatch logMutation = mock( MutationBatch.class );
@@ -131,7 +142,7 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
when( mvccEntityStrategy.write( any( ApplicationScope.class ), any( MvccEntity.class ) ) )
.thenReturn( entityMutation );
- new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy ).call( event );
+ new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, session ).call( event );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
index 09876fb..3ddc14d 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
@@ -18,6 +18,7 @@
package org.apache.usergrid.persistence.collection.mvcc.stage.write;
+import com.datastax.driver.core.Session;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -59,7 +60,8 @@ public class WriteUniqueVerifyTest {
@Rule
public MigrationManagerRule migrationManagerRule;
-
+ @Inject
+ private Session session;
@Inject
private SerializationFig fig;
@@ -82,7 +84,7 @@ public class WriteUniqueVerifyTest {
final MvccEntity mvccEntity = fromEntity( entity );
// run the stage
- WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace,cassandraConfig );
+ WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace,cassandraConfig, session );
newStage.call(
new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0c609878/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
index fcf22cf..3ffdb65 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
@@ -23,6 +23,8 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.UUID;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Session;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -64,6 +66,8 @@ public abstract class UniqueValueSerializationStrategyImplTest {
@Rule
public MigrationManagerRule migrationManagerRule;
+ @Inject
+ private Session session;
private UniqueValueSerializationStrategy strategy;
@@ -91,7 +95,9 @@ public abstract class UniqueValueSerializationStrategyImplTest {
Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
UUID version = UUIDGenerator.newTimeUUID();
UniqueValue stored = new UniqueValueImpl( field, entityId, version );
- strategy.write( scope, stored ).execute();
+ //strategy.write( scope, stored ).execute();
+ BatchStatement batch = strategy.writeCQL(scope, stored, -1);
+ session.execute(batch);
UniqueValueSet fields = strategy.load( scope, entityId.getType(), Collections.<Field>singleton( field ) );
@@ -127,7 +133,9 @@ public abstract class UniqueValueSerializationStrategyImplTest {
Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
UUID version = UUIDGenerator.newTimeUUID();
UniqueValue stored = new UniqueValueImpl( field, entityId, version );
- strategy.write( scope, stored, 5 ).execute();
+ //strategy.write( scope, stored, 5 ).execute();
+ BatchStatement batch = strategy.writeCQL(scope, stored, 5);
+ session.execute(batch);
Thread.sleep( 1000 );
@@ -179,7 +187,10 @@ public abstract class UniqueValueSerializationStrategyImplTest {
Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
UUID version = UUIDGenerator.newTimeUUID();
UniqueValue stored = new UniqueValueImpl( field, entityId, version );
- strategy.write( scope, stored ).execute();
+
+ //strategy.write( scope, stored ).execute();
+ BatchStatement batch = strategy.writeCQL( scope, stored, -1);
+ session.execute(batch);
strategy.delete( scope, stored ).execute();
@@ -207,8 +218,9 @@ public abstract class UniqueValueSerializationStrategyImplTest {
Id entityId = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
UUID version = UUIDGenerator.newTimeUUID();
UniqueValue stored = new UniqueValueImpl( field, entityId, version );
- strategy.write( scope, stored ).execute();
-
+ //strategy.write( scope, stored ).execute();
+ BatchStatement batch = strategy.writeCQL( scope, stored, -1);
+ session.execute(batch);
UniqueValueSet fields = strategy.load( scope, entityId.getType(), Collections.<Field>singleton( field ) );
@@ -278,9 +290,13 @@ public abstract class UniqueValueSerializationStrategyImplTest {
UniqueValue version1Field1Value = new UniqueValueImpl( version1Field1, entityId, version1 );
UniqueValue version1Field2Value = new UniqueValueImpl( version1Field2, entityId, version1 );
- final MutationBatch batch = strategy.write( scope, version1Field1Value );
- batch.mergeShallow( strategy.write( scope, version1Field2Value ) );
+ //final MutationBatch batch = strategy.write( scope, version1Field1Value );
+ //batch.mergeShallow( strategy.write( scope, version1Field2Value ) );
+ final BatchStatement batch = new BatchStatement();
+
+ batch.add(strategy.writeCQL( scope, version1Field1Value, -1));
+ batch.add(strategy.writeCQL( scope, version1Field2Value, -1));
//write V2 of everything
final UUID version2 = UUIDGenerator.newTimeUUID();
@@ -292,10 +308,15 @@ public abstract class UniqueValueSerializationStrategyImplTest {
UniqueValue version2Field1Value = new UniqueValueImpl( version2Field1, entityId, version2 );
UniqueValue version2Field2Value = new UniqueValueImpl( version2Field2, entityId, version2 );
- batch.mergeShallow( strategy.write( scope, version2Field1Value ) );
- batch.mergeShallow( strategy.write( scope, version2Field2Value ) );
+ //batch.mergeShallow( strategy.write( scope, version2Field1Value ) );
+ //batch.mergeShallow( strategy.write( scope, version2Field2Value ) );
+
+ batch.add(strategy.writeCQL( scope, version2Field1Value, -1));
+ batch.add(strategy.writeCQL( scope, version2Field2Value, -1));
+
+ session.execute(batch);
- batch.execute();
+ //batch.execute();
UniqueValueSet fields = strategy.load( scope, entityId.getType(), Arrays.<Field>asList( version1Field1, version1Field2 ) );