You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/17 02:22:09 UTC
[1/3] Fixed index refresh issue.
Repository: incubator-usergrid
Updated Branches:
refs/heads/index-rebuild f807627b9 -> c31c553f9
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c31c553f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index d491d3d..908e6bc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -80,6 +80,7 @@ public class CpWalker {
applicationScope.getApplication().getUuid()
} );
+ //only search edge types that start with collections
Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
new SimpleSearchEdgeType( fromEntityId, CpNamingUtils.EDGE_COLL_SUFFIX, null ) );
@@ -100,16 +101,16 @@ public class CpWalker {
@Override
public void call( Edge edge ) {
- EntityRef sourceEntityRef =
- new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
+ EntityRef targetNodeEntityRef =
+ new SimpleEntityRef( edge.getTargetNode().getType(), edge.getTargetNode().getUuid() );
Entity entity;
try {
- entity = em.get( sourceEntityRef );
+ entity = em.get( targetNodeEntityRef );
}
catch ( Exception ex ) {
- logger.error( "Error getting sourceEntity {}:{}, continuing", sourceEntityRef.getType(),
- sourceEntityRef.getUuid() );
+ logger.error( "Error getting sourceEntity {}:{}, continuing", targetNodeEntityRef.getType(),
+ targetNodeEntityRef.getUuid() );
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c31c553f/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index d2f7fef..2ef65ef 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -213,6 +213,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
// setup.getEmf().refreshIndex();
setup.getEmf().rebuildAllIndexes( po );
+ reporter.report();
registry.remove( meterName );
logger.info("Rebuilt index");
@@ -236,7 +237,6 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
Id appId = new SimpleId( appUuid, "application");
ApplicationScope scope = new ApplicationScopeImpl( appId );
- IndexScope is = new IndexScopeImpl( appId, "application");
EntityIndex ei = eif.createEntityIndex(scope);
EsEntityIndexImpl eeii = (EsEntityIndexImpl)ei;
@@ -247,6 +247,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
private int readData( String collectionName, int expected ) throws Exception {
EntityManager em = app.getEntityManager();
+ em.refreshIndex();
Query q = Query.fromQL("select * where key1=1000");
q.setLimit(40);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c31c553f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 9ea14a1..821f7b9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -116,7 +116,7 @@ public class EsEntityIndexImpl implements EntityIndex {
AdminClient admin = client.admin();
CreateIndexResponse cir = admin.indices().prepareCreate( indexName ).execute().actionGet();
- log.debug( "Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged() );
+ log.info( "Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged() );
RefreshResponse response;
@@ -128,11 +128,11 @@ public class EsEntityIndexImpl implements EntityIndex {
/**
* Immediately refresh to ensure the entire cluster is ready to receive this write. Occasionally we see
* errors. See this post.
- * http://elasticsearch-users.115913.n3.nabble.com/IndexMissingException-on-create-index-followed-by-refresh-td1832793.html
+ * http://elasticsearch-users.115913.n3.nabble
+ * .com/IndexMissingException-on-create-index-followed-by-refresh-td1832793.html
*
*/
refresh();
-
}
catch ( IndexAlreadyExistsException expected ) {
// this is expected to happen if index already exists, it's a no-op and swallow
@@ -272,25 +272,33 @@ public class EsEntityIndexImpl implements EntityIndex {
public void refresh() {
- //now try to refresh, to ensure that it's recognized by everyone. Occasionally we can get a success
- //before we can write.
- for(int i = 0 ; i < MAX_WAITS; i++ ){
- try{
- client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
- break;
- }catch(IndexMissingException e){
- log.error( "Unable to refresh index after create. Waiting before sleeping.", e );
- }
+ log.info( "Refreshing Created new Index Name [{}]", indexName );
- try {
- Thread.sleep( WAIT_TIME );
- }
- catch ( InterruptedException e ) {
- //swallow it
- }
+ //now try to refresh, to ensure that it's recognized by everyone. Occasionally we can get a success
+ //before we can write.
+ for ( int i = 0; i < MAX_WAITS; i++ ) {
+ try {
+ client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
+ return;
+ }
+ catch ( IndexMissingException e ) {
+ log.error( "Unable to refresh index after create. Waiting before sleeping.", e );
}
+ try {
+ Thread.sleep( WAIT_TIME );
+ }
+ catch ( InterruptedException e ) {
+ //swallow it
+ }
+ }
+
+ /**
+ * Try the refresh one last time if we get here
+ */
+ client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
+
log.debug( "Refreshed index: " + indexName );
}
[3/3] git commit: Fixed index refresh issue.
Posted by to...@apache.org.
Fixed index refresh issue.
Fixed source/target switch on entity rebuild
Added performance comments as encountered
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c31c553f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c31c553f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c31c553f
Branch: refs/heads/index-rebuild
Commit: c31c553f9b3c16deda28fc0125d5b1a86a654d14
Parents: f807627
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 16 18:22:05 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 16 18:22:05 2014 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 6 +-
.../corepersistence/CpEntityManagerFactory.java | 13 +-
.../corepersistence/CpRelationManager.java | 1427 ++++++++----------
.../usergrid/corepersistence/CpWalker.java | 11 +-
.../PerformanceEntityRebuildIndexTest.java | 3 +-
.../index/impl/EsEntityIndexImpl.java | 44 +-
6 files changed, 692 insertions(+), 812 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c31c553f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 4015014..2498dda 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -514,7 +514,7 @@ public class CpEntityManager implements EntityManager {
// update in all containing collections and connection indexes
CpRelationManager rm = (CpRelationManager)getRelationManager( entity );
- rm.updateContainingCollectionAndCollectionIndexes( entity, cpEntity );
+ rm.updateContainingCollectionAndCollectionIndexes( cpEntity );
}
@@ -995,7 +995,7 @@ public class CpEntityManager implements EntityManager {
// }
org.apache.usergrid.persistence.model.entity.Entity cpEntity =
- ecm.load( entityId ).toBlockingObservable().last();
+ ecm.load( entityId ).toBlocking().last();
cpEntity.removeField( propertyName );
@@ -1012,7 +1012,7 @@ public class CpEntityManager implements EntityManager {
// update in all containing collections and connection indexes
CpRelationManager rm = (CpRelationManager)getRelationManager( entityRef );
- rm.updateContainingCollectionAndCollectionIndexes( get( entityRef ), cpEntity );
+ rm.updateContainingCollectionAndCollectionIndexes( cpEntity );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c31c553f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 2514e20..e013957 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -147,6 +147,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
sysAppProps.put( PROPERTY_NAME, "systemapp");
em.create( SYSTEM_APP_ID, TYPE_APPLICATION, sysAppProps );
em.getApplication();
+ em.createIndex();
em.refreshIndex();
}
@@ -200,7 +201,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private EntityManager _getEntityManager( UUID applicationId ) {
EntityManager em = new CpEntityManager();
em.init( this, applicationId );
- //TODO T.N. Can we remove this? Seems like we should fix our lifecycle instead...
+ //TODO PERFORMANCE Can we remove this? Seems like we should fix our lifecycle instead...
+ //if this is the first time we've loaded this entity manager in the JVM, create it's indexes, it may be new
+ //not sure how to handle other than this if the system dies after the application em has been created
+ //but before the create call can create the index
em.createIndex();
return em;
}
@@ -286,7 +290,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
EntityManager appEm = getEntityManager( applicationId );
//create our ES index since we're initializing this application
-// TODO T.N, pushed this down into the cache load appEm.createIndex();
+// TODO PERFORMANCE pushed this down into the cache load can we do this here?
+// appEm.createIndex();
appEm.create( applicationId, TYPE_APPLICATION, properties );
appEm.resetRoles();
@@ -681,10 +686,12 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
public void rebuildApplicationIndexes( UUID appId, ProgressObserver po ) throws Exception {
EntityManager em = getEntityManager( appId );
+
+ //explicitly invoke create index, we don't know if it exists or not in ES during a rebuild.
+ em.createIndex();
Application app = em.getApplication();
em.reindex( po );
-// em.refreshIndex();
logger.info("\n\nRebuilt index for application {} id {}\n", app.getName(), appId );
}
[2/3] Fixed index refresh issue.
Posted by to...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c31c553f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 2a790ab..96f0e1e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -16,14 +16,10 @@
package org.apache.usergrid.corepersistence;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
-import com.clearspring.analytics.hash.MurmurHash;
-import com.yammer.metrics.annotation.Metered;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
-import static java.util.Arrays.asList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -33,21 +29,18 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
import org.apache.usergrid.corepersistence.results.ResultsLoader;
import org.apache.usergrid.corepersistence.results.ResultsLoaderFactory;
import org.apache.usergrid.corepersistence.results.ResultsLoaderFactoryImpl;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.utils.UUIDUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
-
import org.apache.usergrid.persistence.ConnectedEntityRef;
import org.apache.usergrid.persistence.ConnectionRef;
import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityFactory;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.IndexBucketLocator;
@@ -56,43 +49,12 @@ import org.apache.usergrid.persistence.RelationManager;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.RoleRef;
import org.apache.usergrid.persistence.Schema;
-import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_TYPES;
-import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_CREATED;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_TITLE;
-import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
-import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
-import static org.apache.usergrid.persistence.Schema.TYPE_ROLE;
-import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.SimpleRoleRef;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX_ENTRIES;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
import org.apache.usergrid.persistence.cassandra.CassandraService;
-import static org.apache.usergrid.persistence.cassandra.CassandraService.INDEX_ENTRY_LIST_COUNT;
import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchDeleteLocationInConnectionsIndex;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchRemoveLocationFromCollectionIndex;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInCollectionIndex;
-import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInConnectionsIndex;
import org.apache.usergrid.persistence.cassandra.IndexUpdate;
-import static org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode;
-import static org.apache.usergrid.persistence.cassandra.IndexUpdate.toIndexableValue;
-import static org.apache.usergrid.persistence.cassandra.IndexUpdate.validIndexableValue;
import org.apache.usergrid.persistence.cassandra.QueryProcessorImpl;
-import static org.apache.usergrid.persistence.cassandra.Serializers.be;
import org.apache.usergrid.persistence.cassandra.index.ConnectedIndexScanner;
import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
@@ -114,6 +76,7 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.apache.usergrid.persistence.index.IndexScope;
import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
import org.apache.usergrid.persistence.index.query.CandidateResult;
@@ -138,19 +101,59 @@ import org.apache.usergrid.persistence.query.ir.result.GeoIterator;
import org.apache.usergrid.persistence.query.ir.result.SliceIterator;
import org.apache.usergrid.persistence.query.ir.result.StaticIdIterator;
import org.apache.usergrid.persistence.schema.CollectionInfo;
-import static org.apache.usergrid.utils.ClassUtils.cast;
-import static org.apache.usergrid.utils.CompositeUtils.setGreaterThanEqualityFlag;
import org.apache.usergrid.utils.IndexUtils;
-import static org.apache.usergrid.utils.InflectionUtils.singularize;
import org.apache.usergrid.utils.MapUtils;
-import static org.apache.usergrid.utils.MapUtils.addMapSet;
-import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
+import org.apache.usergrid.utils.UUIDUtils;
+
+import com.yammer.metrics.annotation.Metered;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.DynamicComposite;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.mutation.Mutator;
import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+
+import static java.util.Arrays.asList;
+
+import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_TYPES;
+import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_CREATED;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_TITLE;
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
+import static org.apache.usergrid.persistence.Schema.TYPE_ROLE;
+import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX_ENTRIES;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+import static org.apache.usergrid.persistence.cassandra.CassandraService.INDEX_ENTRY_LIST_COUNT;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchDeleteLocationInConnectionsIndex;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchRemoveLocationFromCollectionIndex;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInCollectionIndex;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInConnectionsIndex;
+import static org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode;
+import static org.apache.usergrid.persistence.cassandra.IndexUpdate.toIndexableValue;
+import static org.apache.usergrid.persistence.cassandra.IndexUpdate.validIndexableValue;
+import static org.apache.usergrid.persistence.cassandra.Serializers.be;
+import static org.apache.usergrid.utils.ClassUtils.cast;
+import static org.apache.usergrid.utils.CompositeUtils.setGreaterThanEqualityFlag;
+import static org.apache.usergrid.utils.InflectionUtils.singularize;
+import static org.apache.usergrid.utils.MapUtils.addMapSet;
+import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
/**
@@ -161,9 +164,8 @@ public class CpRelationManager implements RelationManager {
private static final Logger logger = LoggerFactory.getLogger( CpRelationManager.class );
-
private CpEntityManagerFactory emf;
-
+
private CpManagerCache managerCache;
private EntityManager em;
@@ -184,23 +186,18 @@ public class CpRelationManager implements RelationManager {
private ResultsLoaderFactory resultsLoaderFactory;
-
public CpRelationManager() {}
- public CpRelationManager init(
- EntityManager em,
- CpEntityManagerFactory emf,
- UUID applicationId,
- EntityRef headEntity,
- IndexBucketLocator indexBucketLocator ) {
+ public CpRelationManager init( EntityManager em, CpEntityManagerFactory emf, UUID applicationId,
+ EntityRef headEntity, IndexBucketLocator indexBucketLocator ) {
Assert.notNull( em, "Entity manager cannot be null" );
Assert.notNull( emf, "Entity manager factory cannot be null" );
Assert.notNull( applicationId, "Application Id cannot be null" );
Assert.notNull( headEntity, "Head entity cannot be null" );
Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" );
-
+
// TODO: this assert should not be failing
//Assert.notNull( indexBucketLocator, "indexBucketLocator cannot be null" );
@@ -209,31 +206,27 @@ public class CpRelationManager implements RelationManager {
this.applicationId = applicationId;
this.headEntity = headEntity;
this.managerCache = emf.getManagerCache();
- this.applicationScope = emf.getApplicationScope(applicationId);
+ this.applicationScope = emf.getApplicationScope( applicationId );
this.cass = em.getCass(); // TODO: eliminate need for this via Core Persistence
this.indexBucketLocator = indexBucketLocator; // TODO: this also
// load the Core Persistence version of the head entity as well
- this.headEntityScope = new CollectionScopeImpl(
- this.applicationScope.getApplication(),
- this.applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( headEntity.getType() ));
+ this.headEntityScope =
+ new CollectionScopeImpl( this.applicationScope.getApplication(), this.applicationScope.getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromEntityType( headEntity.getType() ) );
- EntityCollectionManager ecm = managerCache.getEntityCollectionManager(headEntityScope);
+ EntityCollectionManager ecm = managerCache.getEntityCollectionManager( headEntityScope );
if ( logger.isDebugEnabled() ) {
- logger.debug( "Loading head entity {}:{} from scope\n app {}\n owner {}\n name {}",
- new Object[] {
- headEntity.getType(),
- headEntity.getUuid(),
- headEntityScope.getApplication(),
- headEntityScope.getOwner(),
- headEntityScope.getName()
- } );
+ logger.debug( "Loading head entity {}:{} from scope\n app {}\n owner {}\n name {}", new Object[] {
+ headEntity.getType(), headEntity.getUuid(), headEntityScope.getApplication(),
+ headEntityScope.getOwner(), headEntityScope.getName()
+ } );
}
-
- this.cpHeadEntity = ecm.load( new SimpleId(
- headEntity.getUuid(), headEntity.getType() )).toBlocking().lastOrDefault(null);
+
+ //TODO PERFORMANCE why are we loading this again here?
+ this.cpHeadEntity = ecm.load( new SimpleId( headEntity.getUuid(), headEntity.getType() ) ).toBlocking()
+ .lastOrDefault( null );
// commented out because it is possible that CP entity has not been created yet
Assert.notNull( cpHeadEntity, "cpHeadEntity cannot be null" );
@@ -243,22 +236,21 @@ public class CpRelationManager implements RelationManager {
return this;
}
-
+
@Override
public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
final Set<String> indexes = new HashSet<String>();
- GraphManager gm = managerCache.getGraphManager(applicationScope);
+ GraphManager gm = managerCache.getGraphManager( applicationScope );
String edgeTypePrefix = CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
- logger.debug("getCollectionIndexes(): Searching for edge type prefix {} to target {}:{}",
- new Object[] {
- edgeTypePrefix, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid()
- });
+ logger.debug( "getCollectionIndexes(): Searching for edge type prefix {} to target {}:{}", new Object[] {
+ edgeTypePrefix, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid()
+ } );
- Observable<String> types= gm.getEdgeTypesFromSource(
- new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ));
+ Observable<String> types =
+ gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) );
Iterator<String> iter = types.toBlockingObservable().getIterator();
while ( iter.hasNext() ) {
@@ -275,8 +267,7 @@ public class CpRelationManager implements RelationManager {
//Map<EntityRef, Set<String>> containerEntities = getContainers(-1, "owns", null);
Map<EntityRef, Set<String>> containerEntities = getContainers();
- Map<String, Map<UUID, Set<String>>> owners =
- new LinkedHashMap<String, Map<UUID, Set<String>>>();
+ Map<String, Map<UUID, Set<String>>> owners = new LinkedHashMap<String, Map<UUID, Set<String>>>();
for ( EntityRef owner : containerEntities.keySet() ) {
Set<String> collections = containerEntities.get( owner );
@@ -296,6 +287,7 @@ public class CpRelationManager implements RelationManager {
/**
* Gets containing collections and/or connections depending on the edge type you pass in
+ *
* @param limit Max number to return
* @param edgeType Edge type, edge type prefix or null to allow any edge type
* @param fromEntityType Only consider edges from entities of this type
@@ -304,44 +296,43 @@ public class CpRelationManager implements RelationManager {
Map<EntityRef, Set<String>> results = new LinkedHashMap<EntityRef, Set<String>>();
- GraphManager gm = managerCache.getGraphManager(applicationScope);
+ GraphManager gm = managerCache.getGraphManager( applicationScope );
- Iterator<String> edgeTypes = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(
- cpHeadEntity.getId(), edgeType, null) ).toBlocking().getIterator();
+ Iterator<String> edgeTypes =
+ gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) ).toBlocking()
+ .getIterator();
- logger.debug("getContainers(): "
- + "Searched for edges of type {}\n to target {}:{}\n in scope {}\n found: {}",
- new Object[] {
- edgeType,
- cpHeadEntity.getId().getType(),
- cpHeadEntity.getId().getUuid(),
- applicationScope.getApplication(),
- edgeTypes.hasNext()
- });
+ logger.debug(
+ "getContainers(): " + "Searched for edges of type {}\n to target {}:{}\n in scope {}\n found: {}",
+ new Object[] {
+ edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(),
+ applicationScope.getApplication(), edgeTypes.hasNext()
+ } );
while ( edgeTypes.hasNext() ) {
String etype = edgeTypes.next();
- Observable<Edge> edges = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
- cpHeadEntity.getId(), etype, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ));
+ Observable<Edge> edges = gm.loadEdgesToTarget(
+ new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, Long.MAX_VALUE,
+ SearchByEdgeType.Order.DESCENDING, null ) );
Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
while ( iter.hasNext() ) {
Edge edge = iter.next();
- if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() )) {
- logger.debug("Ignoring edge from entity type {}", edge.getSourceNode().getType());
+ if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) {
+ logger.debug( "Ignoring edge from entity type {}", edge.getSourceNode().getType() );
continue;
}
- EntityRef eref = new SimpleEntityRef(
- edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
+ EntityRef eref = new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
String name = null;
- if ( CpNamingUtils.isConnectionEdgeType( edge.getType() )) {
+ if ( CpNamingUtils.isConnectionEdgeType( edge.getType() ) ) {
name = CpNamingUtils.getConnectionType( edge.getType() );
- } else {
+ }
+ else {
name = CpNamingUtils.getCollectionName( edge.getType() );
}
addMapSet( results, eref, name );
@@ -356,183 +347,161 @@ public class CpRelationManager implements RelationManager {
}
- public List<String> updateContainingCollectionAndCollectionIndexes(
- Entity entity, org.apache.usergrid.persistence.model.entity.Entity cpEntity ) {
-
- List<String> results = new ArrayList<String>();
+ public void updateContainingCollectionAndCollectionIndexes(
+ final org.apache.usergrid.persistence.model.entity.Entity cpEntity ) {
- GraphManager gm = managerCache.getGraphManager(applicationScope);
- Iterator<String> edgeTypesToTarget = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(
- cpHeadEntity.getId(), null, null) ).toBlockingObservable().getIterator();
+ final GraphManager gm = managerCache.getGraphManager( applicationScope );
- logger.debug("updateContainingCollectionsAndCollections(): "
- + "Searched for edges to target {}:{}\n in scope {}\n found: {}",
- new Object[] {
- cpHeadEntity.getId().getType(),
- cpHeadEntity.getId().getUuid(),
- applicationScope.getApplication(),
- edgeTypesToTarget.hasNext()
- });
+ logger.debug( "updateContainingCollectionsAndCollections(): "
+ + "Searched for edges to target {}:{}\n in scope {}\n found: {}", new Object[] {
+ cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(),
+ applicationScope.getApplication()
+ } );
// loop through all types of edge to target
- int count = 0;
+
final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
final EntityIndexBatch entityIndexBatch = ei.createBatch();
- while ( edgeTypesToTarget.hasNext() ) {
-
- // get all edges of the type
- String etype = edgeTypesToTarget.next();
+ final int count = gm.getEdgeTypesToTarget(
+ new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) )
+ //for each edge type, emit all the edges of that type
+ .flatMap( new Func1<String, Observable<Edge>>() {
+ @Override
+ public Observable<Edge> call( final String etype ) {
+ return gm.loadEdgesToTarget(
+ new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, Long.MAX_VALUE,
+ SearchByEdgeType.Order.DESCENDING, null ) );
+ }
+ } )
- Observable<Edge> edges = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
- cpHeadEntity.getId(), etype, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ));
+ //for each edge we receive index and add to the batch
+ .doOnNext( new Action1<Edge>() {
+ @Override
+ public void call( final Edge edge ) {
- // loop through edges of that type
- Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
- while ( iter.hasNext() ) {
- Edge edge = iter.next();
+ EntityRef sourceEntity =
+ new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
- EntityRef sourceEntity = new SimpleEntityRef(
- edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
+ // reindex the entity in the source entity's collection or connection index
- // reindex the entity in the source entity's collection or connection index
+ IndexScope indexScope;
+ if ( CpNamingUtils.isCollectionEdgeType( edge.getType() ) ) {
- IndexScope indexScope;
- if ( CpNamingUtils.isCollectionEdgeType( edge.getType() )) {
+ String collName = CpNamingUtils.getCollectionName( edge.getType() );
+ indexScope =
+ new IndexScopeImpl( new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ),
+ CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
+ }
+ else {
- String collName = CpNamingUtils.getCollectionName( edge.getType() );
- indexScope = new IndexScopeImpl(
- new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
- CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ));
+ String connName = CpNamingUtils.getCollectionName( edge.getType() );
+ indexScope =
+ new IndexScopeImpl( new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ),
+ CpNamingUtils.getConnectionScopeName( cpEntity.getId().getType(),
+ connName ) );
+ }
- } else {
+ entityIndexBatch.index( indexScope, cpEntity );
- String connName = CpNamingUtils.getCollectionName( edge.getType() );
- indexScope = new IndexScopeImpl(
- new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
- CpNamingUtils.getConnectionScopeName( cpHeadEntity.getId().getType(), connName ));
- }
+ // reindex the entity in the source entity's all-types index
- entityIndexBatch.index(indexScope, cpEntity);
+ indexScope = new IndexScopeImpl( new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ),
+ CpNamingUtils.ALL_TYPES );
- // reindex the entity in the source entity's all-types index
-
- indexScope = new IndexScopeImpl(
- new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
- CpNamingUtils.ALL_TYPES);
-
- entityIndexBatch.index(indexScope, cpEntity);
+ entityIndexBatch.index( indexScope, cpEntity );
+ }
+ } ).count().toBlocking().lastOrDefault( 0 );
- count++;
- }
- }
entityIndexBatch.execute();
- logger.debug("updateContainingCollectionsAndCollections() updated {} indexes", count);
- return results;
+ logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes", count );
}
@Override
- public boolean isConnectionMember(String connectionType, EntityRef entity) throws Exception {
+ public boolean isConnectionMember( String connectionType, EntityRef entity ) throws Exception {
Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( connectionType );
- logger.debug("isConnectionMember(): Checking for edge type {} from {}:{} to {}:{}",
- new Object[] {
- edgeType,
- headEntity.getType(), headEntity.getUuid(),
- entity.getType(), entity.getUuid() });
-
- GraphManager gm = managerCache.getGraphManager(applicationScope);
- Observable<Edge> edges = gm.loadEdgeVersions(
- new SimpleSearchByEdge(
- new SimpleId(headEntity.getUuid(), headEntity.getType()),
- edgeType,
- entityId,
- Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
- null));
-
- return edges.toBlockingObservable().firstOrDefault(null) != null;
+ logger.debug( "isConnectionMember(): Checking for edge type {} from {}:{} to {}:{}", new Object[] {
+ edgeType, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid()
+ } );
+
+ GraphManager gm = managerCache.getGraphManager( applicationScope );
+ Observable<Edge> edges = gm.loadEdgeVersions(
+ new SimpleSearchByEdge( new SimpleId( headEntity.getUuid(), headEntity.getType() ), edgeType, entityId,
+ Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ) );
+
+ return edges.toBlockingObservable().firstOrDefault( null ) != null;
}
- @SuppressWarnings("unchecked")
- @Metered(group = "core", name = "RelationManager_isOwner")
+ @SuppressWarnings( "unchecked" )
+ @Metered( group = "core", name = "RelationManager_isOwner" )
@Override
- public boolean isCollectionMember(String collName, EntityRef entity) throws Exception {
+ public boolean isCollectionMember( String collName, EntityRef entity ) throws Exception {
Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( collName );
- logger.debug("isCollectionMember(): Checking for edge type {} from {}:{} to {}:{}",
- new Object[] {
- edgeType,
- headEntity.getType(), headEntity.getUuid(),
- entity.getType(), entity.getUuid() });
-
- GraphManager gm = managerCache.getGraphManager(applicationScope);
- Observable<Edge> edges = gm.loadEdgeVersions(
- new SimpleSearchByEdge(
- new SimpleId(headEntity.getUuid(), headEntity.getType()),
- edgeType,
- entityId,
- Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
- null));
-
- return edges.toBlockingObservable().firstOrDefault(null) != null;
+ logger.debug( "isCollectionMember(): Checking for edge type {} from {}:{} to {}:{}", new Object[] {
+ edgeType, headEntity.getType(), headEntity.getUuid(), entity.getType(), entity.getUuid()
+ } );
+
+ GraphManager gm = managerCache.getGraphManager( applicationScope );
+ Observable<Edge> edges = gm.loadEdgeVersions(
+ new SimpleSearchByEdge( new SimpleId( headEntity.getUuid(), headEntity.getType() ), edgeType, entityId,
+ Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ) );
+
+ return edges.toBlockingObservable().firstOrDefault( null ) != null;
}
- private boolean moreThanOneInboundConnection(
- EntityRef target, String connectionType ) {
+ private boolean moreThanOneInboundConnection( EntityRef target, String connectionType ) {
Id targetId = new SimpleId( target.getUuid(), target.getType() );
- GraphManager gm = managerCache.getGraphManager(applicationScope);
+ GraphManager gm = managerCache.getGraphManager( applicationScope );
- Observable<Edge> edgesToTarget = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
- targetId,
- CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ),
- System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING,
- null)); // last
+ Observable<Edge> edgesToTarget = gm.loadEdgesToTarget(
+ new SimpleSearchByEdgeType( targetId, CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ),
+ System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, null ) ); // last
Iterator<Edge> iterator = edgesToTarget.toBlockingObservable().getIterator();
int count = 0;
while ( iterator.hasNext() ) {
iterator.next();
- if ( count++ > 1 ) {
+ if ( count++ > 1 ) {
return true;
}
- }
+ }
return false;
- }
+ }
- private boolean moreThanOneOutboundConnection(
- EntityRef source, String connectionType ) {
+
+ private boolean moreThanOneOutboundConnection( EntityRef source, String connectionType ) {
Id sourceId = new SimpleId( source.getUuid(), source.getType() );
- GraphManager gm = managerCache.getGraphManager(applicationScope);
+ GraphManager gm = managerCache.getGraphManager( applicationScope );
- Observable<Edge> edgesFromSource = gm.loadEdgesFromSource(new SimpleSearchByEdgeType(
- sourceId,
- CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ),
- System.currentTimeMillis(),SearchByEdgeType.Order.DESCENDING,
- null)); // last
+ Observable<Edge> edgesFromSource = gm.loadEdgesFromSource(
+ new SimpleSearchByEdgeType( sourceId, CpNamingUtils.getEdgeTypeFromConnectionType( connectionType ),
+ System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, null ) ); // last
int count = edgesFromSource.take( 2 ).count().toBlocking().last();
return count > 1;
- }
+ }
@Override
@@ -540,10 +509,10 @@ public class CpRelationManager implements RelationManager {
final Set<String> indexes = new HashSet<String>();
- GraphManager gm = managerCache.getGraphManager(applicationScope);
+ GraphManager gm = managerCache.getGraphManager( applicationScope );
- Observable<String> str = gm.getEdgeTypesFromSource(
- new SimpleSearchEdgeType( cpHeadEntity.getId(), null , null ));
+ Observable<String> str =
+ gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) );
Iterator<String> iter = str.toBlockingObservable().getIterator();
while ( iter.hasNext() ) {
@@ -552,52 +521,53 @@ public class CpRelationManager implements RelationManager {
}
return indexes;
-
}
+
@Override
- public Results getCollection(String collectionName, UUID startResult, int count,
- Level resultsLevel, boolean reversed) throws Exception {
+ public Results getCollection( String collectionName, UUID startResult, int count, Level resultsLevel,
+ boolean reversed ) throws Exception {
- Query query = Query.fromQL("select *");
- query.setLimit(count);
- query.setReversed(reversed);
+ Query query = Query.fromQL( "select *" );
+ query.setLimit( count );
+ query.setReversed( reversed );
if ( startResult != null ) {
- query.addGreaterThanEqualFilter("created", startResult.timestamp());
+ query.addGreaterThanEqualFilter( "created", startResult.timestamp() );
}
- return searchCollection(collectionName, query);
+ return searchCollection( collectionName, query );
}
+
@Override
- public Results getCollection(
- String collName, Query query, Level level) throws Exception {
+ public Results getCollection( String collName, Query query, Level level ) throws Exception {
- return searchCollection(collName, query);
+ return searchCollection( collName, query );
}
// add to a named collection of the head entity
@Override
- public Entity addToCollection(String collName, EntityRef itemRef) throws Exception {
-
+ public Entity addToCollection( String collName, EntityRef itemRef ) throws Exception {
+
CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collName );
if ( ( collection != null ) && !collection.getType().equals( itemRef.getType() ) ) {
return null;
}
- return addToCollection( collName, itemRef,
- (collection != null && collection.getLinkedCollection() != null) );
+ return addToCollection( collName, itemRef, ( collection != null && collection.getLinkedCollection() != null ) );
}
- public Entity addToCollection(String collName, EntityRef itemRef, boolean connectBack ) throws Exception {
+
+ public Entity addToCollection( String collName, EntityRef itemRef, boolean connectBack ) throws Exception {
// don't fetch entity if we've already got one
final Entity itemEntity;
if ( itemRef instanceof Entity ) {
- itemEntity = (Entity)itemRef;
- } else {
+ itemEntity = ( Entity ) itemRef;
+ }
+ else {
itemEntity = em.get( itemRef );
}
@@ -611,70 +581,61 @@ public class CpRelationManager implements RelationManager {
}
// load the new member entity to be added to the collection from its default scope
- CollectionScope memberScope = new CollectionScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( itemRef.getType() ));
+ CollectionScope memberScope =
+ new CollectionScopeImpl( applicationScope.getApplication(), applicationScope.getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromEntityType( itemRef.getType() ) );
- EntityCollectionManager memberMgr = managerCache.getEntityCollectionManager(memberScope);
+ EntityCollectionManager memberMgr = managerCache.getEntityCollectionManager( memberScope );
//TODO, this double load should disappear once events are in
- org.apache.usergrid.persistence.model.entity.Entity memberEntity = memberMgr.load(
- new SimpleId( itemRef.getUuid(), itemRef.getType() )).toBlocking().last();
+ org.apache.usergrid.persistence.model.entity.Entity memberEntity =
+ memberMgr.load( new SimpleId( itemRef.getUuid(), itemRef.getType() ) ).toBlocking().last();
if ( memberEntity == null ) {
- throw new RuntimeException("Unable to load entity uuid="
- + itemRef.getUuid() + " type=" + itemRef.getType());
+ throw new RuntimeException(
+ "Unable to load entity uuid=" + itemRef.getUuid() + " type=" + itemRef.getType() );
}
if ( logger.isDebugEnabled() ) {
- logger.debug("Loaded member entity {}:{} from scope\n app {}\n "
- + "owner {}\n name {} data {}",
- new Object[] {
- itemRef.getType(),
- itemRef.getUuid(),
- memberScope.getApplication(),
- memberScope.getOwner(),
- memberScope.getName(),
- CpEntityMapUtils.toMap( memberEntity )
- });
+ logger.debug( "Loaded member entity {}:{} from scope\n app {}\n " + "owner {}\n name {} data {}",
+ new Object[] {
+ itemRef.getType(), itemRef.getUuid(), memberScope.getApplication(), memberScope.getOwner(),
+ memberScope.getName(), CpEntityMapUtils.toMap( memberEntity )
+ } );
}
String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( collName );
- UUID timeStampUuid = memberEntity.getId().getUuid() != null
- && UUIDUtils.isTimeBased( memberEntity.getId().getUuid())
- ? memberEntity.getId().getUuid() : UUIDUtils.newTimeUUID();
+ UUID timeStampUuid =
+ memberEntity.getId().getUuid() != null && UUIDUtils.isTimeBased( memberEntity.getId().getUuid() ) ?
+ memberEntity.getId().getUuid() : UUIDUtils.newTimeUUID();
- long uuidHash = UUIDUtils.getUUIDLong(timeStampUuid);
+ long uuidHash = UUIDUtils.getUUIDLong( timeStampUuid );
// create graph edge connection from head entity to member entity
- Edge edge = new SimpleEdge(
- cpHeadEntity.getId(),
- edgeType,
- memberEntity.getId(),
- uuidHash);
- GraphManager gm = managerCache.getGraphManager(applicationScope);
- gm.writeEdge(edge).toBlockingObservable().last();
+ Edge edge = new SimpleEdge( cpHeadEntity.getId(), edgeType, memberEntity.getId(), uuidHash );
+ GraphManager gm = managerCache.getGraphManager( applicationScope );
+ gm.writeEdge( edge ).toBlockingObservable().last();
- logger.debug("Wrote edgeType {}\n from {}:{}\n to {}:{}\n scope {}:{}", new Object[] {
- edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(),
- memberEntity.getId().getType(), memberEntity.getId().getUuid(),
- applicationScope.getApplication().getType(), applicationScope.getApplication().getUuid()});
+ logger.debug( "Wrote edgeType {}\n from {}:{}\n to {}:{}\n scope {}:{}", new Object[] {
+ edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(),
+ memberEntity.getId().getType(), memberEntity.getId().getUuid(),
+ applicationScope.getApplication().getType(), applicationScope.getApplication().getUuid()
+ } );
- ((CpEntityManager)em).indexEntityIntoCollection( cpHeadEntity, memberEntity, collName );
+ ( ( CpEntityManager ) em ).indexEntityIntoCollection( cpHeadEntity, memberEntity, collName );
- logger.debug("Added entity {}:{} to collection {}", new Object[] {
- itemRef.getUuid().toString(), itemRef.getType(), collName });
+ logger.debug( "Added entity {}:{} to collection {}", new Object[] {
+ itemRef.getUuid().toString(), itemRef.getType(), collName
+ } );
-// logger.debug("With head entity scope is {}:{}:{}", new Object[] {
-// headEntityScope.getApplication().toString(),
-// headEntityScope.getOwner().toString(),
-// headEntityScope.getName()});
+ // logger.debug("With head entity scope is {}:{}:{}", new Object[] {
+ // headEntityScope.getApplication().toString(),
+ // headEntityScope.getOwner().toString(),
+ // headEntityScope.getName()});
if ( connectBack && collection != null && collection.getLinkedCollection() != null ) {
- getRelationManager( itemEntity )
- .addToCollection( collection.getLinkedCollection(), headEntity, false );
+ getRelationManager( itemEntity ).addToCollection( collection.getLinkedCollection(), headEntity, false );
}
return itemEntity;
@@ -682,11 +643,11 @@ public class CpRelationManager implements RelationManager {
@Override
- public Entity addToCollections(List<EntityRef> owners, String collName) throws Exception {
+ public Entity addToCollections( List<EntityRef> owners, String collName ) throws Exception {
// TODO: this addToCollections() implementation seems wrong.
for ( EntityRef eref : owners ) {
- addToCollection( collName, eref );
+ addToCollection( collName, eref );
}
return null;
@@ -694,9 +655,9 @@ public class CpRelationManager implements RelationManager {
@Override
- @Metered(group = "core", name = "RelationManager_createItemInCollection")
- public Entity createItemInCollection(
- String collName, String itemType, Map<String, Object> properties) throws Exception {
+ @Metered( group = "core", name = "RelationManager_createItemInCollection" )
+ public Entity createItemInCollection( String collName, String itemType, Map<String, Object> properties )
+ throws Exception {
if ( headEntity.getUuid().equals( applicationId ) ) {
if ( itemType.equals( TYPE_ENTITY ) ) {
@@ -714,14 +675,13 @@ public class CpRelationManager implements RelationManager {
return em.create( itemType, properties );
}
- else if ( headEntity.getType().equals( Group.ENTITY_TYPE )
- && ( collName.equals( COLLECTION_ROLES ) ) ) {
+ else if ( headEntity.getType().equals( Group.ENTITY_TYPE ) && ( collName.equals( COLLECTION_ROLES ) ) ) {
UUID groupId = headEntity.getUuid();
String roleName = ( String ) properties.get( PROPERTY_NAME );
return em.createGroupRole( groupId, roleName, ( Long ) properties.get( PROPERTY_INACTIVITY ) );
}
- CollectionInfo collection = getDefaultSchema().getCollection(headEntity.getType(),collName);
+ CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collName );
if ( ( collection != null ) && !collection.getType().equals( itemType ) ) {
return null;
}
@@ -735,16 +695,16 @@ public class CpRelationManager implements RelationManager {
addToCollection( collName, itemEntity );
if ( collection != null && collection.getLinkedCollection() != null ) {
- getRelationManager( getHeadEntity() )
- .addToCollection( collection.getLinkedCollection(),itemEntity);
+ getRelationManager( getHeadEntity() ).addToCollection( collection.getLinkedCollection(), itemEntity );
}
}
- return itemEntity;
+ return itemEntity;
}
+
@Override
- public void removeFromCollection(String collName, EntityRef itemRef) throws Exception {
+ public void removeFromCollection( String collName, EntityRef itemRef ) throws Exception {
// special handling for roles collection of the application
if ( headEntity.getUuid().equals( applicationId ) ) {
@@ -760,102 +720,86 @@ public class CpRelationManager implements RelationManager {
}
em.delete( itemRef );
return;
- }
+ }
// load the entity to be removed to the collection
- CollectionScope memberScope = new CollectionScopeImpl(
- this.applicationScope.getApplication(),
- this.applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( itemRef.getType() ));
- EntityCollectionManager memberMgr = managerCache.getEntityCollectionManager(memberScope);
+ CollectionScope memberScope =
+ new CollectionScopeImpl( this.applicationScope.getApplication(), this.applicationScope.getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromEntityType( itemRef.getType() ) );
+ EntityCollectionManager memberMgr = managerCache.getEntityCollectionManager( memberScope );
if ( logger.isDebugEnabled() ) {
- logger.debug("Loading entity to remove from collection "
- + "{}:{} from scope\n app {}\n owner {}\n name {}",
- new Object[] {
- itemRef.getType(),
- itemRef.getUuid(),
- memberScope.getApplication(),
- memberScope.getOwner(),
- memberScope.getName()
- });
+ logger.debug( "Loading entity to remove from collection "
+ + "{}:{} from scope\n app {}\n owner {}\n name {}", new Object[] {
+ itemRef.getType(), itemRef.getUuid(), memberScope.getApplication(), memberScope.getOwner(),
+ memberScope.getName()
+ } );
}
- org.apache.usergrid.persistence.model.entity.Entity memberEntity = memberMgr.load(
- new SimpleId( itemRef.getUuid(), itemRef.getType() )).toBlockingObservable().last();
+ org.apache.usergrid.persistence.model.entity.Entity memberEntity =
+ memberMgr.load( new SimpleId( itemRef.getUuid(), itemRef.getType() ) ).toBlockingObservable().last();
- final EntityIndex ei = managerCache.getEntityIndex(applicationScope);
+ final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
final EntityIndexBatch batch = ei.createBatch();
// remove item from collection index
- IndexScope indexScope = new IndexScopeImpl(
- cpHeadEntity.getId(),
- CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ));
+ IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
+ CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
- batch.deindex(indexScope, memberEntity );
+ batch.deindex( indexScope, memberEntity );
// remove collection from item index
- IndexScope itemScope = new IndexScopeImpl(
- memberEntity.getId(),
- CpNamingUtils.getCollectionScopeNameFromCollectionName(
- Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ));
+ IndexScope itemScope = new IndexScopeImpl( memberEntity.getId(), CpNamingUtils
+ .getCollectionScopeNameFromCollectionName(
+ Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ) );
- batch.deindex(itemScope, cpHeadEntity );
+ batch.deindex( itemScope, cpHeadEntity );
batch.execute();
// remove edge from collection to item
- GraphManager gm = managerCache.getGraphManager(applicationScope);
- Edge collectionToItemEdge = new SimpleEdge(
- cpHeadEntity.getId(),
- CpNamingUtils.getEdgeTypeFromCollectionName( collName),
- memberEntity.getId(),
- UUIDUtils.getUUIDLong(memberEntity.getId().getUuid())
- );
- gm.deleteEdge(collectionToItemEdge).toBlockingObservable().last();
+ GraphManager gm = managerCache.getGraphManager( applicationScope );
+ Edge collectionToItemEdge =
+ new SimpleEdge( cpHeadEntity.getId(), CpNamingUtils.getEdgeTypeFromCollectionName( collName ),
+ memberEntity.getId(), UUIDUtils.getUUIDLong( memberEntity.getId().getUuid() ) );
+ gm.deleteEdge( collectionToItemEdge ).toBlockingObservable().last();
// remove edge from item to collection
- Edge itemToCollectionEdge = new SimpleEdge(
- memberEntity.getId(),
- CpNamingUtils
- .getEdgeTypeFromCollectionName( Schema.defaultCollectionName( cpHeadEntity.getId().getType() )),
- cpHeadEntity.getId(),
- UUIDUtils.getUUIDLong(cpHeadEntity.getId().getUuid()));
- gm.deleteEdge(itemToCollectionEdge).toBlockingObservable().last();
+ Edge itemToCollectionEdge = new SimpleEdge( memberEntity.getId(), CpNamingUtils
+ .getEdgeTypeFromCollectionName( Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ),
+ cpHeadEntity.getId(), UUIDUtils.getUUIDLong( cpHeadEntity.getId().getUuid() ) );
+ gm.deleteEdge( itemToCollectionEdge ).toBlockingObservable().last();
// special handling for roles collection of a group
if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
if ( collName.equals( COLLECTION_ROLES ) ) {
- String path = (String)( (Entity)itemRef ).getMetadata( "path" );
+ String path = ( String ) ( ( Entity ) itemRef ).getMetadata( "path" );
if ( path.startsWith( "/roles/" ) ) {
- Entity itemEntity = em.get( new SimpleEntityRef(
- memberEntity.getId().getType(), memberEntity.getId().getUuid() ) );
+ Entity itemEntity = em.get( new SimpleEntityRef( memberEntity.getId().getType(),
+ memberEntity.getId().getUuid() ) );
RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
em.deleteRole( roleRef.getApplicationRoleName() );
}
-
- }
+ }
}
}
@Override
- public void copyRelationships(String srcRelationName, EntityRef dstEntityRef,
- String dstRelationName) throws Exception {
+ public void copyRelationships( String srcRelationName, EntityRef dstEntityRef, String dstRelationName )
+ throws Exception {
headEntity = em.validate( headEntity );
dstEntityRef = em.validate( dstEntityRef );
- CollectionInfo srcCollection =
- getDefaultSchema().getCollection( headEntity.getType(), srcRelationName );
+ CollectionInfo srcCollection = getDefaultSchema().getCollection( headEntity.getType(), srcRelationName );
- CollectionInfo dstCollection =
- getDefaultSchema().getCollection( dstEntityRef.getType(), dstRelationName );
+ CollectionInfo dstCollection = getDefaultSchema().getCollection( dstEntityRef.getType(), dstRelationName );
Results results = null;
do {
@@ -881,8 +825,9 @@ public class CpRelationManager implements RelationManager {
while ( ( results != null ) && ( results.hasMoreResults() ) );
}
+
@Override
- public Results searchCollection(String collName, Query query) throws Exception {
+ public Results searchCollection( String collName, Query query ) throws Exception {
if ( query == null ) {
query = new Query();
@@ -890,24 +835,22 @@ public class CpRelationManager implements RelationManager {
headEntity = em.validate( headEntity );
- CollectionInfo collection =
- getDefaultSchema().getCollection( headEntity.getType(), collName );
+ CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collName );
if ( collection == null ) {
- throw new RuntimeException("Cannot find collection-info for '"+collName
- +"' of "+ headEntity.getType() +":"+headEntity.getUuid() );
+ throw new RuntimeException(
+ "Cannot find collection-info for '" + collName + "' of " + headEntity.getType() + ":" + headEntity
+ .getUuid() );
}
- IndexScope indexScope = new IndexScopeImpl(
- cpHeadEntity.getId(),
- CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ));
+ IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
+ CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
- EntityIndex ei = managerCache.getEntityIndex(applicationScope);
-
- logger.debug("Searching scope {}:{}",
+ EntityIndex ei = managerCache.getEntityIndex( applicationScope );
- indexScope.getOwner().toString(),
- indexScope.getName() );
+ logger.debug( "Searching scope {}:{}",
+
+ indexScope.getOwner().toString(), indexScope.getName() );
query.setEntityType( collection.getType() );
query = adjustQuery( query );
@@ -929,22 +872,22 @@ public class CpRelationManager implements RelationManager {
CandidateResults crs = ei.search( indexScope, query );
if ( results == null ) {
- logger.debug("Calling build results 1");
+ logger.debug( "Calling build results 1" );
results = buildResults( query, crs, collName );
-
- } else {
- logger.debug("Calling build results 2");
+ }
+ else {
+ logger.debug( "Calling build results 2" );
Results newResults = buildResults( query, crs, collName );
results.merge( newResults );
}
if ( crs.isEmpty() || !crs.hasCursor() ) { // no results, no cursor, can't get more
satisfied = true;
-
- } else if ( results.size() == originalLimit ) { // got what we need
+ }
+ else if ( results.size() == originalLimit ) { // got what we need
satisfied = true;
-
- } else if ( crs.hasCursor() ) {
+ }
+ else if ( crs.hasCursor() ) {
satisfied = false;
// need to query for more
@@ -952,10 +895,10 @@ public class CpRelationManager implements RelationManager {
query.setCursor( results.getCursor() );
query.setLimit( originalLimit - results.size() );
- logger.warn("Satisfy query limit {}, new limit {} query count {}", new Object[] {
- originalLimit, query.getLimit(), queryCount
- });
- }
+ logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] {
+ originalLimit, query.getLimit(), queryCount
+ } );
+ }
}
return results;
@@ -963,78 +906,60 @@ public class CpRelationManager implements RelationManager {
@Override
- @Metered(group = "core", name = "RelationManager_createConnection_connection_ref")
+ @Metered( group = "core", name = "RelationManager_createConnection_connection_ref" )
public ConnectionRef createConnection( ConnectionRef connection ) throws Exception {
-
+
return createConnection( connection.getConnectionType(), connection.getConnectedEntity() );
}
@Override
- @Metered(group = "core", name = "RelationManager_createConnection_connectionType")
- public ConnectionRef createConnection(
- String connectionType, EntityRef connectedEntityRef ) throws Exception {
+ @Metered( group = "core", name = "RelationManager_createConnection_connectionType" )
+ public ConnectionRef createConnection( String connectionType, EntityRef connectedEntityRef ) throws Exception {
headEntity = em.validate( headEntity );
connectedEntityRef = em.validate( connectedEntityRef );
- ConnectionRefImpl connection = new ConnectionRefImpl(
- headEntity, connectionType, connectedEntityRef );
+ ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
- CollectionScope targetScope = new CollectionScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( connectedEntityRef.getType() ));
+ CollectionScope targetScope =
+ new CollectionScopeImpl( applicationScope.getApplication(), applicationScope.getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromEntityType( connectedEntityRef.getType() ) );
- EntityCollectionManager targetEcm = managerCache.getEntityCollectionManager(targetScope);
+ EntityCollectionManager targetEcm = managerCache.getEntityCollectionManager( targetScope );
if ( logger.isDebugEnabled() ) {
- logger.debug("createConnection(): "
- + "Indexing connection type '{}'\n from source {}:{}]\n"
- + " to target {}:{}\n from scope\n app {}\n owner {}\n name {}",
- new Object[] {
- connectionType,
- headEntity.getType(),
- headEntity.getUuid(),
- connectedEntityRef.getType(),
- connectedEntityRef.getUuid(),
- targetScope.getApplication(),
- targetScope.getOwner(),
- targetScope.getName()
- });
- }
-
- org.apache.usergrid.persistence.model.entity.Entity targetEntity = targetEcm.load(
- new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() ))
- .toBlockingObservable().last();
-
- String edgeType = CpNamingUtils
- .getEdgeTypeFromConnectionType( connectionType );
+ logger.debug( "createConnection(): " + "Indexing connection type '{}'\n from source {}:{}]\n"
+ + " to target {}:{}\n from scope\n app {}\n owner {}\n name {}", new Object[] {
+ connectionType, headEntity.getType(), headEntity.getUuid(), connectedEntityRef.getType(),
+ connectedEntityRef.getUuid(), targetScope.getApplication(), targetScope.getOwner(),
+ targetScope.getName()
+ } );
+ }
+
+ org.apache.usergrid.persistence.model.entity.Entity targetEntity =
+ targetEcm.load( new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() ) )
+ .toBlockingObservable().last();
+
+ String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( connectionType );
// create graph edge connection from head entity to member entity
- Edge edge = new SimpleEdge(
- cpHeadEntity.getId(),
- edgeType,
- targetEntity.getId(),
- System.currentTimeMillis() );
- GraphManager gm = managerCache.getGraphManager(applicationScope);
- gm.writeEdge(edge).toBlockingObservable().last();
-
- EntityIndex ei = managerCache.getEntityIndex(applicationScope);
+ Edge edge = new SimpleEdge( cpHeadEntity.getId(), edgeType, targetEntity.getId(), System.currentTimeMillis() );
+ GraphManager gm = managerCache.getGraphManager( applicationScope );
+ gm.writeEdge( edge ).toBlockingObservable().last();
+
+ EntityIndex ei = managerCache.getEntityIndex( applicationScope );
EntityIndexBatch batch = ei.createBatch();
// Index the new connection in app|source|type context
- IndexScope indexScope = new IndexScopeImpl(
- cpHeadEntity.getId(),
- CpNamingUtils.getConnectionScopeName( connectedEntityRef.getType(), connectionType ));
+ IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
+ CpNamingUtils.getConnectionScopeName( connectedEntityRef.getType(), connectionType ) );
- batch.index(indexScope, targetEntity );
+ batch.index( indexScope, targetEntity );
// Index the new connection in app|scope|all-types context
- IndexScope allTypesIndexScope = new IndexScopeImpl(
- cpHeadEntity.getId(),
- CpNamingUtils.ALL_TYPES);
- batch.index(allTypesIndexScope, targetEntity );
+ IndexScope allTypesIndexScope = new IndexScopeImpl( cpHeadEntity.getId(), CpNamingUtils.ALL_TYPES );
+ batch.index( allTypesIndexScope, targetEntity );
batch.execute();
@@ -1047,16 +972,17 @@ public class CpRelationManager implements RelationManager {
return connection;
}
-
- @SuppressWarnings("unchecked")
- @Metered(group = "core", name = "CpRelationManager_batchUpdateEntityConnection")
- public Mutator<ByteBuffer> batchUpdateEntityConnection( Mutator<ByteBuffer> batch,
- boolean disconnect, ConnectionRefImpl connection, UUID timestampUuid ) throws Exception {
+
+ @SuppressWarnings( "unchecked" )
+ @Metered( group = "core", name = "CpRelationManager_batchUpdateEntityConnection" )
+ public Mutator<ByteBuffer> batchUpdateEntityConnection( Mutator<ByteBuffer> batch, boolean disconnect,
+ ConnectionRefImpl connection, UUID timestampUuid )
+ throws Exception {
long timestamp = getTimestampInMicros( timestampUuid );
- Entity connectedEntity = em.get( new SimpleEntityRef(
- connection.getConnectedEntityType(), connection.getConnectedEntityId()) );
+ Entity connectedEntity =
+ em.get( new SimpleEntityRef( connection.getConnectedEntityType(), connection.getConnectedEntityId() ) );
if ( connectedEntity == null ) {
return batch;
@@ -1065,64 +991,60 @@ public class CpRelationManager implements RelationManager {
// Create connection for requested params
if ( disconnect ) {
-
+
addDeleteToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
- key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
- connection.getConnectionType() ),
- asList( connection.getConnectedEntityId(),
- connection.getConnectedEntityType() ), timestamp );
+ key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
+ connection.getConnectionType() ),
+ asList( connection.getConnectedEntityId(), connection.getConnectedEntityType() ), timestamp );
addDeleteToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
- key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
- connection.getConnectionType() ),
- asList( connection.getConnectingEntityId(),
- connection.getConnectingEntityType() ), timestamp );
+ key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
+ connection.getConnectionType() ),
+ asList( connection.getConnectingEntityId(), connection.getConnectingEntityType() ), timestamp );
// delete the connection path if there will be no connections left
// check out outbound edges of the given type. If we have more than the 1 specified,
// we shouldn't delete the connection types from our outbound index
- if ( !moreThanOneOutboundConnection(
- connection.getConnectingEntity(), connection.getConnectionType() ) ) {
+ if ( !moreThanOneOutboundConnection( connection.getConnectingEntity(), connection.getConnectionType() ) ) {
addDeleteToMutator( batch, ENTITY_DICTIONARIES,
- key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
- connection.getConnectionType(), timestamp );
+ key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
+ connection.getConnectionType(), timestamp );
}
//check out inbound edges of the given type. If we have more than the 1 specified,
// we shouldn't delete the connection types from our outbound index
- if ( !moreThanOneInboundConnection(
- connection.getConnectingEntity(), connection.getConnectionType() ) ) {
+ if ( !moreThanOneInboundConnection( connection.getConnectingEntity(), connection.getConnectionType() ) ) {
addDeleteToMutator( batch, ENTITY_DICTIONARIES,
key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
connection.getConnectionType(), timestamp );
}
-
- } else {
+ }
+ else {
addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
- key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
- connection.getConnectionType() ),
- asList( connection.getConnectedEntityId(), connection.getConnectedEntityType() ),
- timestamp, timestamp );
+ key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
+ connection.getConnectionType() ),
+ asList( connection.getConnectedEntityId(), connection.getConnectedEntityType() ), timestamp,
+ timestamp );
addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
- key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
- connection.getConnectionType() ),
- asList( connection.getConnectingEntityId(), connection.getConnectingEntityType() ),
- timestamp, timestamp );
+ key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
+ connection.getConnectionType() ),
+ asList( connection.getConnectingEntityId(), connection.getConnectingEntityType() ), timestamp,
+ timestamp );
// Add connection type to connections set
addInsertToMutator( batch, ENTITY_DICTIONARIES,
- key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
- connection.getConnectionType(), null, timestamp );
+ key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
+ connection.getConnectionType(), null, timestamp );
// Add connection type to connections set
addInsertToMutator( batch, ENTITY_DICTIONARIES,
- key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
- connection.getConnectionType(), null, timestamp );
+ key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
+ connection.getConnectionType(), null, timestamp );
}
// Add indexes for the connected entity's list properties
@@ -1137,15 +1059,15 @@ public class CpRelationManager implements RelationManager {
for ( String dictionaryName : dictionaryNames ) {
boolean has_dictionary = schema.hasDictionary( connectedEntity.getType(), dictionaryName );
- boolean dictionary_indexed = schema.isDictionaryIndexedInConnections(
- connectedEntity.getType(), dictionaryName );
+ boolean dictionary_indexed =
+ schema.isDictionaryIndexedInConnections( connectedEntity.getType(), dictionaryName );
if ( dictionary_indexed || !has_dictionary ) {
Set<Object> elementValues = em.getDictionaryAsSet( connectedEntity, dictionaryName );
for ( Object elementValue : elementValues ) {
IndexUpdate indexUpdate =
- batchStartIndexUpdate( batch, connectedEntity, dictionaryName,
- elementValue, timestampUuid, has_dictionary, true, disconnect, false );
+ batchStartIndexUpdate( batch, connectedEntity, dictionaryName, elementValue, timestampUuid,
+ has_dictionary, true, disconnect, false );
batchUpdateConnectionIndex( indexUpdate, connection );
}
}
@@ -1156,127 +1078,118 @@ public class CpRelationManager implements RelationManager {
@Override
- @Metered(group = "core", name = "RelationManager_createConnection_paired_connection_type")
- public ConnectionRef createConnection(
- String pairedConnectionType, EntityRef pairedEntity, String connectionType,
- EntityRef connectedEntityRef ) throws Exception {
-
- throw new UnsupportedOperationException("Paired connections not supported");
+ @Metered( group = "core", name = "RelationManager_createConnection_paired_connection_type" )
+ public ConnectionRef createConnection( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
+ EntityRef connectedEntityRef ) throws Exception {
+
+ throw new UnsupportedOperationException( "Paired connections not supported" );
}
@Override
- @Metered(group = "core", name = "RelationManager_createConnection_connected_entity_ref")
+ @Metered( group = "core", name = "RelationManager_createConnection_connected_entity_ref" )
public ConnectionRef createConnection( ConnectedEntityRef... connections ) throws Exception {
- throw new UnsupportedOperationException("Paired connections not supported");
+ throw new UnsupportedOperationException( "Paired connections not supported" );
}
+
@Override
- public ConnectionRef connectionRef(
- String connectionType,
- EntityRef connectedEntityRef) throws Exception {
+ public ConnectionRef connectionRef( String connectionType, EntityRef connectedEntityRef ) throws Exception {
- ConnectionRef connection = new ConnectionRefImpl(
- headEntity, connectionType, connectedEntityRef );
+ ConnectionRef connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
return connection;
}
+
@Override
- public ConnectionRef connectionRef(String pairedConnectionType, EntityRef pairedEntity,
- String connectionType, EntityRef connectedEntityRef) throws Exception {
+ public ConnectionRef connectionRef( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
+ EntityRef connectedEntityRef ) throws Exception {
- throw new UnsupportedOperationException("Paired connections not supported");
+ throw new UnsupportedOperationException( "Paired connections not supported" );
}
+
@Override
- public ConnectionRef connectionRef(ConnectedEntityRef... connections) {
+ public ConnectionRef connectionRef( ConnectedEntityRef... connections ) {
- throw new UnsupportedOperationException("Paired connections not supported");
+ throw new UnsupportedOperationException( "Paired connections not supported" );
}
+
@Override
- public void deleteConnection(ConnectionRef connectionRef) throws Exception {
-
+ public void deleteConnection( ConnectionRef connectionRef ) throws Exception {
+
// First, clean up the dictionary records of the connection
Keyspace ko = cass.getApplicationKeyspace( applicationId );
Mutator<ByteBuffer> m = createMutator( ko, be );
- batchUpdateEntityConnection(
- m, true, (ConnectionRefImpl)connectionRef, UUIDGenerator.newTimeUUID() );
+ batchUpdateEntityConnection( m, true, ( ConnectionRefImpl ) connectionRef, UUIDGenerator.newTimeUUID() );
batchExecute( m, CassandraService.RETRY_COUNT );
EntityRef connectingEntityRef = connectionRef.getConnectingEntity(); // source
EntityRef connectedEntityRef = connectionRef.getConnectedEntity(); // target
String connectionType = connectionRef.getConnectedEntity().getConnectionType();
-
- CollectionScope targetScope = new CollectionScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( connectedEntityRef.getType() ));
- EntityCollectionManager targetEcm = managerCache.getEntityCollectionManager(targetScope);
+ CollectionScope targetScope =
+ new CollectionScopeImpl( applicationScope.getApplication(), applicationScope.getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromEntityType( connectedEntityRef.getType() ) );
+
+ EntityCollectionManager targetEcm = managerCache.getEntityCollectionManager( targetScope );
if ( logger.isDebugEnabled() ) {
- logger.debug("Deleting connection '{}' from source {}:{} \n to target {}:{}",
- new Object[] {
- connectionType,
- connectingEntityRef.getType(),
- connectingEntityRef.getUuid(),
- connectedEntityRef.getType(),
- connectedEntityRef.getUuid()
- });
+ logger.debug( "Deleting connection '{}' from source {}:{} \n to target {}:{}", new Object[] {
+ connectionType, connectingEntityRef.getType(), connectingEntityRef.getUuid(),
+ connectedEntityRef.getType(), connectedEntityRef.getUuid()
+ } );
}
- org.apache.usergrid.persistence.model.entity.Entity targetEntity = targetEcm.load(
- new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() ))
- .toBlockingObservable().last();
+ org.apache.usergrid.persistence.model.entity.Entity targetEntity =
+ targetEcm.load( new SimpleId( connectedEntityRef.getUuid(), connectedEntityRef.getType() ) )
+ .toBlockingObservable().last();
// Delete graph edge connection from head entity to member entity
- Edge edge = new SimpleEdge(
- new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
- connectionType,
- targetEntity.getId(),
- System.currentTimeMillis() );
- GraphManager gm = managerCache.getGraphManager(applicationScope);
- gm.deleteEdge(edge).toBlockingObservable().last();
-
- final EntityIndex ei = managerCache.getEntityIndex( applicationScope ) ;
+ Edge edge = new SimpleEdge( new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
+ connectionType, targetEntity.getId(), System.currentTimeMillis() );
+ GraphManager gm = managerCache.getGraphManager( applicationScope );
+ gm.deleteEdge( edge ).toBlockingObservable().last();
+
+ final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
final EntityIndexBatch batch = ei.createBatch();
// Deindex the connection in app|source|type context
- IndexScope indexScope = new IndexScopeImpl(
- new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
- CpNamingUtils.getConnectionScopeName( targetEntity.getId().getType(), connectionType ));
- batch.deindex( indexScope , targetEntity );
+ IndexScope indexScope =
+ new IndexScopeImpl( new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
+ CpNamingUtils.getConnectionScopeName( targetEntity.getId().getType(), connectionType ) );
+ batch.deindex( indexScope, targetEntity );
// Deindex the connection in app|source|type context
- IndexScope allTypesIndexScope = new IndexScopeImpl(
- new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
- CpNamingUtils.ALL_TYPES);
+ IndexScope allTypesIndexScope =
+ new IndexScopeImpl( new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
+ CpNamingUtils.ALL_TYPES );
- batch.deindex( allTypesIndexScope, targetEntity );
+ batch.deindex( allTypesIndexScope, targetEntity );
batch.execute();
-
}
@Override
- public Set<String> getConnectionTypes(UUID connectedEntityId) throws Exception {
- throw new UnsupportedOperationException("Cannot specify entity by UUID alone.");
+ public Set<String> getConnectionTypes( UUID connectedEntityId ) throws Exception {
+ throw new UnsupportedOperationException( "Cannot specify entity by UUID alone." );
}
+
@Override
public Set<String> getConnectionTypes() throws Exception {
return getConnectionTypes( false );
}
+
@Override
- public Set<String> getConnectionTypes(boolean filterConnection) throws Exception {
- Set<String> connections = cast(
- em.getDictionaryAsSet( headEntity, Schema.DICTIONARY_CONNECTED_TYPES ) );
+ public Set<String> getConnectionTypes( boolean filterConnection ) throws Exception {
+ Set<String> connections = cast( em.getDictionaryAsSet( headEntity, Schema.DICTIONARY_CONNECTED_TYPES ) );
if ( connections == null ) {
return null;
@@ -1289,47 +1202,45 @@ public class CpRelationManager implements RelationManager {
@Override
- public Results getConnectedEntities(
- String connectionType, String connectedEntityType, Level level) throws Exception {
+ public Results getConnectedEntities( String connectionType, String connectedEntityType, Level level )
+ throws Exception {
Results raw = null;
Query query = new Query();
- query.setConnectionType(connectionType);
- query.setEntityType(connectedEntityType);
+ query.setConnectionType( connectionType );
+ query.setEntityType( connectedEntityType );
if ( connectionType == null ) {
raw = searchConnectedEntities( query );
-
- } else {
+ }
+ else {
headEntity = em.validate( headEntity );
String scopeName = null;
if ( connectedEntityType != null ) {
scopeName = CpNamingUtils.getConnectionScopeName( connectedEntityType, connectionType );
- } else {
+ }
+ else {
scopeName = CpNamingUtils.ALL_TYPES;
}
- IndexScope indexScope = new IndexScopeImpl(
- cpHeadEntity.getId(),
- scopeName);
+ IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(), scopeName );
- final EntityIndex ei = managerCache.getEntityIndex(applicationScope);
+ final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
-
- logger.debug("Searching connected entities from scope {}:{}",
- indexScope.getOwner().toString(),
- indexScope.getName());
+
+ logger.debug( "Searching connected entities from scope {}:{}", indexScope.getOwner().toString(),
+ indexScope.getName() );
query = adjustQuery( query );
CandidateResults crs = ei.search( indexScope, query );
- raw = buildResults( query , crs, query.getConnectionType() );
+ raw = buildResults( query, crs, query.getConnectionType() );
}
- if ( Level.ALL_PROPERTIES.equals(level ) ) {
+ if ( Level.ALL_PROPERTIES.equals( level ) ) {
List<Entity> entities = new ArrayList<Entity>();
for ( EntityRef ref : raw.getEntities() ) {
Entity entity = em.get( ref );
@@ -1349,29 +1260,28 @@ public class CpRelationManager implements RelationManager {
@Override
- public Results getConnectingEntities(
- String connType, String fromEntityType, Level resultsLevel) throws Exception {
+ public Results getConnectingEntities( String connType, String fromEntityType, Level resultsLevel )
+ throws Exception {
return getConnectingEntities( connType, fromEntityType, resultsLevel, -1 );
}
+
@Override
- public Results getConnectingEntities(
- String connType, String fromEntityType, Level level, int count) throws Exception {
+ public Results getConnectingEntities( String connType, String fromEntityType, Level level, int count )
+ throws Exception {
// looking for edges to the head entity
- String edgeType =
- CpNamingUtils.getEdgeTypeFromConnectionType( connType );
+ String edgeType = CpNamingUtils.getEdgeTypeFromConnectionType( connType );
- Map<EntityRef, Set<String>> containers =
- getContainers( count, edgeType, fromEntityType );
+ Map<EntityRef, Set<String>> containers = getContainers( count, edgeType, fromEntityType );
- if ( Level.REFS.equals(level ) ) {
+ if ( Level.REFS.equals( level ) ) {
List<EntityRef> refList = new ArrayList<EntityRef>( containers.keySet() );
return Results.fromRefList( refList );
- }
+ }
- if ( Level.IDS.equals(level ) ) {
+ if ( Level.IDS.equals( level ) ) {
// TODO: someday this should return a list of Core Persistence Ids
List<UUID> idList = new ArrayList<UUID>();
for ( EntityRef ref : containers.keySet() ) {
@@ -1383,7 +1293,7 @@ public class CpRelationManager implements RelationManager {
List<Entity> entities = new ArrayList<Entity>();
for ( EntityRef ref : containers.keySet() ) {
Entity entity = em.get( ref );
- logger.debug(" Found connecting entity: " + entity.getProperties());
+ logger.debug( " Found connecting entity: " + entity.getProperties() );
entities.add( entity );
}
return Results.fromEntities( entities );
@@ -1402,35 +1312,30 @@ public class CpRelationManager implements RelationManager {
if ( query.getEntityType() == null ) {
// search across all types of collections of the head-entity
- IndexScope indexScope = new IndexScopeImpl(
- cpHeadEntity.getId(),
- CpNamingUtils.ALL_TYPES);
+ IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(), CpNamingUtils.ALL_TYPES );
+
+ EntityIndex ei = managerCache.getEntityIndex( applicationScope );
- EntityIndex ei = managerCache.getEntityIndex(applicationScope);
-
- logger.debug("Searching connections from the all-types scope {}:{}",
- indexScope.getOwner().toString(),
- indexScope.getName());
+ logger.debug( "Searching connections from the all-types scope {}:{}", indexScope.getOwner().toString(),
+ indexScope.getName() );
query = adjustQuery( query );
- CandidateResults crs = ei.search(indexScope, query );
+ CandidateResults crs = ei.search( indexScope, query );
- return buildConnectionResults(query , crs, query.getConnectionType() );
+ return buildConnectionResults( query, crs, query.getConnectionType() );
}
- IndexScope indexScope = new IndexScopeImpl(
- cpHeadEntity.getId(),
- CpNamingUtils.getConnectionScopeName( query.getEntityType(), query.getConnectionType() ));
- EntityIndex ei = managerCache.getEntityIndex(applicationScope);
-
- logger.debug("Searching connections from the scope {}:{}",
- indexScope.getOwner().toString(),
- indexScope.getName());
+ IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
+ CpNamingUtils.getConnectionScopeName( query.getEntityType(), query.getConnectionType() ) );
+ EntityIndex ei = managerCache.getEntityIndex( applicationScope );
+
+ logger.debug( "Searching connections from the scope {}:{}", indexScope.getOwner().toString(),
+ indexScope.getName() );
query = adjustQuery( query );
CandidateResults crs = ei.search( indexScope, query );
- return buildConnectionResults(query , crs, query.getConnectionType() );
+ return buildConnectionResults( query, crs, query.getConnectionType() );
}
@@ -1448,47 +1353,44 @@ public class CpRelationManager implements RelationManager {
// This is fulgy to put here, but required.
if ( query.getEntityType().equals( User.ENTITY_TYPE ) && ident.isEmail() ) {
- Query newQuery = Query.fromQL(
- "select * where email='" + query.getSingleNameOrEmailIdentifier()+ "'");
+ Query newQuery =
+ Query.fromQL( "select * where email='" + query.getSingleNameOrEmailIdentifier() + "'" );
query.setRootOperand( newQuery.getRootOperand() );
}
// use the ident with the default alias. could be an email
else {
- Query newQuery = Query.fromQL(
- "select * where name='" + query.getSingleNameOrEmailIdentifier()+ "'");
+ Query newQuery =
+ Query.fromQL( "select * where name='" + query.getSingleNameOrEmailIdentifier() + "'" );
query.setRootOperand( newQuery.getRootOperand() );
}
+ }
+ else if ( query.containsSingleUuidIdentifier() ) {
- } else if ( query.containsSingleUuidIdentifier() ) {
-
- Query newQuery = Query.fromQL(
- "select * where uuid='" + query.getSingleUuidIdentifier() + "'");
+ Query newQuery = Query.fromQL( "select * where uuid='" + query.getSingleUuidIdentifier() + "'" );
query.setRootOperand( newQuery.getRootOperand() );
}
}
if ( query.isReversed() ) {
- Query.SortPredicate desc = new Query.SortPredicate(
- PROPERTY_CREATED, Query.SortDirection.DESCENDING );
+
<TRUNCATED>