You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/30 14:50:08 UTC
[03/12] git commit: EntityVersionCreated implementation.
EntityVersionCreated implementation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0537e100
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0537e100
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0537e100
Branch: refs/heads/two-dot-o-events
Commit: 0537e1006339d6741fb45f988d95da74c01cfb30
Parents: 9153eb9
Author: grey <gr...@apigee.com>
Authored: Wed Oct 29 12:59:00 2014 -0700
Committer: grey <gr...@apigee.com>
Committed: Wed Oct 29 12:59:00 2014 -0700
----------------------------------------------------------------------
.../events/EntityVersionCreatedImpl.java | 52 +++++++
.../collection/EntityVersionCreatedFactory.java | 31 ++++
.../collection/guice/CollectionModule.java | 3 +
.../impl/EntityCollectionManagerImpl.java | 29 +++-
.../impl/EntityVersionCreatedTask.java | 150 +++++++++++++++++++
.../impl/EntityVersionCreatedTaskTest.java | 135 +++++++++++++++++
.../persistence/index/EntityIndexBatch.java | 12 +-
.../index/impl/EsEntityIndexBatchImpl.java | 26 ++++
8 files changed, 428 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedImpl.java
new file mode 100644
index 0000000..738183c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedImpl.java
@@ -0,0 +1,52 @@
+package org.apache.usergrid.corepersistence.events;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.CpEntityManager;
+import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
+import org.apache.usergrid.corepersistence.CpSetup;
+import org.apache.usergrid.corepersistence.HybridEntityManagerFactory;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/**
+ * Created by ApigeeCorporation on 10/24/14.
+ */
+public class EntityVersionCreatedImpl implements EntityVersionCreated{
+
+ private static final Logger logger = LoggerFactory.getLogger( EntityVersionCreatedImpl.class );
+
+ public EntityVersionCreatedImpl() {
+ logger.debug("EntityVersionCreated");
+ }
+
+ @Override
+ public void versionCreated( final CollectionScope scope, final Entity entity ) {
+ logger.debug("Entering deleted for entity {}:{} v {} "
+ + "scope\n name: {}\n owner: {}\n app: {}",
+ new Object[] { entity.getId().getType(), entity.getId().getUuid(), entity.getVersion(),
+ scope.getName(), scope.getOwner(), scope.getApplication()});
+
+ HybridEntityManagerFactory hemf = (HybridEntityManagerFactory)CpSetup.getEntityManagerFactory();
+ CpEntityManagerFactory cpemf = (CpEntityManagerFactory)hemf.getImplementation();
+
+// CpEntityManagerFactory emf = (CpEntityManagerFactory)
+// CpSetup.getInjector().getInstance( EntityManagerFactory.class );
+
+ final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope);
+
+ EntityIndexBatch batch = ei.createBatch();
+
+ batch.deindexPreviousVersions( entity );
+ batch.execute();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityVersionCreatedFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityVersionCreatedFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityVersionCreatedFactory.java
new file mode 100644
index 0000000..4248d42
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityVersionCreatedFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.impl.EntityVersionCreatedTask;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+public interface EntityVersionCreatedFactory {
+ public EntityVersionCreatedTask getTask( final CollectionScope scope, final Entity entity);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 3532fe0..de99078 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -27,6 +27,7 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory
import org.apache.usergrid.persistence.collection.EntityCollectionManagerSync;
import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
+import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
import org.apache.usergrid.persistence.collection.event.EntityDeleted;
import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerImpl;
@@ -72,9 +73,11 @@ public class CollectionModule extends AbstractModule {
install ( new FactoryModuleBuilder().build( EntityVersionCleanupFactory.class ));
install ( new FactoryModuleBuilder().build( EntityDeletedFactory.class));
+ install ( new FactoryModuleBuilder().build( EntityVersionCreatedFactory.class ));
// users of this module can add their own implemementations
// for more information: https://github.com/google/guice/wiki/Multibindings
+
Multibinder.newSetBinder( binder(), EntityVersionDeleted.class );
Multibinder.newSetBinder( binder(), EntityVersionCreated.class );
Multibinder.newSetBinder( binder(), EntityDeleted.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index b1d6b2f..3d33a52 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -91,7 +91,10 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final MarkCommit markCommit;
private final TaskExecutor taskExecutor;
- private final EntityVersionCleanupFactory entityVersionCleanupFactory;
+
+ private EntityVersionCleanupFactory entityVersionCleanupFactory;
+ private EntityVersionCreatedFactory entityVersionCreatedFactory;
+
private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
private final MvccEntitySerializationStrategy entitySerializationStrategy;
private final EntityDeletedFactory entityDeletedFactory;
@@ -110,16 +113,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
final MarkStart markStart,
final MarkCommit markCommit,
final EntityVersionCleanupFactory entityVersionCleanupFactory,
+ final EntityVersionCreatedFactory entityVersionCreatedFactory,
final MvccEntitySerializationStrategy entitySerializationStrategy,
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
final EntityDeletedFactory entityDeletedFactory,
@CollectionTaskExecutor final TaskExecutor taskExecutor,
@Assisted final CollectionScope collectionScope
+
) {
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy;
this.entityDeletedFactory = entityDeletedFactory;
+ this.entityVersionCreatedFactory = entityVersionCreatedFactory;
Preconditions.checkNotNull(uuidService, "uuidService must be defined");
@@ -168,13 +174,20 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
observable.map(writeCommit).doOnNext(new Action1<Entity>() {
@Override
public void call(final Entity entity) {
-
- // TODO fire a task here
-
- taskExecutor.submit(entityVersionCleanupFactory.getTask(
- collectionScope, entityId, entity.getVersion() ));
-
- // post-processing to come later. leave it empty for now.
+//<<<<<<< Updated upstream
+//
+// // TODO fire a task here
+//
+// taskExecutor.submit(entityVersionCleanupFactory.getTask(
+// collectionScope, entityId, entity.getVersion() ));
+//
+// // post-processing to come later. leave it empty for now.
+//=======
+ //TODO fire the created task first then the entityVersioncleanup
+ taskExecutor.submit(entityVersionCreatedFactory.getTask(collectionScope,entity));
+ taskExecutor.submit(entityVersionCleanupFactory.getTask(collectionScope, entityId,entity.getVersion()));
+ //post-processing to come later. leave it empty for now.
+//>>>>>>> Stashed changes
}
}).doOnError(rollback);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
new file mode 100644
index 0000000..b78b09c
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
@@ -0,0 +1,150 @@
+package org.apache.usergrid.persistence.collection.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.event.EntityDeleted;
+import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+
+/**
+ * Created by ApigeeCorporation on 10/24/14.
+ */
+public class EntityVersionCreatedTask implements Task<Void> {
+ private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
+
+
+ private final Set<EntityVersionCreated> listeners;
+
+ private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+ private final MvccEntitySerializationStrategy entitySerializationStrategy;
+ private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+ private final Keyspace keyspace;
+
+ private final SerializationFig serializationFig;
+
+ private final CollectionScope collectionScope;
+ private final Entity entity;
+
+ private EntityVersionCreatedFactory entityVersionCreatedFactory;
+
+
+
+ @Inject
+ public EntityVersionCreatedTask( EntityVersionCreatedFactory entityVersionCreatedFactory,
+ final SerializationFig serializationFig,
+ final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+ final Keyspace keyspace,
+ @Assisted final CollectionScope collectionScope,
+ final Set<EntityVersionCreated> listeners,
+ @Assisted final Entity entity ) {
+
+ this.entityVersionCreatedFactory = entityVersionCreatedFactory;
+ this.serializationFig = serializationFig;
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.entitySerializationStrategy = entitySerializationStrategy;
+ this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
+ this.keyspace = keyspace;
+ this.listeners = listeners;
+ this.collectionScope = collectionScope;
+ this.entity = entity;
+
+ }
+
+
+ @Override
+ public void exceptionThrown( final Throwable throwable ) {
+ LOG.error( "Unable to run update task for collection {} with entity {} and version {}",
+ new Object[] { collectionScope, entity}, throwable );
+ }
+
+
+ @Override
+ public Void rejected() {
+ //Our task was rejected meaning our queue was full. We need this operation to run,
+ // so we'll run it in our current thread
+ try {
+ call();
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Exception thrown in call task", e );
+ }
+
+ return null;
+ }
+
+ @Override
+ public Void call() throws Exception {
+
+ fireEvents();
+ return null;
+ }
+
+ private void fireEvents() {
+ final int listenerSize = listeners.size();
+
+ if ( listenerSize == 0 ) {
+ return;
+ }
+
+ if ( listenerSize == 1 ) {
+ listeners.iterator().next().versionCreated( collectionScope,entity );
+ return;
+ }
+
+ LOG.debug( "Started firing {} listeners", listenerSize );
+ //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
+ Observable.from(listeners)
+ .parallel( new Func1<Observable<EntityVersionCreated
+ >, Observable<EntityVersionCreated>>() {
+
+ @Override
+ public Observable<EntityVersionCreated> call(
+ final Observable<EntityVersionCreated> entityVersionCreatedObservable ) {
+
+ return entityVersionCreatedObservable.doOnNext( new Action1<EntityVersionCreated>() {
+ @Override
+ public void call( final EntityVersionCreated listener ) {
+ listener.versionCreated(collectionScope,entity);
+ }
+ } );
+ }
+ }, Schedulers.io() ).toBlocking().last();
+
+ LOG.debug( "Finished firing {} listeners", listenerSize );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
new file mode 100644
index 0000000..4f1abca
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTaskTest.java
@@ -0,0 +1,135 @@
+package org.apache.usergrid.persistence.collection.impl;
+
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.util.LogEntryMock;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Created task tests.
+ */
+public class EntityVersionCreatedTaskTest {
+
+ private static final TaskExecutor taskExecutor = new NamedTaskExecutorImpl( "test", 4, 0 );
+
+ @AfterClass
+ public static void shutdown() {
+ taskExecutor.shutdown();
+ }
+
+ @Test(timeout=10000)
+ public void noListener()//why does it matter if it has a version or not without a listener
+ throws ExecutionException, InterruptedException, ConnectionException {
+
+
+ final SerializationFig serializationFig = mock( SerializationFig.class );
+
+ when( serializationFig.getBufferSize() ).thenReturn( 10 );
+
+ final MvccEntitySerializationStrategy ess =
+ mock( MvccEntitySerializationStrategy.class );
+
+ final MvccLogEntrySerializationStrategy less =
+ mock( MvccLogEntrySerializationStrategy.class );
+
+ final Keyspace keyspace = mock( Keyspace.class );
+
+ final MutationBatch entityBatch = mock( MutationBatch.class );
+
+ final MutationBatch logBatch = mock( MutationBatch.class );
+
+ final EntityVersionCreatedFactory entityVersionCreatedFactory = mock(EntityVersionCreatedFactory.class);
+
+ when( keyspace.prepareMutationBatch() )
+ .thenReturn( mock( MutationBatch.class ) ) // don't care what happens to this one
+ .thenReturn( entityBatch )
+ .thenReturn( logBatch );
+
+ // intentionally no events
+ final Set<EntityVersionCreated> listeners = mock( Set.class );//new HashSet<EntityVersionCreated>();
+
+ when ( listeners.size()).thenReturn( 0 );
+
+ final Id applicationId = new SimpleId( "application" );
+
+ final CollectionScope appScope = new CollectionScopeImpl(
+ applicationId, applicationId, "users" );
+
+ final Id entityId = new SimpleId( "user" );
+
+ final Entity entity = new Entity( entityId );
+
+ final MvccEntity mvccEntity = new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+ MvccEntity.Status.COMPLETE,entity );
+
+
+ // mock up a single log entry for our first test
+ final LogEntryMock logEntryMock =
+ LogEntryMock.createLogEntryMock(less, appScope, entityId, 2 );
+
+ final UUID version = logEntryMock.getEntries().iterator().next().getVersion();
+
+ final UniqueValueSerializationStrategy uvss =
+ mock( UniqueValueSerializationStrategy.class );
+
+ EntityVersionCreatedTask entityVersionCreatedTask =
+ new EntityVersionCreatedTask(entityVersionCreatedFactory,
+ serializationFig,
+ less,
+ ess,
+ uvss,
+ keyspace,
+ appScope,
+ listeners,
+ entity);
+
+ // start the task
+ ListenableFuture<Void> future = taskExecutor.submit( entityVersionCreatedTask );
+
+ // wait for the task
+ future.get();
+
+ //mocked listener makes sure that the task is called
+ verify( listeners ).size();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index 04992e1..bd1ee40 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -60,11 +60,19 @@ public interface EntityIndexBatch {
* Create a delete method that deletes by Id. This will delete all documents from ES with
* the same entity Id, effectively removing all versions of an entity from all index scopes.
*/
- public EntityIndexBatch deleteEntity( Id entityId );
+ public EntityIndexBatch deleteEntity( Id entityId );
/**
- * Execute the batch
+ * Takes all the previous versions of the current entity and deindexs all previous versions
+ * @param entity
+ * @return
*/
+ public EntityIndexBatch deindexPreviousVersions(Entity entity);
+
+
+ /**
+ * Execute the batch
+ */
public void execute();
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0537e100/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 6b9f481..1fca845 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -30,6 +30,9 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
+import org.elasticsearch.index.query.FilterBuilders;
+import org.elasticsearch.index.query.FilteredQueryBuilder;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -211,6 +214,29 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
return this;
}
+ @Override
+ public EntityIndexBatch deindexPreviousVersions(Entity entity){
+
+ FilteredQueryBuilder fqb = QueryBuilders.filteredQuery
+ (QueryBuilders.termQuery
+ ( STRING_PREFIX + ENTITYID_FIELDNAME,entity.getId().getUuid().toString().toLowerCase() ),
+ FilterBuilders.rangeFilter("version").lt( entity.getId().getUuid().timestamp() ));
+
+
+
+ //QueryBuilders.rangeQuery( )
+
+ DeleteByQueryResponse response = client.prepareDeleteByQuery("test")
+ .setQuery( fqb ).execute().actionGet();
+
+ logger.debug("Deleted entity {}:{} from all index scopes with response status = {}",
+ new Object[] { entity.getId().getType(), entity.getId().getUuid(), response.status().toString() });
+
+ maybeFlush();
+
+ return this;
+ }
+
@Override
public void execute() {