You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/06/01 23:48:37 UTC

[28/50] [abbrv] incubator-usergrid git commit: add compaction

add compaction


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8bb77960
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8bb77960
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8bb77960

Branch: refs/heads/USERGRID-628
Commit: 8bb77960bcd1c96fac73438669ca249620a16b97
Parents: b85ff25
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu May 28 16:42:19 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu May 28 16:42:19 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManagerFactory.java | 32 ++++++++++++--------
 .../corepersistence/util/CpNamingUtils.java     | 10 +++++-
 2 files changed, 29 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8bb77960/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 4ac42cc..713a2da 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
 import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeansException;
@@ -348,7 +349,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
         Entity oldAppEntity = managementEm.get(new SimpleEntityRef(collectionFromName, applicationUUID));
         Observable copyConnections = Observable.empty();
-        if(oldAppEntity!=null) {
+        if (oldAppEntity != null) {
             // ensure that there is not already a deleted app with the same name
 
             final EntityRef alias = managementEm.getAlias(collectionToName, oldAppEntity.getName());
@@ -379,25 +380,32 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
                     throw new RuntimeException(e);
                 }
             });
+
         }
+        final Id managementAppId = CpNamingUtils.getManagementApplicationId();
         final ApplicationEntityIndex aei = entityIndexFactory.createApplicationEntityIndex(applicationScope);
         final GraphManager managementGraphManager = managerCache.getGraphManager(managementAppScope);
-        Edge deleteEdge = CpNamingUtils.createCollectionEdge( CpNamingUtils.getManagementApplicationId(),collectionFromName,applicationId);
-        Edge createEdge = CpNamingUtils.createCollectionEdge( CpNamingUtils.getManagementApplicationId(),collectionToName,applicationId);
+        final Edge createEdge = CpNamingUtils.createCollectionEdge(managementAppId, collectionToName, applicationId);
+
+        final Observable compactObservable = managementGraphManager.compactNode(applicationId);
+
+        final Observable deleteNodeGraph = managementGraphManager
+            .markNode(applicationId, CpNamingUtils.createGraphOperationTimestamp())
+            .flatMap(id -> compactObservable);
 
-        final Observable deleteNodeGraph = managementGraphManager.deleteEdge(deleteEdge);
         final Observable createNodeGraph = managementGraphManager.writeEdge(createEdge);
 
         final Observable deleteAppFromIndex = aei.deleteApplication();
 
-        return Observable.concat(copyConnections, createNodeGraph, deleteNodeGraph, deleteAppFromIndex)
+        return Observable
+            .merge(copyConnections, createNodeGraph, deleteNodeGraph, deleteAppFromIndex)
             .doOnCompleted(() -> {
                 try {
                     if (oldAppEntity != null) {
                         managementEm.delete(oldAppEntity);
                         applicationIdCache.evictAppId(oldAppEntity.getName());
-                        entityIndex.refreshAsync().toBlocking().first();
                     }
+                    entityIndex.refreshAsync().toBlocking().last();
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
@@ -439,13 +447,13 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
             CpNamingUtils.getApplicationScope(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
         GraphManager gm = managerCache.getGraphManager(appScope);
 
-        EntityManager em = getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
-        Application app = em.getApplication();
-        if( app == null ) {
+        EntityManager managementEM = getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+        Application managementApp = managementEM.getApplication();
+        if( managementApp == null ) {
             throw new RuntimeException("Management App "
                 + CpNamingUtils.MANAGEMENT_APPLICATION_ID + " should never be null");
         }
-        Id fromEntityId = new SimpleId( app.getUuid(), app.getType() );
+        Id managementId = new SimpleId( managementApp.getUuid(), managementApp.getType() );
 
         final String edgeType;
 
@@ -456,10 +464,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         }
 
         logger.debug("getApplications(): Loading edges of edgeType {} from {}:{}",
-            new Object[] { edgeType, fromEntityId.getType(), fromEntityId.getUuid() } );
+            new Object[]{edgeType, managementId.getType(), managementId.getUuid()});
 
         Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType(
-                fromEntityId, edgeType, Long.MAX_VALUE,
+                managementId, edgeType, Long.MAX_VALUE,
                 SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ));
 
         // TODO This is wrong, and will result in OOM if there are too many applications.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8bb77960/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 31dabea..77dd9f1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -167,11 +167,19 @@ public class CpNamingUtils {
         final String edgeType = getEdgeTypeFromConnectionType( connectionType );
 
         // create graph edge connection from head entity to member entity
-        return new SimpleEdge( sourceEntityId, edgeType, targetEntityId, System.currentTimeMillis() );
+        return new SimpleEdge( sourceEntityId, edgeType, targetEntityId, UUIDUtils.newTimeUUID().timestamp() );
     }
 
 
     /**
+     * When marking nodes for deletion we must use the same unit of measure as the edge timestamps
+     * @return
+     */
+    public static long createGraphOperationTimestamp(){
+        return UUIDUtils.newTimeUUID().timestamp();
+    }
+
+    /**
      * Create a connection searchEdge
      *
      * @param sourceId The source id in the connection