You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/07 18:41:00 UTC
[5/5] incubator-usergrid git commit: WIP overwrite
WIP overwrite
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/70e0e75a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/70e0e75a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/70e0e75a
Branch: refs/heads/USERGRID-614
Commit: 70e0e75aae435df0a5045008625e2dc3d7ead37e
Parents: 45aed6c
Author: Todd Nine <tn...@apigee.com>
Authored: Thu May 7 10:40:52 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu May 7 10:40:52 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 2 +-
.../corepersistence/CpEntityManagerFactory.java | 2 +-
.../corepersistence/CpRelationManager.java | 4 +-
.../migration/AppInfoMigrationPlugin.java | 3 +-
.../collection/EntityCollectionManager.java | 10 +-
.../cache/CachedEntityCollectionManager.java | 10 +-
.../EntityCollectionManagerFactoryImpl.java | 20 +-
.../impl/EntityCollectionManagerImpl.java | 74 ++++---
.../mvcc/stage/delete/VersionCompact.java | 14 +-
.../serialization/SerializationFig.java | 30 +--
.../serialization/impl/LogEntryObservable.java | 54 -----
.../impl/MinMaxLogEntryIterator.java | 27 ++-
.../MvccLogEntrySerializationStrategyImpl.java | 28 +--
.../collection/EntityCollectionManagerIT.java | 215 ++++++++++++-------
.../impl/MinMaxLogEntryIteratorTest.java | 7 +-
...ccLogEntrySerializationStrategyImplTest.java | 59 ++++-
.../collection/util/LogEntryMock.java | 103 ++++-----
.../src/test/resources/log4j.properties | 1 +
.../usergrid/persistence/graph/GraphFig.java | 26 +--
19 files changed, 377 insertions(+), 312 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 460fc11..8bcc73a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -664,7 +664,7 @@ public class CpEntityManager implements EntityManager {
//delete it asynchronously
indexService.queueEntityDelete( applicationScope, entityId );
- return ecm.delete( entityId );
+ return ecm.mark( entityId );
}
else {
return Observable.empty();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 6c375ef..e7ad682 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -369,7 +369,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
}
final ApplicationEntityIndex aei = entityIndexFactory.createApplicationEntityIndex(applicationScope);
final GraphManager managementGraphManager = managerCache.getGraphManager(managementAppScope);
- final Observable deleteNodeGraph = managementGraphManager.deleteNode(applicationId, Long.MAX_VALUE);
+ final Observable deleteNodeGraph = managementGraphManager.markNode( applicationId, Long.MAX_VALUE );
final Observable deleteAppFromIndex = aei.deleteApplication();
return Observable.concat(copyConnections, deleteNodeGraph, deleteAppFromIndex)
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 6adeefc..df3fa82 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -528,7 +528,7 @@ public class CpRelationManager implements RelationManager {
//run our delete
final Edge collectionToItemEdge = createCollectionEdge( cpHeadEntity.getId(), collName, memberEntity.getId() );
- gm.deleteEdge( collectionToItemEdge ).toBlocking().last();
+ gm.markEdge( collectionToItemEdge ).toBlocking().last();
/**
@@ -782,7 +782,7 @@ public class CpRelationManager implements RelationManager {
//delete all the edges
final Edge lastEdge =
- gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.deleteEdge( returnedEdge ) ).toBlocking()
+ gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) ).toBlocking()
.lastOrDefault( null );
if ( lastEdge != null ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
index 30955af..97b87b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
@@ -42,7 +42,6 @@ import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
-import rx.functions.Func1;
import java.util.HashMap;
import java.util.Map;
@@ -200,7 +199,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
final ApplicationScope systemAppScope = getApplicationScope(CpNamingUtils.SYSTEM_APP_ID );
final EntityCollectionManager systemCollectionManager =
entityCollectionManagerFactory.createCollectionManager( systemAppScope );
- systemCollectionManager.delete(new SimpleId(uuid, "appinfos")).toBlocking().last();
+ systemCollectionManager.mark( new SimpleId( uuid, "appinfos" ) ).toBlocking().last();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 8c27825..9de8f41 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -46,11 +46,12 @@ public interface EntityCollectionManager {
/**
- * @param entityId MarkCommit the entity as deleted
+ * @param entityId MarkCommit the entity as deleted. Will not actually remove it from cassandra. This operation will
+ * also remove all unique properties for this entity
*
* @return The observable of the id after the operation has completed
*/
- Observable<Id> delete( Id entityId );
+ Observable<Id> mark( Id entityId );
/**
* @param entityId The entity id to load.
@@ -104,11 +105,12 @@ public interface EntityCollectionManager {
Observable<MvccLogEntry> getVersions(final Id entityId);
/**
- * Remove these versions. Must be atomic so that read log entries are removed
+ * Delete these versions from cassandra. Must be atomic so that read log entries are only removed. Entity data
+ * and log entry will be deleted
* @param entries
* @return Any observable of all successfully compacted log entries
*/
- Observable<MvccLogEntry> compact(final Collection<MvccLogEntry> entries);
+ Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries );
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
index 9412516..7a04b8d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
@@ -85,8 +85,8 @@ public class CachedEntityCollectionManager implements EntityCollectionManager {
@Override
- public Observable<Id> delete( final Id entityId ) {
- return targetEntityCollectionManager.delete( entityId ).doOnNext( new Action1<Id>() {
+ public Observable<Id> mark( final Id entityId ) {
+ return targetEntityCollectionManager.mark( entityId ).doOnNext( new Action1<Id>() {
@Override
public void call( final Id id ) {
entityCache.invalidate( id );
@@ -129,13 +129,13 @@ public class CachedEntityCollectionManager implements EntityCollectionManager {
@Override
public Observable<MvccLogEntry> getVersions( final Id entityId ) {
- return null;
+ return targetEntityCollectionManager.getVersions( entityId );
}
@Override
- public Observable<MvccLogEntry> compact( final Collection<MvccLogEntry> entries ) {
- return targetEntityCollectionManager.compact( entries );
+ public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) {
+ return targetEntityCollectionManager.delete( entries );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index c4422f7..50a4bfc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -29,6 +29,8 @@ import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionMa
import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.UniqueCleanup;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
@@ -36,6 +38,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
@@ -65,6 +68,9 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
private final RollbackAction rollback;
private final MarkStart markStart;
private final MarkCommit markCommit;
+ private final UniqueCleanup uniqueCleanup;
+ private final VersionCompact versionCompact;
+ private final SerializationFig serializationFig;
private final MvccEntitySerializationStrategy entitySerializationStrategy;
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
@@ -80,10 +86,11 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
//create the target EM that will perform logic
final EntityCollectionManager target = new EntityCollectionManagerImpl(
writeStart, writeVerifyUnique,
- writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
+ writeOptimisticVerify, writeCommit, rollback, markStart, markCommit, uniqueCleanup, versionCompact,
entitySerializationStrategy, uniqueValueSerializationStrategy,
- mvccLogEntrySerializationStrategy, keyspace, scope, metricsFactory,
- rxTaskScheduler );
+ mvccLogEntrySerializationStrategy, keyspace,
+ metricsFactory, serializationFig,
+ rxTaskScheduler, scope );
final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target );
@@ -98,7 +105,9 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
final WriteOptimisticVerify writeOptimisticVerify,
final WriteCommit writeCommit, final RollbackAction rollback,
final MarkStart markStart, final MarkCommit markCommit,
- final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact,
+ final SerializationFig serializationFig, final
+ MvccEntitySerializationStrategy entitySerializationStrategy,
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
final Keyspace keyspace, final EntityCacheFig entityCacheFig,
@@ -111,6 +120,9 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
this.rollback = rollback;
this.markStart = markStart;
this.markCommit = markCommit;
+ this.uniqueCleanup = uniqueCleanup;
+ this.versionCompact = versionCompact;
+ this.serializationFig = serializationFig;
this.entitySerializationStrategy = entitySerializationStrategy;
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 83c2035..7a32d72 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.collection.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
@@ -37,6 +38,8 @@ import org.apache.usergrid.persistence.collection.VersionSet;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.UniqueCleanup;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
@@ -44,12 +47,15 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.MinMaxLogEntryIterator;
import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.Health;
@@ -91,6 +97,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final WriteOptimisticVerify writeOptimisticVerify;
private final WriteCommit writeCommit;
private final RollbackAction rollback;
+ private final UniqueCleanup uniqueCleanup;
+ private final VersionCompact versionCompact;
//delete stages
@@ -101,8 +109,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final MvccEntitySerializationStrategy entitySerializationStrategy;
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+ private final SerializationFig serializationFig;
- private final RxTaskScheduler rxTaskScheduler;
private final Keyspace keyspace;
private final Timer writeTimer;
@@ -114,7 +122,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final Timer getLatestTimer;
private final ApplicationScope applicationScope;
-
+ private final RxTaskScheduler rxTaskScheduler;
@Inject
@@ -122,15 +130,18 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
final WriteOptimisticVerify writeOptimisticVerify,
final WriteCommit writeCommit, final RollbackAction rollback,
final MarkStart markStart, final MarkCommit markCommit,
+ final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact,
final MvccEntitySerializationStrategy entitySerializationStrategy,
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
- final Keyspace keyspace, @Assisted final ApplicationScope applicationScope,
- final MetricsFactory metricsFactory,
-
- final RxTaskScheduler rxTaskScheduler ) {
+ final Keyspace keyspace, final MetricsFactory metricsFactory,
+ final SerializationFig serializationFig, final RxTaskScheduler rxTaskScheduler,
+ @Assisted final ApplicationScope applicationScope ) {
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy;
+ this.uniqueCleanup = uniqueCleanup;
+ this.versionCompact = versionCompact;
+ this.serializationFig = serializationFig;
this.rxTaskScheduler = rxTaskScheduler;
ValidationUtils.validateApplicationScope( applicationScope );
@@ -184,14 +195,15 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
@Override
- public Observable<Id> delete( final Id entityId ) {
+ public Observable<Id> mark( final Id entityId ) {
Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
- Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( applicationScope, entityId ) ).map( markStart )
- .doOnNext( markCommit ).map( entityEvent -> entityEvent.getEvent().getId() );
+ Observable<Id> o = Observable.just( new CollectionIoEvent<>( applicationScope, entityId ) ).map( markStart )
+ .doOnNext( markCommit ).compose( uniqueCleanup ).map(
+ entityEvent -> entityEvent.getEvent().getId() );
return ObservableTimer.time( o, deleteTimer );
@@ -205,7 +217,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" );
Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" );
- final Observable<Entity> entityObservable= load( Collections.singleton( entityId ) ).flatMap( entitySet -> {
+ final Observable<Entity> entityObservable = load( Collections.singleton( entityId ) ).flatMap( entitySet -> {
final MvccEntity entity = entitySet.getEntity( entityId );
if ( entity == null || !entity.getEntity().isPresent() ) {
@@ -225,7 +237,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityIds, "entityIds cannot be null" );
- final Observable<EntitySet> entitySetObservable = Observable.create( new Observable.OnSubscribe<EntitySet>() {
+ final Observable<EntitySet> entitySetObservable = Observable.create( new Observable.OnSubscribe<EntitySet>() {
@Override
public void call( final Subscriber<? super EntitySet> subscriber ) {
@@ -249,21 +261,32 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
@Override
public Observable<MvccLogEntry> getVersions( final Id entityId ) {
-// mvccLogEntrySerializationStrategy.load( )
- return null;
+ ValidationUtils.verifyIdentity( entityId );
+
+ return Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) {
+ @Override
+ protected Iterator<MvccLogEntry> getIterator() {
+ return new MinMaxLogEntryIterator( mvccLogEntrySerializationStrategy, applicationScope, entityId,
+ serializationFig.getBufferSize() );
+ }
+ } );
}
@Override
- public Observable<MvccLogEntry> compact( final Collection<MvccLogEntry> entries ) {
- return null;
+ public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) {
+ Preconditions.checkNotNull( entries, "entries must not be null" );
+
+
+ return Observable.from( entries ).map( logEntry -> new CollectionIoEvent<>( applicationScope, logEntry ) )
+ .compose( versionCompact ).map( event -> event.getEvent() );
}
@Override
public Observable<Id> getIdField( final String type, final Field field ) {
final List<Field> fields = Collections.singletonList( field );
- final Observable<Id> idObservable = Observable.from( fields ).map( field1 -> {
+ final Observable<Id> idObservable = Observable.from( fields ).map( field1 -> {
try {
final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields );
final UniqueValue value = set.getValue( field1.getName() );
@@ -315,8 +338,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
}
//Load a entity for each entityId we retrieved.
- final EntitySet entitySet =
- entitySerializationStrategy.load( applicationScope, entityIds, startTime );
+ final EntitySet entitySet = entitySerializationStrategy.load( applicationScope, entityIds, startTime );
//now loop through and ensure the entities are there.
final MutationBatch deleteBatch = keyspace.prepareMutationBatch();
@@ -372,11 +394,10 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
.doOnNext( writeOptimisticVerify );
- final Observable<CollectionIoEvent<MvccEntity>> zip = Observable.zip( uniqueObservable, optimisticObservable,
- ( unique, optimistic ) -> optimistic );
+ final Observable<CollectionIoEvent<MvccEntity>> zip =
+ Observable.zip( uniqueObservable, optimisticObservable, ( unique, optimistic ) -> optimistic );
return zip;
-
} );
}
@@ -384,8 +405,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
@Override
public Observable<VersionSet> getLatestVersion( final Collection<Id> entityIds ) {
- final Timer.Context timer = getLatestTimer.time();
- return Observable.create( new Observable.OnSubscribe<VersionSet>() {
+
+ final Observable<VersionSet> observable = Observable.create( new Observable.OnSubscribe<VersionSet>() {
@Override
public void call( final Subscriber<? super VersionSet> subscriber ) {
@@ -400,12 +421,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
subscriber.onError( e );
}
}
- } ).doOnCompleted( new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
} );
+
+ return ObservableTimer.time( observable, getLatestTimer );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
index 0945827..424ec86 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
@@ -87,13 +87,6 @@ public class VersionCompact
final Id entityId = mvccLogEntry.getEntityId();
final UUID version = mvccLogEntry.getVersion();
- //delete from our log
- mutationBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, version ) );
-
- //merge our entity delete in
- mutationBatch
- .mergeShallow( mvccEntitySerializationStrategy.delete( scope, entityId, version ) );
-
if ( logger.isDebugEnabled() ) {
logger.debug(
"Deleting log entry and version data for entity id {} and version {} in app scope {}",
@@ -101,6 +94,13 @@ public class VersionCompact
}
+ //delete from our log
+ mutationBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, version ) );
+
+ //merge our entity delete in
+ mutationBatch
+ .mergeShallow( mvccEntitySerializationStrategy.delete( scope, entityId, version ) );
+
} ) )
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
index 381a24e..6591781 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
@@ -24,42 +24,18 @@ public interface SerializationFig extends GuicyFig {
@Default("5")
int getTimeout();
- /**
- * Number of history items to return for delete.
- *
- * @return Timeout in seconds.
- */
- @Key("collection.delete.history.size")
- @Default("100")
- int getHistorySize();
/**
* Number of items to buffer.
*
- * @return Timeout in seconds.
+ * @return Number of items to buffer in memory
*/
- @Key("collection.buffer.size")
- @Default("10")
+ @Key("buffer.size")
+ @Default("100")
int getBufferSize();
/**
- * The size of threads to have in the task pool
- */
- @Key( "collection.task.pool.threadsize" )
- @Default( "20" )
- int getTaskPoolThreadSize();
-
-
-
- /**
- * The size of threads to have in the task pool
- */
- @Key( "collection.task.pool.queuesize" )
- @Default( "20" )
- int getTaskPoolQueueSize();
-
- /**
* The maximum amount of entities we can load in a single request
* TODO, change this and move it into a common setting that both query and collection share
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java
deleted file mode 100644
index 072e0ea..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryObservable.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-
-import rx.Observable;
-import rx.Subscriber;
-
-
-/**
- * An observable that emits log entries from MIN to MAX
- */
-public class LogEntryObservable implements Observable.OnSubscribe<MvccLogEntry>{
-
- private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
-
-
- public LogEntryObservable( final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy ) {
- this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
- }
-
-
- @Override
- public void call( final Subscriber<? super MvccLogEntry> subscriber ) {
-
- subscriber.onStart();
-
- while(!subscriber.isUnsubscribed()){
-
-
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
index a8e15a7..eae8c06 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIterator.java
@@ -35,20 +35,16 @@ public class MinMaxLogEntryIterator implements Iterator<MvccLogEntry> {
* @param logEntrySerializationStrategy The serialization strategy to get the log entries
* @param scope The scope of the entity
* @param entityId The id of the entity
- * @param maxVersion The max version of the entity. Iterator will iterate from min to min starting with the version
- * < max
* @param pageSize The fetch size to get when querying the serialization strategy
*/
public MinMaxLogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final ApplicationScope scope, final Id entityId, final UUID maxVersion,
- final int pageSize ) {
+ final ApplicationScope scope, final Id entityId, final int pageSize ) {
Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
this.logEntrySerializationStrategy = logEntrySerializationStrategy;
this.scope = scope;
this.entityId = entityId;
- this.nextStart = maxVersion;
this.pageSize = pageSize;
}
@@ -89,19 +85,27 @@ public class MinMaxLogEntryIterator implements Iterator<MvccLogEntry> {
*/
public void advance() throws ConnectionException {
- final int requestedSize = pageSize + 1;
+ final int requestedSize;
+
+ if ( nextStart != null ) {
+ requestedSize = pageSize + 1;
+ }
+ else {
+ requestedSize = pageSize;
+ }
//loop through even entry that's < this one and remove it
- List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
+ List<MvccLogEntry> results = logEntrySerializationStrategy.loadReversed( scope, entityId, nextStart, requestedSize );
//we always remove the first version if it's equal since it's returned
- if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
+ if ( nextStart != null && results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
results.remove( 0 );
}
- //we have results, set our next start
- if ( results.size() == pageSize ) {
+
+ //we have results, set our next start. If we miss our start version (due to deletion) and we request a +1, we want to ensure we set our next, hence the >=
+ if ( results.size() >= pageSize ) {
nextStart = results.get( results.size() - 1 ).getVersion();
}
//nothing left to do
@@ -109,6 +113,9 @@ public class MinMaxLogEntryIterator implements Iterator<MvccLogEntry> {
nextStart = null;
}
+
+
+
elementItr = results.iterator();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
index 73804e4..0c0d961 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImpl.java
@@ -89,19 +89,16 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
final UUID colName = entry.getVersion();
final StageStatus stageStatus = new StageStatus( stage, entry.getState() );
- return doWrite( collectionScope, entry.getEntityId(), entry.getVersion(), new RowOp() {
- @Override
- public void doOp( final ColumnListMutation<UUID> colMutation ) {
-
- //Write the stage with a timeout, it's set as transient
- if ( stage.isTransient() ) {
- colMutation.putColumn( colName, stageStatus, SER, fig.getTimeout() );
- return;
- }
-
- //otherwise it's persistent, write it with no expiration
- colMutation.putColumn( colName, stageStatus, SER, null );
+ return doWrite( collectionScope, entry.getEntityId(), entry.getVersion(), colMutation -> {
+
+ //Write the stage with a timeout, it's set as transient
+ if ( stage.isTransient() ) {
+ colMutation.putColumn( colName, stageStatus, SER, fig.getTimeout() );
+ return;
}
+
+ //otherwise it's persistent, write it with no expiration
+ colMutation.putColumn( colName, stageStatus, SER, null );
} );
}
@@ -253,12 +250,7 @@ public abstract class MvccLogEntrySerializationStrategyImpl<K> implements MvccLo
Preconditions.checkNotNull( entityId, "entityId is required" );
Preconditions.checkNotNull( version, "version context is required" );
- return doWrite( context, entityId, version, new RowOp() {
- @Override
- public void doOp( final ColumnListMutation<UUID> colMutation ) {
- colMutation.deleteColumn( version );
- }
- } );
+ return doWrite( context, entityId, version, colMutation -> colMutation.deleteColumn( version ) );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index baceeb4..36faf62 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -91,8 +91,7 @@ public class EntityCollectionManagerIT {
public void write() {
- ApplicationScope context =
- new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
Entity newEntity = new Entity( new SimpleId( "test" ) );
@@ -113,8 +112,7 @@ public class EntityCollectionManagerIT {
public void writeWithUniqueValues() {
- ApplicationScope context =
- new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -146,8 +144,7 @@ public class EntityCollectionManagerIT {
public void writeAndLoad() {
- ApplicationScope context =
- new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
Entity newEntity = new Entity( new SimpleId( "test" ) );
@@ -174,7 +171,7 @@ public class EntityCollectionManagerIT {
public void writeLoadDelete() {
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
Entity newEntity = new Entity( new SimpleId( "test" ) );
EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -185,30 +182,27 @@ public class EntityCollectionManagerIT {
assertNotNull( "Id was assigned", createReturned.getId() );
-
- UUID version = createReturned.getVersion();
-
Observable<Entity> loadObservable = manager.load( createReturned.getId() );
Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null );
assertEquals( "Same value", createReturned, loadReturned );
- manager.delete( createReturned.getId() ).toBlocking().last();
+ manager.mark( createReturned.getId() ).toBlocking().last();
loadObservable = manager.load( createReturned.getId() );
//load may return null, use last or default
loadReturned = loadObservable.toBlocking().lastOrDefault( null );
- assertNull("Entity was deleted", loadReturned);
+ assertNull( "Entity was deleted", loadReturned );
}
@Test
public void writeLoadUpdateLoad() {
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
Entity newEntity = new Entity( new SimpleId( "test" ) );
newEntity.setField( new IntegerField( "counter", 1 ) );
@@ -217,36 +211,36 @@ public class EntityCollectionManagerIT {
Observable<Entity> observable = manager.write( newEntity );
- Entity createReturned = observable.toBlocking().lastOrDefault(null);
+ Entity createReturned = observable.toBlocking().lastOrDefault( null );
assertNotNull( "Id was assigned", createReturned.getId() );
- Observable<Entity> loadObservable = manager.load(createReturned.getId());
+ Observable<Entity> loadObservable = manager.load( createReturned.getId() );
Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null );
assertEquals( "Same value", createReturned, loadReturned );
- assertEquals("Field value correct", createReturned.getField("counter"), loadReturned.getField("counter"));
+ assertEquals( "Field value correct", createReturned.getField( "counter" ), loadReturned.getField( "counter" ) );
//update the field to 2
createReturned.setField( new IntegerField( "counter", 2 ) );
//wait for the write to complete
- manager.write( createReturned ).toBlocking().lastOrDefault(null);
+ manager.write( createReturned ).toBlocking().lastOrDefault( null );
loadObservable = manager.load( createReturned.getId() );
- loadReturned = loadObservable.toBlocking().lastOrDefault(null);
+ loadReturned = loadObservable.toBlocking().lastOrDefault( null );
assertEquals( "Same value", createReturned, loadReturned );
- assertEquals("Field value correct", createReturned.getField("counter"), loadReturned.getField("counter"));
+ assertEquals( "Field value correct", createReturned.getField( "counter" ), loadReturned.getField( "counter" ) );
}
@@ -254,30 +248,30 @@ public class EntityCollectionManagerIT {
public void writeAndLoadScopeClosure() {
- ApplicationScope collectionScope1 = new ApplicationScopeImpl(new SimpleId("organization"));
+ ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
Entity newEntity = new Entity( new SimpleId( "test" ) );
- EntityCollectionManager manager = factory.createCollectionManager(collectionScope1);
+ EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 );
- Observable<Entity> observable = manager.write(newEntity);
+ Observable<Entity> observable = manager.write( newEntity );
Entity createReturned = observable.toBlocking().lastOrDefault( null );
- assertNotNull("Id was assigned", createReturned.getId());
- assertNotNull("Version was assigned", createReturned.getVersion());
+ assertNotNull( "Id was assigned", createReturned.getId() );
+ assertNotNull( "Version was assigned", createReturned.getVersion() );
Observable<Entity> loadObservable = manager.load( createReturned.getId() );
Entity loadReturned = loadObservable.toBlocking().lastOrDefault( null );
- assertEquals("Same value", createReturned, loadReturned);
+ assertEquals( "Same value", createReturned, loadReturned );
- ApplicationScope collectionScope2 = new ApplicationScopeImpl(new SimpleId("organization"));
+ ApplicationScope collectionScope2 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
//now make sure we can't load it from another scope, using the same org
@@ -286,9 +280,7 @@ public class EntityCollectionManagerIT {
Entity loaded = manager2.load( createReturned.getId() ).toBlocking().lastOrDefault( null );
- assertNull("CollectionScope works correctly", loaded);
-
-
+ assertNull( "CollectionScope works correctly", loaded );
}
@@ -296,34 +288,32 @@ public class EntityCollectionManagerIT {
public void writeAndGetField() {
- ApplicationScope collectionScope1 = new ApplicationScopeImpl(new SimpleId("organization"));
+ ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
Entity newEntity = new Entity( new SimpleId( "test" ) );
Field field = new StringField( "testField", "unique", true );
- newEntity.setField(field);
+ newEntity.setField( field );
EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 );
- Observable<Entity> observable = manager.write(newEntity);
+ Observable<Entity> observable = manager.write( newEntity );
Entity createReturned = observable.toBlocking().lastOrDefault( null );
assertNotNull( "Id was assigned", createReturned.getId() );
- assertNotNull("Version was assigned", createReturned.getVersion());
+ assertNotNull( "Version was assigned", createReturned.getVersion() );
Id id = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
assertNotNull( id );
assertEquals( newEntity.getId(), id );
Field fieldNull = new StringField( "testFieldNotThere", "uniquely", true );
- id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
+ id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
assertNull( id );
}
-
-
@Test
public void updateVersioning() {
@@ -331,10 +321,10 @@ public class EntityCollectionManagerIT {
Entity origEntity = new Entity( new SimpleId( "testUpdate" ) );
origEntity.setField( new StringField( "testField", "value" ) );
- ApplicationScope context = new ApplicationScopeImpl(new SimpleId("organization"));
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
EntityCollectionManager manager = factory.createCollectionManager( context );
- Entity returned = manager.write( origEntity ).toBlocking().lastOrDefault(null);
+ Entity returned = manager.write( origEntity ).toBlocking().lastOrDefault( null );
// note its version
UUID oldVersion = returned.getVersion();
@@ -345,7 +335,7 @@ public class EntityCollectionManagerIT {
// partial update entity but we don't have version number
Entity updateEntity = new Entity( origEntity.getId() );
updateEntity.setField( new StringField( "addedField", "other value" ) );
- manager.write( updateEntity ).toBlocking().lastOrDefault(null);
+ manager.write( updateEntity ).toBlocking().lastOrDefault( null );
// get entity now, it must have a new version
returned = manager.load( origEntity.getId() ).toBlocking().lastOrDefault( null );
@@ -354,14 +344,14 @@ public class EntityCollectionManagerIT {
assertNotNull( "A new version must be assigned", newVersion );
// new Version should be > old version
- assertTrue(UUIDComparator.staticCompare(newVersion, oldVersion) > 0);
+ assertTrue( UUIDComparator.staticCompare( newVersion, oldVersion ) > 0 );
}
@Test
public void writeMultiget() {
- final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
final int multigetSize = serializationFig.getMaxLoadSize();
@@ -381,10 +371,10 @@ public class EntityCollectionManagerIT {
final EntitySet entitySet = manager.load( entityIds ).toBlocking().lastOrDefault( null );
- assertNotNull(entitySet);
+ assertNotNull( entitySet );
assertEquals( multigetSize, entitySet.size() );
- assertFalse(entitySet.isEmpty());
+ assertFalse( entitySet.isEmpty() );
/**
* Validate every element exists
@@ -405,7 +395,7 @@ public class EntityCollectionManagerIT {
@Test
public void writeMultigetRepair() {
- final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
final int multigetSize = serializationFig.getMaxLoadSize();
@@ -444,7 +434,7 @@ public class EntityCollectionManagerIT {
assertEquals( "Same entity returned", expected, returned.getEntity().get() );
- assertTrue((Boolean) returned.getEntity().get().getField("updated").getValue());
+ assertTrue( ( Boolean ) returned.getEntity().get().getField( "updated" ).getValue() );
}
}
@@ -452,7 +442,7 @@ public class EntityCollectionManagerIT {
@Test( expected = IllegalArgumentException.class )
public void readTooLarge() {
- final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ final ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
final int multigetSize = serializationFig.getMaxLoadSize() + 1;
@@ -474,7 +464,7 @@ public class EntityCollectionManagerIT {
@Test
public void testGetVersion() {
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -495,7 +485,7 @@ public class EntityCollectionManagerIT {
VersionSet results =
- manager.getLatestVersion( Arrays.asList( created1.getId(), created2.getId() ) ).toBlocking().last();
+ manager.getLatestVersion( Arrays.asList( created1.getId(), created2.getId() ) ).toBlocking().last();
final MvccLogEntry version1Log = results.getMaxVersion( created1.getId() );
@@ -515,7 +505,7 @@ public class EntityCollectionManagerIT {
@Test
public void testVersionLogWrite() {
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -529,10 +519,10 @@ public class EntityCollectionManagerIT {
final UUID v1Version = v1Created.getVersion();
- final VersionSet resultsV1 = manager.getLatestVersion(Arrays.asList(v1Created.getId())).toBlocking().last();
+ final VersionSet resultsV1 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last();
- final MvccLogEntry version1Log = resultsV1.getMaxVersion(v1Created.getId());
+ final MvccLogEntry version1Log = resultsV1.getMaxVersion( v1Created.getId() );
assertEquals( v1Created.getId(), version1Log.getEntityId() );
assertEquals( v1Version, version1Log.getVersion() );
assertEquals( MvccLogEntry.State.COMPLETE, version1Log.getState() );
@@ -543,7 +533,7 @@ public class EntityCollectionManagerIT {
final UUID v2Version = v2Created.getVersion();
- assertTrue("Newer version in v2", UUIDComparator.staticCompare(v2Version, v1Version) > 0);
+ assertTrue( "Newer version in v2", UUIDComparator.staticCompare( v2Version, v1Version ) > 0 );
final VersionSet resultsV2 = manager.getLatestVersion( Arrays.asList( v1Created.getId() ) ).toBlocking().last();
@@ -560,7 +550,7 @@ public class EntityCollectionManagerIT {
@Test
public void testVersionLogUpdate() {
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -608,7 +598,7 @@ public class EntityCollectionManagerIT {
@Test
public void healthTest() {
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -632,7 +622,7 @@ public class EntityCollectionManagerIT {
final Entity entity = EntityHelper.generateEntity( setSize );
//now we have one massive, entity, save it and retrieve it.
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
final EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -652,20 +642,21 @@ public class EntityCollectionManagerIT {
SetConfigTestBypass.setValueByPass( serializationFig, "getMaxEntitySize", currentMaxSize + "" );
}
+
@Test
public void invalidNameRepair() throws ConnectionException {
//write an entity with a unique field
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
Entity newEntity = new Entity( new SimpleId( "test" ) );
//if we add a second field we get a second entity that is the exact same. Is this expected?
- final IntegerField expectedInteger = new IntegerField( "count", 5, true );
- // final StringField expectedString = new StringField( "yes", "fred", true );
+ final IntegerField expectedInteger = new IntegerField( "count", 5, true );
+ // final StringField expectedString = new StringField( "yes", "fred", true );
newEntity.setField( expectedInteger );
- // newEntity.setField( expectedString );
+ // newEntity.setField( expectedString );
EntityCollectionManager manager = factory.createCollectionManager( context );
@@ -677,23 +668,26 @@ public class EntityCollectionManagerIT {
assertNotNull( "Id was assigned", createReturned.getId() );
assertNotNull( "Version was assigned", createReturned.getVersion() );
- FieldSet
- fieldResults = manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ).toBlocking().last();
+ FieldSet fieldResults =
+ manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) )
+ .toBlocking().last();
- assertEquals(1,fieldResults.size());
+ assertEquals( 1, fieldResults.size() );
//verify the entity is correct.
- assertEquals( "Same value", createReturned, fieldResults.getEntity( expectedInteger ).getEntity().get()); //loadReturned );
+ assertEquals( "Same value", createReturned,
+ fieldResults.getEntity( expectedInteger ).getEntity().get() ); //loadReturned );
//use the entity serializationStrategy to remove the entity data.
//do a mark as one test, and a delete as another
- entitySerializationStrategy.delete( context,createReturned.getId(),createReturned.getVersion() ).execute();
+ entitySerializationStrategy.delete( context, createReturned.getId(), createReturned.getVersion() ).execute();
//try to load via the unique field, should have triggered repair
- final FieldSet
- results = manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) ).toBlocking().last();
+ final FieldSet results =
+ manager.getEntitiesFromFields( newEntity.getId().getType(), Arrays.<Field>asList( expectedInteger ) )
+ .toBlocking().last();
//verify no entity returned
@@ -701,37 +695,104 @@ public class EntityCollectionManagerIT {
//user the unique serialization to verify it's been deleted from cassandra
- UniqueValueSet uniqueValues = uniqueValueSerializationStrategy.load( context, newEntity.getId().getType(), createReturned.getFields() );
+ UniqueValueSet uniqueValues =
+ uniqueValueSerializationStrategy.load( context, newEntity.getId().getType(), createReturned.getFields() );
assertFalse( uniqueValues.iterator().hasNext() );
-
}
@Test
public void testGetIdField() throws Exception {
- ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
EntityCollectionManager manager = factory.createCollectionManager( context );
// create an entity of type "item" with a unique_id field value = 1
Entity entity1 = new Entity( new SimpleId( "item" ) );
- entity1.setField( new StringField( "unique_id", "1", true ));
+ entity1.setField( new StringField( "unique_id", "1", true ) );
manager.write( entity1 ).toBlocking().last();
- final Observable<Id> idObs = manager.getIdField("item", new StringField("unique_id", "1"));
- Id id = idObs.toBlocking().lastOrDefault(null);
- assertEquals(entity1.getId(), id);
+ final Observable<Id> idObs = manager.getIdField( "item", new StringField( "unique_id", "1" ) );
+ Id id = idObs.toBlocking().lastOrDefault( null );
+ assertEquals( entity1.getId(), id );
// create an entity of type "deleted_item" with a unique_id field value = 1
Entity entity2 = new Entity( new SimpleId( "deleted_item" ) );
- entity2.setField( new StringField( "unique_id", "1", true ));
+ entity2.setField( new StringField( "unique_id", "1", true ) );
manager = factory.createCollectionManager( context );
manager.write( entity2 ).toBlocking().last();
- final Observable<Id> id2Obs = manager.getIdField("deleted_item", new StringField("unique_id", "1"));
- Id id2 = id2Obs.toBlocking().lastOrDefault(null);
- assertEquals(entity2.getId(), id2);
+ final Observable<Id> id2Obs = manager.getIdField( "deleted_item", new StringField( "unique_id", "1" ) );
+ Id id2 = id2Obs.toBlocking().lastOrDefault( null );
+ assertEquals( entity2.getId(), id2 );
+ }
+
+
+ @Test
+ public void writeGetVersionsDelete() {
+
+ ApplicationScope context = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+
+ Entity entity = new Entity( new SimpleId( "test" ) );
+ entity.setField( new IntegerField( "counter", 0 ) );
+
+ EntityCollectionManager manager = factory.createCollectionManager( context );
+
+ Entity createReturned = manager.write( entity ).toBlocking().lastOrDefault( null );
+
+ assertNotNull( "Id was assigned", createReturned.getId() );
+
+ final int size = 200;
+
+ final Id entityId = createReturned.getId();
+
+ List<UUID> versions = new ArrayList<>( size );
+ versions.add( entity.getVersion() );
+
+ //write new versions
+ for ( int i = 1; i < size; i++ ) {
+ final Entity newEntity = new Entity( entityId );
+
+ final Entity returnedEntity = manager.write( newEntity ).toBlocking().last();
+
+ versions.add( returnedEntity.getVersion() );
+ }
+
+
+ //now get our values, and load the latest version
+
+ final Entity lastVersion = manager.load( entityId ).toBlocking().last();
+
+ //ensure the latest version is correct
+ assertEquals( versions.get( versions.size() - 1 ), lastVersion.getVersion() );
+
+
+ // now ensure all versions are correct
+ final List<MvccLogEntry> entries = manager.getVersions( entityId ).toList().toBlocking().last();
+
+
+ assertEquals( "Same size expected", versions.size(), entries.size() );
+
+ for ( int i = 0; i < versions.size(); i++ ) {
+ assertEquals( versions.get( i ), entries.get( i ).getVersion() );
+ }
+
+
+ //now get all the log versions, and delete them all we do it in 2+ batches to ensure we clean up as expected
+ manager.getVersions( entityId ).buffer( 100 ).flatMap( bufferList -> manager.delete( bufferList ) )
+ .toBlocking().last();
+
+
+ //now load them, there shouldn't be any versions
+ final List<MvccLogEntry> postDeleteEntries = manager.getVersions( entityId ).toList().toBlocking().last();
+
+ assertEquals( "All log entries should be removed", 0, postDeleteEntries.size() );
+
+ final Entity postDeleteLastVersion = manager.load( entityId ).toBlocking().lastOrDefault( null );
+
+ //ensure the latest version is correct
+ assertNull( "Last version was deleted", postDeleteLastVersion );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java
index c82e1bf..a7210a5 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MinMaxLogEntryIteratorTest.java
@@ -55,7 +55,7 @@ public class MinMaxLogEntryIteratorTest {
//now iterate we should get everything
MinMaxLogEntryIterator
- itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
+ itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, pageSize );
assertFalse( itr.hasNext() );
@@ -111,12 +111,9 @@ public class MinMaxLogEntryIteratorTest {
Iterator<MvccLogEntry> expectedEntries = mockResults.getEntries().iterator();
- //this element should be skipped
- UUID start = expectedEntries.next().getVersion();
-
//now iterate we should get everything
MinMaxLogEntryIterator
- itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, start, pageSize );
+ itr = new MinMaxLogEntryIterator( logEntrySerializationStrategy, scope, entityId, pageSize );
while ( expectedEntries.hasNext() && itr.hasNext() ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
index 673903c..9ed254c 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.persistence.collection.serialization.impl;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -223,9 +224,9 @@ public abstract class MvccLogEntrySerializationStrategyImplTest {
//now do a range scan from the end
- final int half = count/2;
+ final int half = count / 2;
- final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, versions[0], half);
+ final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, versions[0], half );
assertEquals( half, results.size() );
@@ -238,10 +239,9 @@ public abstract class MvccLogEntrySerializationStrategyImplTest {
//now get the next batch
- final List<MvccLogEntry> results2 =
- logEntryStrategy.loadReversed( context, id, versions[half], count );
+ final List<MvccLogEntry> results2 = logEntryStrategy.loadReversed( context, id, versions[half], count );
- assertEquals( half, results2.size());
+ assertEquals( half, results2.size() );
for ( int i = 0; i < half; i++ ) {
final MvccLogEntry saved = entries[half + i];
@@ -256,12 +256,59 @@ public abstract class MvccLogEntrySerializationStrategyImplTest {
logEntryStrategy.delete( context, id, versions[i] ).execute();
}
- final List<MvccLogEntry> results3 = logEntryStrategy.loadReversed( context, id, null, versions.length );
+ final List<MvccLogEntry> results3 = logEntryStrategy.loadReversed( context, id, null, versions.length );
assertEquals( "All log entries were deleted", 0, results3.size() );
}
+ @Test
+ public void createAndDeleteEntries() throws ConnectionException {
+
+ final Id applicationId = new SimpleId( "application" );
+
+ ApplicationScope context = new ApplicationScopeImpl( applicationId );
+
+
+ final Id id = new SimpleId( "test" );
+
+
+ final int size = 10;
+
+ final List<MvccLogEntry> savedEntries = new ArrayList<>( size );
+
+ for ( int i = 0; i < size; i++ ) {
+ final UUID version = UUIDGenerator.newTimeUUID();
+ MvccLogEntry saved = new MvccLogEntryImpl( id, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE );
+ logEntryStrategy.write( context, saved ).execute();
+
+ savedEntries.add( saved );
+ }
+
+ //now test we get them all back
+
+ final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, null, size );
+
+ assertEquals( size, results.size() );
+
+ //assert they're the same
+ for ( int i = 0; i < size; i++ ) {
+ assertEquals( savedEntries.get( i ), results.get( i ) );
+ }
+
+ //now delete them all
+
+ for ( final MvccLogEntry mvccLogEntry : savedEntries ) {
+ logEntryStrategy.delete( context, id, mvccLogEntry.getVersion() ).execute();
+ }
+
+ //now get them back, should be empty
+ final List<MvccLogEntry> emptyResults = logEntryStrategy.loadReversed( context, id, null, size );
+
+ assertEquals( 0, emptyResults.size() );
+ }
+
+
@Test( expected = NullPointerException.class )
public void writeParamsNoContext() throws ConnectionException {
logEntryStrategy.write( null, mock( MvccLogEntry.class ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
index 93de9d4..cd6ad3d 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
@@ -20,15 +20,11 @@ package org.apache.usergrid.persistence.collection.util;/*
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
import java.util.UUID;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
@@ -50,7 +46,11 @@ import static org.mockito.Mockito.when;
public class LogEntryMock {
- private final TreeMap<UUID, MvccLogEntry> entries = new TreeMap<>(ReversedUUIDComparator.INSTANCE);
+ private final TreeMap<UUID, MvccLogEntry> reversedEntries =
+ new TreeMap<>( ( o1, o2 ) -> UUIDComparator.staticCompare( o1, o2 ) * -1 );
+
+ private final TreeMap<UUID, MvccLogEntry> entries =
+ new TreeMap<>( ( o1, o2 ) -> UUIDComparator.staticCompare( o1, o2 ) );
private final Id entityId;
@@ -61,78 +61,92 @@ public class LogEntryMock {
* @param entityId The entity Id to use
* @param versions The versions to use
*/
- private LogEntryMock(final Id entityId, final List<UUID> versions ) {
+ private LogEntryMock( final Id entityId, final List<UUID> versions ) {
this.entityId = entityId;
- for ( UUID version: versions) {
- entries.put( version, new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE ) );
+ for ( UUID version : versions ) {
+ final MvccLogEntry mvccLogEntry =
+ new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE );
+ reversedEntries.put( version, mvccLogEntry );
+ entries.put( version, mvccLogEntry );
}
}
/**
* Init the mock with the given data structure
+ *
* @param logEntrySerializationStrategy The strategy to moc
- * @param scope
- * @throws ConnectionException
*/
- private void initMock( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope )
+ private void initMock( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final ApplicationScope scope )
- throws ConnectionException {
+ throws ConnectionException {
//wire up the mocks
- when(logEntrySerializationStrategy.load( same( scope ), same( entityId ), any(UUID.class), any(Integer.class) )).thenAnswer( new Answer<List<MvccLogEntry>>() {
-
+ when( logEntrySerializationStrategy
+ .load( same( scope ), same( entityId ), any( UUID.class ), any( Integer.class ) ) ).thenAnswer(
- @Override
- public List<MvccLogEntry> answer( final InvocationOnMock invocation ) throws Throwable {
+ invocation -> {
final UUID startVersion = ( UUID ) invocation.getArguments()[2];
- final int count = (Integer)invocation.getArguments()[3];
+ final int count = ( Integer ) invocation.getArguments()[3];
final List<MvccLogEntry> results = new ArrayList<>( count );
- final Iterator<MvccLogEntry> itr = entries.tailMap( startVersion, true ).values().iterator();
+ final Iterator<MvccLogEntry> itr = reversedEntries.tailMap( startVersion, true ).values().iterator();
- for(int i = 0; i < count && itr.hasNext(); i ++){
+ for ( int i = 0; i < count && itr.hasNext(); i++ ) {
results.add( itr.next() );
}
return results;
- }
- } );
- }
+ } );
- /**
- * Get the entry at the specified index from high to low
- * @param index
- * @return
- */
- public MvccLogEntry getEntryAtIndex(final int index){
+ //mock in reverse
- final Iterator<MvccLogEntry> itr = entries.values().iterator();
+ when( logEntrySerializationStrategy
+ .loadReversed( same( scope ), same( entityId ), any( UUID.class ), any( Integer.class ) ) ).thenAnswer(
- for(int i = 0; i < index; i ++){
- itr.next();
- }
+ invocation -> {
+ final UUID startVersion = ( UUID ) invocation.getArguments()[2];
+ final int count = ( Integer ) invocation.getArguments()[3];
+
+
+ final List<MvccLogEntry> results = new ArrayList<>( count );
+
+ final Iterator<MvccLogEntry> itr;
+
+ if ( startVersion == null ) {
+ itr = entries.values().iterator();
+ }
+ else {
+ itr = entries.tailMap( startVersion, true ).values().iterator();
+ }
+
+ for ( int i = 0; i < count && itr.hasNext(); i++ ) {
+ results.add( itr.next() );
+ }
- return itr.next();
+
+ return results;
+ } );
}
/**
- *
* @param logEntrySerializationStrategy The mock to use
* @param scope The scope to use
* @param entityId The entityId to use
* @param versions The versions to mock
- * @throws ConnectionException
*/
- public static LogEntryMock createLogEntryMock(final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope,final Id entityId, final List<UUID> versions )
+ public static LogEntryMock createLogEntryMock(
+ final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope,
+ final Id entityId, final List<UUID> versions )
- throws ConnectionException {
+ throws ConnectionException {
LogEntryMock mock = new LogEntryMock( entityId, versions );
mock.initMock( logEntrySerializationStrategy, scope );
@@ -141,19 +155,12 @@ public class LogEntryMock {
}
- public Collection<MvccLogEntry> getEntries() {
- return entries.values();
+ public Collection<MvccLogEntry> getReversedEntries() {
+ return reversedEntries.values();
}
- private static final class ReversedUUIDComparator implements Comparator<UUID> {
-
- public static final ReversedUUIDComparator INSTANCE = new ReversedUUIDComparator();
-
-
- @Override
- public int compare( final UUID o1, final UUID o2 ) {
- return UUIDComparator.staticCompare( o1, o2 ) * -1;
- }
+ public Collection<MvccLogEntry> getEntries() {
+ return entries.values();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/collection/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/resources/log4j.properties b/stack/corepersistence/collection/src/test/resources/log4j.properties
index acf5c39..7b55cf8 100644
--- a/stack/corepersistence/collection/src/test/resources/log4j.properties
+++ b/stack/corepersistence/collection/src/test/resources/log4j.properties
@@ -33,4 +33,5 @@ log4j.logger.cassandra.db=ERROR
#log4j.logger.org.apache.usergrid=DEBUG
#log4j.logger.org.apache.usergrid.persistence.collection=TRACE
+log4j.logger.org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact=TRACE
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70e0e75a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 894e74a..2a153e2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -31,46 +31,46 @@ import org.safehaus.guicyfig.Key;
public interface GraphFig extends GuicyFig {
- public static final String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
+ String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
- public static final String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size";
+ String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size";
/**
* The size of the shards. This is approximate, and should be set lower than what you would like your max to be
*/
- public static final String SHARD_SIZE = "usergrid.graph.shard.size";
+ String SHARD_SIZE = "usergrid.graph.shard.size";
/**
* Number of shards we can cache.
*/
- public static final String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size";
+ String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size";
/**
* Get the cache timeout. The local cache will exist for this amount of time max (in millis).
*/
- public static final String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
+ String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
/**
* Number of worker threads to refresh the cache
*/
- public static final String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count";
+ String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count";
/**
* The size of the worker count for shard auditing
*/
- public static final String SHARD_AUDIT_QUEUE_SIZE = "usergrid.graph.shard.audit.worker.queue.size";
+ String SHARD_AUDIT_QUEUE_SIZE = "usergrid.graph.shard.audit.worker.queue.size";
/**
* The size of the worker count for shard auditing
*/
- public static final String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count";
+ String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count";
- public static final String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
+ String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
/**
@@ -80,14 +80,14 @@ public interface GraphFig extends GuicyFig {
* Note that you should also pad this for node clock drift. A good value for this would be 2x the shard cache
* timeout + 30 seconds, assuming you have NTP and allow a max drift of 30 seconds
*/
- public static final String SHARD_MIN_DELTA = "usergrid.graph.shard.min.delta";
+ String SHARD_MIN_DELTA = "usergrid.graph.shard.min.delta";
- public static final String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.beginFlush.count";
+ String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.beginFlush.count";
- public static final String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.beginFlush.interval";
+ String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.beginFlush.interval";
- public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
+ String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";