You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/03/29 21:49:10 UTC
[5/7] usergrid git commit: Clean up older versions of entities from
the index after an entity update.
Clean up older versions of entities from the index after an entity update.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e57e9ef4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e57e9ef4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e57e9ef4
Branch: refs/heads/release-2.1.1
Commit: e57e9ef44809b14e3462a5540d06c1ab6dd5ff15
Parents: 9b9f835
Author: Michael Russo <mr...@apigee.com>
Authored: Tue Mar 29 12:47:33 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Tue Mar 29 12:47:33 2016 -0700
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 7 ++-
.../asyncevents/AsyncEventService.java | 7 +++
.../asyncevents/AsyncEventServiceImpl.java | 33 +++++++++++--
.../asyncevents/EventBuilder.java | 12 ++++-
.../asyncevents/EventBuilderImpl.java | 36 +++++++++++++-
.../asyncevents/model/AsyncEvent.java | 5 +-
.../model/DeIndexOldVersionsEvent.java | 50 ++++++++++++++++++++
.../asyncevents/model/EdgeDeleteEvent.java | 4 +-
.../model/ElasticsearchIndexEvent.java | 2 +-
.../asyncevents/model/EntityDeleteEvent.java | 3 ++
.../model/InitializeApplicationIndexEvent.java | 2 +-
11 files changed, 147 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 3dbdb7d..b29e6d3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -588,9 +588,12 @@ public class CpEntityManager implements EntityManager {
handleWriteUniqueVerifyException( entity, wuve );
}
- // update in all containing collections and connection indexes
+ // queue an event to update the new entity
+ indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 );
- indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0);
+
+ // queue up an event to clean-up older versions than this one from the index
+ indexService.queueDeIndexOldVersion( applicationScope, entityId );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 1abf83f..9f34604 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -83,6 +83,13 @@ public interface AsyncEventService extends ReIndexAction {
void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage );
/**
+ *
+ * @param applicationScope
+ * @param entityId
+ */
+ void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId);
+
+ /**
* current queue depth
* @return
*/
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 1349011..d180919 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -28,15 +28,11 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.usergrid.corepersistence.asyncevents.model.*;
import org.apache.usergrid.persistence.index.impl.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
@@ -337,6 +333,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
indexOperationMessage = handleIndexOperation((ElasticsearchIndexEvent) event);
+ } else if (event instanceof DeIndexOldVersionsEvent) {
+
+ indexOperationMessage = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event);
+
} else {
throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim());
@@ -533,6 +533,29 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
+
+ @Override
+ public void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId) {
+
+ // queue the de-index of old versions to the topic so cleanup happens in all regions
+ offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
+ new EntityIdScope( applicationScope, entityId)) );
+
+ }
+
+
+ public IndexOperationMessage handleDeIndexOldVersionEvent ( final DeIndexOldVersionsEvent deIndexOldVersionsEvent){
+
+
+ ApplicationScope applicationScope = deIndexOldVersionsEvent.getEntityIdScope().getApplicationScope();
+ Id entityId = deIndexOldVersionsEvent.getEntityIdScope().getId();
+
+ return eventBuilder.deIndexOlderVersions( applicationScope, entityId )
+ .toBlocking().lastOrDefault(null);
+
+
+ }
+
/**
* this method will call initialize for each message, since we are caching the entity indexes,
* we don't worry about aggregating by app id
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index a47ec77..1f62029 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -26,6 +26,7 @@ import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.impl.IndexOperation;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -56,7 +57,7 @@ public interface EventBuilder {
Observable<IndexOperationMessage> buildDeleteEdge( ApplicationScope applicationScope, Edge edge );
/**
- * Return a ben with 2 obervable streams for entity delete.
+ * Return a bin with 2 observable streams for entity delete.
* @param applicationScope
* @param entityId
* @return
@@ -72,6 +73,15 @@ public interface EventBuilder {
*/
Observable<IndexOperationMessage> buildEntityIndex( EntityIndexOperation entityIndexOperation );
+
+ /**
+ * Find all versions of the entity older than the latest and de-index them.
+ * @param applicationScope
+ * @param entityId
+ * @return
+ */
+ Observable<IndexOperationMessage> deIndexOlderVersions(ApplicationScope applicationScope, Id entityId );
+
/**
* A bean to hold both our observables so the caller can choose the subscription mechanism. Note that
* indexOperationMessages should be subscribed and completed BEFORE the getEntitiesDeleted is subscribed
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 2edc668..9851936 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -20,8 +20,11 @@
package org.apache.usergrid.corepersistence.asyncevents;
+import java.util.Collections;
import java.util.List;
+import java.util.UUID;
+import org.apache.usergrid.persistence.collection.VersionSet;
import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,7 +121,7 @@ public class EventBuilderImpl implements EventBuilder {
MvccLogEntry mostRecentlyMarked = ecm.getVersions( entityId ).toBlocking()
.firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
- // De-indexing and entity deletes don't check log entiries. We must do that first. If no DELETED logs, then
+ // De-indexing and entity deletes don't check log entries. We must do that first. If no DELETED logs, then
// return an empty observable as our no-op.
Observable<IndexOperationMessage> deIndexObservable = Observable.empty();
Observable<List<MvccLogEntry>> ecmDeleteObservable = Observable.empty();
@@ -168,4 +171,35 @@ public class EventBuilderImpl implements EventBuilder {
//perform indexing on the task scheduler and start it
.flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
}
+
+
+ @Override
+ public Observable<IndexOperationMessage> deIndexOlderVersions(final ApplicationScope applicationScope, Id entityId ){
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Removing old versions of entity {} from index in app scope {}", entityId, applicationScope );
+ }
+
+ final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+ // find all versions of the entity that come before the provided entityId
+ VersionSet latestVersions = ecm.getLatestVersion(Collections.singletonList(entityId) ).toBlocking()
+ .firstOrDefault( null );
+
+ // If there are no versions before this, allow it to return an empty observable
+ Observable<IndexOperationMessage> deIndexObservable = Observable.empty();
+
+ if(latestVersions.getMaxVersion(entityId) != null){
+
+ UUID latestVersion = latestVersions.getMaxVersion(entityId).getVersion();
+
+ deIndexObservable =
+ indexService.deleteEntityIndexes( applicationScope, entityId, latestVersion);
+
+ }
+
+ return deIndexObservable;
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 57b5812..7b2b228 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -26,7 +26,6 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.usergrid.persistence.queue.QueueFig;
/**
@@ -42,7 +41,9 @@ import org.apache.usergrid.persistence.queue.QueueFig;
@JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ),
@JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
@JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ),
- @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" )
+ @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" ),
+ @JsonSubTypes.Type( value = DeIndexOldVersionsEvent.class, name = "deIndexOldVersionsEvent" )
+
} )
public abstract class AsyncEvent implements Serializable {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java
new file mode 100644
index 0000000..59694d5
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+
+
+/**
+ * An index event de-indexing documents for Entity versions older than the provided Entity
+ */
+public final class DeIndexOldVersionsEvent extends AsyncEvent {
+
+
+ @JsonProperty
+ protected EntityIdScope entityIdScope;
+
+ public DeIndexOldVersionsEvent() {
+ }
+
+ public DeIndexOldVersionsEvent(String sourceRegion, EntityIdScope entityIdScope) {
+ super(sourceRegion);
+ this.entityIdScope = entityIdScope;
+ }
+
+
+ /**
+ * Get the unique message id of the
+ * @return
+ */
+ public EntityIdScope getEntityIdScope() {
+ return entityIdScope;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
index 4bbe6f5..572ec13 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
@@ -25,7 +25,9 @@ import org.apache.usergrid.persistence.graph.Edge;
import com.fasterxml.jackson.annotation.JsonProperty;
-
+/**
+ * Event that will signal to finish the actual delete (post-mark delete) for an Edge
+ */
public final class EdgeDeleteEvent extends AsyncEvent {
@JsonProperty
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
index 049c3a5..432dd63 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
/**
- * An index event for publishing to elastic search
+ * An index event for publishing operations (index and de-index) to Elasticsearch
*/
public final class ElasticsearchIndexEvent extends AsyncEvent {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
index cb3ecda..01d2ba8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
@@ -22,6 +22,9 @@ package org.apache.usergrid.corepersistence.asyncevents.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+/**
+ * Event that will signal to finish the actual delete (post-mark delete) for an Entity
+ */
public final class EntityDeleteEvent extends AsyncEvent {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
index 1a270d4..fa72249 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
@@ -25,7 +25,7 @@ import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
/**
- * event to init app index
+ * Event to initialize the index for an application
*/
public class InitializeApplicationIndexEvent extends AsyncEvent {