You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/19 23:20:30 UTC
[40/50] [abbrv] incubator-usergrid git commit: Merge branch
'two-dot-o' into USERGRID-405
Merge branch 'two-dot-o' into USERGRID-405
Conflicts:
stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7b5d2224
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7b5d2224
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7b5d2224
Branch: refs/heads/USERGRID-493
Commit: 7b5d2224512f0043336acbd9b9323e1e32edf888
Parents: 5c7a5f8 b53cb07
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 17 11:36:08 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 17 11:36:08 2015 -0600
----------------------------------------------------------------------
.../main/dist/init_instance/init_rest_server.sh | 2 +
.../dist/init_instance/install_elasticsearch.sh | 22 +-
.../main/groovy/configure_elasticsearch.groovy | 80 ++--
.../src/main/groovy/configure_usergrid.groovy | 12 +-
stack/awscluster/ugcluster-cf.json | 466 +++++++++++++------
stack/core/pom.xml | 10 -
.../batch/service/JobSchedulerService.java | 5 -
.../usergrid/corepersistence/CoreModule.java | 36 +-
.../corepersistence/CpEntityManager.java | 64 ++-
.../corepersistence/CpEntityManagerFactory.java | 4 -
.../corepersistence/CpRelationManager.java | 14 -
.../events/EntityDeletedHandler.java | 58 ++-
.../events/EntityVersionCreatedHandler.java | 60 ++-
.../events/EntityVersionDeletedHandler.java | 53 ++-
.../results/FilteringLoader.java | 2 +-
.../usergrid/persistence/EntityManager.java | 10 +-
.../persistence/EntityManagerFactory.java | 2 -
.../cassandra/EntityManagerFactoryImpl.java | 3 -
.../cassandra/EntityManagerImpl.java | 31 +-
.../cassandra/RelationManagerImpl.java | 58 +--
.../cassandra/index/ConnectedIndexScanner.java | 2 -
.../cassandra/index/IndexBucketScanner.java | 2 -
.../corepersistence/StaleIndexCleanupTest.java | 38 +-
.../usergrid/persistence/EntityManagerIT.java | 1 +
.../PerformanceEntityRebuildIndexTest.java | 2 +
.../collection/EntityCollectionManager.java | 20 +-
.../collection/EntityDeletedFactory.java | 34 --
.../collection/EntityVersionCleanupFactory.java | 35 --
.../collection/EntityVersionCreatedFactory.java | 31 --
.../persistence/collection/FieldSet.java | 48 ++
.../cache/CachedEntityCollectionManager.java | 11 +-
.../collection/guice/CollectionModule.java | 11 +-
.../EntityCollectionManagerFactoryImpl.java | 32 +-
.../impl/EntityCollectionManagerImpl.java | 251 ++++++++--
.../collection/impl/EntityDeletedTask.java | 9 +-
.../impl/EntityVersionCleanupTask.java | 34 +-
.../impl/EntityVersionTaskFactory.java | 65 +++
.../mvcc/stage/write/WriteUniqueVerify.java | 140 +++---
.../UniqueValueSerializationStrategy.java | 12 +
.../serialization/impl/MutableFieldSet.java | 63 +++
.../UniqueValueSerializationStrategyImpl.java | 27 +-
.../collection/util/EntityUtils.java | 4 +-
.../collection/EntityCollectionManagerIT.java | 68 +++
.../mvcc/stage/write/WriteUniqueVerifyTest.java | 6 +-
.../core/astyanax/CassandraConfig.java | 6 +
.../core/astyanax/CassandraConfigImpl.java | 8 +-
.../persistence/core/astyanax/CassandraFig.java | 9 +-
.../persistence/core/future/BetterFuture.java | 43 +-
.../core/metrics/MetricsFactory.java | 9 +
.../core/metrics/MetricsFactoryImpl.java | 121 +++--
.../core/astyanax/ColumnNameIteratorTest.java | 7 +-
.../MultiKeyColumnNameIteratorTest.java | 7 +-
.../astyanax/MultiRowColumnIteratorTest.java | 7 +-
stack/corepersistence/graph/pom.xml | 28 +-
.../graph/impl/GraphManagerImpl.java | 288 ++++++++++--
.../usergrid/persistence/map/MapManager.java | 14 +-
.../persistence/map/impl/MapManagerImpl.java | 8 +
.../persistence/map/impl/MapSerialization.java | 9 +
.../map/impl/MapSerializationImpl.java | 130 +++++-
.../persistence/map/MapManagerTest.java | 49 +-
stack/corepersistence/queryindex/pom.xml | 17 +-
.../usergrid/persistence/index/EntityIndex.java | 26 +-
.../persistence/index/IndexBufferConsumer.java | 11 +
.../persistence/index/IndexBufferProducer.java | 1 -
.../usergrid/persistence/index/IndexFig.java | 73 ++-
.../index/IndexOperationMessage.java | 115 ++++-
.../persistence/index/guice/IndexModule.java | 7 +
.../persistence/index/guice/QueueProvider.java | 116 +++++
.../persistence/index/impl/BatchRequest.java | 41 ++
.../persistence/index/impl/BufferQueue.java | 68 +++
.../index/impl/BufferQueueInMemoryImpl.java | 108 +++++
.../index/impl/BufferQueueSQSImpl.java | 307 ++++++++++++
.../persistence/index/impl/DeIndexRequest.java | 115 +++++
.../index/impl/EsEntityIndexBatchImpl.java | 50 +-
.../index/impl/EsEntityIndexImpl.java | 176 ++-----
.../index/impl/EsIndexBufferConsumerImpl.java | 286 ++++++++----
.../index/impl/EsIndexBufferProducerImpl.java | 16 +-
.../persistence/index/impl/EsIndexCache.java | 138 +++---
.../persistence/index/impl/IndexRequest.java | 125 +++++
.../index/guice/TestIndexModule.java | 3 +-
.../index/impl/BufferQueueSQSImplTest.java | 169 +++++++
.../impl/EntityConnectionIndexImplTest.java | 5 +-
.../persistence/index/impl/EntityIndexTest.java | 20 +-
.../persistence/index/impl/EsTestUtils.java | 48 --
.../persistence/queue/QueueManager.java | 4 +-
.../usergrid/persistence/queue/QueueScope.java | 2 +-
.../persistence/queue/QueueScopeFactory.java | 34 --
.../persistence/queue/guice/QueueModule.java | 17 +-
.../queue/impl/QueueScopeFactoryImpl.java | 48 --
.../persistence/queue/impl/QueueScopeImpl.java | 27 +-
.../queue/impl/SQSQueueManagerImpl.java | 214 +++++----
.../persistence/queue/NoAWSCredsRule.java | 98 ++++
.../persistence/queue/QueueManagerTest.java | 29 +-
stack/pom.xml | 43 --
stack/rest/pom.xml | 4 -
.../org/apache/usergrid/rest/RootResource.java | 6 -
stack/services/pom.xml | 13 +-
.../cassandra/ManagementServiceImpl.java | 9 +-
.../services/AbstractCollectionService.java | 40 +-
.../services/AbstractConnectionsService.java | 39 +-
.../notifications/NotificationsService.java | 50 +-
.../services/notifications/QueueListener.java | 5 +-
.../usergrid/services/queues/QueueListener.java | 5 +-
.../notifications/NotifiersServiceIT.java | 6 +
stack/test-utils/pom.xml | 5 -
105 files changed, 3849 insertions(+), 1571 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index cf927ce,161037b..5da7067
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@@ -33,21 -30,17 +30,15 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
import org.apache.usergrid.persistence.collection.guice.CollectionModule;
- import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.guice.CommonModule;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
- import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
- import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
import org.apache.usergrid.persistence.graph.guice.GraphModule;
- import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
import org.apache.usergrid.persistence.index.guice.IndexModule;
-import org.apache.usergrid.persistence.index.impl.BufferQueue;
-import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
import org.apache.usergrid.persistence.map.guice.MapModule;
import org.apache.usergrid.persistence.queue.guice.QueueModule;
-
- import com.google.inject.AbstractModule;
- import com.google.inject.Provider;
- import com.google.inject.TypeLiteral;
- import com.google.inject.multibindings.Multibinder;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.springframework.context.ApplicationContext;
/**
@@@ -76,31 -69,11 +67,31 @@@ public class CoreModule extends Abstra
bind(EntityManagerFactory.class).toProvider( lazyEntityManagerFactoryProvider );
install( new CommonModule());
- install(new CollectionModule());
- install(new GraphModule());
- install( new IndexModule() );
-// install(new MapModule()); TODO, re-enable when index module doesn't depend on queue
-// install(new QueueModule());
+ install( new CollectionModule() {
+ /**
+ * configure our migration data provider for all entities in the system
+ */
+ @Override
+ public void configureMigrationProvider() {
+
+ bind(new TypeLiteral< MigrationDataProvider<EntityIdScope>>(){}).to(
+ AllEntitiesInSystemImpl.class );
+ }
+ } );
+ install( new GraphModule() {
+
+ /**
+ * Override the observable that needs to be used for migration
+ */
+ @Override
+ public void configureMigrationProvider() {
+ bind( new TypeLiteral<MigrationDataProvider<GraphNode>>() {} ).to(
+ AllNodesInGraphImpl.class );
+ }
+ } );
+ install(new IndexModule());
- install(new MapModule());
- install(new QueueModule());
++ // install(new MapModule()); TODO, re-enable when index module doesn't depend on queue
++ // install(new QueueModule());
bind(ManagerCache.class).to( CpManagerCache.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 65dacee,01848cb..95ff9dc
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@@ -47,18 -53,18 +54,20 @@@ import rx.schedulers.Schedulers
public class EntityVersionDeletedHandler implements EntityVersionDeleted {
private static final Logger logger = LoggerFactory.getLogger(EntityVersionDeletedHandler.class );
- @Inject
- private SerializationFig serializationFig;
+
+
+
+ private final EntityManagerFactory emf;
@Inject
- private EntityManagerFactory emf;
+ public EntityVersionDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
+
@Override
- public void versionDeleted(
- final CollectionScope scope, final Id entityId, final List<MvccEntity> entityVersions) {
+ public void versionDeleted( final CollectionScope scope, final Id entityId,
+ final List<MvccLogEntry> entityVersions ) {
+
// This check is for testing purposes and for a test that to be able to dynamically turn
// off and on delete previous versions so that it can test clean-up on read.
@@@ -87,19 -89,18 +92,19 @@@
scope.getName()
);
- rx.Observable.from( entityVersions )
- .buffer(serializationFig.getBufferSize())
- .map(new Func1<List<MvccLogEntry>, List<MvccLogEntry>>() {
+ Observable.from( entityVersions )
+ .collect( ei.createBatch(), new Action2<EntityIndexBatch, MvccEntity>() {
@Override
- public List<MvccLogEntry> call(List<MvccLogEntry> entityList) {
- for (MvccLogEntry entity : entityList) {
- eibatch.deindex(indexScope, entityId, entity.getVersion());
- }
- eibatch.execute();
- return entityList;
+ public void call( final EntityIndexBatch entityIndexBatch, final MvccEntity mvccEntity ) {
+ entityIndexBatch.deindex( indexScope, mvccEntity.getId(), mvccEntity.getVersion() );
}
- }).toBlocking().last();
+ } ).doOnNext( new Action1<EntityIndexBatch>() {
+ @Override
+ public void call( final EntityIndexBatch entityIndexBatch ) {
+ entityIndexBatch.execute();
+ }
+ } ).toBlocking().last();
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index d5478d4,a73d7a7..68ddab5
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@@ -21,19 -21,17 +21,14 @@@ package org.apache.usergrid.persistence
import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
- import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
- import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
- import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
- import org.apache.usergrid.persistence.collection.MvccEntity;
+ import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory;
-import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
import org.apache.usergrid.persistence.collection.event.EntityDeleted;
import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
--import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
--import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index 10d85f8,23e375d..ac82181
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@@ -26,25 -25,25 +26,18 @@@ import java.util.concurrent.ExecutionEx
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+ import org.apache.usergrid.persistence.collection.EntityCollectionManagerSync;
+import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
+import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
+import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionManager;
import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
--import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
- import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
- import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
- import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
-import org.apache.usergrid.persistence.collection.guice.Write;
-import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
+ import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
-import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
- import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
- import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
--import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
--import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
--import org.apache.usergrid.persistence.core.task.TaskExecutor;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
@@@ -74,25 -74,22 +67,24 @@@ public class EntityCollectionManagerFac
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
private final Keyspace keyspace;
- private final EntityVersionCleanupFactory entityVersionCleanupFactory;
- private final EntityVersionCreatedFactory entityVersionCreatedFactory;
- private final EntityDeletedFactory entityDeletedFactory;
+ private final EntityVersionTaskFactory entityVersionTaskFactory;
private final TaskExecutor taskExecutor;
+ private final EntityCacheFig entityCacheFig;
+ private final MetricsFactory metricsFactory;
+ private EntityCacheFig entityCacheFig;
+ private SerializationFig serializationFig;
private LoadingCache<CollectionScope, EntityCollectionManager> ecmCache =
CacheBuilder.newBuilder().maximumSize( 1000 )
.build( new CacheLoader<CollectionScope, EntityCollectionManager>() {
public EntityCollectionManager load( CollectionScope scope ) {
//create the target EM that will perform logic
- final EntityCollectionManager target = new EntityCollectionManagerImpl( writeStart, writeUpdate, writeVerifyUnique,
+ final EntityCollectionManager target = new EntityCollectionManagerImpl(
+ writeStart, writeVerifyUnique,
writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
entitySerializationStrategy, uniqueValueSerializationStrategy,
- mvccLogEntrySerializationStrategy, keyspace, serializationFig,entityVersionCleanupFactory,
- entityVersionCreatedFactory, entityDeletedFactory, taskExecutor, scope );
- mvccLogEntrySerializationStrategy, keyspace, entityVersionTaskFactory,
- taskExecutor, scope, metricsFactory );
++ mvccLogEntrySerializationStrategy, keyspace, serializationFig,entityVersionTaskFactory, taskExecutor, scope, metricsFactory );
final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target );
@@@ -113,14 -111,13 +105,12 @@@
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
final Keyspace keyspace,
- final EntityVersionCleanupFactory entityVersionCleanupFactory,
- final EntityVersionCreatedFactory entityVersionCreatedFactory,
- final EntityDeletedFactory entityDeletedFactory,
+ final EntityVersionTaskFactory entityVersionTaskFactory,
@CollectionTaskExecutor final TaskExecutor taskExecutor,
final EntityCacheFig entityCacheFig,
- final SerializationFig serializationFig) {
+ MetricsFactory metricsFactory) {
this.writeStart = writeStart;
- this.writeUpdate = writeUpdate;
this.writeVerifyUnique = writeVerifyUnique;
this.writeOptimisticVerify = writeOptimisticVerify;
this.writeCommit = writeCommit;
@@@ -131,20 -128,21 +121,18 @@@
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
this.keyspace = keyspace;
- this.entityVersionCleanupFactory = entityVersionCleanupFactory;
- this.entityVersionCreatedFactory = entityVersionCreatedFactory;
- this.entityDeletedFactory = entityDeletedFactory;
+ this.entityVersionTaskFactory = entityVersionTaskFactory;
this.taskExecutor = taskExecutor;
this.entityCacheFig = entityCacheFig;
- this.serializationFig = serializationFig;
+ this.metricsFactory = metricsFactory;
}
-
-
@Override
- public EntityCollectionManager createCollectionManager( CollectionScope collectionScope ) {
- Preconditions.checkNotNull( collectionScope );
- try {
- return ecmCache.get( collectionScope );
- }
- catch ( ExecutionException ee ) {
- throw new RuntimeException( ee );
+ public EntityCollectionManager createCollectionManager(CollectionScope collectionScope) {
+ Preconditions.checkNotNull(collectionScope);
+ try{
+ return ecmCache.get(collectionScope);
+ }catch (ExecutionException ee){
+ throw new RuntimeException(ee);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 19e8937,6ba4513..001e0db
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@@ -35,6 -34,9 +37,10 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.VersionSet;
import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
++import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
+ import org.apache.usergrid.persistence.collection.guice.Write;
+ import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
+ import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
@@@ -51,8 -51,9 +57,11 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
+ import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+ import org.apache.usergrid.persistence.core.task.Task;
+ import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Entity;
@@@ -69,9 -72,14 +80,17 @@@ import com.netflix.astyanax.connectionp
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.CqlResult;
import com.netflix.astyanax.serializers.StringSerializer;
++import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
++import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
++import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
+ import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
+ import org.apache.usergrid.persistence.core.task.Task;
+ import org.apache.usergrid.persistence.core.task.TaskExecutor;
+ import rx.Notification;
import rx.Observable;
import rx.Subscriber;
+ import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@@ -288,6 -368,140 +375,89 @@@ public class EntityCollectionManagerImp
} );
}
+
+ /**
+ * Retrieves all entities that correspond to each field given in the Collection.
+ * @param fields
+ * @return
+ */
+ @Override
+ public Observable<FieldSet> getEntitiesFromFields( final Collection<Field> fields ) {
+ return rx.Observable.just(fields).map( new Func1<Collection<Field>, FieldSet>() {
+ @Override
+ public FieldSet call( Collection<Field> fields ) {
+ try {
+
+ final UUID startTime = UUIDGenerator.newTimeUUID();
+
+ //Get back set of unique values that correspond to collection of fields
+ UniqueValueSet set = uniqueValueSerializationStrategy.load( collectionScope, fields );
+
+ //Short circut if we don't have any uniqueValues from the given fields.
+ if(!set.iterator().hasNext()){
+ return new MutableFieldSet( 0 );
+ }
+
+
+ //loop through each field, and construct an entity load
+ List<Id> entityIds = new ArrayList<>(fields.size());
+ List<UniqueValue> uniqueValues = new ArrayList<>(fields.size());
+
+ for(final Field expectedField: fields) {
+
+ UniqueValue value = set.getValue(expectedField.getName());
+
+ if(value ==null){
+ logger.debug( "Field does not correspond to a unique value" );
+ }
+
+ entityIds.add(value.getEntityId());
+ uniqueValues.add(value);
+ }
+
+ //Load a entity for each entityId we retrieved.
+ final EntitySet entitySet = entitySerializationStrategy.load(collectionScope, entityIds, startTime);
+
+ //now loop through and ensure the entities are there.
+ final MutationBatch deleteBatch = keyspace.prepareMutationBatch();
+
+ final MutableFieldSet response = new MutableFieldSet(fields.size());
+
+ for(final UniqueValue expectedUnique: uniqueValues) {
+ final MvccEntity entity = entitySet.getEntity(expectedUnique.getEntityId());
+
+ //bad unique value, delete this, it's inconsistent
+ if(entity == null || !entity.getEntity().isPresent()){
+ final MutationBatch valueDelete = uniqueValueSerializationStrategy.delete(collectionScope, expectedUnique);
+ deleteBatch.mergeShallow(valueDelete);
+ continue;
+ }
+
+
+ //else add it to our result set
+ response.addEntity(expectedUnique.getField(),entity);
+
+ }
+
+ //TODO: explore making this an Async process
+ //We'll repair it again if we have to
+ deleteBatch.execute();
+
+ return response;
+
+
+ }
+ catch ( ConnectionException e ) {
+ logger.error( "Failed to getIdField", e );
+ throw new RuntimeException( e );
+ }
+ }
+ } );
+ }
+
+
+
- @Override
- public Observable<Entity> update( final Entity entity ) {
-
- logger.debug( "Starting update process" );
-
- //do our input validation
- Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
-
- final Id entityId = entity.getId();
-
-
- ValidationUtils.verifyIdentity( entityId );
-
- // create our observable and start the write
- CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
-
-
- Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeUpdate );
-
-
- final Timer.Context timer = updateTimer.time();
- return observable.map( writeCommit ).doOnNext(new Action1<Entity>() {
- @Override
- public void call(final Entity entity) {
- logger.debug("sending entity to the queue");
-
- //we an update, signal the fix
- taskExecutor.submit( entityVersionTaskFactory.getCreatedTask( collectionScope, entity ));
-
- taskExecutor.submit( entityVersionTaskFactory.getCleanupTask( collectionScope, entity.getId(), entity.getVersion(), false) );
- //TODO T.N Change this to fire a task
- // Observable.from( new CollectionIoEvent<Id>(collectionScope,
- // entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();
-
-
- }
- }).doOnError(rollback)
- .doOnNext(new Action1<Entity>() {
- @Override
- public void call(Entity entity) {
- updateMeter.mark();
- }
- })
- .doOnCompleted(new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- });
- }
-
+
// fire the stages
public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
WriteStart writeState ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
index dbe58d4,83f165d..5472645
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@@ -22,9 -22,8 +22,8 @@@ import com.google.inject.Inject
import com.google.inject.assistedinject.Assisted;
import com.netflix.astyanax.MutationBatch;
import org.apache.usergrid.persistence.collection.CollectionScope;
- import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.core.task.Task;
import org.apache.usergrid.persistence.model.entity.Id;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 7c39ef4,65506ec..27c3db8
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@@ -20,9 -20,17 +20,18 @@@ package org.apache.usergrid.persistence
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
+ import com.google.inject.Inject;
+ import com.google.inject.assistedinject.Assisted;
+ import org.apache.usergrid.persistence.collection.MvccEntity;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+ import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+ import org.apache.usergrid.persistence.collection.util.EntityUtils;
+ import org.apache.usergrid.persistence.model.entity.Entity;
+ import org.apache.usergrid.persistence.model.field.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -70,6 -75,7 +79,7 @@@ public class EntityVersionCleanupTask i
private final CollectionScope scope;
private final Id entityId;
private final UUID version;
- private final int numToSkip;
++ private final boolean includeVersion;
@Inject
@@@ -89,6 -100,8 +103,8 @@@
this.scope = scope;
this.entityId = entityId;
this.version = version;
+
- numToSkip = includeVersion? 0: 1;
++ includeVersion = includeVersion;
}
@@@ -117,94 -130,79 +133,92 @@@
@Override
public Void call() throws Exception {
//TODO Refactor this logic into a a class that can be invoked from anywhere
- //load every entity we have history of
- Observable<List<MvccEntity>> deleteFieldsObservable =
- Observable.create(new ObservableIterator<MvccEntity>("deleteColumns") {
- @Override
- protected Iterator<MvccEntity> getIterator() {
- Iterator<MvccEntity> entities = entitySerializationStrategy.loadDescendingHistory(
- scope, entityId, version, 1000); // TODO: what fetchsize should we use here?
- return entities;
- }
- })
- //buffer them for efficiency
- .skip(numToSkip)
- .buffer(serializationFig.getBufferSize()).doOnNext(
- new Action1<List<MvccEntity>>() {
- @Override
- public void call(final List<MvccEntity> mvccEntities) {
- final MutationBatch batch = keyspace.prepareMutationBatch();
- final MutationBatch entityBatch = keyspace.prepareMutationBatch();
- final MutationBatch logBatch = keyspace.prepareMutationBatch();
--
- for (MvccEntity mvccEntity : mvccEntities) {
- final UUID entityVersion = mvccEntity.getVersion();
--
- //iterate all unique values
-
- //if the entity is present process the fields
- if(mvccEntity.getEntity().isPresent()) {
- final Entity entity = mvccEntity.getEntity().get();
-
- //remove all unique fields from the index
- for ( final Field field : EntityUtils.getUniqueFields(entity )) {
-
- final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion );
- final MutationBatch deleteMutation =
- uniqueValueSerializationStrategy.delete( scope, unique );
- batch.mergeShallow( deleteMutation );
++ //iterate all unique values
+ final BlockingObservable<Long> uniqueValueCleanup =
+ Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
+ @Override
+ protected Iterator<UniqueValue> getIterator() {
+ return uniqueValueSerializationStrategy.getAllUniqueFields( scope, entityId );
+ }
+ } )
+
+ //skip current versions
+ .skipWhile( new Func1<UniqueValue, Boolean>() {
+ @Override
+ public Boolean call( final UniqueValue uniqueValue ) {
+ return version.equals( uniqueValue.getEntityVersion() );
}
+ } )
+ //buffer our buffer size, then roll them all up in a single batch mutation
+ .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<UniqueValue>>() {
+ @Override
+ public void call( final List<UniqueValue> uniqueValues ) {
+ final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
+
+
+ for ( UniqueValue value : uniqueValues ) {
+ uniqueCleanupBatch.mergeShallow( uniqueValueSerializationStrategy.delete( scope, value ) );
}
- final MutationBatch entityDelete = entitySerializationStrategy
- .delete(scope, entityId, mvccEntity.getVersion());
- entityBatch.mergeShallow( entityDelete );
- final MutationBatch logDelete = logEntrySerializationStrategy
- .delete(scope, entityId, version);
- logBatch.mergeShallow(logDelete);
+ try {
+ uniqueCleanupBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute batch mutation", e );
+ }
}
+ } ).subscribeOn( Schedulers.io() ).longCount().toBlocking();
- try {
- batch.execute();
- } catch (ConnectionException e1) {
- throw new RuntimeException("Unable to execute " +
- "unique value " +
- "delete", e1);
- }
- fireEvents(mvccEntities);
- try {
- entityBatch.execute();
- } catch (ConnectionException e) {
- throw new RuntimeException("Unable to delete entities in cleanup", e);
+
+ //start calling the listeners for remove log entries
+ BlockingObservable<Long> versionsDeletedObservable =
+
+ Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) {
+ @Override
+ protected Iterator<MvccLogEntry> getIterator() {
+
+ return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, version,
+ serializationFig.getBufferSize() );
}
+ } )
+ //skip current version
+ .skipWhile( new Func1<MvccLogEntry, Boolean>() {
+ @Override
+ public Boolean call( final MvccLogEntry mvccLogEntry ) {
+ return version.equals( mvccLogEntry.getVersion() );
+ }
+ } )
+ //buffer them for efficiency
+ .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<MvccLogEntry>>() {
+ @Override
+ public void call( final List<MvccLogEntry> mvccEntities ) {
+
+ fireEvents( mvccEntities );
+
+ final MutationBatch logCleanupBatch = keyspace.prepareMutationBatch();
+
+
+ for ( MvccLogEntry entry : mvccEntities ) {
+ logCleanupBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, entry.getVersion() ));
+ }
- try {
- logBatch.execute();
- } catch (ConnectionException e) {
- throw new RuntimeException("Unable to delete entities from the log", e);
+ try {
+ logCleanupBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute batch mutation", e );
+ }
}
+ } ).subscribeOn( Schedulers.io() ).longCount().toBlocking();
+
+ //wait or this to complete
+ final Long removedCount = uniqueValueCleanup.last();
- }
- }
- );
+ logger.debug( "Removed unique values for {} entities of entity {}", removedCount, entityId );
- final int removedCount = deleteFieldsObservable.count().toBlocking().last();
+ final Long versionCleanupCount = versionsDeletedObservable.last();
- logger.debug("Removed unique values for {} entities of entity {}",removedCount,entityId);
+ logger.debug( "Removed {} previous entity versions of entity {}", versionCleanupCount, entityId );
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
index 5b7898e,60275d9..d7a6407
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@@ -23,9 -22,9 +23,10 @@@ import java.util.Iterator
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+ import com.netflix.astyanax.model.ConsistencyLevel;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
+import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
@@@ -55,27 -53,23 +56,38 @@@ public interface UniqueValueSerializati
/**
* Load UniqueValue that matches field from collection or null if that value does not exist.
*
- * @param colScope Collection scope in which to look for field name/value
+ * @param collectionScope scope in which to look for field name/value
* @param fields Field name/value to search for
+ *
* @return UniqueValueSet containing fields from the collection that exist in cassandra
+ *
* @throws ConnectionException on error connecting to Cassandra
*/
- public UniqueValueSet load( CollectionScope colScope, Collection<Field> fields ) throws ConnectionException;
+ public UniqueValueSet load( CollectionScope collectionScope, Collection<Field> fields ) throws ConnectionException;
+ /**
- * Load UniqueValue that matches field from collection or null if that value does not exist.
- *
- * @param colScope Collection scope in which to look for field name/value
- * @param consistencyLevel Consistency level of query
- * @param fields Field name/value to search for
- * @return UniqueValueSet containing fields from the collection that exist in cassandra
- * @throws ConnectionException on error connecting to Cassandra
- */
++ * Load UniqueValue that matches field from collection or null if that value does not exist.
++ *
++ * @param colScope Collection scope in which to look for field name/value
++ * @param consistencyLevel Consistency level of query
++ * @param fields Field name/value to search for
++ * @return UniqueValueSet containing fields from the collection that exist in cassandra
++ * @throws ConnectionException on error connecting to Cassandra
++ */
+ public UniqueValueSet load( CollectionScope colScope, ConsistencyLevel consistencyLevel, Collection<Field> fields ) throws ConnectionException;
++
+
+ /**
+ * Loads the currently persisted history of every unique value the entity has held. This will
+ * start from the max version and return values in descending version order. Note that for entities
+ * with more than one unique field, sequential fields can be returned with the same version.
+ * @param collectionScope The scope the entity is stored in
+ * @param entityId
+ * @return
+ */
+ public Iterator<UniqueValue> getAllUniqueFields(CollectionScope collectionScope, Id entityId);
+
+
/**
* Delete the specified Unique Value from Cassandra.
*
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index 7aed8db,47372ae..108f2e8
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@@ -38,16 -36,8 +40,17 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.util.EntityUtils;
+import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
+import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+ import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
@@@ -78,40 -65,23 +81,43 @@@ public class UniqueValueSerializationSt
private static final EntityVersionSerializer ENTITY_VERSION_SER = new EntityVersionSerializer();
- private static final MultiTennantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Field>>, EntityVersion> CF_UNIQUE_VALUES =
- new MultiTennantColumnFamily<>( "Unique_Values", ROW_KEY_SER,
- ENTITY_VERSION_SER );
+ private static final MultiTennantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Field>>, EntityVersion>
+ CF_UNIQUE_VALUES = new MultiTennantColumnFamily<>( "Unique_Values", ROW_KEY_SER, ENTITY_VERSION_SER );
+
+
++
+ private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+ private static final CollectionScopedRowKeySerializer<Id> ENTITY_ROW_KEY_SER =
+ new CollectionScopedRowKeySerializer<>( ID_SER );
+
+
+ private static final MultiTennantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UniqueFieldEntry>
+ CF_ENTITY_UNIQUE_VALUES =
+ new MultiTennantColumnFamily<>( "Entity_Unique_Values", ENTITY_ROW_KEY_SER, UniqueFieldEntrySerializer.get() );
+
+ public static final int COL_VALUE = 0x0;
+
+
- private final Keyspace keyspace;
+ private final SerializationFig serializationFig;
+ protected final Keyspace keyspace;
- private final CassandraFig cassandraFig;
++ private final CassandraFig cassandraFig;
+
/**
* Construct serialization strategy for keyspace.
*
* @param keyspace Keyspace in which to store Unique Values.
+ * @param serializationFig
*/
@Inject
- public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final SerializationFig serializationFig ) {
- public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final CassandraFig cassandraFig) {
- this.cassandraFig = cassandraFig;
++ public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final CassandraFig cassandraFig, final SerializationFig serializationFig ) {
this.keyspace = keyspace;
++ this.cassandraFig = cassandraFig;
+ this.serializationFig = serializationFig;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
index beff3bd,cf964a3..20edb66
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
@@@ -2,9 -2,10 +2,10 @@@ package org.apache.usergrid.persistence
import java.util.ArrayList;
- import java.util.Collection;
import java.util.List;
-import java.util.Set;
++import java.util.Collection;
import java.util.UUID;
+ import org.apache.usergrid.persistence.model.field.Field;
import org.apache.commons.lang3.reflect.FieldUtils;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 8f78799,f19d613..246bbca
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@@ -96,7 -126,8 +126,8 @@@ public class GraphManagerImpl implement
final GraphFig graphFig,
final EdgeDeleteListener edgeDeleteListener,
final NodeDeleteListener nodeDeleteListener,
- final ApplicationScope scope) {
- @Assisted final ApplicationScope scope,
++ final ApplicationScope scope,
+ MetricsFactory metricsFactory) {
ValidationUtils.validateApplicationScope( scope );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 7fccd11,50b994d..27fe705
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@@ -19,8 -19,12 +19,9 @@@
package org.apache.usergrid.persistence.index.guice;
- import org.apache.usergrid.persistence.core.guice.CommonModule;
+ import org.apache.usergrid.persistence.collection.guice.CollectionModule;
import org.apache.usergrid.persistence.core.guice.TestModule;
+ import org.apache.usergrid.persistence.core.guice.CommonModule;
-import org.apache.usergrid.persistence.index.impl.BufferQueue;
-import org.apache.usergrid.persistence.index.impl.BufferQueueInMemoryImpl;
-import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
public class TestIndexModule extends TestModule {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index e4043d2,ca9bf79..9cb5297
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@@ -24,10 -24,14 +24,12 @@@ import java.util.*
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
+ import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.index.*;
-import org.apache.usergrid.persistence.model.field.ArrayField;
-import org.apache.usergrid.persistence.model.field.EntityObjectField;
-import org.apache.usergrid.persistence.model.field.UUIDField;
+import org.apache.usergrid.persistence.model.field.*;
import org.apache.usergrid.persistence.model.field.value.EntityObject;
import org.junit.Ignore;
+ import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/pom.xml
----------------------------------------------------------------------