You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/10 17:50:17 UTC
[10/43] git commit: Add new "public Observable
getLatestVersion( Id entityId )" method to the collection manager.
Add new "public Observable<UUID> getLatestVersion( Id entityId )" method to the collection manager.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/159e5fd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/159e5fd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/159e5fd4
Branch: refs/heads/two-dot-o-events
Commit: 159e5fd447ffbc0f3c6ee649dae48d0e44b8cef1
Parents: 185ecef
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Oct 9 11:32:16 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Oct 9 11:32:16 2014 -0400
----------------------------------------------------------------------
.../collection/EntityCollectionManager.java | 15 +++-
.../exception/WriteUniqueVerifyException.java | 1 -
.../collection/guice/CollectionModule.java | 12 +--
.../impl/EntityCollectionManagerImpl.java | 78 ++++++++++++--------
.../collection/mvcc/stage/load/GetVersion.java | 74 +++++++++++++++++++
.../collection/EntityCollectionManagerIT.java | 17 +++++
6 files changed, 154 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index b49989f..6756cdc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -19,6 +19,7 @@
package org.apache.usergrid.persistence.collection;
+import java.util.UUID;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -51,16 +52,22 @@ public interface EntityCollectionManager {
*/
public Observable<Entity> load( Id entityId );
- //TODO TN Change load to use multiget and return multiple entities. Only supports loading 1k per load operation.
+ /**
+ * Return the latest version of the specified entity.
+ */
+ public Observable<UUID> getLatestVersion( Id entityId );
+ //TODO TN Change load to use multiget and return multiple entities.
+ // Only supports loading 1k per load operation.
- //TODO Dave add a load versions using a multiget that will return a latest version structure for a collection of entity Ids
+ //TODO Dave add a load versions using a multiget that will return a latest version
+ //structure for a collection of entity Ids
/**
* Takes the change and reloads an entity with all changes applied in this entity applied.
- * The resulting entity from calling load will be the previous version of this entity + the entity
- * in this object applied to it.
+ * The resulting entity from calling load will be the previous version of this entity + the
+ * entityin this object applied to it.
* @param entity
* @return
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
index 64ca777..7e7f05b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
@@ -22,7 +22,6 @@ import java.util.Map;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.field.Field;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 3336166..cb0087b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -64,11 +64,10 @@ public class CollectionModule extends AbstractModule {
install( new ServiceModule() );
// create a guice factor for getting our collection manager
- install(
- new FactoryModuleBuilder().implement( EntityCollectionManager.class, EntityCollectionManagerImpl.class )
- .implement( EntityCollectionManagerSync.class,
- EntityCollectionManagerSyncImpl.class )
- .build( EntityCollectionManagerFactory.class ) );
+ install( new FactoryModuleBuilder()
+ .implement( EntityCollectionManager.class, EntityCollectionManagerImpl.class )
+ .implement( EntityCollectionManagerSync.class, EntityCollectionManagerSyncImpl.class )
+ .build( EntityCollectionManagerFactory.class ) );
bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );
@@ -101,7 +100,8 @@ public class CollectionModule extends AbstractModule {
@Provides
@CollectionTaskExecutor
public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
- return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
+ return new NamedTaskExecutorImpl( "collectiontasks",
+ serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index b2b07e9..0efb21c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -18,7 +18,6 @@
*/
package org.apache.usergrid.persistence.collection.impl;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +32,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.load.Load;
+import org.apache.usergrid.persistence.collection.mvcc.stage.load.GetVersion;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
@@ -47,6 +47,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
+import java.util.UUID;
import rx.Observable;
import rx.functions.Action1;
@@ -75,6 +76,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final WriteOptimisticVerify writeOptimisticVerify;
private final WriteCommit writeCommit;
private final RollbackAction rollback;
+ private final GetVersion getVersion;
//load stages
private final Load load;
@@ -87,15 +89,21 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final TaskExecutor taskExecutor;
@Inject
- public EntityCollectionManagerImpl( final UUIDService uuidService, @Write final WriteStart writeStart,
- @WriteUpdate final WriteStart writeUpdate,
- final WriteUniqueVerify writeVerifyUnique,
- final WriteOptimisticVerify writeOptimisticVerify,
- final WriteCommit writeCommit, final RollbackAction rollback, final Load load,
- final MarkStart markStart, final MarkCommit markCommit,
- @CollectionTaskExecutor
- final TaskExecutor taskExecutor,
- @Assisted final CollectionScope collectionScope) {
+ public EntityCollectionManagerImpl(
+ final UUIDService uuidService,
+ @Write final WriteStart writeStart,
+ @WriteUpdate final WriteStart writeUpdate,
+ final WriteUniqueVerify writeVerifyUnique,
+ final WriteOptimisticVerify writeOptimisticVerify,
+ final WriteCommit writeCommit,
+ final RollbackAction rollback,
+ final Load load,
+ final MarkStart markStart,
+ final MarkCommit markCommit,
+ final GetVersion getVersion,
+ @CollectionTaskExecutor
+ final TaskExecutor taskExecutor,
+ @Assisted final CollectionScope collectionScope) {
Preconditions.checkNotNull( uuidService, "uuidService must be defined" );
@@ -111,6 +119,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
this.load = load;
this.markStart = markStart;
this.markCommit = markCommit;
+ this.getVersion = getVersion;
this.uuidService = uuidService;
this.collectionScope = collectionScope;
@@ -180,8 +189,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" );
Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" );
- return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) )
- .map( load );
+ return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) ).map(load);
}
@Override
@@ -212,7 +220,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
//we an update, signal the fix
//TODO T.N Change this to fire a task
- Observable.from( new CollectionIoEvent<Id>(collectionScope, entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();
+ Observable.from( new CollectionIoEvent<Id>(collectionScope, entityId ) )
+ .map( load ).subscribeOn( Schedulers.io() ).subscribe();
}
@@ -220,34 +229,39 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
}
// fire the stages
- public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData, WriteStart writeState ) {
-
- return Observable.from( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
-
- @Override
- public void call(
- final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
+ public Observable<CollectionIoEvent<MvccEntity>> stageRunner(
+ CollectionIoEvent<Entity> writeData, WriteStart writeState ) {
- Observable<CollectionIoEvent<MvccEntity>> unique =
- Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
- .doOnNext( writeVerifyUnique );
+ return Observable.from( writeData ).map( writeState ).doOnNext(
+ new Action1<CollectionIoEvent<MvccEntity>>() {
+ @Override
+ public void call(
+ final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
- // optimistic verification
- Observable<CollectionIoEvent<MvccEntity>> optimistic =
- Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
- .doOnNext( writeOptimisticVerify );
+ Observable<CollectionIoEvent<MvccEntity>> unique =
+ Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io())
+ .doOnNext( writeVerifyUnique );
- //wait for both to finish
- Observable.merge( unique, optimistic ).toBlocking().last();
+ // optimistic verification
+ Observable<CollectionIoEvent<MvccEntity>> optimistic =
+ Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io())
+ .doOnNext( writeOptimisticVerify );
- }
- } );
+ //wait for both to finish
+ Observable.merge( unique, optimistic ).toBlocking().last();
+ }
+ }
+ );
}
-
+ @Override
+ public Observable<UUID> getLatestVersion(Id entityId) {
+ return Observable.from(
+ new CollectionIoEvent<Id>( collectionScope, entityId ) ).map(getVersion);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/GetVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/GetVersion.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/GetVersion.java
new file mode 100644
index 0000000..2069818
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/GetVersion.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.mvcc.stage.load;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import java.util.List;
+import java.util.UUID;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.functions.Func1;
+
+
+/**
+ * Gets the latest version UUID for an Entity without loading the Entity.
+ */
+public class GetVersion implements Func1<CollectionIoEvent<Id>, UUID> {
+
+ private static final Logger LOG = LoggerFactory.getLogger( GetVersion.class );
+
+ private final MvccLogEntrySerializationStrategy logStrat;
+
+
+ @Inject
+ public GetVersion( final MvccLogEntrySerializationStrategy logStrat ) {
+ Preconditions.checkNotNull( logStrat, "logStrat is required" );
+ this.logStrat = logStrat;
+ }
+
+
+ @Override
+ public UUID call( CollectionIoEvent<Id> idEvent ) {
+
+ Id id = idEvent.getEvent();
+ CollectionScope cs = idEvent.getEntityCollection();
+
+ final UUID latestVersion;
+ try {
+ List<MvccLogEntry> logEntries = logStrat.load( cs, id, UUIDGenerator.newTimeUUID(), 1 );
+ latestVersion = logEntries.get(0).getVersion();
+
+ } catch (ConnectionException ex) {
+ throw new RuntimeException("Unable to get latest version of entity " +
+ id.getType() + ":" + id.getUuid().toString(), ex );
+ }
+
+ return latestVersion;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index 9e8bbe3..34061ee 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -392,4 +392,21 @@ public class EntityCollectionManagerIT {
assertTrue( UUIDComparator.staticCompare( newVersion, oldVersion ) > 0);
}
+ @Test
+ public void testGetVersion() {
+
+ CollectionScope context = new CollectionScopeImpl(
+ new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
+
+ Entity newEntity = new Entity( new SimpleId( "test" ) );
+ EntityCollectionManager manager = factory.createCollectionManager( context );
+ Observable<Entity> observable = manager.write( newEntity );
+ Entity created = observable.toBlocking().lastOrDefault( null );
+
+ assertNotNull("Id was assigned", created.getId() );
+ assertNotNull("Version was assigned", created.getVersion() );
+
+ assertTrue(UUIDComparator.staticCompare(created.getVersion(),
+ manager.getLatestVersion( created.getId() ).toBlocking().lastOrDefault(null)) == 0);
+ }
}