You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/06/01 23:48:37 UTC
[28/50] [abbrv] incubator-usergrid git commit: add compaction
add compaction
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8bb77960
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8bb77960
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8bb77960
Branch: refs/heads/USERGRID-628
Commit: 8bb77960bcd1c96fac73438669ca249620a16b97
Parents: b85ff25
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu May 28 16:42:19 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu May 28 16:42:19 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityManagerFactory.java | 32 ++++++++++++--------
.../corepersistence/util/CpNamingUtils.java | 10 +++++-
2 files changed, 29 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8bb77960/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 4ac42cc..713a2da 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
@@ -348,7 +349,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
Entity oldAppEntity = managementEm.get(new SimpleEntityRef(collectionFromName, applicationUUID));
Observable copyConnections = Observable.empty();
- if(oldAppEntity!=null) {
+ if (oldAppEntity != null) {
// ensure that there is not already a deleted app with the same name
final EntityRef alias = managementEm.getAlias(collectionToName, oldAppEntity.getName());
@@ -379,25 +380,32 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
throw new RuntimeException(e);
}
});
+
}
+ final Id managementAppId = CpNamingUtils.getManagementApplicationId();
final ApplicationEntityIndex aei = entityIndexFactory.createApplicationEntityIndex(applicationScope);
final GraphManager managementGraphManager = managerCache.getGraphManager(managementAppScope);
- Edge deleteEdge = CpNamingUtils.createCollectionEdge( CpNamingUtils.getManagementApplicationId(),collectionFromName,applicationId);
- Edge createEdge = CpNamingUtils.createCollectionEdge( CpNamingUtils.getManagementApplicationId(),collectionToName,applicationId);
+ final Edge createEdge = CpNamingUtils.createCollectionEdge(managementAppId, collectionToName, applicationId);
+
+ final Observable compactObservable = managementGraphManager.compactNode(applicationId);
+
+ final Observable deleteNodeGraph = managementGraphManager
+ .markNode(applicationId, CpNamingUtils.createGraphOperationTimestamp())
+ .flatMap(id -> compactObservable);
- final Observable deleteNodeGraph = managementGraphManager.deleteEdge(deleteEdge);
final Observable createNodeGraph = managementGraphManager.writeEdge(createEdge);
final Observable deleteAppFromIndex = aei.deleteApplication();
- return Observable.concat(copyConnections, createNodeGraph, deleteNodeGraph, deleteAppFromIndex)
+ return Observable
+ .merge(copyConnections, createNodeGraph, deleteNodeGraph, deleteAppFromIndex)
.doOnCompleted(() -> {
try {
if (oldAppEntity != null) {
managementEm.delete(oldAppEntity);
applicationIdCache.evictAppId(oldAppEntity.getName());
- entityIndex.refreshAsync().toBlocking().first();
}
+ entityIndex.refreshAsync().toBlocking().last();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -439,13 +447,13 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
CpNamingUtils.getApplicationScope(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
GraphManager gm = managerCache.getGraphManager(appScope);
- EntityManager em = getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
- Application app = em.getApplication();
- if( app == null ) {
+ EntityManager managementEM = getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+ Application managementApp = managementEM.getApplication();
+ if( managementApp == null ) {
throw new RuntimeException("Management App "
+ CpNamingUtils.MANAGEMENT_APPLICATION_ID + " should never be null");
}
- Id fromEntityId = new SimpleId( app.getUuid(), app.getType() );
+ Id managementId = new SimpleId( managementApp.getUuid(), managementApp.getType() );
final String edgeType;
@@ -456,10 +464,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
}
logger.debug("getApplications(): Loading edges of edgeType {} from {}:{}",
- new Object[] { edgeType, fromEntityId.getType(), fromEntityId.getUuid() } );
+ new Object[]{edgeType, managementId.getType(), managementId.getUuid()});
Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType(
- fromEntityId, edgeType, Long.MAX_VALUE,
+ managementId, edgeType, Long.MAX_VALUE,
SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ));
// TODO This is wrong, and will result in OOM if there are too many applications.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8bb77960/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 31dabea..77dd9f1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -167,11 +167,19 @@ public class CpNamingUtils {
final String edgeType = getEdgeTypeFromConnectionType( connectionType );
// create graph edge connection from head entity to member entity
- return new SimpleEdge( sourceEntityId, edgeType, targetEntityId, System.currentTimeMillis() );
+ return new SimpleEdge( sourceEntityId, edgeType, targetEntityId, UUIDUtils.newTimeUUID().timestamp() );
}
/**
+ * When marking nodes for deletion we must use the same unit of measure as the edge timestamps
+ * @return
+ */
+ public static long createGraphOperationTimestamp(){
+ return UUIDUtils.newTimeUUID().timestamp();
+ }
+
+ /**
* Create a connection searchEdge
*
* @param sourceId The source id in the connection