You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/09 21:23:43 UTC
[1/4] git commit: Add new "public Observable getLatestVersion(
Id entityId )" method to the collection manager.
Repository: incubator-usergrid
Updated Branches:
refs/heads/collection_multiget f1a88aa90 -> 63094abc3
Add new "public Observable<UUID> getLatestVersion( Id entityId )" method to the collection manager.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/159e5fd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/159e5fd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/159e5fd4
Branch: refs/heads/collection_multiget
Commit: 159e5fd447ffbc0f3c6ee649dae48d0e44b8cef1
Parents: 185ecef
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Oct 9 11:32:16 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Oct 9 11:32:16 2014 -0400
----------------------------------------------------------------------
.../collection/EntityCollectionManager.java | 15 +++-
.../exception/WriteUniqueVerifyException.java | 1 -
.../collection/guice/CollectionModule.java | 12 +--
.../impl/EntityCollectionManagerImpl.java | 78 ++++++++++++--------
.../collection/mvcc/stage/load/GetVersion.java | 74 +++++++++++++++++++
.../collection/EntityCollectionManagerIT.java | 17 +++++
6 files changed, 154 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index b49989f..6756cdc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -19,6 +19,7 @@
package org.apache.usergrid.persistence.collection;
+import java.util.UUID;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -51,16 +52,22 @@ public interface EntityCollectionManager {
*/
public Observable<Entity> load( Id entityId );
- //TODO TN Change load to use multiget and return multiple entities. Only supports loading 1k per load operation.
+ /**
+ * Return the latest version of the specified entity.
+ */
+ public Observable<UUID> getLatestVersion( Id entityId );
+ //TODO TN Change load to use multiget and return multiple entities.
+ // Only supports loading 1k per load operation.
- //TODO Dave add a load versions using a multiget that will return a latest version structure for a collection of entity Ids
+ //TODO Dave add a load versions using a multiget that will return a latest version
+ //structure for a collection of entity Ids
/**
* Takes the change and reloads an entity with all changes applied in this entity applied.
- * The resulting entity from calling load will be the previous version of this entity + the entity
- * in this object applied to it.
+ * The resulting entity from calling load will be the previous version of this entity + the
+ * entityin this object applied to it.
* @param entity
* @return
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
index 64ca777..7e7f05b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
@@ -22,7 +22,6 @@ import java.util.Map;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.field.Field;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 3336166..cb0087b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -64,11 +64,10 @@ public class CollectionModule extends AbstractModule {
install( new ServiceModule() );
// create a guice factor for getting our collection manager
- install(
- new FactoryModuleBuilder().implement( EntityCollectionManager.class, EntityCollectionManagerImpl.class )
- .implement( EntityCollectionManagerSync.class,
- EntityCollectionManagerSyncImpl.class )
- .build( EntityCollectionManagerFactory.class ) );
+ install( new FactoryModuleBuilder()
+ .implement( EntityCollectionManager.class, EntityCollectionManagerImpl.class )
+ .implement( EntityCollectionManagerSync.class, EntityCollectionManagerSyncImpl.class )
+ .build( EntityCollectionManagerFactory.class ) );
bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );
@@ -101,7 +100,8 @@ public class CollectionModule extends AbstractModule {
@Provides
@CollectionTaskExecutor
public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
- return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
+ return new NamedTaskExecutorImpl( "collectiontasks",
+ serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index b2b07e9..0efb21c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -18,7 +18,6 @@
*/
package org.apache.usergrid.persistence.collection.impl;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +32,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.load.Load;
+import org.apache.usergrid.persistence.collection.mvcc.stage.load.GetVersion;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
@@ -47,6 +47,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
+import java.util.UUID;
import rx.Observable;
import rx.functions.Action1;
@@ -75,6 +76,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final WriteOptimisticVerify writeOptimisticVerify;
private final WriteCommit writeCommit;
private final RollbackAction rollback;
+ private final GetVersion getVersion;
//load stages
private final Load load;
@@ -87,15 +89,21 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final TaskExecutor taskExecutor;
@Inject
- public EntityCollectionManagerImpl( final UUIDService uuidService, @Write final WriteStart writeStart,
- @WriteUpdate final WriteStart writeUpdate,
- final WriteUniqueVerify writeVerifyUnique,
- final WriteOptimisticVerify writeOptimisticVerify,
- final WriteCommit writeCommit, final RollbackAction rollback, final Load load,
- final MarkStart markStart, final MarkCommit markCommit,
- @CollectionTaskExecutor
- final TaskExecutor taskExecutor,
- @Assisted final CollectionScope collectionScope) {
+ public EntityCollectionManagerImpl(
+ final UUIDService uuidService,
+ @Write final WriteStart writeStart,
+ @WriteUpdate final WriteStart writeUpdate,
+ final WriteUniqueVerify writeVerifyUnique,
+ final WriteOptimisticVerify writeOptimisticVerify,
+ final WriteCommit writeCommit,
+ final RollbackAction rollback,
+ final Load load,
+ final MarkStart markStart,
+ final MarkCommit markCommit,
+ final GetVersion getVersion,
+ @CollectionTaskExecutor
+ final TaskExecutor taskExecutor,
+ @Assisted final CollectionScope collectionScope) {
Preconditions.checkNotNull( uuidService, "uuidService must be defined" );
@@ -111,6 +119,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
this.load = load;
this.markStart = markStart;
this.markCommit = markCommit;
+ this.getVersion = getVersion;
this.uuidService = uuidService;
this.collectionScope = collectionScope;
@@ -180,8 +189,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" );
Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" );
- return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) )
- .map( load );
+ return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) ).map(load);
}
@Override
@@ -212,7 +220,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
//we an update, signal the fix
//TODO T.N Change this to fire a task
- Observable.from( new CollectionIoEvent<Id>(collectionScope, entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();
+ Observable.from( new CollectionIoEvent<Id>(collectionScope, entityId ) )
+ .map( load ).subscribeOn( Schedulers.io() ).subscribe();
}
@@ -220,34 +229,39 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
}
// fire the stages
- public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData, WriteStart writeState ) {
-
- return Observable.from( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
-
- @Override
- public void call(
- final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
+ public Observable<CollectionIoEvent<MvccEntity>> stageRunner(
+ CollectionIoEvent<Entity> writeData, WriteStart writeState ) {
- Observable<CollectionIoEvent<MvccEntity>> unique =
- Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
- .doOnNext( writeVerifyUnique );
+ return Observable.from( writeData ).map( writeState ).doOnNext(
+ new Action1<CollectionIoEvent<MvccEntity>>() {
+ @Override
+ public void call(
+ final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
- // optimistic verification
- Observable<CollectionIoEvent<MvccEntity>> optimistic =
- Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
- .doOnNext( writeOptimisticVerify );
+ Observable<CollectionIoEvent<MvccEntity>> unique =
+ Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io())
+ .doOnNext( writeVerifyUnique );
- //wait for both to finish
- Observable.merge( unique, optimistic ).toBlocking().last();
+ // optimistic verification
+ Observable<CollectionIoEvent<MvccEntity>> optimistic =
+ Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io())
+ .doOnNext( writeOptimisticVerify );
- }
- } );
+ //wait for both to finish
+ Observable.merge( unique, optimistic ).toBlocking().last();
+ }
+ }
+ );
}
-
+ @Override
+ public Observable<UUID> getLatestVersion(Id entityId) {
+ return Observable.from(
+ new CollectionIoEvent<Id>( collectionScope, entityId ) ).map(getVersion);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/GetVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/GetVersion.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/GetVersion.java
new file mode 100644
index 0000000..2069818
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/GetVersion.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.mvcc.stage.load;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import java.util.List;
+import java.util.UUID;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.functions.Func1;
+
+
+/**
+ * Gets the latest version UUID for an Entity without loading the Entity.
+ */
+public class GetVersion implements Func1<CollectionIoEvent<Id>, UUID> {
+
+ private static final Logger LOG = LoggerFactory.getLogger( GetVersion.class );
+
+ private final MvccLogEntrySerializationStrategy logStrat;
+
+
+ @Inject
+ public GetVersion( final MvccLogEntrySerializationStrategy logStrat ) {
+ Preconditions.checkNotNull( logStrat, "logStrat is required" );
+ this.logStrat = logStrat;
+ }
+
+
+ @Override
+ public UUID call( CollectionIoEvent<Id> idEvent ) {
+
+ Id id = idEvent.getEvent();
+ CollectionScope cs = idEvent.getEntityCollection();
+
+ final UUID latestVersion;
+ try {
+ List<MvccLogEntry> logEntries = logStrat.load( cs, id, UUIDGenerator.newTimeUUID(), 1 );
+ latestVersion = logEntries.get(0).getVersion();
+
+ } catch (ConnectionException ex) {
+ throw new RuntimeException("Unable to get latest version of entity " +
+ id.getType() + ":" + id.getUuid().toString(), ex );
+ }
+
+ return latestVersion;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index 9e8bbe3..34061ee 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -392,4 +392,21 @@ public class EntityCollectionManagerIT {
assertTrue( UUIDComparator.staticCompare( newVersion, oldVersion ) > 0);
}
+ @Test
+ public void testGetVersion() {
+
+ CollectionScope context = new CollectionScopeImpl(
+ new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+
+ Entity newEntity = new Entity( new SimpleId( "test" ) );
+ EntityCollectionManager manager = factory.createCollectionManager( context );
+ Observable<Entity> observable = manager.write( newEntity );
+ Entity created = observable.toBlocking().lastOrDefault( null );
+
+ assertNotNull("Id was assigned", created.getId() );
+ assertNotNull("Version was assigned", created.getVersion() );
+
+ assertTrue(UUIDComparator.staticCompare(created.getVersion(),
+ manager.getLatestVersion( created.getId() ).toBlocking().lastOrDefault(null)) == 0);
+ }
}
[3/4] git commit: Merge branch 'two-dot-o' of
https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
collection_multiget
Posted by to...@apache.org.
Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into collection_multiget
Conflicts:
stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ac61f132
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ac61f132
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ac61f132
Branch: refs/heads/collection_multiget
Commit: ac61f132372f653cf14561fe7a03cb7cb6d8a79c
Parents: f1a88aa 81d4e0e
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 9 11:47:51 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 9 11:47:51 2014 -0600
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 179 +++++++++++--------
.../corepersistence/StaleIndexCleanupTest.java | 43 ++++-
.../collection/EntityCollectionManager.java | 5 +
.../exception/WriteUniqueVerifyException.java | 1 -
.../collection/guice/CollectionModule.java | 12 +-
.../impl/EntityCollectionManagerImpl.java | 13 +-
.../collection/EntityCollectionManagerIT.java | 17 ++
7 files changed, 175 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac61f132/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index e5917b0,6756cdc..2625078
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@@ -19,8 -19,7 +19,9 @@@
package org.apache.usergrid.persistence.collection;
+import java.util.Collection;
+
+ import java.util.UUID;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@@ -53,17 -52,16 +54,21 @@@ public interface EntityCollectionManage
*/
public Observable<Entity> load( Id entityId );
+ /**
+ * Return the latest version of the specified entity.
+ */
+ public Observable<UUID> getLatestVersion( Id entityId );
+ //TODO TN Change load to use multiget and return multiple entities. Only supports loading 1k per load operation.
+
+ /**
+ * Load all the entityIds into the observable entity set
+ * @param entityIds
+ * @return
+ */
+ public Observable<EntitySet> load(Collection<Id> entityIds);
- //TODO TN Change load to use multiget and return multiple entities.
- // Only supports loading 1k per load operation.
- //TODO Dave add a load versions using a multiget that will return a latest version
- //structure for a collection of entity Ids
+ //TODO Dave add a load versions using a multiget that will return a latest version structure for a collection of entity Ids
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac61f132/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 39df9cb,0efb21c..5fe2fbb
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@@ -80,7 -76,11 +83,8 @@@ public class EntityCollectionManagerImp
private final WriteOptimisticVerify writeOptimisticVerify;
private final WriteCommit writeCommit;
private final RollbackAction rollback;
+ private final GetVersion getVersion;
- //load stages
- private final Load load;
-
//delete stages
private final MarkStart markStart;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac61f132/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index 823cb87,34061ee..a7f9ea2
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@@ -402,130 -389,24 +402,147 @@@ public class EntityCollectionManagerIT
assertNotNull( "A new version must be assigned", newVersion );
// new Version should be > old version
- assertTrue( UUIDComparator.staticCompare( newVersion, oldVersion ) > 0);
+ assertTrue( UUIDComparator.staticCompare( newVersion, oldVersion ) > 0 );
+ }
+
+
+ @Test
+ public void writeMultiget() {
+
+ final CollectionScope context =
+ new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+ final EntityCollectionManager manager = factory.createCollectionManager( context );
+
+ final int multigetSize = serializationFig.getMaxLoadSize();
+
+ final List<Entity> writtenEntities = new ArrayList<>( multigetSize );
+ final List<Id> entityIds = new ArrayList<>( multigetSize );
+
+ for ( int i = 0; i < multigetSize; i++ ) {
+ final Entity entity = new Entity( new SimpleId( "test" ) );
+
+ final Entity written = manager.write( entity ).toBlocking().last();
+
+ writtenEntities.add( written );
+ entityIds.add( written.getId() );
+ }
+
+
+ final EntitySet entitySet = manager.load( entityIds ).toBlocking().lastOrDefault( null );
+
+ assertNotNull( entitySet );
+
+ assertEquals(multigetSize, entitySet.size());
+ assertFalse(entitySet.isEmpty());
+
+ /**
+ * Validate every element exists
+ */
+ for(int i = 0; i < multigetSize; i ++){
+ final Entity expected = writtenEntities.get( i );
+
+ final MvccEntity returned = entitySet.getEntity( expected.getId() );
+
+ assertEquals("Same entity returned", expected, returned.getEntity().get());
+ }
+
+
}
+
+ /**
+ * Perform a multiget where every entity will need repaired on load
+ */
+ @Test
+ public void writeMultigetRepair() {
+
+ final CollectionScope context =
+ new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+ final EntityCollectionManager manager = factory.createCollectionManager( context );
+
+ final int multigetSize = serializationFig.getMaxLoadSize();
+
+ final List<Entity> writtenEntities = new ArrayList<>( multigetSize );
+ final List<Id> entityIds = new ArrayList<>( multigetSize );
+
+ for ( int i = 0; i < multigetSize; i++ ) {
+ final Entity entity = new Entity( new SimpleId( "test" ) );
+
+ final Entity written = manager.write( entity ).toBlocking().last();
+
+ written.setField( new BooleanField( "updated", true ) );
+
+ final Entity updated = manager.update( written ).toBlocking().last();
+
+ writtenEntities.add( updated );
+ entityIds.add( updated.getId() );
+ }
+
+
+ final EntitySet entitySet = manager.load( entityIds ).toBlocking().lastOrDefault( null );
+
+ assertNotNull( entitySet );
+
+ assertEquals(multigetSize, entitySet.size());
+ assertFalse(entitySet.isEmpty());
+
+ /**
+ * Validate every element exists
+ */
+ for(int i = 0; i < multigetSize; i ++){
+ final Entity expected = writtenEntities.get( i );
+
+ final MvccEntity returned = entitySet.getEntity( expected.getId() );
+
+ assertEquals("Same entity returned", expected, returned.getEntity().get());
+
+ assertTrue( ( Boolean ) returned.getEntity().get().getField( "updated" ).getValue() );
+ }
+
+
+ }
+
+
+
+ @Test(expected = IllegalArgumentException.class)
+ public void readTooLarge() {
+
+ final CollectionScope context =
+ new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+ final EntityCollectionManager manager = factory.createCollectionManager( context );
+
+ final int multigetSize = serializationFig.getMaxLoadSize() +1;
+
+
+ final List<Id> entityIds = new ArrayList<>( multigetSize );
+
+ for ( int i = 0; i < multigetSize; i++ ) {
+
+ entityIds.add( new SimpleId( "simple" ) );
+ }
+
+
+ //should throw an exception
+ manager.load( entityIds ).toBlocking().lastOrDefault( null );
+
+
+
+ }
+ @Test
+ public void testGetVersion() {
+
+ CollectionScope context = new CollectionScopeImpl(
+ new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+
+ Entity newEntity = new Entity( new SimpleId( "test" ) );
+ EntityCollectionManager manager = factory.createCollectionManager( context );
+ Observable<Entity> observable = manager.write( newEntity );
+ Entity created = observable.toBlocking().lastOrDefault( null );
+
+ assertNotNull("Id was assigned", created.getId() );
+ assertNotNull("Version was assigned", created.getVersion() );
+
+ assertTrue(UUIDComparator.staticCompare(created.getVersion(),
+ manager.getLatestVersion( created.getId() ).toBlocking().lastOrDefault(null)) == 0);
+ }
}
[4/4] git commit: Initial refactor of results loading to use
multiget. Need to push load logic down to impelmentations.
Posted by to...@apache.org.
Initial refactor of results loading to use multiget. Need to push load logic down to impelmentations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/63094abc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/63094abc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/63094abc
Branch: refs/heads/collection_multiget
Commit: 63094abc3d52fb6139d8a0828328d0bc4c111e0c
Parents: ac61f13
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 9 13:23:35 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 9 13:23:35 2014 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityDeleteListener.java | 2 +-
.../corepersistence/CpRelationManager.java | 6 +-
.../results/AbstractIdLoader.java | 130 +++++++++++++++++++
.../corepersistence/results/EntitiesLoader.java | 51 ++++++++
.../corepersistence/results/IdsLoader.java | 46 +++++++
.../corepersistence/results/RefsLoader.java | 46 +++++++
.../corepersistence/results/ResultsLoader.java | 57 ++++++++
.../results/ResultsLoaderFactory.java | 53 ++++++++
.../results/ResultsLoaderFactoryImpl.java | 70 ++++++++++
.../CpEntityDeleteListenerTest.java | 2 +-
.../CpEntityIndexDeleteListenerTest.java | 3 +-
11 files changed, 461 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java
index 6b6fa59..70df7d5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityDeleteEvent;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.core.entity.EntityVersion;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index bcfe215..03aada3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -1551,8 +1551,12 @@ public class CpRelationManager implements RelationManager {
*/
private Results buildResults(Query query, CandidateResults crs, String collName ) {
+
logger.debug("buildResults() for {} from {} candidates", collName, crs.size());
+
+ //TODO T.N Change to results loader here
+
Results results = null;
EntityIndex index = managerCache.getEntityIndex(applicationScope);
@@ -1574,7 +1578,7 @@ public class CpRelationManager implements RelationManager {
EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
- UUID latestVersion = ecm.getLatestVersion( cr.getId() ).toBlocking().lastOrDefault(null);
+ UUID latestVersion = ecm.getLatestVersion( Collections.singleton( cr.getId()) ).toBlocking().last().getMaxVersion( cr.getId() ).getVersion();
if ( logger.isDebugEnabled() ) {
logger.debug("Getting version for entity {} from scope\n app {}\n owner {}\n name {}",
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java
new file mode 100644
index 0000000..b034a53
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.usergrid.corepersistence.CpNamingUtils;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public abstract class AbstractIdLoader implements ResultsLoader{
+
+ @Override
+ public Results getResults( final ApplicationScope applicationScope, final CandidateResults crs ) {
+// Map<Id, CandidateResult> latestVersions = new LinkedHashMap<Id, CandidateResult>();
+//
+// Iterator<CandidateResult> iter = crs.iterator();
+// while ( iter.hasNext() ) {
+//
+// CandidateResult cr = iter.next();
+//
+// CollectionScope collScope = new CollectionScopeImpl(
+// applicationScope.getApplication(),
+// applicationScope.getApplication(),
+// CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
+//
+// EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
+//
+// UUID latestVersion = ecm.getLatestVersion( cr.getId() ).toBlocking().lastOrDefault(null);
+//
+// if ( logger.isDebugEnabled() ) {
+// logger.debug("Getting version for entity {} from scope\n app {}\n owner {}\n name {}",
+// new Object[] {
+// cr.getId(),
+// collScope.getApplication(),
+// collScope.getOwner(),
+// collScope.getName()
+// });
+// }
+//
+// if ( latestVersion == null ) {
+// logger.error("Version for Entity {}:{} not found",
+// cr.getId().getType(), cr.getId().getUuid());
+// continue;
+// }
+//
+// if ( cr.getVersion().compareTo( latestVersion) < 0 ) {
+// logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
+// new Object[] { cr.getId().getUuid(), cr.getId().getType(),
+// cr.getVersion(), latestVersion});
+//
+// IndexScope indexScope = new IndexScopeImpl(
+// cpHeadEntity.getId(),
+// CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
+// indexBatch.deindex( indexScope, cr);
+//
+// continue;
+// }
+//
+// CandidateResult alreadySeen = latestVersions.get( cr.getId() );
+//
+// if ( alreadySeen == null ) { // never seen it, so add to map
+// latestVersions.put( cr.getId(), cr );
+//
+// } else {
+// // we seen this id before, only add entity if we now have newer version
+// if ( latestVersion.compareTo( alreadySeen.getVersion() ) > 0 ) {
+//
+// latestVersions.put( cr.getId(), cr);
+//
+// IndexScope indexScope = new IndexScopeImpl(
+// cpHeadEntity.getId(),
+// CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
+// indexBatch.deindex( indexScope, alreadySeen);
+// }
+// }
+// }
+//
+// indexBatch.execute();
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java
new file mode 100644
index 0000000..d20f677
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+
+
+public class EntitiesLoader implements ResultsLoader {
+ @Override
+ public Results getResults( final ApplicationScope scope, final CandidateResults crs ) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java
new file mode 100644
index 0000000..7796028
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+
+
+public class IdsLoader extends AbstractIdLoader{
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java
new file mode 100644
index 0000000..ad852d5
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+
+
+public class RefsLoader extends AbstractIdLoader{
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java
new file mode 100644
index 0000000..8a17c6e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+
+
+/**
+ * Interface for loading results
+ */
+public interface ResultsLoader {
+
+ /**
+ * Using the candidate results, get the results
+ * @param crs
+ * @return
+ */
+ public Results getResults(final ApplicationScope scope, final CandidateResults crs);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
new file mode 100644
index 0000000..5c5892c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+/**
+ * Factory for creating results
+ */
+
+public interface ResultsLoaderFactory {
+
+
+ /**
+ * Get the load for results
+ * @return
+ */
+ public ResultsLoader getLoader();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
new file mode 100644
index 0000000..edaca78
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * Factory for creating results
+ */
+@Singleton
+public class ResultsLoaderFactoryImpl implements ResultsLoaderFactory {
+
+
+ private final EntitiesLoader entityLoader;
+ private final IdsLoader idsLoader;
+ private final RefsLoader refsLoader;
+
+
+ @Inject
+ public ResultsLoaderFactoryImpl( final EntitiesLoader entityLoader, final IdsLoader idsLoader,
+ final RefsLoader refsLoader ) {
+ this.entityLoader = entityLoader;
+ this.idsLoader = idsLoader;
+ this.refsLoader = refsLoader;
+ }
+
+
+ @Override
+ public ResultsLoader getLoader() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityDeleteListenerTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityDeleteListenerTest.java
index dfea003..16d2e79 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityDeleteListenerTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityDeleteListenerTest.java
@@ -28,7 +28,7 @@ import org.junit.runner.RunWith;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityDeleteEvent;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.core.entity.EntityVersion;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
index 77c62c5..6b92d90 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
@@ -29,9 +29,8 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.apache.usergrid.Application;
import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityDeleteEvent;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
[2/4] git commit: Change query result building logic to discard stale
CandidateResults in all cases,
and to do repair by reindexing each stale candidate found.
Posted by to...@apache.org.
Change query result building logic to discard stale CandidateResults in all cases, and to do repair by reindexing each stale candidate found.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/81d4e0ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/81d4e0ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/81d4e0ea
Branch: refs/heads/collection_multiget
Commit: 81d4e0ea24d7f18e60718fefc33086ced5f5900c
Parents: 159e5fd
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Oct 9 11:33:53 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Oct 9 11:33:53 2014 -0400
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 179 +++++++++++--------
.../corepersistence/StaleIndexCleanupTest.java | 43 ++++-
2 files changed, 136 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/81d4e0ea/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 5f595f4..bcfe215 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -1542,118 +1542,143 @@ public class CpRelationManager implements RelationManager {
}
+ /**
+ * Build results from a set of candidates, and discard those that represent stale indexes.
+ *
+ * @param query Query that was executed
+ * @param crs Candidates to be considered for results
+ * @param collName Name of collection or null if querying all types
+ */
private Results buildResults(Query query, CandidateResults crs, String collName ) {
logger.debug("buildResults() for {} from {} candidates", collName, crs.size());
Results results = null;
- if ( query.getLevel().equals( Level.IDS )) {
+ EntityIndex index = managerCache.getEntityIndex(applicationScope);
+ EntityIndexBatch indexBatch = index.createBatch();
- // TODO: add stale entity logic here
-
- // TODO: replace this with List<Id> someday
- List<UUID> ids = new ArrayList<UUID>();
- Iterator<CandidateResult> iter = crs.iterator();
- while ( iter.hasNext() ) {
- ids.add( iter.next().getId().getUuid() );
+
+ // map of the latest versions, we will discard stale indexes
+ Map<Id, CandidateResult> latestVersions = new LinkedHashMap<Id, CandidateResult>();
+
+ Iterator<CandidateResult> iter = crs.iterator();
+ while ( iter.hasNext() ) {
+
+ CandidateResult cr = iter.next();
+
+ CollectionScope collScope = new CollectionScopeImpl(
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
+
+ EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
+
+ UUID latestVersion = ecm.getLatestVersion( cr.getId() ).toBlocking().lastOrDefault(null);
+
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Getting version for entity {} from scope\n app {}\n owner {}\n name {}",
+ new Object[] {
+ cr.getId(),
+ collScope.getApplication(),
+ collScope.getOwner(),
+ collScope.getName()
+ });
+ }
+
+ if ( latestVersion == null ) {
+ logger.error("Version for Entity {}:{} not found",
+ cr.getId().getType(), cr.getId().getUuid());
+ continue;
}
- results = Results.fromIdList( ids );
- } else if ( query.getLevel().equals( Level.REFS )) {
+ if ( cr.getVersion().compareTo( latestVersion) < 0 ) {
+ logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
+ new Object[] { cr.getId().getUuid(), cr.getId().getType(),
+ cr.getVersion(), latestVersion});
- // TODO: add stale entity logic here
-
- if ( crs.size() == 1 ) {
- CandidateResult cr = crs.iterator().next();
- results = Results.fromRef(
- new SimpleEntityRef( cr.getId().getType(), cr.getId().getUuid()));
+ IndexScope indexScope = new IndexScopeImpl(
+ cpHeadEntity.getId(),
+ CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
+ indexBatch.deindex( indexScope, cr);
+
+ continue;
+ }
+
+ CandidateResult alreadySeen = latestVersions.get( cr.getId() );
+
+ if ( alreadySeen == null ) { // never seen it, so add to map
+ latestVersions.put( cr.getId(), cr );
} else {
+ // we seen this id before, only add entity if we now have newer version
+ if ( latestVersion.compareTo( alreadySeen.getVersion() ) > 0 ) {
- List<EntityRef> entityRefs = new ArrayList<EntityRef>();
- Iterator<CandidateResult> iter = crs.iterator();
- while ( iter.hasNext() ) {
- Id id = iter.next().getId();
- entityRefs.add( new SimpleEntityRef( id.getType(), id.getUuid() ));
- }
- results = Results.fromRefList(entityRefs);
+ latestVersions.put( cr.getId(), cr);
+
+ IndexScope indexScope = new IndexScopeImpl(
+ cpHeadEntity.getId(),
+ CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
+ indexBatch.deindex( indexScope, alreadySeen);
+ }
}
+ }
- } else {
+ indexBatch.execute();
- // first, build map of latest versions of entities
- Map<Id, org.apache.usergrid.persistence.model.entity.Entity> latestVersions =
- new LinkedHashMap<Id, org.apache.usergrid.persistence.model.entity.Entity>();
+ if (query.getLevel().equals(Level.IDS)) {
- Iterator<CandidateResult> iter = crs.iterator();
- while ( iter.hasNext() ) {
+ List<UUID> ids = new ArrayList<UUID>();
+ for ( Id id : latestVersions.keySet() ) {
+ CandidateResult cr = latestVersions.get(id);
+ ids.add( cr.getId().getUuid() );
+ }
+ results = Results.fromIdList(ids);
+
+ } else if (query.getLevel().equals(Level.REFS)) {
+
+ List<EntityRef> refs = new ArrayList<EntityRef>();
+ for ( Id id : latestVersions.keySet() ) {
+ CandidateResult cr = latestVersions.get(id);
+ refs.add( new SimpleEntityRef( cr.getId().getType(), cr.getId().getUuid()));
+ }
+ results = Results.fromRefList( refs );
- CandidateResult cr = iter.next();
+ } else {
+
+ List<Entity> entities = new ArrayList<Entity>();
+ for (Id id : latestVersions.keySet()) {
+
+ CandidateResult cr = latestVersions.get(id);
CollectionScope collScope = new CollectionScopeImpl(
applicationScope.getApplication(),
applicationScope.getApplication(),
CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
- EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
- if ( logger.isDebugEnabled() ) {
- logger.debug("Loading entity {} from scope\n app {}\n owner {}\n name {}",
- new Object[] {
- cr.getId(),
- collScope.getApplication(),
- collScope.getOwner(),
- collScope.getName()
- });
- }
+ EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
- org.apache.usergrid.persistence.model.entity.Entity e =
- ecm.load( cr.getId() ).toBlockingObservable().last();
+ org.apache.usergrid.persistence.model.entity.Entity e =
+ ecm.load( cr.getId() ).toBlocking().lastOrDefault(null);
if ( e == null ) {
- logger.error("Entity {}:{} not found", cr.getId().getType(), cr.getId().getUuid());
+ logger.error("Entity {}:{} not found",
+ cr.getId().getType(), cr.getId().getUuid());
continue;
}
- if ( cr.getVersion().compareTo( e.getVersion()) < 0 ) {
- logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
- new Object[] { cr.getId().getUuid(), cr.getId().getType(),
- cr.getVersion(), e.getVersion()});
- continue;
- }
-
- org.apache.usergrid.persistence.model.entity.Entity alreadySeen =
- latestVersions.get( e.getId() );
- if ( alreadySeen == null ) { // never seen it, so add to map
- latestVersions.put( e.getId(), e);
-
- } else {
- // we seen this id before, only add entity if newer version
- if ( e.getVersion().compareTo( alreadySeen.getVersion() ) > 0 ) {
- latestVersions.put( e.getId(), e);
- }
- }
- }
-
- // now build collection of old-school entities
- List<Entity> entities = new ArrayList<Entity>();
- for ( Id id : latestVersions.keySet() ) {
-
- org.apache.usergrid.persistence.model.entity.Entity e =
- latestVersions.get( id );
-
Entity entity = EntityFactory.newEntity(
- e.getId().getUuid(), e.getField("type").getValue().toString() );
+ e.getId().getUuid(), e.getField("type").getValue().toString());
- Map<String, Object> entityMap = CpEntityMapUtils.toMap( e );
- entity.addProperties( entityMap );
- entities.add( entity );
+ Map<String, Object> entityMap = CpEntityMapUtils.toMap(e);
+ entity.addProperties(entityMap);
+ entities.add(entity);
}
- if ( entities.size() == 1 ) {
- results = Results.fromEntity( entities.get(0));
+ if (entities.size() == 1) {
+ results = Results.fromEntity(entities.get(0));
} else {
- results = Results.fromEntities( entities );
+ results = Results.fromEntities(entities);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/81d4e0ea/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 5fc9af3..c5d5782 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
@@ -53,6 +54,9 @@ import org.slf4j.LoggerFactory;
public class StaleIndexCleanupTest extends AbstractCoreIT {
private static final Logger logger = LoggerFactory.getLogger(StaleIndexCleanupTest.class );
+ private static final long writeDelayMs = 80;
+ //private static final long readDelayMs = 7;
+
@Test
public void testUpdateVersioning() throws Exception {
@@ -92,45 +96,66 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
logger.info("Started testStaleIndexCleanup()");
- final EntityManager em = app.getEntityManager();
+ // TODO: turn off post processing stuff that cleans up stale entities
- final List<Entity> things = new ArrayList<Entity>();
+ final EntityManager em = app.getEntityManager();
- int numEntities = 1;
- int numUpdates = 3;
+ int numEntities = 100;
+ int numUpdates = 10;
- // create 100 entities
+ // create lots of entities
+ final List<Entity> things = new ArrayList<Entity>();
for ( int i=0; i<numEntities; i++) {
final String thingName = "thing" + i;
things.add( em.create("thing", new HashMap<String, Object>() {{
put("name", thingName);
}}));
+ Thread.sleep( writeDelayMs );
}
em.refreshIndex();
CandidateResults crs = queryCollectionCp( "things", "select *");
Assert.assertEquals( numEntities, crs.size() );
- // update each one 10 times
+ // update each one a bunch of times
+ int count = 0;
for ( Entity thing : things ) {
for ( int j=0; j<numUpdates; j++) {
+
Entity toUpdate = em.get( thing.getUuid() );
thing.setProperty( "property" + j, RandomStringUtils.randomAlphanumeric(10));
em.update(toUpdate);
+
+ Thread.sleep( writeDelayMs );
em.refreshIndex();
+ count++;
+
+ if ( count % 100 == 0 ) {
+ logger.info("Updated {} of {} times", count, numEntities * numUpdates);
+ }
}
}
- // new query for total number of result candidates = 1000
+ // query Core Persistence directly for total number of result candidates
+ // should be entities X updates because of stale indexes
crs = queryCollectionCp("things", "select *");
Assert.assertEquals( numEntities * numUpdates, crs.size() );
- // query for results, should be 100 (and it triggers background clean up of stale indexes)
+ // query EntityManager for results
+ // should return 100 becuase it filters out the stale entities
+ Query q = Query.fromQL("select *");
+ q.setLimit( 10000 );
+ Results results = em.searchCollection( em.getApplicationRef(), "things", q);
+ assertEquals( numEntities, results.size() );
+ // EntityManager should have kicked off a batch cleanup of those stale indexes
// wait a second for batch cleanup to complete
+ Thread.sleep(600);
- // query for total number of result candidates = 1000
+ // query for total number of result candidates = 100
+ crs = queryCollectionCp("things", "select *");
+ Assert.assertEquals( numEntities, crs.size() );
}