You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/03/29 21:49:10 UTC

[5/7] usergrid git commit: Clean up older versions of entities from the index after an entity update.

Clean up older versions of entities from the index after an entity update.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e57e9ef4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e57e9ef4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e57e9ef4

Branch: refs/heads/release-2.1.1
Commit: e57e9ef44809b14e3462a5540d06c1ab6dd5ff15
Parents: 9b9f835
Author: Michael Russo <mr...@apigee.com>
Authored: Tue Mar 29 12:47:33 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Tue Mar 29 12:47:33 2016 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  7 ++-
 .../asyncevents/AsyncEventService.java          |  7 +++
 .../asyncevents/AsyncEventServiceImpl.java      | 33 +++++++++++--
 .../asyncevents/EventBuilder.java               | 12 ++++-
 .../asyncevents/EventBuilderImpl.java           | 36 +++++++++++++-
 .../asyncevents/model/AsyncEvent.java           |  5 +-
 .../model/DeIndexOldVersionsEvent.java          | 50 ++++++++++++++++++++
 .../asyncevents/model/EdgeDeleteEvent.java      |  4 +-
 .../model/ElasticsearchIndexEvent.java          |  2 +-
 .../asyncevents/model/EntityDeleteEvent.java    |  3 ++
 .../model/InitializeApplicationIndexEvent.java  |  2 +-
 11 files changed, 147 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 3dbdb7d..b29e6d3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -588,9 +588,12 @@ public class CpEntityManager implements EntityManager {
             handleWriteUniqueVerifyException( entity, wuve );
         }
 
-        // update in all containing collections and connection indexes
+        // queue an event to update the new entity
+        indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 );
 
-        indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0);
+
+        // queue up an event to clean-up older versions than this one from the index
+        indexService.queueDeIndexOldVersion( applicationScope, entityId );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 1abf83f..9f34604 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -83,6 +83,13 @@ public interface AsyncEventService extends ReIndexAction {
     void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage );
 
     /**
+     *
+     * @param applicationScope
+     * @param entityId
+     */
+    void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId);
+
+    /**
      * current queue depth
      * @return
      */

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 1349011..d180919 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -28,15 +28,11 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.usergrid.corepersistence.asyncevents.model.*;
 import org.apache.usergrid.persistence.index.impl.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
 import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
 import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
@@ -337,6 +333,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                     indexOperationMessage = handleIndexOperation((ElasticsearchIndexEvent) event);
 
+                } else if (event instanceof DeIndexOldVersionsEvent) {
+
+                    indexOperationMessage = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event);
+
                 } else {
 
                     throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim());
@@ -533,6 +533,29 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
     }
 
+
+    @Override
+    public void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId) {
+
+        // queue the de-index of old versions to the topic so cleanup happens in all regions
+        offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
+            new EntityIdScope( applicationScope, entityId)) );
+
+    }
+
+
+    public IndexOperationMessage handleDeIndexOldVersionEvent ( final DeIndexOldVersionsEvent deIndexOldVersionsEvent){
+
+
+        ApplicationScope applicationScope = deIndexOldVersionsEvent.getEntityIdScope().getApplicationScope();
+        Id entityId = deIndexOldVersionsEvent.getEntityIdScope().getId();
+
+        return eventBuilder.deIndexOlderVersions( applicationScope, entityId )
+            .toBlocking().lastOrDefault(null);
+
+
+    }
+
     /**
      *     this method will call initialize for each message, since we are caching the entity indexes,
      *     we don't worry about aggregating by app id

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index a47ec77..1f62029 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -26,6 +26,7 @@ import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.impl.IndexOperation;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -56,7 +57,7 @@ public interface EventBuilder {
     Observable<IndexOperationMessage> buildDeleteEdge( ApplicationScope applicationScope, Edge edge );
 
     /**
-     * Return a ben with 2 obervable streams for entity delete.
+     * Return a bin with 2 observable streams for entity delete.
      * @param applicationScope
      * @param entityId
      * @return
@@ -72,6 +73,15 @@ public interface EventBuilder {
      */
     Observable<IndexOperationMessage> buildEntityIndex( EntityIndexOperation entityIndexOperation );
 
