You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/30 14:50:08 UTC

[03/12] git commit: EntityVersionCreated implementation.

EntityVersionCreated implementation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0537e100
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0537e100
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0537e100

Branch: refs/heads/two-dot-o-events
Commit: 0537e1006339d6741fb45f988d95da74c01cfb30
Parents: 9153eb9
Author: grey <gr...@apigee.com>
Authored: Wed Oct 29 12:59:00 2014 -0700
Committer: grey <gr...@apigee.com>
Committed: Wed Oct 29 12:59:00 2014 -0700

----------------------------------------------------------------------
 .../events/EntityVersionCreatedImpl.java        |  52 +++++++
 .../collection/EntityVersionCreatedFactory.java |  31 ++++
 .../collection/guice/CollectionModule.java      |   3 +
 .../impl/EntityCollectionManagerImpl.java       |  29 +++-
 .../impl/EntityVersionCreatedTask.java          | 150 +++++++++++++++++++
 .../impl/EntityVersionCreatedTaskTest.java      | 135 +++++++++++++++++
 .../persistence/index/EntityIndexBatch.java     |  12 +-
 .../index/impl/EsEntityIndexBatchImpl.java      |  26 ++++
 8 files changed, 428 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedImpl.java
new file mode 100644
index 0000000..738183c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedImpl.java
@@ -0,0 +1,52 @@
+package org.apache.usergrid.corepersistence.events;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.CpEntityManager;
+import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
+import org.apache.usergrid.corepersistence.CpSetup;
+import org.apache.usergrid.corepersistence.HybridEntityManagerFactory;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/**
+ * Created by ApigeeCorporation on 10/24/14.
+ */
+public class EntityVersionCreatedImpl implements EntityVersionCreated{
+
+    private static final Logger logger = LoggerFactory.getLogger( EntityVersionCreatedImpl.class );
+
+    public EntityVersionCreatedImpl() {
+        logger.debug("EntityVersionCreated");
+    }
+
+    @Override
+    public void versionCreated( final CollectionScope scope, final Entity entity ) {
+        logger.debug("Entering deleted for entity {}:{} v {} "
+                        + "scope\n   name: {}\n   owner: {}\n   app: {}",
+                new Object[] { entity.getId().getType(), entity.getId().getUuid(), entity.getVersion(),
+                        scope.getName(), scope.getOwner(), scope.getApplication()});
+
+        HybridEntityManagerFactory hemf = (HybridEntityManagerFactory)CpSetup.getEntityManagerFactory();
+        CpEntityManagerFactory cpemf = (CpEntityManagerFactory)hemf.getImplementation();
+
+//        CpEntityManagerFactory emf = (CpEntityManagerFactory)
+//                CpSetup.getInjector().getInstance( EntityManagerFactory.class );
+
+        final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope);
+
+        EntityIndexBatch batch = ei.createBatch();
+
+        batch.deindexPreviousVersions( entity );
+        batch.execute();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityVersionCreatedFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityVersionCreatedFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityVersionCreatedFactory.java
new file mode 100644
index 0000000..4248d42
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityVersionCreatedFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.impl.EntityVersionCreatedTask;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+public interface EntityVersionCreatedFactory {
+    public EntityVersionCreatedTask getTask( final CollectionScope scope, final Entity entity);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 3532fe0..de99078 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -27,6 +27,7 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerSync;
 import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
 import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
+import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
 import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
 import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerImpl;
@@ -72,9 +73,11 @@ public class CollectionModule extends AbstractModule {
 
         install ( new FactoryModuleBuilder().build( EntityVersionCleanupFactory.class ));
         install ( new FactoryModuleBuilder().build( EntityDeletedFactory.class));
+        install ( new FactoryModuleBuilder().build( EntityVersionCreatedFactory.class ));
 
         // users of this module can add their own implemementations
         // for more information: https://github.com/google/guice/wiki/Multibindings
+
         Multibinder.newSetBinder( binder(), EntityVersionDeleted.class );
         Multibinder.newSetBinder( binder(), EntityVersionCreated.class );
         Multibinder.newSetBinder( binder(), EntityDeleted.class );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index b1d6b2f..3d33a52 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -91,7 +91,10 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final MarkCommit markCommit;
 
     private final TaskExecutor taskExecutor;
-    private final EntityVersionCleanupFactory entityVersionCleanupFactory;
+
+    private EntityVersionCleanupFactory entityVersionCleanupFactory;
+    private EntityVersionCreatedFactory entityVersionCreatedFactory;
+
     private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
     private final MvccEntitySerializationStrategy entitySerializationStrategy;
     private final EntityDeletedFactory entityDeletedFactory;
@@ -110,16 +113,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         final MarkStart                            markStart, 
         final MarkCommit                           markCommit,
         final EntityVersionCleanupFactory          entityVersionCleanupFactory,
+        final EntityVersionCreatedFactory          entityVersionCreatedFactory,
         final MvccEntitySerializationStrategy      entitySerializationStrategy,
         final UniqueValueSerializationStrategy     uniqueValueSerializationStrategy,
         final MvccLogEntrySerializationStrategy    mvccLogEntrySerializationStrategy,
         final EntityDeletedFactory                 entityDeletedFactory,
         @CollectionTaskExecutor final TaskExecutor taskExecutor,
         @Assisted final CollectionScope            collectionScope
+
     ) {
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.entitySerializationStrategy = entitySerializationStrategy;
         this.entityDeletedFactory = entityDeletedFactory;
+        this.entityVersionCreatedFactory = entityVersionCreatedFactory;
 
         Preconditions.checkNotNull(uuidService, "uuidService must be defined");
 
@@ -168,13 +174,20 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         observable.map(writeCommit).doOnNext(new Action1<Entity>() {
             @Override
             public void call(final Entity entity) {
-
-                // TODO fire a task here
-
-                taskExecutor.submit(entityVersionCleanupFactory.getTask( 
-                    collectionScope, entityId, entity.getVersion() ));
-
-                // post-processing to come later. leave it empty for now.
+//<<<<<<< Updated upstream
+//
+//                // TODO fire a task here
+//
+//                taskExecutor.submit(entityVersionCleanupFactory.getTask(
+//                    collectionScope, entityId, entity.getVersion() ));
+//
+//                // post-processing to come later. leave it empty for now.
+//=======
+                //TODO fire the created task first then the entityVersioncleanup
+                taskExecutor.submit(entityVersionCreatedFactory.getTask(collectionScope,entity));
+                taskExecutor.submit(entityVersionCleanupFactory.getTask(collectionScope, entityId,entity.getVersion()));
+                //post-processing to come later. leave it empty for now.
+//>>>>>>> Stashed changes
             }
         }).doOnError(rollback);
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
new file mode 100644
index 0000000..b78b09c
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
@@ -0,0 +1,150 @@
+package org.apache.usergrid.persistence.collection.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.event.EntityDeleted;
+import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+
+/**
+ * Created by ApigeeCorporation on 10/24/14.
+ */
+public class EntityVersionCreatedTask implements Task<Void> {
+    private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
+
+
+    private final Set<EntityVersionCreated> listeners;
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final MvccEntitySerializationStrategy entitySerializationStrategy;
+    private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+    private final Keyspace keyspace;
+
+    private final SerializationFig serializationFig;
+
+    private final CollectionScope collectionScope;
+    private final Entity entity;
+
+    private EntityVersionCreatedFactory entityVersionCreatedFactory;
+
+
+
+    @Inject
+    public EntityVersionCreatedTask( EntityVersionCreatedFactory entityVersionCreatedFactory,
+                                     final SerializationFig serializationFig,
+                                     final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                                     final MvccEntitySerializationStrategy entitySerializationStrategy,
+                                     final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+                                     final Keyspace keyspace,
+                                     @Assisted final CollectionScope collectionScope,
+                                     final Set<EntityVersionCreated> listeners,
+                                     @Assisted final Entity entity ) {
+
+        this.entityVersionCreatedFactory = entityVersionCreatedFactory;
+        this.serializationFig = serializationFig;
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.entitySerializationStrategy = entitySerializationStrategy;
+        this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
+        this.keyspace = keyspace;
+        this.listeners = listeners;
+        this.collectionScope = collectionScope;
+        this.entity = entity;
+
+    }
+
+
+    @Override
+    public void exceptionThrown( final Throwable throwable ) {
+        LOG.error( "Unable to run update task for collection {} with entity {} and version {}",
+                new Object[] { collectionScope, entity}, throwable );
+    }
+
+
+    @Override
+    public Void rejected() {
+        //Our task was rejected meaning our queue was full.  We need this operation to run,
+        // so we'll run it in our current thread
+        try {
+            call();
+        }
+        catch ( Exception e ) {
+            throw new RuntimeException( "Exception thrown in call task", e );
+        }
+
+        return null;
+    }
+
+    @Override
+    public Void call() throws Exception {
+
+        fireEvents();
+        return null;
+    }
+
+    private void fireEvents() {
+        final int listenerSize = listeners.size();
+
+        if ( listenerSize == 0 ) {
+            return;
+        }
+
+        if ( listenerSize == 1 ) {
+            listeners.iterator().next().versionCreated( collectionScope,entity );
+            return;
+        }
+
+        LOG.debug( "Started firing {} listeners", listenerSize );
+        //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
+        Observable.from(listeners)
+                  .parallel( new Func1<Observable<EntityVersionCreated
+                          >, Observable<EntityVersionCreated>>() {
+
+                      @Override
+                      public Observable<EntityVersionCreated> call(
+                              final Observable<EntityVersionCreated> entityVersionCreatedObservable ) {
+
+                          return entityVersionCreatedObservable.doOnNext( new Action1<EntityVersionCreated>() {
+                              @Override
+                              public void call( final EntityVersionCreated listener ) {
+                                  listener.versionCreated(collectionScope,entity);
+                              }
+                          } );
+                      }
+                  }, Schedulers.io() ).toBlocking().last();
+
+        LOG.debug( "Finished firing {} listeners", listenerSize );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
new file mode 100644
index 0000000..4f1abca
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
@@ -0,0 +1,135 @@
+package org.apache.usergrid.persistence.collection.impl;
+
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.util.LogEntryMock;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Created task tests.
+ */
+public class EntityVersionCreatedTaskTest {
+
+    private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 );
+
+    @AfterClass
+    public static void shutdown() {
+        taskExecutor.shutdown();
+    }
+
+    @Test(timeout=10000)
+    public void noListener()//why does it matter if it has a version or not without a listener
+            throws ExecutionException, InterruptedException, ConnectionException {
+
+
+        final SerializationFig serializationFig = mock( SerializationFig.class );
+
+        when( serializationFig.getBufferSize() ).thenReturn( 10 );
+
+        final MvccEntitySerializationStrategy ess =
+                mock( MvccEntitySerializationStrategy.class );
+
+        final MvccLogEntrySerializationStrategy less =
+                mock( MvccLogEntrySerializationStrategy.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final MutationBatch entityBatch = mock( MutationBatch.class );
+
+        final MutationBatch logBatch = mock( MutationBatch.class );
+
+        final EntityVersionCreatedFactory entityVersionCreatedFactory = mock(EntityVersionCreatedFactory.class);
+
+        when( keyspace.prepareMutationBatch() )
+                .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one
+                .thenReturn( entityBatch )
+                .thenReturn( logBatch );
+
+        // intentionally no events
+        final Set<EntityVersionCreated> listeners = mock( Set.class );//new HashSet<EntityVersionCreated>();
+
+        when ( listeners.size()).thenReturn( 0 );
+
+        final Id applicationId = new SimpleId( "application" );
+
+        final CollectionScope appScope = new CollectionScopeImpl(
+                applicationId, applicationId, "users" );
+
+        final Id entityId = new SimpleId( "user" );
+
+        final Entity entity = new Entity( entityId );
+
+        final MvccEntity mvccEntity = new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+                MvccEntity.Status.COMPLETE,entity );
+
+
+        // mock up a single log entry for our first test
+        final LogEntryMock logEntryMock =
+                LogEntryMock.createLogEntryMock(less, appScope, entityId, 2 );
+
+        final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+        final UniqueValueSerializationStrategy uvss =
+                mock( UniqueValueSerializationStrategy.class );
+
+        EntityVersionCreatedTask entityVersionCreatedTask =
+                new EntityVersionCreatedTask(entityVersionCreatedFactory,
+                serializationFig,
+                        less,
+                        ess,
+                        uvss,
+                        keyspace,
+                        appScope,
+                        listeners,
+                        entity);
+
+        // start the task
+        ListenableFuture<Void> future = taskExecutor.submit( entityVersionCreatedTask );
+
+        // wait for the task
+        future.get();
+
+        //mocked listener makes sure that the task is called
+        verify( listeners ).size();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index 04992e1..bd1ee40 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -60,11 +60,19 @@ public interface EntityIndexBatch {
      * Create a delete method that deletes by Id. This will delete all documents from ES with 
      * the same entity Id, effectively removing all versions of an entity from all index scopes.
      */
-    public EntityIndexBatch deleteEntity( Id entityId ); 
+    public EntityIndexBatch deleteEntity( Id entityId );
 
     /**
-     * Execute the batch
+     * Takes all the previous versions of the current entity and deindexs all previous versions
+     * @param entity
+     * @return
      */
+    public EntityIndexBatch deindexPreviousVersions(Entity entity);
+
+
+        /**
+         * Execute the batch
+         */
     public void execute();
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 6b9f481..1fca845 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -30,6 +30,9 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.index.query.FilterBuilders;
+import org.elasticsearch.index.query.FilteredQueryBuilder;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -211,6 +214,29 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         return this;
     }
 
+    @Override
+    public EntityIndexBatch deindexPreviousVersions(Entity entity){
+
+         FilteredQueryBuilder fqb = QueryBuilders.filteredQuery
+                 (QueryBuilders.termQuery
+                         ( STRING_PREFIX + ENTITYID_FIELDNAME,entity.getId().getUuid().toString().toLowerCase() ),
+                         FilterBuilders.rangeFilter("version").lt( entity.getId().getUuid().timestamp() ));
+
+
+
+        //QueryBuilders.rangeQuery(  )
+
+        DeleteByQueryResponse response = client.prepareDeleteByQuery("test")
+                                               .setQuery( fqb ).execute().actionGet();
+
+        logger.debug("Deleted entity {}:{} from all index scopes with response status = {}",
+                new Object[] { entity.getId().getType(), entity.getId().getUuid(), response.status().toString() });
+
+        maybeFlush();
+
+        return this;
+    }
+
 
     @Override
     public void execute() {