You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/11/20 01:37:50 UTC
[2/3] incubator-usergrid git commit: Removed duplicate MigrationRule
Removed duplicate MigrationRule
Partially refactored entity serialization.
Finished creating tests for refactoring
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2bd1c950
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2bd1c950
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2bd1c950
Branch: refs/heads/USERGRID-250-buffer-size-fix
Commit: 2bd1c950f6fab093d458e6c3a55fb7fb8ff28e79
Parents: 39b1576
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Nov 19 17:01:04 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Nov 19 17:01:04 2014 -0700
----------------------------------------------------------------------
.../corepersistence/migration/Versions.java | 3 +
.../impl/EntityCollectionManagerImpl.java | 241 +++++++++----------
.../mvcc/MvccEntitySerializationStrategy.java | 3 +-
.../mvcc/MvccLogEntrySerializationStrategy.java | 3 +-
.../mvcc/stage/delete/MarkCommit.java | 3 +-
.../mvcc/stage/write/WriteCommit.java | 3 +-
.../UniqueValueSerializationStrategy.java | 3 +-
.../MvccEntitySerializationStrategyImpl.java | 172 ++-----------
...vccEntitySerializationStrategyProxyImpl.java | 162 +++++++++++++
.../MvccEntitySerializationStrategyV1Impl.java | 193 +++++++++++++++
.../MvccEntitySerializationStrategyV2Impl.java | 194 +++++++++++++++
.../MvccLogEntrySerializationStrategyImpl.java | 2 +-
.../serialization/impl/SerializationModule.java | 21 +-
.../UniqueValueSerializationStrategyImpl.java | 2 +-
.../EntityCollectionManagerFactoryTest.java | 2 +-
.../collection/EntityCollectionManagerIT.java | 74 ++++--
.../EntityCollectionManagerStressTest.java | 2 +-
.../EntityCollectionManagerSyncIT.java | 2 +-
.../collection/guice/MigrationManagerRule.java | 38 ---
.../collection/guice/TestCollectionModule.java | 14 ++
...niqueValueSerializationStrategyImplTest.java | 2 +-
.../mvcc/stage/write/WriteUniqueVerifyIT.java | 2 +-
.../mvcc/stage/write/WriteUniqueVerifyTest.java | 2 +-
...MvccEntitySerializationStrategyImplTest.java | 56 ++---
...cEntitySerializationStrategyProxyV1Test.java | 77 ++++++
...cEntitySerializationStrategyProxyV2Test.java | 75 ++++++
...ccEntitySerializationStrategyV1ImplTest.java | 46 ++++
...ccEntitySerializationStrategyV2ImplTest.java | 47 ++++
.../impl/MvccLESSTransientTest.java | 2 +-
...ccLogEntrySerializationStrategyImplTest.java | 2 +-
.../src/test/resources/log4j.properties | 21 +-
.../core/guice/MaxMigrationModule.java | 39 +++
.../core/guice/MaxMigrationVersion.java | 40 +++
.../core/guice/MigrationManagerRule.java | 19 +-
.../persistence/graph/GraphManagerIT.java | 2 +-
.../persistence/graph/GraphManagerLoadTest.java | 2 +-
.../graph/GraphManagerShardingIT.java | 2 +-
.../graph/GraphManagerStressTest.java | 2 +-
.../usergrid/persistence/graph/SimpleTest.java | 2 +-
.../graph/guice/TestGraphModule.java | 11 +
.../graph/impl/EdgeDeleteListenerTest.java | 2 +-
.../graph/impl/NodeDeleteListenerTest.java | 2 +-
.../graph/impl/stage/EdgeDeleteRepairTest.java | 2 +-
.../graph/impl/stage/EdgeMetaRepairTest.java | 2 +-
.../EdgeMetadataSerializationTest.java | 2 +-
.../EdgeSerializationChopTest.java | 2 +-
.../serialization/EdgeSerializationTest.java | 2 +-
.../serialization/NodeSerializationTest.java | 2 +-
.../impl/shard/EdgeShardSerializationTest.java | 2 +-
.../NodeShardCounterSerializationTest.java | 2 +-
.../persistence/map/MapManagerTest.java | 2 +-
51 files changed, 1205 insertions(+), 405 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
index b4fe095..99067b7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
@@ -22,6 +22,7 @@
package org.apache.usergrid.corepersistence.migration;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxyImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
@@ -39,4 +40,6 @@ public class Versions {
* Version 2. Edge meta changes
*/
public static final int VERSION_2 = EdgeMetadataSerializationProxyImpl.MIGRATION_VERSION;
+
+ public static final int VERSION_3 = MvccEntitySerializationStrategyProxyImpl.MIGRATION_VERSION;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 919e83b..d54c5f7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -23,23 +23,18 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
-import org.apache.usergrid.persistence.model.field.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.VersionSet;
import org.apache.usergrid.persistence.collection.guice.Write;
import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
@@ -49,9 +44,16 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.google.common.base.Preconditions;
@@ -59,11 +61,10 @@ import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.CqlResult;
import com.netflix.astyanax.serializers.StringSerializer;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.core.util.Health;
import rx.Observable;
import rx.Subscriber;
@@ -73,13 +74,12 @@ import rx.schedulers.Schedulers;
/**
- * Simple implementation. Should perform writes, delete and load.
- * <p/>
- * TODO: maybe refactor the stage operations into their own classes for clarity and organization?
+ * Simple implementation. Should perform writes, delete and load. <p/> TODO: maybe refactor the stage operations into
+ * their own classes for clarity and organization?
*/
public class EntityCollectionManagerImpl implements EntityCollectionManager {
- private static final Logger logger = LoggerFactory.getLogger(EntityCollectionManagerImpl.class);
+ private static final Logger logger = LoggerFactory.getLogger( EntityCollectionManagerImpl.class );
private final CollectionScope collectionScope;
@@ -106,24 +106,20 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
@Inject
- public EntityCollectionManagerImpl(
- @Write final WriteStart writeStart,
- @WriteUpdate final WriteStart writeUpdate,
- final WriteUniqueVerify writeVerifyUnique,
- final WriteOptimisticVerify writeOptimisticVerify,
- final WriteCommit writeCommit, final RollbackAction rollback,
- final MarkStart markStart, final MarkCommit markCommit,
- final MvccEntitySerializationStrategy entitySerializationStrategy,
- final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
- final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
- final Keyspace keyspace,
- final SerializationFig config,
- @Assisted final CollectionScope collectionScope
- ) {
+ public EntityCollectionManagerImpl( @Write final WriteStart writeStart, @WriteUpdate final WriteStart writeUpdate,
+ final WriteUniqueVerify writeVerifyUnique,
+ final WriteOptimisticVerify writeOptimisticVerify,
+ final WriteCommit writeCommit, final RollbackAction rollback,
+ final MarkStart markStart, final MarkCommit markCommit,
+ @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
+ final Keyspace keyspace, final SerializationFig config,
+ @Assisted final CollectionScope collectionScope ) {
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy;
- MvccValidationUtils.validateCollectionScope(collectionScope);
+ MvccValidationUtils.validateCollectionScope( collectionScope );
this.writeStart = writeStart;
this.writeUpdate = writeUpdate;
@@ -145,20 +141,20 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
@Override
- public Observable<Entity> write(final Entity entity) {
+ public Observable<Entity> write( final Entity entity ) {
//do our input validation
- Preconditions.checkNotNull(entity, "Entity is required in the new stage of the mvcc write");
+ Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
final Id entityId = entity.getId();
- ValidationUtils.verifyIdentity(entityId);
+ ValidationUtils.verifyIdentity( entityId );
// create our observable and start the write
- final CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>(collectionScope, entity);
+ final CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
- Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner(writeData, writeStart);
+ Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart );
// execute all validation stages concurrently. Needs refactored when this is done.
// https://github.com/Netflix/RxJava/issues/627
@@ -166,115 +162,119 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
// writeVerifyUnique, writeOptimisticVerify );
// return the commit result.
- return observable.map(writeCommit).doOnError(rollback);
+ return observable.map( writeCommit ).doOnError( rollback );
}
@Override
- public Observable<Void> delete(final Id entityId) {
-
- Preconditions.checkNotNull(entityId, "Entity id is required in this stage");
- Preconditions.checkNotNull(entityId.getUuid(), "Entity id is required in this stage");
- Preconditions.checkNotNull(entityId.getType(), "Entity type is required in this stage");
-
- return Observable.from(new CollectionIoEvent<Id>(collectionScope, entityId)).map(markStart)
- .doOnNext(markCommit).map(new Func1<CollectionIoEvent<MvccEntity>, Void>() {
- @Override
- public Void call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
- return null;
- }
- });
+ public Observable<Void> delete( final Id entityId ) {
+
+ Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+ Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
+ Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
+
+ return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) ).map( markStart )
+ .doOnNext( markCommit ).map( new Func1<CollectionIoEvent<MvccEntity>, Void>() {
+ @Override
+ public Void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
+ return null;
+ }
+ } );
}
@Override
- public Observable<Entity> load(final Id entityId) {
+ public Observable<Entity> load( final Id entityId ) {
- Preconditions.checkNotNull(entityId, "Entity id required in the load stage");
- Preconditions.checkNotNull(entityId.getUuid(), "Entity id uuid required in load stage");
- Preconditions.checkNotNull(entityId.getType(), "Entity id type required in load stage");
+ Preconditions.checkNotNull( entityId, "Entity id required in the load stage" );
+ Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" );
+ Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" );
- return load(Collections.singleton(entityId)).map(new Func1<EntitySet, Entity>() {
+ return load( Collections.singleton( entityId ) ).map( new Func1<EntitySet, Entity>() {
@Override
- public Entity call(final EntitySet entitySet) {
- final MvccEntity entity = entitySet.getEntity(entityId);
+ public Entity call( final EntitySet entitySet ) {
+ final MvccEntity entity = entitySet.getEntity( entityId );
- if (entity == null) {
+ if ( entity == null ) {
return null;
}
return entity.getEntity().orNull();
}
- });
+ } );
}
@Override
- public Observable<EntitySet> load(final Collection<Id> entityIds) {
+ public Observable<EntitySet> load( final Collection<Id> entityIds ) {
- Preconditions.checkNotNull(entityIds, "entityIds cannot be null");
+ Preconditions.checkNotNull( entityIds, "entityIds cannot be null" );
- return Observable.create(new Observable.OnSubscribe<EntitySet>() {
+ return Observable.create( new Observable.OnSubscribe<EntitySet>() {
@Override
- public void call(final Subscriber<? super EntitySet> subscriber) {
+ public void call( final Subscriber<? super EntitySet> subscriber ) {
try {
- final EntitySet results = entitySerializationStrategy.load(
- collectionScope, entityIds, UUIDGenerator.newTimeUUID());
+ final EntitySet results =
+ entitySerializationStrategy.load( collectionScope, entityIds, UUIDGenerator.newTimeUUID() );
- subscriber.onNext(results);
+ subscriber.onNext( results );
subscriber.onCompleted();
- } catch (Exception e) {
- subscriber.onError(e);
+ }
+ catch ( Exception e ) {
+ subscriber.onError( e );
}
}
- });
+ } );
}
+
@Override
- public Observable<Id> getIdField(final Field field) {
- final List<Field> fields = Collections.singletonList(field);
- return rx.Observable.from(fields).map(new Func1<Field, Id>() {
+ public Observable<Id> getIdField( final Field field ) {
+ final List<Field> fields = Collections.singletonList( field );
+ return rx.Observable.from( fields ).map( new Func1<Field, Id>() {
@Override
- public Id call(Field field) {
+ public Id call( Field field ) {
try {
- UniqueValueSet set = uniqueValueSerializationStrategy.load(collectionScope, fields);
- UniqueValue value = set.getValue(field.getName());
+ UniqueValueSet set = uniqueValueSerializationStrategy.load( collectionScope, fields );
+ UniqueValue value = set.getValue( field.getName() );
Id id = value == null ? null : value.getEntityId();
return id;
- } catch (ConnectionException e) {
- logger.error("Failed to getIdField", e);
- throw new RuntimeException(e);
+ }
+ catch ( ConnectionException e ) {
+ logger.error( "Failed to getIdField", e );
+ throw new RuntimeException( e );
}
}
- });
+ } );
}
+
@Override
- public Observable<Entity> update(final Entity entity) {
+ public Observable<Entity> update( final Entity entity ) {
- logger.debug("Starting update process");
+ logger.debug( "Starting update process" );
//do our input validation
- Preconditions.checkNotNull(entity, "Entity is required in the new stage of the mvcc write");
+ Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
final Id entityId = entity.getId();
- ValidationUtils.verifyIdentity(entityId);
+ ValidationUtils.verifyIdentity( entityId );
// create our observable and start the write
- CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>(collectionScope, entity);
+ CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
- Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner(writeData, writeUpdate);
+ Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeUpdate );
- return observable.map(writeCommit).doOnNext(new Action1<Entity>() {
+ return observable.map( writeCommit ).doOnNext( new Action1<Entity>() {
@Override
- public void call(final Entity entity) {
- logger.debug("sending entity to the queue");
+ public void call( final Entity entity ) {
+ logger.debug( "sending entity to the queue" );
//we an update, signal the fix
@@ -284,57 +284,56 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
}
- }).doOnError(rollback);
+ } ).doOnError( rollback );
}
// fire the stages
- public Observable<CollectionIoEvent<MvccEntity>> stageRunner(CollectionIoEvent<Entity> writeData,
- WriteStart writeState) {
+ public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
+ WriteStart writeState ) {
- return Observable.from(writeData).map(writeState).doOnNext(
- new Action1<CollectionIoEvent<MvccEntity>>() {
+ return Observable.from( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
- @Override
- public void call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
+ @Override
+ public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
- Observable<CollectionIoEvent<MvccEntity>> unique =
- Observable.from(mvccEntityCollectionIoEvent).subscribeOn(Schedulers.io())
- .doOnNext(writeVerifyUnique);
+ Observable<CollectionIoEvent<MvccEntity>> unique =
+ Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
+ .doOnNext( writeVerifyUnique );
- // optimistic verification
- Observable<CollectionIoEvent<MvccEntity>> optimistic =
- Observable.from(mvccEntityCollectionIoEvent).subscribeOn(Schedulers.io())
- .doOnNext(writeOptimisticVerify);
+ // optimistic verification
+ Observable<CollectionIoEvent<MvccEntity>> optimistic =
+ Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
+ .doOnNext( writeOptimisticVerify );
- //wait for both to finish
- Observable.merge(unique, optimistic).toBlocking().last();
- }
- });
+ //wait for both to finish
+ Observable.merge( unique, optimistic ).toBlocking().last();
+ }
+ } );
}
@Override
- public Observable<VersionSet> getLatestVersion(final Collection<Id> entityIds) {
+ public Observable<VersionSet> getLatestVersion( final Collection<Id> entityIds ) {
- return Observable.create(new Observable.OnSubscribe<VersionSet>() {
+ return Observable.create( new Observable.OnSubscribe<VersionSet>() {
@Override
- public void call(final Subscriber<? super VersionSet> subscriber) {
+ public void call( final Subscriber<? super VersionSet> subscriber ) {
try {
final VersionSet logEntries = mvccLogEntrySerializationStrategy
- .load(collectionScope, entityIds, UUIDGenerator.newTimeUUID());
+ .load( collectionScope, entityIds, UUIDGenerator.newTimeUUID() );
- subscriber.onNext(logEntries);
+ subscriber.onNext( logEntries );
subscriber.onCompleted();
-
- } catch (Exception e) {
- subscriber.onError(e);
+ }
+ catch ( Exception e ) {
+ subscriber.onError( e );
}
}
- });
+ } );
}
@@ -342,25 +341,21 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
public Health getHealth() {
try {
- ColumnFamily<String, String> CF_SYSTEM_LOCAL = new ColumnFamily<String, String>(
- "system.local",
- StringSerializer.get(),
- StringSerializer.get(),
- StringSerializer.get());
+ ColumnFamily<String, String> CF_SYSTEM_LOCAL =
+ new ColumnFamily<String, String>( "system.local", StringSerializer.get(), StringSerializer.get(),
+ StringSerializer.get() );
- OperationResult<CqlResult<String, String>> result = keyspace.prepareQuery(CF_SYSTEM_LOCAL)
- .withCql("SELECT now() FROM system.local;")
- .execute();
+ OperationResult<CqlResult<String, String>> result =
+ keyspace.prepareQuery( CF_SYSTEM_LOCAL ).withCql( "SELECT now() FROM system.local;" ).execute();
if ( result.getResult().getRows().size() == 1 ) {
return Health.GREEN;
}
-
- } catch ( ConnectionException ex ) {
- logger.error("Error connecting to Cassandra", ex);
+ }
+ catch ( ConnectionException ex ) {
+ logger.error( "Error connecting to Cassandra", ex );
}
return Health.RED;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
index b9277eb..93288af 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntitySet;
import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.model.entity.Id;
import com.netflix.astyanax.MutationBatch;
@@ -34,7 +35,7 @@ import com.netflix.astyanax.MutationBatch;
/**
* The interface that allows us to serialize an entity to disk
*/
-public interface MvccEntitySerializationStrategy {
+public interface MvccEntitySerializationStrategy extends Migration {
/**
* Serialize the entity to the data store with the given collection context
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java
index 927a60c..4baef84 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccLogEntrySerializationStrategy.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.model.entity.Id;
import com.netflix.astyanax.MutationBatch;
@@ -34,7 +35,7 @@ import com.netflix.astyanax.MutationBatch;
/**
* The interface that allows us to serialize a log entry to disk
*/
-public interface MvccLogEntrySerializationStrategy {
+public interface MvccLogEntrySerializationStrategy extends Migration {
/**
* Serialize the entity to the data store with the given collection context
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index 9e2d52c..5bcb9f8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.collection.serialization.SerializationFig
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -72,7 +73,7 @@ public class MarkCommit implements Action1<CollectionIoEvent<MvccEntity>> {
@Inject
public MarkCommit( final MvccLogEntrySerializationStrategy logStrat,
- final MvccEntitySerializationStrategy entityStrat,
+ @ProxyImpl final MvccEntitySerializationStrategy entityStrat,
final UniqueValueSerializationStrategy uniqueValueStrat, final SerializationFig serializationFig,
final Keyspace keyspace ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 6a17197..d3c8193 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -37,6 +37,7 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -70,7 +71,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity>
@Inject
public WriteCommit( final MvccLogEntrySerializationStrategy logStrat,
- final MvccEntitySerializationStrategy entryStrat,
+ @ProxyImpl final MvccEntitySerializationStrategy entryStrat,
final UniqueValueSerializationStrategy uniqueValueStrat) {
Preconditions.checkNotNull( logStrat, "MvccLogEntrySerializationStrategy is required" );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
index 4ceb407..030d9d1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@ -23,13 +23,14 @@ import java.util.Collection;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.model.field.Field;
/**
* Reads and writes to UniqueValues column family.
*/
-public interface UniqueValueSerializationStrategy {
+public interface UniqueValueSerializationStrategy extends Migration {
/**
* Write the specified UniqueValue to Cassandra with optional timeToLive in milliseconds.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
index bbaeb4a..1ec027f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
@@ -18,9 +18,7 @@
package org.apache.usergrid.persistence.collection.serialization.impl;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
@@ -54,21 +52,15 @@ import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnList;
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-import com.netflix.astyanax.model.Composites;
import com.netflix.astyanax.model.Row;
import com.netflix.astyanax.query.RowQuery;
import com.netflix.astyanax.serializers.AbstractSerializer;
@@ -80,18 +72,13 @@ import com.netflix.astyanax.serializers.UUIDSerializer;
/**
* @author tnine
*/
-@Singleton
-public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializationStrategy, Migration {
+public abstract class MvccEntitySerializationStrategyImpl implements MvccEntitySerializationStrategy {
private static final Logger log = LoggerFactory.getLogger( MvccLogEntrySerializationStrategyImpl.class );
- private static final EntitySerializer ENTITY_JSON_SER = new EntitySerializer();
- private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
-
- private static final ByteBufferSerializer BUFFER_SERIALIZER = ByteBufferSerializer.get();
- private static final BytesArraySerializer BYTES_ARRAY_SERIALIZER = BytesArraySerializer.get();
+ private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
private static final CollectionScopedRowKeySerializer<Id> ROW_KEY_SER =
@@ -104,6 +91,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
protected final Keyspace keyspace;
protected final SerializationFig serializationFig;
protected final EntityRepair repair;
+ private final AbstractSerializer<EntityWrapper> entityJsonSerializer;
@Inject
@@ -111,6 +99,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
this.keyspace = keyspace;
this.serializationFig = serializationFig;
this.repair = new EntityRepairImpl( this, serializationFig );
+ this.entityJsonSerializer = getEntitySerializer();
}
@@ -126,7 +115,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
@Override
public void doOp( final ColumnListMutation<UUID> colMutation ) {
try {
- colMutation.putColumn( colName, ENTITY_JSON_SER
+ colMutation.putColumn( colName, entityJsonSerializer
.toByteBuffer( new EntityWrapper( entity.getStatus(), entity.getEntity() ) ) );
}
catch ( Exception e ) {
@@ -207,7 +196,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
final Column<UUID> column = columns.getColumnByIndex( 0 );
- final MvccEntity parsedEntity = new MvccColumnParser( entityId ).parseColumn( column );
+ final MvccEntity parsedEntity = new MvccColumnParser( entityId, entityJsonSerializer ).parseColumn( column );
//we *might* need to repair, it's not clear so check before loading into result sets
final MvccEntity maybeRepaired = repair.maybeRepair( collectionScope, parsedEntity );
@@ -245,7 +234,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
keyspace.prepareQuery( CF_ENTITY_DATA ).getKey( rowKey )
.withColumnRange( version, null, false, fetchSize );
- return new ColumnNameIterator( query, new MvccColumnParser( entityId ), false );
+ return new ColumnNameIterator( query, new MvccColumnParser( entityId, entityJsonSerializer ), false );
}
@@ -275,7 +264,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
keyspace.prepareQuery( CF_ENTITY_DATA ).getKey( rowKey )
.withColumnRange( null, version, true, fetchSize );
- return new ColumnNameIterator( query, new MvccColumnParser( entityId ), false );
+ return new ColumnNameIterator( query, new MvccColumnParser( entityId, entityJsonSerializer ), false );
}
@@ -290,7 +279,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
return doWrite( collectionScope, entityId, new RowOp() {
@Override
public void doOp( final ColumnListMutation<UUID> colMutation ) {
- colMutation.putColumn( version, ENTITY_JSON_SER
+ colMutation.putColumn( version, entityJsonSerializer
.toByteBuffer( new EntityWrapper( MvccEntity.Status.COMPLETE, Optional.<Entity>absent() ) ) );
}
} );
@@ -367,12 +356,12 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
/**
* Simple bean wrapper for state and entity
*/
- private static class EntityWrapper {
- private final MvccEntity.Status status;
- private final Optional<Entity> entity;
+ protected static class EntityWrapper {
+ protected final MvccEntity.Status status;
+ protected final Optional<Entity> entity;
- private EntityWrapper( final MvccEntity.Status status, final Optional<Entity> entity ) {
+ protected EntityWrapper( final MvccEntity.Status status, final Optional<Entity> entity ) {
this.status = status;
this.entity = entity;
}
@@ -385,10 +374,12 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
private static final class MvccColumnParser implements ColumnParser<UUID, MvccEntity> {
private final Id id;
+ private final AbstractSerializer<EntityWrapper> entityJsonSerializer;
- private MvccColumnParser( Id id ) {
+ private MvccColumnParser( Id id, final AbstractSerializer<EntityWrapper> entityJsonSerializer ) {
this.id = id;
+ this.entityJsonSerializer = entityJsonSerializer;
}
@@ -399,7 +390,7 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
final UUID version = column.getName();
try {
- deSerialized = column.getValue( ENTITY_JSON_SER );
+ deSerialized = column.getValue( entityJsonSerializer );
}
catch ( DataCorruptionException e ) {
log.error(
@@ -421,128 +412,9 @@ public class MvccEntitySerializationStrategyImpl implements MvccEntitySerializat
}
- public static class EntitySerializer extends AbstractSerializer<EntityWrapper> {
-
-
- public static final SmileFactory f = new SmileFactory();
-
- public static ObjectMapper mapper;
-
- private static byte[] STATE_COMPLETE = new byte[] { 0 };
- private static byte[] STATE_DELETED = new byte[] { 1 };
- private static byte[] STATE_PARTIAL = new byte[] { 2 };
-
- private static byte[] VERSION = new byte[] { 0 };
-
-
-
- public EntitySerializer() {
- try {
- mapper = new ObjectMapper( f );
- // mapper.enable(SerializationFeature.INDENT_OUTPUT); don't indent output,
- // causes slowness
- mapper.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
- }
- catch ( Exception e ) {
- throw new RuntimeException( "Error setting up mapper", e );
- }
- }
-
-
- @Override
- public ByteBuffer toByteBuffer( final EntityWrapper wrapper ) {
- if ( wrapper == null ) {
- return null;
- }
-
- CompositeBuilder builder = Composites.newCompositeBuilder();
-
- builder.addBytes( VERSION );
-
- //mark this version as empty
- if ( !wrapper.entity.isPresent() ) {
- //we're empty
- builder.addBytes( STATE_DELETED );
-
- return builder.build();
- }
-
- //we have an entity
-
- if ( wrapper.status == MvccEntity.Status.COMPLETE ) {
- builder.addBytes( STATE_COMPLETE );
- }
-
- else {
- builder.addBytes( STATE_PARTIAL );
- }
-
- try {
- final byte[] entityBytes = mapper.writeValueAsBytes( wrapper.entity.get() ) ;
- builder.addBytes( entityBytes );
- }
- catch ( Exception e ) {
- throw new RuntimeException( "Unable to serialize entity", e );
- }
-
- return builder.build();
- }
-
-
- @Override
- public EntityWrapper fromByteBuffer( final ByteBuffer byteBuffer ) {
-
- /**
- * We intentionally turn data corruption exceptions when we're unable to de-serialize
- * the data in cassandra. If this occurs, we'll never be able to de-serialize it
- * and it should be considered lost. This is an error that is occuring due to a bug
- * in serializing the entity. This is a lazy recognition + repair signal for deployment with
- * existing systems.
- */
- CompositeParser parser;
- try {
- parser = Composites.newCompositeParser( byteBuffer );
- }
- catch ( Exception e ) {
- throw new DataCorruptionException("Unable to de-serialze entity", e);
- }
-
- byte[] version = parser.read( BYTES_ARRAY_SERIALIZER );
-
- if ( !Arrays.equals( VERSION, version ) ) {
- throw new UnsupportedOperationException( "A version of type " + version + " is unsupported" );
- }
-
- byte[] state = parser.read( BYTES_ARRAY_SERIALIZER );
-
- // it's been deleted, remove it
-
- if ( Arrays.equals( STATE_DELETED, state ) ) {
- return new EntityWrapper( MvccEntity.Status.COMPLETE, Optional.<Entity>absent() );
- }
-
- Entity storedEntity;
-
- ByteBuffer jsonBytes = parser.read( BUFFER_SERIALIZER );
- byte[] array = jsonBytes.array();
- int start = jsonBytes.arrayOffset();
- int length = jsonBytes.remaining();
-
- try {
- storedEntity = mapper.readValue( array, start, length, Entity.class );
- }
- catch ( Exception e ) {
- throw new DataCorruptionException( "Unable to read entity data", e );
- }
-
- final Optional<Entity> entity = Optional.of( storedEntity );
-
- if ( Arrays.equals( STATE_COMPLETE, state ) ) {
- return new EntityWrapper( MvccEntity.Status.COMPLETE, entity );
- }
-
- // it's partial by default
- return new EntityWrapper( MvccEntity.Status.PARTIAL, entity );
- }
- }
+ /**
+ * Return the entity serializer for this instance
+ * @return
+ */
+ protected abstract AbstractSerializer<EntityWrapper> getEntitySerializer();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
new file mode 100644
index 0000000..a9e01b1
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.guice.CurrentImpl;
+import org.apache.usergrid.persistence.core.guice.PreviousImpl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+
+
+/**
+ * Version 3 implementation of entity serialization. This will proxy writes and reads so that during
+ * migration data goes to both sources and is read from the old source. After the ugprade completes,
+ * it will be available from the new source
+ */
+@Singleton
+public class MvccEntitySerializationStrategyProxyImpl implements MvccEntitySerializationStrategy {
+
+
+ public static final int MIGRATION_VERSION = 3;
+
+ private final DataMigrationManager dataMigrationManager;
+ private final Keyspace keyspace;
+ private final MvccEntitySerializationStrategy previous;
+ private final MvccEntitySerializationStrategy current;
+
+
+ @Inject
+ public MvccEntitySerializationStrategyProxyImpl( final DataMigrationManager dataMigrationManager,
+ final Keyspace keyspace,
+ @PreviousImpl final MvccEntitySerializationStrategy previous,
+ @CurrentImpl final MvccEntitySerializationStrategy current ) {
+ this.dataMigrationManager = dataMigrationManager;
+ this.keyspace = keyspace;
+ this.previous = previous;
+ this.current = current;
+ }
+
+
+ @Override
+ public MutationBatch write( final CollectionScope context, final MvccEntity entity ) {
+ if ( isOldVersion() ) {
+ final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
+
+ aggregateBatch.mergeShallow( previous.write( context, entity ) );
+ aggregateBatch.mergeShallow( current.write( context, entity ) );
+
+ return aggregateBatch;
+ }
+
+ return current.write( context, entity );
+ }
+
+
+ @Override
+ public EntitySet load( final CollectionScope scope, final Collection<Id> entityIds, final UUID maxVersion ) {
+ if ( isOldVersion() ) {
+ return previous.load( scope, entityIds, maxVersion );
+ }
+
+ return current.load( scope, entityIds, maxVersion );
+ }
+
+
+ @Override
+ public Iterator<MvccEntity> load( final CollectionScope context, final Id entityId, final UUID version,
+ final int fetchSize ) {
+ if ( isOldVersion() ) {
+ return previous.load( context, entityId, version, fetchSize );
+ }
+
+ return current.load( context, entityId, version, fetchSize );
+ }
+
+
+ @Override
+ public Iterator<MvccEntity> loadHistory( final CollectionScope context, final Id entityId, final UUID version,
+ final int fetchSize ) {
+ if ( isOldVersion() ) {
+ return previous.loadHistory( context, entityId, version, fetchSize );
+ }
+
+ return current.loadHistory( context, entityId, version, fetchSize );
+ }
+
+
+ @Override
+ public MutationBatch mark( final CollectionScope context, final Id entityId, final UUID version ) {
+ if ( isOldVersion() ) {
+ final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
+
+ aggregateBatch.mergeShallow( previous.mark( context, entityId, version ) );
+ aggregateBatch.mergeShallow( current.mark( context, entityId, version ) );
+
+ return aggregateBatch;
+ }
+
+ return current.mark( context, entityId, version );
+ }
+
+
+ @Override
+ public MutationBatch delete( final CollectionScope context, final Id entityId, final UUID version ) {
+ if ( isOldVersion() ) {
+ final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
+
+ aggregateBatch.mergeShallow( previous.delete( context, entityId, version ) );
+ aggregateBatch.mergeShallow( current.delete( context, entityId, version ) );
+
+ return aggregateBatch;
+ }
+
+ return current.delete( context, entityId, version );
+ }
+
+
+ /**
+ * Return true if we're on an old version
+ */
+ private boolean isOldVersion() {
+ return dataMigrationManager.getCurrentVersion() < MIGRATION_VERSION;
+ }
+
+
+ @Override
+ public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
+ return Collections.emptyList();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
new file mode 100644
index 0000000..49b3486
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV1Impl.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.model.Composites;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.ByteBufferSerializer;
+import com.netflix.astyanax.serializers.BytesArraySerializer;
+
+
+/**
+ * Version 1 implementation of entity serialization
+ */
+@Singleton
+public class MvccEntitySerializationStrategyV1Impl extends MvccEntitySerializationStrategyImpl {
+
+ private static final EntitySerializer ENTITY_JSON_SER = new EntitySerializer();
+
+
+ @Inject
+ public MvccEntitySerializationStrategyV1Impl( final Keyspace keyspace, final SerializationFig serializationFig ) {
+ super( keyspace, serializationFig );
+ }
+
+
+ @Override
+ protected AbstractSerializer<MvccEntitySerializationStrategyImpl.EntityWrapper> getEntitySerializer() {
+ return ENTITY_JSON_SER;
+ }
+
+
+ public static class EntitySerializer extends AbstractSerializer<EntityWrapper> {
+
+
+ private static final ByteBufferSerializer BUFFER_SERIALIZER = ByteBufferSerializer.get();
+
+ private static final BytesArraySerializer BYTES_ARRAY_SERIALIZER = BytesArraySerializer.get();
+
+
+ public static final SmileFactory f = new SmileFactory();
+
+ public static ObjectMapper mapper;
+
+ private static byte[] STATE_COMPLETE = new byte[] { 0 };
+ private static byte[] STATE_DELETED = new byte[] { 1 };
+ private static byte[] STATE_PARTIAL = new byte[] { 2 };
+
+ private static byte[] VERSION = new byte[] { 0 };
+
+
+ public EntitySerializer() {
+ try {
+ mapper = new ObjectMapper( f );
+ // mapper.enable(SerializationFeature.INDENT_OUTPUT); don't indent output,
+ // causes slowness
+ mapper.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Error setting up mapper", e );
+ }
+ }
+
+
+ @Override
+ public ByteBuffer toByteBuffer( final EntityWrapper wrapper ) {
+ if ( wrapper == null ) {
+ return null;
+ }
+
+ CompositeBuilder builder = Composites.newCompositeBuilder();
+
+ builder.addBytes( VERSION );
+
+ //mark this version as empty
+ if ( !wrapper.entity.isPresent() ) {
+ //we're empty
+ builder.addBytes( STATE_DELETED );
+
+ return builder.build();
+ }
+
+ //we have an entity
+
+ if ( wrapper.status == MvccEntity.Status.COMPLETE ) {
+ builder.addBytes( STATE_COMPLETE );
+ }
+
+ else {
+ builder.addBytes( STATE_PARTIAL );
+ }
+
+ try {
+ final byte[] entityBytes = mapper.writeValueAsBytes( wrapper.entity.get() );
+ builder.addBytes( entityBytes );
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Unable to serialize entity", e );
+ }
+
+ return builder.build();
+ }
+
+
+ @Override
+ public EntityWrapper fromByteBuffer( final ByteBuffer byteBuffer ) {
+
+ /**
+ * We intentionally turn data corruption exceptions when we're unable to de-serialize
+ * the data in cassandra. If this occurs, we'll never be able to de-serialize it
+ * and it should be considered lost. This is an error that is occuring due to a bug
+ * in serializing the entity. This is a lazy recognition + repair signal for deployment with
+ * existing systems.
+ */
+ CompositeParser parser;
+ try {
+ parser = Composites.newCompositeParser( byteBuffer );
+ }
+ catch ( Exception e ) {
+ throw new DataCorruptionException( "Unable to de-serialze entity", e );
+ }
+
+ byte[] version = parser.read( BYTES_ARRAY_SERIALIZER );
+
+ if ( !Arrays.equals( VERSION, version ) ) {
+ throw new UnsupportedOperationException( "A version of type " + version + " is unsupported" );
+ }
+
+ byte[] state = parser.read( BYTES_ARRAY_SERIALIZER );
+
+ // it's been deleted, remove it
+
+ if ( Arrays.equals( STATE_DELETED, state ) ) {
+ return new EntityWrapper( MvccEntity.Status.COMPLETE, Optional.<Entity>absent() );
+ }
+
+ Entity storedEntity;
+
+ ByteBuffer jsonBytes = parser.read( BUFFER_SERIALIZER );
+ byte[] array = jsonBytes.array();
+ int start = jsonBytes.arrayOffset();
+ int length = jsonBytes.remaining();
+
+ try {
+ storedEntity = mapper.readValue( array, start, length, Entity.class );
+ }
+ catch ( Exception e ) {
+ throw new DataCorruptionException( "Unable to read entity data", e );
+ }
+
+ final Optional<Entity> entity = Optional.of( storedEntity );
+
+ if ( Arrays.equals( STATE_COMPLETE, state ) ) {
+ return new EntityWrapper( MvccEntity.Status.COMPLETE, entity );
+ }
+
+ // it's partial by default
+ return new EntityWrapper( MvccEntity.Status.PARTIAL, entity );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
new file mode 100644
index 0000000..f80960d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyV2Impl.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.model.Composites;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.ByteBufferSerializer;
+import com.netflix.astyanax.serializers.BytesArraySerializer;
+
+
+/**
+ * Version 1 implementation of entity serialization
+ */
+@Singleton
+public class MvccEntitySerializationStrategyV2Impl extends MvccEntitySerializationStrategyImpl {
+
+ private static final EntitySerializer ENTITY_JSON_SER = new EntitySerializer();
+
+
+ @Inject
+ public MvccEntitySerializationStrategyV2Impl( final Keyspace keyspace, final SerializationFig serializationFig ) {
+ super( keyspace, serializationFig );
+ }
+
+
+ @Override
+ protected AbstractSerializer<EntityWrapper> getEntitySerializer() {
+ return ENTITY_JSON_SER;
+ }
+
+
+ public static class EntitySerializer extends AbstractSerializer<EntityWrapper> {
+
+
+ private static final ByteBufferSerializer BUFFER_SERIALIZER = ByteBufferSerializer.get();
+
+ private static final BytesArraySerializer BYTES_ARRAY_SERIALIZER = BytesArraySerializer.get();
+
+
+ public static final SmileFactory f = new SmileFactory();
+
+ public static ObjectMapper mapper;
+
+ private static byte[] STATE_COMPLETE = new byte[] { 0 };
+ private static byte[] STATE_DELETED = new byte[] { 1 };
+ private static byte[] STATE_PARTIAL = new byte[] { 2 };
+
+ private static byte[] VERSION = new byte[] { 0 };
+
+
+ public EntitySerializer() {
+ try {
+ mapper = new ObjectMapper( f );
+ // mapper.enable(SerializationFeature.INDENT_OUTPUT); don't indent output,
+ // causes slowness
+ mapper.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Error setting up mapper", e );
+ }
+ }
+
+
+ @Override
+ public ByteBuffer toByteBuffer( final EntityWrapper wrapper ) {
+ if ( wrapper == null ) {
+ return null;
+ }
+
+ CompositeBuilder builder = Composites.newCompositeBuilder();
+
+ builder.addBytes( VERSION );
+
+ //mark this version as empty
+ if ( !wrapper.entity.isPresent() ) {
+ //we're empty
+ builder.addBytes( STATE_DELETED );
+
+ return builder.build();
+ }
+
+ //we have an entity
+
+ if ( wrapper.status == MvccEntity.Status.COMPLETE ) {
+ builder.addBytes( STATE_COMPLETE );
+ }
+
+ else {
+ builder.addBytes( STATE_PARTIAL );
+ }
+
+ try {
+ final byte[] entityBytes = mapper.writeValueAsBytes( wrapper.entity.get() );
+ builder.addBytes( entityBytes );
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Unable to serialize entity", e );
+ }
+
+ return builder.build();
+ }
+
+
+ @Override
+ public EntityWrapper fromByteBuffer( final ByteBuffer byteBuffer ) {
+
+ /**
+ * We intentionally turn data corruption exceptions when we're unable to de-serialize
+ * the data in cassandra. If this occurs, we'll never be able to de-serialize it
+ * and it should be considered lost. This is an error that is occuring due to a bug
+ * in serializing the entity. This is a lazy recognition + repair signal for deployment with
+ * existing systems.
+ */
+ CompositeParser parser;
+ try {
+ parser = Composites.newCompositeParser( byteBuffer );
+ }
+ catch ( Exception e ) {
+ throw new DataCorruptionException( "Unable to de-serialze entity", e );
+ }
+
+ byte[] version = parser.read( BYTES_ARRAY_SERIALIZER );
+
+ if ( !Arrays.equals( VERSION, version ) ) {
+ throw new UnsupportedOperationException( "A version of type " + version + " is unsupported" );
+ }
+
+ byte[] state = parser.read( BYTES_ARRAY_SERIALIZER );
+
+ // it's been deleted, remove it
+
+ if ( Arrays.equals( STATE_DELETED, state ) ) {
+ return new EntityWrapper( MvccEntity.Status.COMPLETE, Optional.<Entity>absent() );
+ }
+
+ Entity storedEntity;
+
+ ByteBuffer jsonBytes = parser.read( BUFFER_SERIALIZER );
+ byte[] array = jsonBytes.array();
+ int start = jsonBytes.arrayOffset();
+ int length = jsonBytes.remaining();
+
+ try {
+ storedEntity = mapper.readValue( array, start, length, Entity.class );
+ }
+ catch ( Exception e ) {
+ throw new DataCorruptionException( "Unable to read entity data", e );
+ }
+
+ final Optional<Entity> entity = Optional.of( storedEntity );
+
+ if ( Arrays.equals( STATE_COMPLETE, state ) ) {
+ return new EntityWrapper( MvccEntity.Status.COMPLETE, entity );
+ }
+
+ // it's partial by default
+ return new EntityWrapper( MvccEntity.Status.PARTIAL, entity );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
index 7368e9a..831091d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
@@ -71,7 +71,7 @@ import com.netflix.astyanax.serializers.UUIDSerializer;
* @author tnine
*/
@Singleton
-public class MvccLogEntrySerializationStrategyImpl implements MvccLogEntrySerializationStrategy, Migration {
+public class MvccLogEntrySerializationStrategyImpl implements MvccLogEntrySerializationStrategy {
private static final Logger LOG = LoggerFactory.getLogger( MvccLogEntrySerializationStrategyImpl.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
index 8a4a9ac..eb9c374 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
@@ -21,9 +21,13 @@ package org.apache.usergrid.persistence.collection.serialization.impl;
import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.CurrentImpl;
+import org.apache.usergrid.persistence.core.guice.PreviousImpl;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import com.google.inject.AbstractModule;
+import com.google.inject.Key;
import com.google.inject.multibindings.Multibinder;
@@ -37,14 +41,23 @@ public class SerializationModule extends AbstractModule {
// bind the serialization strategies
- bind( MvccEntitySerializationStrategy.class ).to( MvccEntitySerializationStrategyImpl.class );
+
+ //We've migrated this one, so we need to set up the previous, current, and proxy
+ bind( MvccEntitySerializationStrategy.class ).annotatedWith( PreviousImpl.class )
+ .to( MvccEntitySerializationStrategyV1Impl.class );
+ bind( MvccEntitySerializationStrategy.class ).annotatedWith( CurrentImpl.class )
+ .to( MvccEntitySerializationStrategyV2Impl.class );
+ bind( MvccEntitySerializationStrategy.class ).annotatedWith( ProxyImpl.class )
+ .to( MvccEntitySerializationStrategyProxyImpl.class );
+
bind( MvccLogEntrySerializationStrategy.class ).to( MvccLogEntrySerializationStrategyImpl.class );
bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );
//do multibindings for migrations
Multibinder<Migration> uriBinder = Multibinder.newSetBinder( binder(), Migration.class );
- uriBinder.addBinding().to( MvccEntitySerializationStrategyImpl.class );
- uriBinder.addBinding().to( MvccLogEntrySerializationStrategyImpl.class );
- uriBinder.addBinding().to( UniqueValueSerializationStrategyImpl.class );
+ uriBinder.addBinding().to( Key.get( MvccEntitySerializationStrategy.class, PreviousImpl.class ) );
+ uriBinder.addBinding().to( Key.get( MvccEntitySerializationStrategy.class, CurrentImpl.class ) );
+ uriBinder.addBinding().to( Key.get( MvccLogEntrySerializationStrategy.class ) );
+ uriBinder.addBinding().to( Key.get( UniqueValueSerializationStrategy.class ) );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index b6b3cab..be95b08 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@ -57,7 +57,7 @@ import com.netflix.astyanax.util.RangeBuilder;
/**
* Reads and writes to UniqueValues column family.
*/
-public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializationStrategy, Migration {
+public class UniqueValueSerializationStrategyImpl implements UniqueValueSerializationStrategy {
private static final Logger log = LoggerFactory.getLogger( UniqueValueSerializationStrategyImpl.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactoryTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactoryTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactoryTest.java
index f2deb3e..0d44e98 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactoryTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactoryTest.java
@@ -6,7 +6,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
import org.apache.usergrid.persistence.core.test.ITRunner;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index 66f162f..2d18675 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -23,18 +23,19 @@ import java.util.Arrays;
import java.util.List;
import java.util.UUID;
-import org.apache.usergrid.persistence.core.test.UseModules;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
-import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
@@ -42,10 +43,10 @@ import org.apache.usergrid.persistence.model.field.BooleanField;
import org.apache.usergrid.persistence.model.field.Field;
import org.apache.usergrid.persistence.model.field.IntegerField;
import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.fasterxml.uuid.UUIDComparator;
import com.google.inject.Inject;
-import org.apache.usergrid.persistence.core.util.Health;
import rx.Observable;
@@ -58,8 +59,8 @@ import static org.junit.Assert.fail;
/** @author tnine */
-@RunWith(ITRunner.class)
-@UseModules(TestCollectionModule.class)
+@RunWith( ITRunner.class )
+@UseModules( TestCollectionModule.class )
public class EntityCollectionManagerIT {
@Inject
private EntityCollectionManagerFactory factory;
@@ -533,7 +534,7 @@ public class EntityCollectionManagerIT {
}
- @Test(expected = IllegalArgumentException.class)
+ @Test( expected = IllegalArgumentException.class )
public void readTooLarge() {
final CollectionScope context =
@@ -630,8 +631,7 @@ public class EntityCollectionManagerIT {
final UUID v2Version = v2Created.getVersion();
- assertTrue( "Newer version in v2",
- UUIDComparator.staticCompare( v2Version, v1Version) > 0 );
+ assertTrue( "Newer version in v2", UUIDComparator.staticCompare( v2Version, v1Version ) > 0 );
final VersionSet resultsV2 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last();
@@ -678,12 +678,9 @@ public class EntityCollectionManagerIT {
final UUID v2Version = v2Created.getVersion();
-
-
assertEquals( "Same entityId", v1Created.getId(), v2Created.getId() );
- assertTrue( "Newer version in v2",
- UUIDComparator.staticCompare( v2Version, v1Version ) > 0 );
+ assertTrue( "Newer version in v2", UUIDComparator.staticCompare( v2Version, v1Version ) > 0 );
final VersionSet resultsV2 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last();
@@ -700,11 +697,60 @@ public class EntityCollectionManagerIT {
@Test
public void healthTest() {
- CollectionScope context = new CollectionScopeImpl(
- new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+ CollectionScope context =
+ new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
final EntityCollectionManager manager = factory.createCollectionManager( context );
assertEquals( Health.GREEN, manager.getHealth() );
}
+
+
+ /**
+ * Tests an entity with more than 65535 bytes worth of data
+ */
+ @Test
+ public void largeEntityWriteRead() {
+ final int setSize = 65535 * 2;
+
+ int currentLength = 0;
+
+ final Entity entity = new Entity( new SimpleId( "test" ) );
+
+ //generate a really large string value
+ StringBuilder builder = new StringBuilder();
+
+ for ( int i = 0; i < 100; i++ ) {
+ builder.append( UUIDGenerator.newTimeUUID().toString() );
+ }
+
+ final String value = builder.toString();
+
+
+ //loop until our size is beyond the set size
+ for ( int i = 0; currentLength < setSize; i++ ) {
+ final String key = "newStringField" + i;
+
+ entity.setField( new StringField( key, value ) );
+
+ currentLength += key.length() + value.length();
+ }
+
+
+ //now we have one massive, entity, save it and retrieve it.
+ CollectionScope context =
+ new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+
+ final EntityCollectionManager manager = factory.createCollectionManager( context );
+
+ final Entity saved = manager.write( entity ).toBlocking().last();
+
+
+ assertEquals( entity, saved );
+
+ //now load it
+ final Entity loaded = manager.load( entity.getId() ).toBlocking().last();
+
+ assertEquals( entity, loaded );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
index d4f4d81..11ad389 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerStressTest.java
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.time.StopWatch;
-import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
import org.apache.usergrid.persistence.core.test.ITRunner;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerSyncIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerSyncIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerSyncIT.java
index ec35ed2..d4881de 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerSyncIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerSyncIT.java
@@ -6,7 +6,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
import org.apache.usergrid.persistence.core.test.ITRunner;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2bd1c950/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java
deleted file mode 100644
index 6b02b63..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/MigrationManagerRule.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package org.apache.usergrid.persistence.collection.guice;
-
-
-import org.junit.rules.ExternalResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
-import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-
-/**
- */
-@Singleton
-public class MigrationManagerRule extends ExternalResource {
- private static final Logger LOG = LoggerFactory.getLogger( MigrationManagerRule.class );
-
- private MigrationManager migrationManager;
-
-
- @Inject
- public void setMigrationManager( MigrationManager migrationManager ) {
- this.migrationManager = migrationManager;
- }
-
-
- @Override
- protected void before() throws MigrationException {
- LOG.info( "Starting migration" );
-
- migrationManager.migrate();
-
- LOG.info( "Migration complete" );
- }
-}