+
+    /**
+     * Find all versions of the entity older than the latest and de-index them.
+     * @param applicationScope
+     * @param entityId
+     * @return
+     */
+    Observable<IndexOperationMessage> deIndexOlderVersions(ApplicationScope applicationScope, Id entityId );
+
     /**
      * A bean to hold both our observables so the caller can choose the subscription mechanism.  Note that
      * indexOperationMessages should be subscribed and completed BEFORE the getEntitiesDeleted is subscribed

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 2edc668..9851936 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -20,8 +20,11 @@
 package org.apache.usergrid.corepersistence.asyncevents;
 
 
+import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 
+import org.apache.usergrid.persistence.collection.VersionSet;
 import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -118,7 +121,7 @@ public class EventBuilderImpl implements EventBuilder {
         MvccLogEntry mostRecentlyMarked = ecm.getVersions( entityId ).toBlocking()
             .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
 
-        // De-indexing and entity deletes don't check log entiries.  We must do that first. If no DELETED logs, then
+        // De-indexing and entity deletes don't check log entries.  We must do that first. If no DELETED logs, then
         // return an empty observable as our no-op.
         Observable<IndexOperationMessage> deIndexObservable = Observable.empty();
         Observable<List<MvccLogEntry>> ecmDeleteObservable = Observable.empty();
@@ -168,4 +171,35 @@ public class EventBuilderImpl implements EventBuilder {
             //perform indexing on the task scheduler and start it
             .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
     }
+
+
+    @Override
+    public Observable<IndexOperationMessage> deIndexOlderVersions(final ApplicationScope applicationScope, Id entityId ){
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Removing old versions of entity {} from index in app scope {}", entityId, applicationScope );
+        }
+
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+        // find all versions of the entity that come before the provided entityId
+        VersionSet latestVersions = ecm.getLatestVersion(Collections.singletonList(entityId) ).toBlocking()
+            .firstOrDefault( null );
+
+        // If there are no versions before this, allow it to return an empty observable
+        Observable<IndexOperationMessage> deIndexObservable = Observable.empty();
+
+        if(latestVersions.getMaxVersion(entityId) != null){
+
+            UUID latestVersion = latestVersions.getMaxVersion(entityId).getVersion();
+
+            deIndexObservable =
+                indexService.deleteEntityIndexes( applicationScope, entityId, latestVersion);
+
+        }
+
+        return  deIndexObservable;
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 57b5812..7b2b228 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -26,7 +26,6 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.usergrid.persistence.queue.QueueFig;
 
 
 /**
@@ -42,7 +41,9 @@ import org.apache.usergrid.persistence.queue.QueueFig;
     @JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ),
     @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
     @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ),
-    @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" )
+    @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" ),
+    @JsonSubTypes.Type( value = DeIndexOldVersionsEvent.class, name = "deIndexOldVersionsEvent" )
+
 } )
 
 public abstract class AsyncEvent implements Serializable {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java
new file mode 100644
index 0000000..59694d5
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+
+
+/**
+ * An index event de-indexing documents for Entity versions older than the provided Entity
+ */
+public final class DeIndexOldVersionsEvent extends AsyncEvent {
+
+
+    @JsonProperty
+    protected EntityIdScope entityIdScope;
+
+    public DeIndexOldVersionsEvent() {
+    }
+
+    public DeIndexOldVersionsEvent(String sourceRegion, EntityIdScope entityIdScope) {
+        super(sourceRegion);
+        this.entityIdScope = entityIdScope;
+    }
+
+
+    /**
+     * Get the unique message id of the
+     * @return
+     */
+    public EntityIdScope getEntityIdScope() {
+        return entityIdScope;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
index 4bbe6f5..572ec13 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
@@ -25,7 +25,9 @@ import org.apache.usergrid.persistence.graph.Edge;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-
+/**
+ * Event that will signal to finish the actual delete (post-mark delete) for an Edge
+ */
 public final class EdgeDeleteEvent extends AsyncEvent {
 
     @JsonProperty

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
index 049c3a5..432dd63 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 
 
 /**
- * An index event for publishing to elastic search
+ * An index event for publishing operations (index and de-index) to Elasticsearch
  */
 public final class ElasticsearchIndexEvent extends AsyncEvent {
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
index cb3ecda..01d2ba8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
@@ -22,6 +22,9 @@ package org.apache.usergrid.corepersistence.asyncevents.model;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 
+/**
+ * Event that will signal to finish the actual delete (post-mark delete) for an Entity
+ */
 public final class EntityDeleteEvent extends AsyncEvent {
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e57e9ef4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
index 1a270d4..fa72249 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
@@ -25,7 +25,7 @@ import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 
 /**
- * event to init app index
+ * Event to initialize the index for an application
  */
 
 public class InitializeApplicationIndexEvent extends AsyncEvent {