You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/06/15 16:25:27 UTC
[34/50] [abbrv] usergrid git commit: USERGRID-1295 - Re-introduce a
more efficient de-index upon entity delete and entity updates. Remove the
inefficient code as a safety measure so it can't be used again.
USERGRID-1295 - Re-introduce a more efficient de-index upon entity delete and entity updates. Remove the inefficient code as a safety measure so it can't be used again.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7143cbaf
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7143cbaf
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7143cbaf
Branch: refs/heads/master
Commit: 7143cbaf6c26b4db00d14fb3a1b9e3eb8a2e068e
Parents: 66bb5cd
Author: Michael Russo <mr...@apigee.com>
Authored: Fri Jun 3 22:18:12 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri Jun 3 22:18:12 2016 -0700
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 2 +-
.../corepersistence/EntityManagerFig.java | 2 +-
.../asyncevents/AsyncEventService.java | 6 +-
.../asyncevents/AsyncEventServiceImpl.java | 11 +-
.../asyncevents/EventBuilder.java | 6 +-
.../asyncevents/EventBuilderImpl.java | 46 ++++---
.../model/DeIndexOldVersionsEvent.java | 12 +-
.../corepersistence/index/IndexService.java | 23 +++-
.../corepersistence/index/IndexServiceImpl.java | 86 +++++++------
.../collection/EntityCollectionManager.java | 10 +-
.../impl/EntityCollectionManagerImpl.java | 32 ++++-
.../serialization/SerializationFig.java | 5 +
.../serialization/impl/LogEntryIterator.java | 128 +++++++++++++++++++
.../usergrid/persistence/index/EntityIndex.java | 5 +-
.../usergrid/persistence/index/IndexFig.java | 2 +-
.../index/impl/EsEntityIndexImpl.java | 64 +++-------
.../persistence/index/impl/EntityIndexTest.java | 41 ------
17 files changed, 313 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 68f5d71..fd31cf6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -610,7 +610,7 @@ public class CpEntityManager implements EntityManager {
// queue up an event to clean-up older versions than this one from the index
if (entityManagerFig.getDeindexOnUpdate()) {
- indexService.queueDeIndexOldVersion( applicationScope, entityId );
+ indexService.queueDeIndexOldVersion( applicationScope, cpEntity.getId(), cpEntity.getVersion());
}
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
index 4c50aee..655a968 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
@@ -39,6 +39,6 @@ public interface EntityManagerFig extends GuicyFig {
int sleep();
@Key( "usergrid.entityManager.enable_deindex_on_update" )
- @Default( "false" )
+ @Default( "true" )
boolean getDeindexOnUpdate();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 9f34604..d833cf7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -27,6 +27,8 @@ import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
+import java.util.UUID;
+
/**
* Low level queue service for events in the entity. These events are fire and forget, and will always be asynchronous
@@ -83,11 +85,11 @@ public interface AsyncEventService extends ReIndexAction {
void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage );
/**
- *
* @param applicationScope
* @param entityId
+ * @param markedVersion
*/
- void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId);
+ void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId, UUID markedVersion);
/**
* current queue depth
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 3b01292..fa175ab 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -584,11 +584,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
@Override
- public void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId) {
+ public void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId, UUID markedVersion) {
// queue the de-index of old versions to the topic so cleanup happens in all regions
offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
- new EntityIdScope( applicationScope, entityId)) );
+ new EntityIdScope( applicationScope, entityId), markedVersion) );
}
@@ -596,10 +596,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
public IndexOperationMessage handleDeIndexOldVersionEvent ( final DeIndexOldVersionsEvent deIndexOldVersionsEvent){
- ApplicationScope applicationScope = deIndexOldVersionsEvent.getEntityIdScope().getApplicationScope();
- Id entityId = deIndexOldVersionsEvent.getEntityIdScope().getId();
+ final ApplicationScope applicationScope = deIndexOldVersionsEvent.getEntityIdScope().getApplicationScope();
+ final Id entityId = deIndexOldVersionsEvent.getEntityIdScope().getId();
+ final UUID markedVersion = deIndexOldVersionsEvent.getMarkedVersion();
- return eventBuilder.deIndexOlderVersions( applicationScope, entityId )
+ return eventBuilder.deIndexOldVersions( applicationScope, entityId, markedVersion )
.toBlocking().lastOrDefault(null);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index 1f62029..4db9f4b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -21,12 +21,12 @@ package org.apache.usergrid.corepersistence.asyncevents;
import java.util.List;
+import java.util.UUID;
import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.impl.IndexOperation;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -78,9 +78,11 @@ public interface EventBuilder {
* Find all versions of the entity older than the latest and de-index them.
* @param applicationScope
* @param entityId
+ * @param markedVersion
* @return
*/
- Observable<IndexOperationMessage> deIndexOlderVersions(ApplicationScope applicationScope, Id entityId );
+ Observable<IndexOperationMessage> deIndexOldVersions( ApplicationScope applicationScope,
+ Id entityId, UUID markedVersion );
/**
* A bean to hold both our observables so the caller can choose the subscription mechanism. Note that
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 5c827c6..02a7588 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -20,11 +20,10 @@
package org.apache.usergrid.corepersistence.asyncevents;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import org.apache.usergrid.persistence.collection.VersionSet;
import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,7 +117,7 @@ public class EventBuilderImpl implements EventBuilder {
//TODO USERGRID-1123: Implement so we don't iterate logs twice (latest DELETED version, then to get all DELETED)
- MvccLogEntry mostRecentlyMarked = ecm.getVersions( entityId ).toBlocking()
+ MvccLogEntry mostRecentlyMarked = ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking()
.firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
// De-indexing and entity deletes don't check log entries. We must do that first. If no DELETED logs, then
@@ -127,13 +126,16 @@ public class EventBuilderImpl implements EventBuilder {
Observable<List<MvccLogEntry>> ecmDeleteObservable = Observable.empty();
if(mostRecentlyMarked != null){
+
+ // fetch entity versions to be de-index by looking in cassandra
deIndexObservable =
- indexService.deleteEntityIndexes( applicationScope, entityId, mostRecentlyMarked.getVersion() );
+ indexService.deIndexEntity(applicationScope, entityId, mostRecentlyMarked.getVersion(),
+ getVersionsOlderThanMarked(ecm, entityId, mostRecentlyMarked.getVersion()));
ecmDeleteObservable =
- ecm.getVersions( entityId )
+ ecm.getVersionsFromMaxToMin( entityId, mostRecentlyMarked.getVersion() )
.filter( mvccLogEntry->
- UUIDUtils.compare(mvccLogEntry.getVersion(), mostRecentlyMarked.getVersion()) <= 0)
+ mvccLogEntry.getVersion().timestamp() <= mostRecentlyMarked.getVersion().timestamp() )
.buffer( serializationFig.getBufferSize() )
.doOnNext( buffer -> ecm.delete( buffer ) );
}
@@ -173,7 +175,8 @@ public class EventBuilderImpl implements EventBuilder {
@Override
- public Observable<IndexOperationMessage> deIndexOlderVersions(final ApplicationScope applicationScope, Id entityId ){
+ public Observable<IndexOperationMessage> deIndexOldVersions( final ApplicationScope applicationScope,
+ final Id entityId, final UUID markedVersion ){
if (logger.isDebugEnabled()) {
logger.debug("Removing old versions of entity {} from index in app scope {}", entityId, applicationScope );
@@ -181,24 +184,31 @@ public class EventBuilderImpl implements EventBuilder {
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
- // find all versions of the entity that come before the provided entityId
- VersionSet latestVersions = ecm.getLatestVersion(Collections.singletonList(entityId) ).toBlocking()
- .firstOrDefault( null );
- // If there are no versions before this, allow it to return an empty observable
- Observable<IndexOperationMessage> deIndexObservable = Observable.empty();
+ return indexService.deIndexOldVersions( applicationScope, entityId,
+ getVersionsOlderThanMarked(ecm, entityId, markedVersion), markedVersion);
- if(latestVersions.getMaxVersion(entityId) != null){
+ }
- UUID latestVersion = latestVersions.getMaxVersion(entityId).getVersion();
- deIndexObservable =
- indexService.deleteEntityIndexes( applicationScope, entityId, latestVersion);
+ private List<UUID> getVersionsOlderThanMarked( final EntityCollectionManager ecm,
+ final Id entityId, final UUID markedVersion ){
- }
+ final List<UUID> versions = new ArrayList<>();
+
+ // only take last 5 versions to avoid eating memory. a tool can be built for massive cleanups for old usergrid
+ // clusters that do not have this in-line cleanup
+ ecm.getVersionsFromMaxToMin( entityId, markedVersion)
+ .take(5)
+ .forEach( mvccLogEntry -> {
+ if ( mvccLogEntry.getVersion().timestamp() < markedVersion.timestamp() ) {
+ versions.add(mvccLogEntry.getVersion());
+ }
+
+ });
- return deIndexObservable;
+ return versions;
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java
index 59694d5..1f00e14 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java
@@ -21,6 +21,8 @@ package org.apache.usergrid.corepersistence.asyncevents.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import java.util.UUID;
+
/**
* An index event de-indexing documents for Entity versions older than the provided Entity
@@ -31,12 +33,16 @@ public final class DeIndexOldVersionsEvent extends AsyncEvent {
@JsonProperty
protected EntityIdScope entityIdScope;
+ @JsonProperty
+ protected UUID markedVersion;
+
public DeIndexOldVersionsEvent() {
}
- public DeIndexOldVersionsEvent(String sourceRegion, EntityIdScope entityIdScope) {
+ public DeIndexOldVersionsEvent(String sourceRegion, EntityIdScope entityIdScope, UUID markedVersion) {
super(sourceRegion);
this.entityIdScope = entityIdScope;
+ this.markedVersion = markedVersion;
}
@@ -47,4 +53,8 @@ public final class DeIndexOldVersionsEvent extends AsyncEvent {
public EntityIdScope getEntityIdScope() {
return entityIdScope;
}
+
+ public UUID getMarkedVersion() {
+ return markedVersion;
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
index 54eb464..b989a9c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
@@ -20,17 +20,16 @@
package org.apache.usergrid.corepersistence.index;
+import java.util.List;
import java.util.UUID;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.IndexEdge;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
-import rx.observables.ConnectableObservable;
/**
@@ -71,18 +70,28 @@ public interface IndexService {
Observable<IndexOperationMessage> deleteIndexEdge(final ApplicationScope applicationScope, final Edge edge);
-
-
/**
- * Delete all indexes with the specified entityId
+ * De-index all documents with the specified entityId and versions provided. This will also remove any documents
+ * where the entity is a source/target node ( index docs where this entityId is a part of connections).
*
* @param applicationScope
* @param entityId
+ * @param markedVersion
* @return
*/
- Observable<IndexOperationMessage> deleteEntityIndexes(final ApplicationScope applicationScope, final Id entityId,
- final UUID markedVersion);
+ Observable<IndexOperationMessage> deIndexEntity(final ApplicationScope applicationScope, final Id entityId,
+ final UUID markedVersion, final List<UUID> allVersionsBeforeMarked);
+ /**
+ * De-index all documents with the specified entityId and versions of the entityId provided
+ *
+ * @param applicationScope
+ * @param entityId
+ * @param markedVersion
+ * @return
+ */
+ Observable<IndexOperationMessage> deIndexOldVersions(final ApplicationScope applicationScope, final Id entityId,
+ final List<UUID> versions, UUID markedVersion);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 9509626..54b18bb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -20,12 +20,7 @@
package org.apache.usergrid.corepersistence.index;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,8 +51,6 @@ import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.utils.InflectionUtils;
-import org.apache.usergrid.utils.JsonUtils;
-import org.apache.usergrid.utils.UUIDUtils;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
@@ -66,9 +59,7 @@ import com.google.inject.Singleton;
import rx.Observable;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createSearchEdgeFromSource;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromTarget;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.*;
import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
@@ -278,42 +269,61 @@ public class IndexServiceImpl implements IndexService {
return ObservableTimer.time( batches, addTimer );
}
- //This should look up the entityId and delete any documents with a timestamp that comes before
- //The edges that are connected will be compacted away from the graph.
+
@Override
- public Observable<IndexOperationMessage> deleteEntityIndexes( final ApplicationScope applicationScope,
- final Id entityId, final UUID markedVersion ) {
+ public Observable<IndexOperationMessage> deIndexEntity( final ApplicationScope applicationScope, final Id entityId,
+ final UUID markedVersion,
+ final List<UUID> allVersionsBeforeMarked ) {
- //bootstrap the lower modules from their caches
- final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
+ final EntityIndex ei = entityIndexFactory.
+ createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
- CandidateResults crs = ei.getAllEntityVersionsBeforeMarkedVersion( entityId, markedVersion );
- //If we get no search results, its possible that something was already deleted or
- //that it wasn't indexed yet. In either case we can't delete anything and return an empty observable..
- if(crs.isEmpty()) {
- return Observable.empty();
- }
+ final SearchEdge searchEdgeFromSource = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
+ CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId,
+ entityId.getUuid().timestamp() ) );
+
+
+ final EntityIndexBatch batch = ei.createBatch();
+
+ // de-index each version of the entity before the marked version
+ allVersionsBeforeMarked.forEach(version -> batch.deindex(searchEdgeFromSource, entityId, version));
- UUID timeUUID = UUIDUtils.isTimeBased(entityId.getUuid()) ? entityId.getUuid() : UUIDUtils.newTimeUUID();
- //not actually sure about the timestamp but ah well. works.
- SearchEdge searchEdge = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
+
+ // for now, query the index to remove docs where the entity is source/target node and older than markedVersion
+ // TODO: investigate getting this information from graph
+ CandidateResults candidateResults = ei.getNodeDocsOlderThanMarked(entityId, markedVersion );
+ candidateResults.forEach(candidateResult -> batch.deindex(candidateResult));
+
+ return Observable.just(batch.build());
+
+ }
+
+ @Override
+ public Observable<IndexOperationMessage> deIndexOldVersions(final ApplicationScope applicationScope,
+ final Id entityId,
+ final List<UUID> versions,
+ UUID markedVersion) {
+
+ final EntityIndex ei = entityIndexFactory.
+ createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
+
+
+ final SearchEdge searchEdgeFromSource = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId,
- timeUUID.timestamp() ) );
+ entityId.getUuid().timestamp() ) );
- final Observable<IndexOperationMessage> batches = Observable.from( crs )
- //collect results into a single batch
- .collect( () -> ei.createBatch(), ( batch, candidateResult ) -> {
- if (logger.isDebugEnabled()) {
- logger.debug("Deindexing on edge {} for entity {} added to batch", searchEdge, entityId);
- }
- batch.deindex( candidateResult );
- } )
- //return the future from the batch execution
- .map( batch ->batch.build() );
+ final EntityIndexBatch batch = ei.createBatch();
+
+ versions.forEach( version -> {
+
+ batch.deindex(searchEdgeFromSource, entityId, version);
+
+ });
+
+ return Observable.just(batch.build());
- return ObservableTimer.time(batches, indexTimer);
}
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 9de8f41..22fbb5f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.collection;
import java.util.Collection;
+import java.util.UUID;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.model.entity.Entity;
@@ -98,13 +99,20 @@ public interface EntityCollectionManager {
Observable<EntitySet> load( Collection<Id> entityIds );
/**
- * Get all versions of the log entry, from Max to min
+ * Get all versions of the log entry, from min to max
* @param entityId
* @return An observable stream of mvccLog entries
*/
Observable<MvccLogEntry> getVersions(final Id entityId);
/**
+ * Get all versions of the log entry, from max to min
+ * @param entityId
+ * @return An observable stream of mvccLog entries
+ */
+ Observable<MvccLogEntry> getVersionsFromMaxToMin(final Id entityId, final UUID startVersion);
+
+ /**
* Delete these versions from cassandra. Must be atomic so that read log entries are only removed. Entity data
* and log entry will be deleted
* @param entries
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index e71e6bb..70b06ba 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.UUID;
import com.netflix.astyanax.model.ConsistencyLevel;
+import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -279,6 +280,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
} );
}
+ @Override
+ public Observable<MvccLogEntry> getVersionsFromMaxToMin( final Id entityId, final UUID startVersion ) {
+ ValidationUtils.verifyIdentity( entityId );
+
+ return Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) {
+ @Override
+ protected Iterator<MvccLogEntry> getIterator() {
+ return new LogEntryIterator( mvccLogEntrySerializationStrategy, applicationScope, entityId, startVersion,
+ serializationFig.getBufferSize() );
+ }
+ } );
+ }
+
@Override
public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) {
@@ -359,6 +373,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
if ( entity == null || !entity.getEntity().isPresent() ) {
final MutationBatch valueDelete =
uniqueValueSerializationStrategy.delete( applicationScope, expectedUnique );
+
deleteBatch.mergeShallow( valueDelete );
continue;
}
@@ -370,10 +385,23 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
response.addEntity( expectedUnique.getField(), entity );
}
- //TODO: explore making this an Async process
- //We'll repair it again if we have to
deleteBatch.execute();
+ // optionally sleep after read repair as some tasks immediately try to write after the delete
+ if ( serializationFig.getReadRepairDelay() > 0 ){
+
+ try {
+
+ Thread.sleep(Math.min(serializationFig.getReadRepairDelay(), 200L));
+
+ } catch (InterruptedException e) {
+
+ // do nothing if sleep fails; log and continue on
+ logger.warn("Sleep during unique value read repair failed.");
+ }
+
+ }
+
return response;
}
catch ( ConnectionException e ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
index 96759ba..12033fe 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
@@ -57,4 +57,9 @@ public interface SerializationFig extends GuicyFig {
@Key ( "usergrid.uniqueverify.poolsize" )
@Default( "150" )
int getUniqueVerifyPoolSize();
+
+
+ @Key ( "collection.readrepair.delay" )
+ @Default( "0" ) // in milliseconds
+ int getReadRepairDelay();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
new file mode 100644
index 0000000..de6b2bc
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
@@ -0,0 +1,128 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Iterator that will iterate all versions of the entity from max to min
+ */
+public class LogEntryIterator implements Iterator<MvccLogEntry> {
+
+
+ private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+ private final ApplicationScope scope;
+ private final Id entityId;
+ private final int pageSize;
+
+
+ private Iterator<MvccLogEntry> elementItr;
+ private UUID nextStart;
+ private UUID startVersion;
+
+
+ /**
+ * @param logEntrySerializationStrategy The serialization strategy to get the log entries
+ * @param scope The scope of the entity
+ * @param entityId The id of the entity
+ * @param pageSize The fetch size to get when querying the serialization strategy
+ */
+ public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final ApplicationScope scope, final Id entityId,
+ final UUID startVersion, final int pageSize ) {
+
+ Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
+
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.scope = scope;
+ this.entityId = entityId;
+ this.pageSize = pageSize;
+ this.startVersion = startVersion;
+
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
+ try {
+ advance();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to query cassandra", e );
+ }
+ }
+
+ return elementItr.hasNext();
+ }
+
+
+ @Override
+ public MvccLogEntry next() {
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "No more elements exist" );
+ }
+
+ return elementItr.next();
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is unsupported" );
+ }
+
+
+ /**
+ * Advance our iterator
+ */
+ public void advance() throws ConnectionException {
+
+ final int requestedSize;
+ UUID start;
+
+ if ( nextStart != null ) {
+ requestedSize = pageSize + 1;
+ start = nextStart;
+ }
+ else {
+ requestedSize = pageSize;
+ start = startVersion;
+ }
+
+ //loop through even entry that's < this one and remove it
+ List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, start, requestedSize );
+
+ //we always remove the first version if it's equal since it's returned
+ if ( nextStart != null && results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
+ results.remove( 0 );
+ }
+
+
+
+ //we have results, set our next start. If we miss our start version (due to deletion) and we request a +1, we want to ensure we set our next, hence the >=
+ if ( results.size() >= pageSize ) {
+ nextStart = results.get( results.size() - 1 ).getVersion();
+ }
+ //nothing left to do
+ else {
+ nextStart = null;
+ }
+
+
+
+
+ elementItr = results.iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index 81f900c..8ab2d41 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -113,14 +113,13 @@ public interface EntityIndex extends CPManager {
CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId);
/**
- * Returns all entity documents that match the entityId and come before the marked version
+ * Returns all entity docs that match the entityId being the nodeId ( aka connections where entityId = sourceNode)
*
* @param entityId The entityId to match when searching
* @param markedVersion The version that has been marked for deletion. All version before this one must be deleted.
* @return
*/
- CandidateResults getAllEntityVersionsBeforeMarkedVersion(final Id entityId, final UUID markedVersion);
-
+ CandidateResults getNodeDocsOlderThanMarked(final Id entityId, final UUID markedVersion);
/**
* delete all application records
*
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index e81219a..3dbf5e5 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -202,7 +202,7 @@ public interface IndexFig extends GuicyFig {
@Key( "elasticsearch_queue_error_sleep_ms" )
long getSleepTimeForQueueError();
- @Default("1000")
+ @Default("100")
@Key( ELASTICSEARCH_VERSION_QUERY_LIMIT )
int getVersionQueryLimit();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 10ee91e..3b60b57 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -533,7 +533,9 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
@Override
- public CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID markedVersion ) {
+ public CandidateResults getNodeDocsOlderThanMarked(final Id entityId, final UUID markedVersion ) {
+
+ // TODO: investigate if functionality via iterator so a caller can page the deletion until all is gone
Preconditions.checkNotNull( entityId, "entityId cannot be null" );
Preconditions.checkNotNull(markedVersion, "markedVersion cannot be null");
@@ -544,12 +546,8 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
final long markedTimestamp = markedVersion.timestamp();
- // never let the limit be less than 2 as there are potential indefinite paging issues
- final int searchLimit = Math.max(2, indexFig.getVersionQueryLimit());
-
- // this query will find the document for the entity itself
- final QueryBuilder entityQuery = QueryBuilders
- .termQuery(IndexingUtils.ENTITY_ID_FIELDNAME, IndexingUtils.entityId(entityId));
+ // never let this fetch more than 100 to save memory
+ final int searchLimit = Math.min(100, indexFig.getVersionQueryLimit());
// this query will find all the documents where this entity is a source/target node
final QueryBuilder nodeQuery = QueryBuilders
@@ -562,49 +560,25 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
long queryTimestamp = 0L;
- while(true){
+ QueryBuilder timestampQuery = QueryBuilders
+ .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
+ .gte(queryTimestamp)
+ .lt(markedTimestamp);
- QueryBuilder timestampQuery = QueryBuilders
- .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
- .gte(queryTimestamp)
- .lte(markedTimestamp);
+ QueryBuilder finalQuery = QueryBuilders
+ .boolQuery()
+ .must(timestampQuery)
+ .must(nodeQuery);
- QueryBuilder entityQueryWithTimestamp = QueryBuilders
- .boolQuery()
- .must(entityQuery)
- .must(timestampQuery);
+ searchResponse = srb
+ .setQuery(finalQuery)
+ .setSize(searchLimit)
+ .execute()
+ .actionGet();
- QueryBuilder finalQuery = QueryBuilders
- .boolQuery()
- .should(entityQueryWithTimestamp)
- .should(nodeQuery)
- .minimumNumberShouldMatch(1);
- searchResponse = srb
- .setQuery(finalQuery)
- .setSize(searchLimit)
- .execute()
- .actionGet();
+ candidates = aggregateScrollResults(candidates, searchResponse, markedVersion);
- int responseSize = searchResponse.getHits().getHits().length;
- if(responseSize == 0){
- break;
- }
-
- candidates = aggregateScrollResults(candidates, searchResponse, markedVersion);
-
- // update queryTimestamp to be the timestamp of the last entity returned from the query
- queryTimestamp = (long) searchResponse
- .getHits().getAt(responseSize - 1)
- .getSource().get(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME);
-
-
- if(responseSize < searchLimit){
-
- break;
- }
-
- }
}
catch ( Throwable t ) {
logger.error( "Unable to communicate with Elasticsearch", t.getMessage() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 008ec80..c84635d 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -383,47 +383,6 @@ public class EntityIndexTest extends BaseIT {
}
- /**
- * Tests that we aggregate results only before the halfway version point.
- */
- @Test
- public void testScollingDeindex() {
-
- int numberOfEntities = 1000;
- int versionToSearchFor = numberOfEntities / 2;
-
-
- UUID entityUUID = UUID.randomUUID();
- Id entityId = new SimpleId( "mehCar" );
-
- Map entityMap = new HashMap() {{
- put( "name", "Toyota Corolla" );
- put( "introduced", 1966 );
- put( "topspeed", 111 );
- }};
-
- Entity[] entity = new Entity[numberOfEntities];
- for(int i = 0; i < numberOfEntities; i++) {
- entity[i] = EntityIndexMapUtils.fromMap( entityMap );
- EntityUtils.setId(entity[i], entityId);
- EntityUtils.setVersion(entity[i], UUIDGenerator.newTimeUUID());
- entity[i].setField(new UUIDField(IndexingUtils.ENTITY_ID_FIELDNAME, entityUUID));
-
- IndexEdge searchEdge = new IndexEdgeImpl( appId, "mehCars", SearchEdge.NodeType.SOURCE, System.currentTimeMillis()*1000 );
-
- //index the new entity. This is where the loop will be set to create like 100 entities.
- indexProducer.put(entityIndex.createBatch().index( searchEdge, entity[i] ).build()).subscribe();
-
- }
- entityIndex.refreshAsync().toBlocking().first();
-
- CandidateResults candidateResults = entityIndex
- .getAllEntityVersionsBeforeMarkedVersion( entity[versionToSearchFor].getId(),
- entity[versionToSearchFor].getVersion() );
- assertEquals( 501, candidateResults.size() );
- }
-
-
private CandidateResults testQuery( final SearchEdge scope, final SearchTypes searchTypes,
final String queryString,