You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/01/30 16:21:08 UTC
[32/34] update to master
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9fec2baa/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
index b110178,0000000..8c19989
mode 100644,000000..100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
@@@ -1,2337 -1,0 +1,2327 @@@
+/*******************************************************************************
+ * Copyright 2012 Apigee Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.usergrid.persistence.cassandra;
+
+
+import java.nio.ByteBuffer;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.CollectionRef;
+import org.apache.usergrid.persistence.ConnectedEntityRef;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.IndexBucketLocator;
+import org.apache.usergrid.persistence.PagingResultsIterator;
+import org.apache.usergrid.persistence.Query;
+import org.apache.usergrid.persistence.RelationManager;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.RoleRef;
+import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.SimpleCollectionRef;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.SimpleRoleRef;
+import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
+import org.apache.usergrid.persistence.Results.Level;
+import org.apache.usergrid.persistence.cassandra.IndexUpdate.IndexEntry;
+import org.apache.usergrid.persistence.cassandra.index.ConnectedIndexScanner;
+import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
+import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
+import org.apache.usergrid.persistence.entities.Group;
+import org.apache.usergrid.persistence.geo.CollectionGeoSearch;
+import org.apache.usergrid.persistence.geo.ConnectionGeoSearch;
+import org.apache.usergrid.persistence.geo.EntityLocationRef;
+import org.apache.usergrid.persistence.geo.model.Point;
+import org.apache.usergrid.persistence.query.ir.AllNode;
+import org.apache.usergrid.persistence.query.ir.NameIdentifierNode;
+import org.apache.usergrid.persistence.query.ir.QueryNode;
+import org.apache.usergrid.persistence.query.ir.QuerySlice;
+import org.apache.usergrid.persistence.query.ir.SearchVisitor;
+import org.apache.usergrid.persistence.query.ir.WithinNode;
+import org.apache.usergrid.persistence.query.ir.result.CollectionResultsLoaderFactory;
+import org.apache.usergrid.persistence.query.ir.result.ConnectionIndexSliceParser;
+import org.apache.usergrid.persistence.query.ir.result.ConnectionResultsLoaderFactory;
+import org.apache.usergrid.persistence.query.ir.result.ConnectionTypesIterator;
+import org.apache.usergrid.persistence.query.ir.result.EmptyIterator;
+import org.apache.usergrid.persistence.query.ir.result.GeoIterator;
+import org.apache.usergrid.persistence.query.ir.result.SliceIterator;
+import org.apache.usergrid.persistence.query.ir.result.StaticIdIterator;
+import org.apache.usergrid.persistence.query.ir.result.UUIDIndexSliceParser;
+import org.apache.usergrid.persistence.schema.CollectionInfo;
+import org.apache.usergrid.utils.IndexUtils;
+import org.apache.usergrid.utils.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+import com.yammer.metrics.annotation.Metered;
+
+import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.mutation.Mutator;
+import static java.lang.String.CASE_INSENSITIVE_ORDER;
+import static java.util.Arrays.asList;
+import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_COLLECTIONS;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_ENTITIES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTING_TYPES;
+import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_COLLECTION_NAME;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_ITEM;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_ITEM_TYPE;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_TITLE;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
+import static org.apache.usergrid.persistence.Schema.TYPE_MEMBER;
+import static org.apache.usergrid.persistence.Schema.TYPE_ROLE;
+import static org.apache.usergrid.persistence.Schema.defaultCollectionName;
+import static org.apache.usergrid.persistence.Schema.getDefaultSchema;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_ID_SETS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX_ENTRIES;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addDeleteToMutator;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.addInsertToMutator;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.batchExecute;
+import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.key;
+import static org.apache.usergrid.persistence.cassandra.CassandraService.INDEX_ENTRY_LIST_COUNT;
+import static org.apache.usergrid.persistence.cassandra.ConnectionRefImpl.CONNECTION_ENTITY_CONNECTION_TYPE;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchDeleteLocationInConnectionsIndex;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchRemoveLocationFromCollectionIndex;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInCollectionIndex;
+import static org.apache.usergrid.persistence.cassandra.GeoIndexManager.batchStoreLocationInConnectionsIndex;
+import static org.apache.usergrid.persistence.cassandra.IndexUpdate.indexValueCode;
+import static org.apache.usergrid.persistence.cassandra.IndexUpdate.toIndexableValue;
+import static org.apache.usergrid.persistence.cassandra.IndexUpdate.validIndexableValue;
+import static org.apache.usergrid.utils.ClassUtils.cast;
+import static org.apache.usergrid.utils.CompositeUtils.setGreaterThanEqualityFlag;
+import static org.apache.usergrid.utils.ConversionUtils.string;
+import static org.apache.usergrid.utils.InflectionUtils.singularize;
+import static org.apache.usergrid.utils.MapUtils.addMapSet;
+import static org.apache.usergrid.utils.UUIDUtils.getTimestampInMicros;
+import static org.apache.usergrid.utils.UUIDUtils.newTimeUUID;
+
+
+public class RelationManagerImpl implements RelationManager {
+
+ private static final Logger logger = LoggerFactory.getLogger( RelationManagerImpl.class );
+
+ private EntityManagerImpl em;
+ private CassandraService cass;
+ private UUID applicationId;
+ private EntityRef headEntity;
+ private IndexBucketLocator indexBucketLocator;
+
+ public static final StringSerializer se = new StringSerializer();
+ public static final ByteBufferSerializer be = new ByteBufferSerializer();
+ public static final UUIDSerializer ue = new UUIDSerializer();
+
+
+ public RelationManagerImpl() {
+ }
+
+
+ public RelationManagerImpl init( EntityManagerImpl em, CassandraService cass, UUID applicationId,
+ EntityRef headEntity, IndexBucketLocator indexBucketLocator ) {
+
+ Assert.notNull( em, "Entity manager cannot be null" );
+ Assert.notNull( cass, "Cassandra service cannot be null" );
+ Assert.notNull( applicationId, "Application Id cannot be null" );
+ Assert.notNull( headEntity, "Head entity cannot be null" );
+ Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" );
+ Assert.notNull( indexBucketLocator, "Index bucket locator cannot be null" );
+
+ this.em = em;
+ this.applicationId = applicationId;
+ this.cass = cass;
+ this.headEntity = headEntity;
+ this.indexBucketLocator = indexBucketLocator;
+
+ return this;
+ }
+
+
+ private RelationManagerImpl getRelationManager( EntityRef headEntity ) {
+ RelationManagerImpl rmi = new RelationManagerImpl();
+ rmi.init( em, cass, applicationId, headEntity, indexBucketLocator );
+ return rmi;
+ }
+
+
+ /** side effect: converts headEntity into an Entity if it is an EntityRef! */
+ private Entity getHeadEntity() throws Exception {
+ Entity entity = null;
+ if ( headEntity instanceof Entity ) {
+ entity = ( Entity ) headEntity;
+ }
+ else {
+ entity = em.get( headEntity );
+ headEntity = entity;
+ }
+ return entity;
+ }
+
+
+ /**
+ * Batch update collection index.
+ *
+ * @param indexUpdate The update to apply
+ * @param owner The entity that is the owner context of this entity update. Can either be an application, or
+ * another entity
+ * @param collectionName the collection name
+ *
+ * @return The indexUpdate with batch mutations
+ *
+ * @throws Exception the exception
+ */
+ @Metered(group = "core", name = "RelationManager_batchUpdateCollectionIndex")
+ public IndexUpdate batchUpdateCollectionIndex( IndexUpdate indexUpdate, EntityRef owner, String collectionName )
+ throws Exception {
+
+ logger.debug( "batchUpdateCollectionIndex" );
+
+ Entity indexedEntity = indexUpdate.getEntity();
+
+ String bucketId = indexBucketLocator
+ .getBucket( applicationId, IndexType.COLLECTION, indexedEntity.getUuid(), indexedEntity.getType(),
+ indexUpdate.getEntryName() );
+
+ // the root name without the bucket
+ // entity_id,collection_name,prop_name,
+ Object index_name = null;
+ // entity_id,collection_name,prop_name, bucketId
+ Object index_key = null;
+
+ // entity_id,collection_name,collected_entity_id,prop_name
+
+ for ( IndexEntry entry : indexUpdate.getPrevEntries() ) {
+
+ if ( entry.getValue() != null ) {
+
+ index_name = key( owner.getUuid(), collectionName, entry.getPath() );
+
+ index_key = key( index_name, bucketId );
+
+ addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, index_key, entry.getIndexComposite(),
+ indexUpdate.getTimestamp() );
+
+ if ( "location.coordinates".equals( entry.getPath() ) ) {
+ EntityLocationRef loc = new EntityLocationRef( indexUpdate.getEntity(), entry.getTimestampUuid(),
+ entry.getValue().toString() );
+ batchRemoveLocationFromCollectionIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
+ index_name, loc );
+ }
+ }
+ else {
+ logger.error( "Unexpected condition - deserialized property value is null" );
+ }
+ }
+
+ if ( ( indexUpdate.getNewEntries().size() > 0 ) && ( !indexUpdate.isMultiValue() || ( indexUpdate.isMultiValue()
+ && !indexUpdate.isRemoveListEntry() ) ) ) {
+
+ for ( IndexEntry indexEntry : indexUpdate.getNewEntries() ) {
+
+ // byte valueCode = indexEntry.getValueCode();
+
+ index_name = key( owner.getUuid(), collectionName, indexEntry.getPath() );
+
+ index_key = key( index_name, bucketId );
+
+ // int i = 0;
+
+ addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, index_key, indexEntry.getIndexComposite(),
+ null, indexUpdate.getTimestamp() );
+
+ if ( "location.coordinates".equals( indexEntry.getPath() ) ) {
+ EntityLocationRef loc =
+ new EntityLocationRef( indexUpdate.getEntity(), indexEntry.getTimestampUuid(),
+ indexEntry.getValue().toString() );
+ batchStoreLocationInCollectionIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
+ index_name, indexedEntity.getUuid(), loc );
+ }
+
+ // i++;
+ }
+ }
+
+ for ( String index : indexUpdate.getIndexesSet() ) {
+ addInsertToMutator( indexUpdate.getBatch(), ENTITY_DICTIONARIES,
+ key( owner.getUuid(), collectionName, Schema.DICTIONARY_INDEXES ), index, null,
+ indexUpdate.getTimestamp() );
+ }
+
+ return indexUpdate;
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_getCollectionIndexes")
+ public Set<String> getCollectionIndexes( String collectionName ) throws Exception {
+
+ // TODO TN, read all buckets here
+ List<HColumn<String, String>> results =
+ cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES,
+ key( headEntity.getUuid(), collectionName, Schema.DICTIONARY_INDEXES ), se, se );
+ Set<String> indexes = new TreeSet<String>();
+ if ( results != null ) {
+ for ( HColumn<String, String> column : results ) {
+ String propertyName = column.getName();
+ if ( !propertyName.endsWith( ".keywords" ) ) {
+ indexes.add( column.getName() );
+ }
+ }
+ }
+ return indexes;
+ }
+
+
+ public Map<EntityRef, Set<String>> getContainingCollections() throws Exception {
+ Map<EntityRef, Set<String>> results = new LinkedHashMap<EntityRef, Set<String>>();
+
+ Keyspace ko = cass.getApplicationKeyspace( applicationId );
+
+ // TODO TN get all buckets here
+
+ List<HColumn<DynamicComposite, ByteBuffer>> containers = cass.getAllColumns( ko, ENTITY_COMPOSITE_DICTIONARIES,
+ key( headEntity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ), EntityManagerFactoryImpl.dce, be );
+ if ( containers != null ) {
+ for ( HColumn<DynamicComposite, ByteBuffer> container : containers ) {
+ DynamicComposite composite = container.getName();
+ if ( composite != null ) {
+ String ownerType = ( String ) composite.get( 0 );
+ String collectionName = ( String ) composite.get( 1 );
+ UUID ownerId = ( UUID ) composite.get( 2 );
+ addMapSet( results, new SimpleEntityRef( ownerType, ownerId ), collectionName );
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( " {} ( {} ) is in collection {} ( {} ).", new Object[] {
+ headEntity.getType(), headEntity.getUuid(), ownerType, collectionName, ownerId
+ } );
+ }
+ }
+ }
+ }
+ EntityRef applicationRef = new SimpleEntityRef( TYPE_APPLICATION, applicationId );
+ if ( !results.containsKey( applicationRef ) ) {
+ addMapSet( results, applicationRef, defaultCollectionName( headEntity.getType() ) );
+ }
+ return results;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public void batchCreateCollectionMembership( Mutator<ByteBuffer> batch, EntityRef ownerRef, String collectionName,
+ EntityRef itemRef, EntityRef membershipRef, UUID timestampUuid )
+ throws Exception {
+
+ long timestamp = getTimestampInMicros( timestampUuid );
+
+ if ( membershipRef == null ) {
+ membershipRef = new SimpleCollectionRef( ownerRef, collectionName, itemRef );
+ }
+
+ Map<String, Object> properties = new TreeMap<String, Object>( CASE_INSENSITIVE_ORDER );
+ properties.put( PROPERTY_TYPE, membershipRef.getType() );
+ properties.put( PROPERTY_COLLECTION_NAME, collectionName );
+ properties.put( PROPERTY_ITEM, itemRef.getUuid() );
+ properties.put( PROPERTY_ITEM_TYPE, itemRef.getType() );
+
+ em.batchCreate( batch, membershipRef.getType(), null, properties, membershipRef.getUuid(), timestampUuid );
+
+ addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
+ key( membershipRef.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ),
+ asList( ownerRef.getType(), collectionName, ownerRef.getUuid() ), membershipRef.getUuid(), timestamp );
+ }
+
+
+ /**
+ * Batch add to collection.
+ *
+ * @param batch the batch
+ * @param collectionName the collection name
+ * @param entity The entity to add to the batch
+ * @param timestampUuid The timestamp of this update in a time uuid
+ *
+ * @return batch
+ *
+ * @throws Exception the exception
+ */
+ public Mutator<ByteBuffer> batchAddToCollection( Mutator<ByteBuffer> batch, String collectionName, Entity entity,
+ UUID timestampUuid ) throws Exception {
+ List<UUID> ids = new ArrayList<UUID>( 1 );
+ ids.add( headEntity.getUuid() );
+ return batchAddToCollections( batch, headEntity.getType(), ids, collectionName, entity, timestampUuid );
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Metered(group = "core", name = "RelationManager_batchAddToCollections")
+ public Mutator<ByteBuffer> batchAddToCollections( Mutator<ByteBuffer> batch, String ownerType, List<UUID> ownerIds,
+ String collectionName, Entity entity, UUID timestampUuid )
+ throws Exception {
+
+ long timestamp = getTimestampInMicros( timestampUuid );
+
+ if ( Schema.isAssociatedEntityType( entity.getType() ) ) {
+ logger.error( "Cant add an extended type to any collection", new Throwable() );
+ return batch;
+ }
+
+ Map<UUID, CollectionRef> membershipRefs = new LinkedHashMap<UUID, CollectionRef>();
+
+ for ( UUID ownerId : ownerIds ) {
+
+ CollectionRef membershipRef =
+ new SimpleCollectionRef( new SimpleEntityRef( ownerType, ownerId ), collectionName, entity );
+
+ membershipRefs.put( ownerId, membershipRef );
+
+ // get the bucket this entityId needs to be inserted into
+ String bucketId = indexBucketLocator
+ .getBucket( applicationId, IndexType.COLLECTION, entity.getUuid(), collectionName );
+
+ Object collections_key = key( ownerId, Schema.DICTIONARY_COLLECTIONS, collectionName, bucketId );
+
+ // Insert in main collection
+
+ addInsertToMutator( batch, ENTITY_ID_SETS, collections_key, entity.getUuid(), membershipRef.getUuid(),
+ timestamp );
+
+ addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
+ key( entity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ),
+ asList( ownerType, collectionName, ownerId ), membershipRef.getUuid(), timestamp );
+ }
+
+
+ Schema schema = getDefaultSchema();
+
+ // Add property indexes
+ for ( String propertyName : entity.getProperties().keySet() ) {
+ boolean indexed_property = schema.isPropertyIndexed( entity.getType(), propertyName );
+ if ( indexed_property ) {
+ boolean collection_indexes_property =
+ schema.isPropertyIndexedInCollection( ownerType, collectionName, propertyName );
+ boolean item_schema_has_property = schema.hasProperty( entity.getType(), propertyName );
+ boolean fulltext_indexed = schema.isPropertyFulltextIndexed( entity.getType(), propertyName );
+ if ( collection_indexes_property || !item_schema_has_property ) {
+ Object propertyValue = entity.getProperty( propertyName );
+ IndexUpdate indexUpdate =
+ batchStartIndexUpdate( batch, entity, propertyName, propertyValue, timestampUuid,
+ item_schema_has_property, false, false, fulltext_indexed, true );
+ for ( UUID ownerId : ownerIds ) {
+ EntityRef owner = new SimpleEntityRef( ownerType, ownerId );
+ batchUpdateCollectionIndex( indexUpdate, owner, collectionName );
+ }
+ }
+ }
+ }
+
+ // Add set property indexes
+
+ Set<String> dictionaryNames = em.getDictionaryNames( entity );
+
+ for ( String dictionaryName : dictionaryNames ) {
+ boolean has_dictionary = schema.hasDictionary( entity.getType(), dictionaryName );
+ boolean dictionary_indexed =
+ schema.isDictionaryIndexedInCollection( ownerType, collectionName, dictionaryName );
+
+ if ( dictionary_indexed || !has_dictionary ) {
+ Set<Object> elementValues = em.getDictionaryAsSet( entity, dictionaryName );
+ for ( Object elementValue : elementValues ) {
+ IndexUpdate indexUpdate =
+ batchStartIndexUpdate( batch, entity, dictionaryName, elementValue, timestampUuid,
+ has_dictionary, true, false, false, true );
+ for ( UUID ownerId : ownerIds ) {
+ EntityRef owner = new SimpleEntityRef( ownerType, ownerId );
+ batchUpdateCollectionIndex( indexUpdate, owner, collectionName );
+ }
+ }
+ }
+ }
+
+ for ( UUID ownerId : ownerIds ) {
+ EntityRef owner = new SimpleEntityRef( ownerType, ownerId );
+ batchCreateCollectionMembership( batch, owner, collectionName, entity, membershipRefs.get( ownerId ),
+ timestampUuid );
+ }
+
+ return batch;
+ }
+
+
+ /**
+ * Batch remove from collection.
+ * <p/>
+ * * Batch add to collection.
+ *
+ * @param batch the batch
+ * @param collectionName the collection name
+ * @param entity The entity to add to the batch
+ * @param timestampUuid The timestamp of this update in a time uuid
+ *
+ * @return The mutation with the delete operations added
+ *
+ * @throws Exception the exception
+ */
+ public Mutator<ByteBuffer> batchRemoveFromCollection( Mutator<ByteBuffer> batch, String collectionName,
+ Entity entity, UUID timestampUuid ) throws Exception {
+ return this.batchRemoveFromCollection( batch, collectionName, entity, false, timestampUuid );
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Metered(group = "core", name = "RelationManager_batchRemoveFromCollection")
+ public Mutator<ByteBuffer> batchRemoveFromCollection( Mutator<ByteBuffer> batch, String collectionName,
+ Entity entity, boolean force, UUID timestampUuid )
+ throws Exception {
+
+ long timestamp = getTimestampInMicros( timestampUuid );
+
+ if ( !force && headEntity.getUuid().equals( applicationId ) ) {
+ // Can't remove entities from root collections
+ return batch;
+ }
+
+ Object collections_key = key( headEntity.getUuid(), Schema.DICTIONARY_COLLECTIONS, collectionName,
+ indexBucketLocator.getBucket( applicationId, IndexType.COLLECTION, entity.getUuid(), collectionName ) );
+
+ // Remove property indexes
+
+ Schema schema = getDefaultSchema();
+ for ( String propertyName : entity.getProperties().keySet() ) {
+ boolean collection_indexes_property =
+ schema.isPropertyIndexedInCollection( headEntity.getType(), collectionName, propertyName );
+ boolean item_schema_has_property = schema.hasProperty( entity.getType(), propertyName );
+ boolean fulltext_indexed = schema.isPropertyFulltextIndexed( entity.getType(), propertyName );
+ if ( collection_indexes_property || !item_schema_has_property ) {
+ IndexUpdate indexUpdate = batchStartIndexUpdate( batch, entity, propertyName, null, timestampUuid,
+ item_schema_has_property, false, false, fulltext_indexed );
+ batchUpdateCollectionIndex( indexUpdate, headEntity, collectionName );
+ }
+ }
+
+ // Remove set indexes
+
+ Set<String> dictionaryNames = em.getDictionaryNames( entity );
+
+ for ( String dictionaryName : dictionaryNames ) {
+ boolean has_dictionary = schema.hasDictionary( entity.getType(), dictionaryName );
+ boolean dictionary_indexed =
+ schema.isDictionaryIndexedInCollection( headEntity.getType(), collectionName, dictionaryName );
+
+ if ( dictionary_indexed || !has_dictionary ) {
+ Set<Object> elementValues = em.getDictionaryAsSet( entity, dictionaryName );
+ for ( Object elementValue : elementValues ) {
+ IndexUpdate indexUpdate =
+ batchStartIndexUpdate( batch, entity, dictionaryName, elementValue, timestampUuid,
+ has_dictionary, true, true, false );
+ batchUpdateCollectionIndex( indexUpdate, headEntity, collectionName );
+ }
+ }
+ }
+
+ // Delete actual property
+
+ addDeleteToMutator( batch, ENTITY_ID_SETS, collections_key, entity.getUuid(), timestamp );
+
+ addDeleteToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
+ key( entity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ),
+ asList( headEntity.getType(), collectionName, headEntity.getUuid() ), timestamp );
+
+ if ( !headEntity.getType().equalsIgnoreCase( TYPE_APPLICATION ) && !Schema
+ .isAssociatedEntityType( entity.getType() ) ) {
+ em.deleteEntity( new SimpleCollectionRef( headEntity, collectionName, entity ).getUuid() );
+ }
+
+ return batch;
+ }
+
+
+ @Metered(group = "core", name = "RelationManager_batchDeleteConnectionIndexEntries")
+ public Mutator<ByteBuffer> batchDeleteConnectionIndexEntries( IndexUpdate indexUpdate, IndexEntry entry,
+ ConnectionRefImpl connection, UUID[] index_keys )
+ throws Exception {
+
+ // entity_id,prop_name
+ Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(),
+ indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION, index_keys[ConnectionRefImpl.ALL],
+ entry.getPath() ) );
+
+ // entity_id,entity_type,prop_name
+ Object entity_type_prop_index_key =
+ key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
+ indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
+ index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], entry.getPath() ) );
+
+ // entity_id,connection_type,prop_name
+ Object connection_type_prop_index_key =
+ key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
+ indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
+ index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], entry.getPath() ) );
+
+ // entity_id,connection_type,entity_type,prop_name
+ Object connection_type_and_entity_type_prop_index_key =
+ key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
+ indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
+ index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], entry.getPath() ) );
+
+ // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
+ addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
+ entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType(),
+ connection.getConnectedEntityType() ), indexUpdate.getTimestamp() );
+
+ // composite(property_value,connected_entity_id,connection_type,entry_timestamp)
+ addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key,
+ entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType() ),
+ indexUpdate.getTimestamp() );
+
+ // composite(property_value,connected_entity_id,entity_type,entry_timestamp)
+ addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key,
+ entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectedEntityType() ),
+ indexUpdate.getTimestamp() );
+
+ // composite(property_value,connected_entity_id,entry_timestamp)
+ addDeleteToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key,
+ entry.getIndexComposite( connection.getConnectedEntityId() ), indexUpdate.getTimestamp() );
+
+ return indexUpdate.getBatch();
+ }
+
+
+ @Metered(group = "core", name = "RelationManager_batchAddConnectionIndexEntries")
+ public Mutator<ByteBuffer> batchAddConnectionIndexEntries( IndexUpdate indexUpdate, IndexEntry entry,
+ ConnectionRefImpl connection, UUID[] index_keys ) {
+
+ // entity_id,prop_name
+ Object property_index_key = key( index_keys[ConnectionRefImpl.ALL], INDEX_CONNECTIONS, entry.getPath(),
+ indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION, index_keys[ConnectionRefImpl.ALL],
+ entry.getPath() ) );
+
+ // entity_id,entity_type,prop_name
+ Object entity_type_prop_index_key =
+ key( index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
+ indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
+ index_keys[ConnectionRefImpl.BY_ENTITY_TYPE], entry.getPath() ) );
+
+ // entity_id,connection_type,prop_name
+ Object connection_type_prop_index_key =
+ key( index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], INDEX_CONNECTIONS, entry.getPath(),
+ indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
+ index_keys[ConnectionRefImpl.BY_CONNECTION_TYPE], entry.getPath() ) );
+
+ // entity_id,connection_type,entity_type,prop_name
+ Object connection_type_and_entity_type_prop_index_key =
+ key( index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], INDEX_CONNECTIONS, entry.getPath(),
+ indexBucketLocator.getBucket( applicationId, IndexType.CONNECTION,
+ index_keys[ConnectionRefImpl.BY_CONNECTION_AND_ENTITY_TYPE], entry.getPath() ) );
+
+ // composite(property_value,connected_entity_id,connection_type,entity_type,entry_timestamp)
+ addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, property_index_key,
+ entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType(),
+ connection.getConnectedEntityType() ), connection.getUuid(), indexUpdate.getTimestamp() );
+
+ // composite(property_value,connected_entity_id,connection_type,entry_timestamp)
+ addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, entity_type_prop_index_key,
+ entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectionType() ),
+ connection.getUuid(), indexUpdate.getTimestamp() );
+
+ // composite(property_value,connected_entity_id,entity_type,entry_timestamp)
+ addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_prop_index_key,
+ entry.getIndexComposite( connection.getConnectedEntityId(), connection.getConnectedEntityType() ),
+ connection.getUuid(), indexUpdate.getTimestamp() );
+
+ // composite(property_value,connected_entity_id,entry_timestamp)
+ addInsertToMutator( indexUpdate.getBatch(), ENTITY_INDEX, connection_type_and_entity_type_prop_index_key,
+ entry.getIndexComposite( connection.getConnectedEntityId() ), connection.getUuid(),
+ indexUpdate.getTimestamp() );
+
+ return indexUpdate.getBatch();
+ }
+
+
+ /**
+ * Batch update connection index.
+ *
+ * @param indexUpdate The update operation to perform
+ * @param connection The connection to update
+ *
+ * @return The index with the batch mutation udpated
+ *
+ * @throws Exception the exception
+ */
+ @Metered(group = "core", name = "RelationManager_batchUpdateConnectionIndex")
+ public IndexUpdate batchUpdateConnectionIndex( IndexUpdate indexUpdate, ConnectionRefImpl connection )
+ throws Exception {
+
+ // UUID connection_id = connection.getUuid();
+
+ UUID[] index_keys = connection.getIndexIds();
+
+ // Delete all matching entries from entry list
+ for ( IndexEntry entry : indexUpdate.getPrevEntries() ) {
+
+ if ( entry.getValue() != null ) {
+
+ batchDeleteConnectionIndexEntries( indexUpdate, entry, connection, index_keys );
+
+ if ( "location.coordinates".equals( entry.getPath() ) ) {
+ EntityLocationRef loc = new EntityLocationRef( indexUpdate.getEntity(), entry.getTimestampUuid(),
+ entry.getValue().toString() );
+ batchDeleteLocationInConnectionsIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
+ index_keys, entry.getPath(), loc );
+ }
+ }
+ else {
+ logger.error( "Unexpected condition - deserialized property value is null" );
+ }
+ }
+
+ if ( ( indexUpdate.getNewEntries().size() > 0 ) && ( !indexUpdate.isMultiValue() || ( indexUpdate.isMultiValue()
+ && !indexUpdate.isRemoveListEntry() ) ) ) {
+
+ for ( IndexEntry indexEntry : indexUpdate.getNewEntries() ) {
+
+ batchAddConnectionIndexEntries( indexUpdate, indexEntry, connection, index_keys );
+
+ if ( "location.coordinates".equals( indexEntry.getPath() ) ) {
+ EntityLocationRef loc =
+ new EntityLocationRef( indexUpdate.getEntity(), indexEntry.getTimestampUuid(),
+ indexEntry.getValue().toString() );
+ batchStoreLocationInConnectionsIndex( indexUpdate.getBatch(), indexBucketLocator, applicationId,
+ index_keys, indexEntry.getPath(), loc );
+ }
+ }
+
+ /*
+ * addInsertToMutator(batch, EntityCF.SETS, key(connection_id,
+ * Schema.INDEXES_SET), indexEntry.getKey(), null, false, timestamp); }
+ *
+ * addInsertToMutator(batch, EntityCF.SETS, key(connection_id,
+ * Schema.INDEXES_SET), entryName, null, false, timestamp);
+ */
+ }
+
+ for ( String index : indexUpdate.getIndexesSet() ) {
+ addInsertToMutator( indexUpdate.getBatch(), ENTITY_DICTIONARIES,
+ key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ), index, null,
+ indexUpdate.getTimestamp() );
+ }
+
+ return indexUpdate;
+ }
+
+
+ public Set<String> getConnectionIndexes( ConnectionRefImpl connection ) throws Exception {
+ List<HColumn<String, String>> results =
+ cass.getAllColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_DICTIONARIES,
+ key( connection.getConnectingIndexId(), Schema.DICTIONARY_INDEXES ), se, se );
+ Set<String> indexes = new TreeSet<String>();
+ if ( results != null ) {
+ for ( HColumn<String, String> column : results ) {
+ String propertyName = column.getName();
+ if ( !propertyName.endsWith( ".keywords" ) ) {
+ indexes.add( column.getName() );
+ }
+ }
+ }
+ return indexes;
+ }
+
+
+ /**
+ * Batch update backward connections property indexes.
+ *
+ * @param indexUpdate The update to run for incoming connections
+ *
+ * @return The index update to run
+ *
+ * @throws Exception the exception
+ */
+ @Metered(group = "core", name = "RelationManager_batchUpdateBackwardConnectionsPropertyIndexes")
+ public IndexUpdate batchUpdateBackwardConnectionsPropertyIndexes( IndexUpdate indexUpdate ) throws Exception {
+
+ logger.debug( "batchUpdateBackwordConnectionsPropertyIndexes" );
+
+ boolean entitySchemaHasProperty = indexUpdate.isSchemaHasProperty();
+
+ if ( entitySchemaHasProperty ) {
+ if ( !getDefaultSchema()
+ .isPropertyIndexed( indexUpdate.getEntity().getType(), indexUpdate.getEntryName() ) ) {
+ return indexUpdate;
+ }
+ }
+
+
+ return doBackwardConnectionsUpdate( indexUpdate );
+ }
+
+
+ /**
+ * Search each reverse connection type in the graph for connections. If one is found, update the index
+ * appropriately
+ *
+ * @param indexUpdate The index update to use
+ *
+ * @return The updated index update
+ */
+ private IndexUpdate doBackwardConnectionsUpdate( IndexUpdate indexUpdate ) throws Exception {
+ final Entity targetEntity = indexUpdate.getEntity();
+
+ final ConnectionTypesIterator connectionTypes =
+ new ConnectionTypesIterator( cass, applicationId, targetEntity.getUuid(), false, 100 );
+
+ for ( String connectionType : connectionTypes ) {
+
+ PagingResultsIterator itr = getReversedConnectionsIterator( targetEntity, connectionType );
+
+ for ( Object connection : itr ) {
+
+ final ConnectedEntityRef sourceEntity = ( ConnectedEntityRef ) connection;
+
+ //we need to create a connection ref from the source entity (found via reverse edge) to the entity
+ // we're about to update. This is the index that needs updated
+ final ConnectionRefImpl connectionRef =
+ new ConnectionRefImpl( sourceEntity, connectionType, indexUpdate.getEntity() );
+
+ batchUpdateConnectionIndex( indexUpdate, connectionRef );
+ }
+ }
+
+ return indexUpdate;
+ }
+
+
+ /**
+ * Get a paging results iterator. Should return an iterator for all results
+ *
+ * @param targetEntity The target entity search connections from
+ *
+ * @return connectionType The name of the edges to search
+ */
+ private PagingResultsIterator getReversedConnectionsIterator( EntityRef targetEntity, String connectionType )
+ throws Exception {
+ return new PagingResultsIterator( getConnectingEntities( targetEntity, connectionType, null, Level.REFS ) );
+ }
+
+
+ /**
+ * Batch update backward connections set indexes.
+ *
+ * @param indexUpdate The index to update in the dictionary
+ *
+ * @return The index update
+ *
+ * @throws Exception the exception
+ */
+ @Metered(group = "core", name = "RelationManager_batchUpdateBackwardConnectionsDictionaryIndexes")
+ public IndexUpdate batchUpdateBackwardConnectionsDictionaryIndexes( IndexUpdate indexUpdate ) throws Exception {
+
+ logger.debug( "batchUpdateBackwardConnectionsListIndexes" );
+
+ boolean entityHasDictionary = getDefaultSchema()
+ .isDictionaryIndexedInConnections( indexUpdate.getEntity().getType(), indexUpdate.getEntryName() );
+
+ if ( !entityHasDictionary ) {
+ return indexUpdate;
+ }
+
+
+ return doBackwardConnectionsUpdate( indexUpdate );
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Metered(group = "core", name = "RelationManager_batchUpdateEntityConnection")
+ public Mutator<ByteBuffer> batchUpdateEntityConnection( Mutator<ByteBuffer> batch, boolean disconnect,
+ ConnectionRefImpl connection, UUID timestampUuid )
+ throws Exception {
+
+ long timestamp = getTimestampInMicros( timestampUuid );
+
+ Entity connectedEntity = em.get( connection.getConnectedEntityId() );
+
+ if ( connectedEntity == null ) {
+ return batch;
+ }
+
+ // Create connection for requested params
+
+
+ if ( disconnect ) {
+ addDeleteToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
+ key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
+ connection.getConnectionType() ),
+ asList( connection.getConnectedEntityId(), connection.getConnectedEntityType() ), timestamp );
+
+ addDeleteToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
+ key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
+ connection.getConnectionType() ),
+ asList( connection.getConnectingEntityId(), connection.getConnectingEntityType() ), timestamp );
+
+ // delete the connection path if there will be no connections left
+
+ boolean delete = true;
+
+ //check out outbound edges of the given type. If we have more than the 1 specified,
+ // we shouldn't delete the connection types from our outbound index
+ PagingResultsIterator itr = new PagingResultsIterator(
+ getConnectedEntities( connection.getConnectingEntity(), connection.getConnectionType(), null,
+ Level.REFS ) );
+
+ ConnectedEntityRef c;
+
+ while ( itr.hasNext() ) {
+ c = ( ConnectedEntityRef ) itr.next();
+
+ if ( !connection.getConnectedEntityId().equals( c.getUuid() ) ) {
+ delete = false;
+ break;
+ }
+
+
+ // c = (ConnectionRef) itr.next();
+ // if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
+ // .getConnectionType()) &&!c.getConnectedEntity().getUuid().equals(connection.getConnectedEntity()
+ // .getUuid())) {
+ // delete = false;
+ // break;
+ // }
+
+ }
+ // for (ConnectionRefImpl c : getConnectionsWithEntity(connection.getConnectingEntityId())) {
+ // if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
+ // .getConnectionType())) {
+ // if (!c.getConnectedEntity().getUuid().equals(connection.getConnectedEntity().getUuid())) {
+ // delete = false;
+ // break;
+ // }
+ // }
+ // }
+ if ( delete ) {
+ addDeleteToMutator( batch, ENTITY_DICTIONARIES,
+ key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
+ connection.getConnectionType(), timestamp );
+ }
+
+ // delete the connection path if there will be no connections left
+ delete = true;
+
+
+ //check out inbound edges of the given type. If we have more than the 1 specified,
+ // we shouldn't delete the connection types from our outbound index
+ itr = new PagingResultsIterator(
+ getConnectingEntities( connection.getConnectingEntity(), connection.getConnectionType(), null,
+ Level.REFS ) );
+
+ while ( itr.hasNext() ) {
+ c = ( ConnectedEntityRef ) itr.next();
+
+ if ( !connection.getConnectedEntityId().equals( c.getUuid() ) ) {
+ delete = false;
+ break;
+ }
+ // if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
+ // .getConnectionType()) && !c.getConnectingEntity().getUuid().equals(connection.getConnectingEntity
+ // ().getUuid())) {
+ // delete = false;
+ // break;
+ // }
+
+ }
+
+ // for (ConnectionRefImpl c : getConnectionsWithEntity(connection.getConnectedEntityId())) {
+ // if (c.getConnectedEntity().getConnectionType().equals(connection.getConnectedEntity()
+ // .getConnectionType())) {
+ // if (!c.getConnectingEntity().getUuid().equals(connection.getConnectingEntity().getUuid())) {
+ // delete = false;
+ // break;
+ // }
+ // }
+ // }
+ if ( delete ) {
+ addDeleteToMutator( batch, ENTITY_DICTIONARIES,
+ key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
+ connection.getConnectionType(), timestamp );
+ }
+ }
+ else {
+ addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
+ key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_ENTITIES,
+ connection.getConnectionType() ),
+ asList( connection.getConnectedEntityId(), connection.getConnectedEntityType() ), timestamp,
+ timestamp );
+
+ addInsertToMutator( batch, ENTITY_COMPOSITE_DICTIONARIES,
+ key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
+ connection.getConnectionType() ),
+ asList( connection.getConnectingEntityId(), connection.getConnectingEntityType() ), timestamp,
+ timestamp );
+
+ // Add connection type to connections set
+ addInsertToMutator( batch, ENTITY_DICTIONARIES,
+ key( connection.getConnectingEntityId(), DICTIONARY_CONNECTED_TYPES ),
+ connection.getConnectionType(), null, timestamp );
+
+ // Add connection type to connections set
+ addInsertToMutator( batch, ENTITY_DICTIONARIES,
+ key( connection.getConnectedEntityId(), DICTIONARY_CONNECTING_TYPES ),
+ connection.getConnectionType(), null, timestamp );
+ }
+
+ // Add property indexes
+
+ // Iterate though all the properties of the connected entity
+
+ Schema schema = getDefaultSchema();
+ for ( String propertyName : connectedEntity.getProperties().keySet() ) {
+ Object propertyValue = connectedEntity.getProperties().get( propertyName );
+
+ boolean indexed = schema.isPropertyIndexed( connectedEntity.getType(), propertyName );
+
+ boolean connection_indexes_property = schema.isPropertyIndexed( connectedEntity.getType(), propertyName );
+ boolean item_schema_has_property = schema.hasProperty( connectedEntity.getType(), propertyName );
+ boolean fulltext_indexed = schema.isPropertyFulltextIndexed( connectedEntity.getType(), propertyName );
+ // For each property, if the schema says it's indexed, update its
+ // index
+
+ if ( indexed && ( connection_indexes_property || !item_schema_has_property ) ) {
+ IndexUpdate indexUpdate =
+ batchStartIndexUpdate( batch, connectedEntity, propertyName, disconnect ? null : propertyValue,
+ timestampUuid, item_schema_has_property, false, false, fulltext_indexed );
+ batchUpdateConnectionIndex( indexUpdate, connection );
+ }
+ }
+
+ // Add indexes for the connected entity's list properties
+
+ // Get the names of the list properties in the connected entity
+ Set<String> dictionaryNames = em.getDictionaryNames( connectedEntity );
+
+ // For each list property, get the values in the list and
+ // update the index with those values
+
+ for ( String dictionaryName : dictionaryNames ) {
+ boolean has_dictionary = schema.hasDictionary( connectedEntity.getType(), dictionaryName );
+ boolean dictionary_indexed =
+ schema.isDictionaryIndexedInConnections( connectedEntity.getType(), dictionaryName );
+
+ if ( dictionary_indexed || !has_dictionary ) {
+ Set<Object> elementValues = em.getDictionaryAsSet( connectedEntity, dictionaryName );
+ for ( Object elementValue : elementValues ) {
+ IndexUpdate indexUpdate =
+ batchStartIndexUpdate( batch, connectedEntity, dictionaryName, elementValue, timestampUuid,
+ has_dictionary, true, disconnect, false );
+ batchUpdateConnectionIndex( indexUpdate, connection );
+ }
+ }
+ }
+
+ return batch;
+ }
+
+
+ public void updateEntityConnection( boolean disconnect, ConnectionRefImpl connection ) throws Exception {
+
+ UUID timestampUuid = newTimeUUID();
+ Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+
+ // Make or break the connection
+
+ batchUpdateEntityConnection( batch, disconnect, connection, timestampUuid );
+
+ // Make or break a connection from the connecting entity
+ // to the connection itself
+
+ ConnectionRefImpl loopback = connection.getConnectionToConnectionEntity();
+ if ( !disconnect ) {
+ em.insertEntity( CONNECTION_ENTITY_CONNECTION_TYPE, loopback.getConnectedEntityId() );
+ }
+
+ batchUpdateEntityConnection( batch, disconnect, loopback, timestampUuid );
+
+ batchExecute( batch, CassandraService.RETRY_COUNT );
+ }
+
+
+ @Metered(group = "core", name = "RelationManager_batchDisconnect")
+ public void batchDisconnect( Mutator<ByteBuffer> batch, UUID timestampUuid ) throws Exception {
+
+
+ PagingResultsIterator itr =
+ new PagingResultsIterator( getConnectingEntities( headEntity, null, null, Level.REFS ) );
+
+ ConnectionRefImpl connection;
+
+ while ( itr.hasNext() ) {
+ connection = ( ConnectionRefImpl ) itr.next();
+
+ batchUpdateEntityConnection( batch, true, connection, timestampUuid );
+ }
+ //
+ // List<ConnectionRefImpl> connections = getConnectionsWithEntity(headEntity.getUuid());
+ // if (connections != null) {
+ // for (ConnectionRefImpl connection : connections) {
+ // batchUpdateEntityConnection(batch, true, connection, timestampUuid);
+ // }
+ // }
+ }
+
+
+ public IndexUpdate batchStartIndexUpdate( Mutator<ByteBuffer> batch, Entity entity, String entryName,
+ Object entryValue, UUID timestampUuid, boolean schemaHasProperty,
+ boolean isMultiValue, boolean removeListEntry, boolean fulltextIndexed )
+ throws Exception {
+ return batchStartIndexUpdate( batch, entity, entryName, entryValue, timestampUuid, schemaHasProperty,
+ isMultiValue, removeListEntry, fulltextIndexed, false );
+ }
+
+
+ @Metered(group = "core", name = "RelationManager_batchStartIndexUpdate")
+ public IndexUpdate batchStartIndexUpdate( Mutator<ByteBuffer> batch, Entity entity, String entryName,
+ Object entryValue, UUID timestampUuid, boolean schemaHasProperty,
+ boolean isMultiValue, boolean removeListEntry, boolean fulltextIndexed,
+ boolean skipRead ) throws Exception {
+
+ long timestamp = getTimestampInMicros( timestampUuid );
+
+ IndexUpdate indexUpdate =
+ new IndexUpdate( batch, entity, entryName, entryValue, schemaHasProperty, isMultiValue, removeListEntry,
+ timestampUuid );
+
+ // entryName = entryName.toLowerCase();
+
+ // entity_id,connection_type,connected_entity_id,prop_name
+
+ if ( !skipRead ) {
+
+ List<HColumn<ByteBuffer, ByteBuffer>> entries = null;
+
+ if ( isMultiValue && validIndexableValue( entryValue ) ) {
+ entries = cass.getColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_INDEX_ENTRIES,
+ entity.getUuid(),
+ new DynamicComposite( entryName, indexValueCode( entryValue ), toIndexableValue( entryValue ) ),
+ setGreaterThanEqualityFlag( new DynamicComposite( entryName, indexValueCode( entryValue ),
+ toIndexableValue( entryValue ) ) ), INDEX_ENTRY_LIST_COUNT, false );
+ }
+ else {
+ entries = cass.getColumns( cass.getApplicationKeyspace( applicationId ), ENTITY_INDEX_ENTRIES,
+ entity.getUuid(), new DynamicComposite( entryName ),
+ setGreaterThanEqualityFlag( new DynamicComposite( entryName ) ), INDEX_ENTRY_LIST_COUNT,
+ false );
+ }
+
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Found {} previous index entries for {} of entity {}", new Object[] {
+ entries.size(), entryName, entity.getUuid()
+ } );
+ }
+
+ // Delete all matching entries from entry list
+ for ( HColumn<ByteBuffer, ByteBuffer> entry : entries ) {
+ UUID prev_timestamp = null;
+ Object prev_value = null;
+ String prev_obj_path = null;
+
+ // new format:
+ // composite(entryName,
+ // value_code,prev_value,prev_timestamp,prev_obj_path) = null
+ DynamicComposite composite = DynamicComposite.fromByteBuffer( entry.getName().duplicate() );
+ prev_value = composite.get( 2 );
+ prev_timestamp = ( UUID ) composite.get( 3 );
+ if ( composite.size() > 4 ) {
+ prev_obj_path = ( String ) composite.get( 4 );
+ }
+
+ if ( prev_value != null ) {
+
+ String entryPath = entryName;
+ if ( ( prev_obj_path != null ) && ( prev_obj_path.length() > 0 ) ) {
+ entryPath = entryName + "." + prev_obj_path;
+ }
+
+ indexUpdate.addPrevEntry( entryPath, prev_value, prev_timestamp, entry.getName().duplicate() );
+
+ // composite(property_value,connected_entity_id,entry_timestamp)
+ // addDeleteToMutator(batch, ENTITY_INDEX_ENTRIES,
+ // entity.getUuid(), entry.getName(), timestamp);
+
+ }
+ else {
+ logger.error( "Unexpected condition - deserialized property value is null" );
+ }
+ }
+ }
+
+ if ( !isMultiValue || ( isMultiValue && !removeListEntry ) ) {
+
+ List<Map.Entry<String, Object>> list = IndexUtils.getKeyValueList( entryName, entryValue, fulltextIndexed );
+
+ if ( entryName.equalsIgnoreCase( "location" ) && ( entryValue instanceof Map ) ) {
+ @SuppressWarnings("rawtypes") double latitude =
+ MapUtils.getDoubleValue( ( Map ) entryValue, "latitude" );
+ @SuppressWarnings("rawtypes") double longitude =
+ MapUtils.getDoubleValue( ( Map ) entryValue, "longitude" );
+ list.add( new AbstractMap.SimpleEntry<String, Object>( "location.coordinates",
+ latitude + "," + longitude ) );
+ }
+
+ for ( Map.Entry<String, Object> indexEntry : list ) {
+
+ if ( validIndexableValue( indexEntry.getValue() ) ) {
+ indexUpdate.addNewEntry( indexEntry.getKey(), toIndexableValue( indexEntry.getValue() ) );
+ }
+ }
+
+ if ( isMultiValue ) {
+ addInsertToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(),
+ asList( entryName, indexValueCode( entryValue ), toIndexableValue( entryValue ),
+ indexUpdate.getTimestampUuid() ), null, timestamp );
+ }
+ else {
+ // int i = 0;
+
+ for ( Map.Entry<String, Object> indexEntry : list ) {
+
+ String name = indexEntry.getKey();
+ if ( name.startsWith( entryName + "." ) ) {
+ name = name.substring( entryName.length() + 1 );
+ }
+ else if ( name.startsWith( entryName ) ) {
+ name = name.substring( entryName.length() );
+ }
+
+ byte code = indexValueCode( indexEntry.getValue() );
+ Object val = toIndexableValue( indexEntry.getValue() );
+ addInsertToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(),
+ asList( entryName, code, val, indexUpdate.getTimestampUuid(), name ), null, timestamp );
+
+ indexUpdate.addIndex( indexEntry.getKey() );
+ }
+ }
+
+ indexUpdate.addIndex( entryName );
+ }
+
+ return indexUpdate;
+ }
+
+
+ @Metered(group = "core", name = "RelationManager_batchUpdatePropertyIndexes")
+ public void batchUpdatePropertyIndexes( Mutator<ByteBuffer> batch, String propertyName, Object propertyValue,
+ boolean entitySchemaHasProperty, boolean noRead, UUID timestampUuid )
+ throws Exception {
+
+ Entity entity = getHeadEntity();
+
+ UUID associatedId = null;
+ String associatedType = null;
+
+ if ( Schema.isAssociatedEntityType( entity.getType() ) ) {
+ Object item = entity.getProperty( PROPERTY_ITEM );
+ if ( ( item instanceof UUID ) && ( entity.getProperty( PROPERTY_COLLECTION_NAME ) instanceof String ) ) {
+ associatedId = ( UUID ) item;
+ associatedType = string( entity.getProperty( PROPERTY_ITEM_TYPE ) );
+ String entryName = TYPE_MEMBER + "." + propertyName;
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Extended property {} ( {} ).{} indexed as {} ({})." + entryName, new Object[] {
+ entity.getType(), entity.getUuid(), propertyName, associatedType, associatedId
+ } );
+ }
+ propertyName = entryName;
+ }
+ }
+
+ IndexUpdate indexUpdate = batchStartIndexUpdate( batch, entity, propertyName, propertyValue, timestampUuid,
+ entitySchemaHasProperty, false, false,
+ getDefaultSchema().isPropertyFulltextIndexed( entity.getType(), propertyName ), noRead );
+
+ // Update collections
+
+ String effectiveType = entity.getType();
+ if ( associatedType != null ) {
+ indexUpdate.setAssociatedId( associatedId );
+ effectiveType = associatedType;
+ }
+
+ Map<String, Set<CollectionInfo>> containers = getDefaultSchema().getContainers( effectiveType );
+ if ( containers != null ) {
+
+ Map<EntityRef, Set<String>> containerEntities = null;
+ if ( noRead ) {
+ containerEntities = new LinkedHashMap<EntityRef, Set<String>>();
+ EntityRef applicationRef = new SimpleEntityRef( TYPE_APPLICATION, applicationId );
+ addMapSet( containerEntities, applicationRef, defaultCollectionName( entity.getType() ) );
+ }
+ else {
+ containerEntities = getContainingCollections();
+ }
+
+ for ( EntityRef containerEntity : containerEntities.keySet() ) {
+ if ( containerEntity.getType().equals( TYPE_APPLICATION ) && Schema
+ .isAssociatedEntityType( entity.getType() ) ) {
+ logger.debug( "Extended properties for {} not indexed by application", entity.getType() );
+ continue;
+ }
+ Set<String> collectionNames = containerEntities.get( containerEntity );
+ Set<CollectionInfo> collections = containers.get( containerEntity.getType() );
+
+ if ( collections != null ) {
+ for ( CollectionInfo collection : collections ) {
+ if ( collectionNames.contains( collection.getName() ) ) {
+ batchUpdateCollectionIndex( indexUpdate, containerEntity, collection.getName() );
+ }
+ }
+ }
+ }
+ }
+
+ if ( !noRead ) {
+ batchUpdateBackwardConnectionsPropertyIndexes( indexUpdate );
+ }
+
+ /**
+ * We've updated the properties, add the deletes to the ledger
+ *
+ */
+
+ for ( IndexEntry entry : indexUpdate.getPrevEntries() ) {
+ addDeleteToMutator( batch, ENTITY_INDEX_ENTRIES, entity.getUuid(), entry.getLedgerColumn(),
+ indexUpdate.getTimestamp() );
+ }
+ }
+
+
+ public void batchUpdateSetIndexes( Mutator<ByteBuffer> batch, String setName, Object elementValue,
+ boolean removeFromSet, UUID timestampUuid ) throws Exception {
+
+ Entity entity = getHeadEntity();
+
+ elementValue = getDefaultSchema().validateEntitySetValue( entity.getType(), setName, elementValue );
+
+ IndexUpdate indexUpdate =
+ batchStartIndexUpdate( batch, entity, setName, elementValue, timestampUuid, true, true, removeFromSet,
+ false );
+
+ // Update collections
+ Map<String, Set<CollectionInfo>> containers =
+ getDefaultSchema().getContainersIndexingDictionary( entity.getType(), setName );
+
+ if ( containers != null ) {
+ Map<EntityRef, Set<String>> containerEntities = getContainingCollections();
+ for ( EntityRef containerEntity : containerEntities.keySet() ) {
+ if ( containerEntity.getType().equals( TYPE_APPLICATION ) && Schema
+ .isAssociatedEntityType( entity.getType() ) ) {
+ logger.debug( "Extended properties for {} not indexed by application", entity.getType() );
+ continue;
+ }
+ Set<String> collectionNames = containerEntities.get( containerEntity );
+ Set<CollectionInfo> collections = containers.get( containerEntity.getType() );
+
+ if ( collections != null ) {
+
+ for ( CollectionInfo collection : collections ) {
+ if ( collectionNames.contains( collection.getName() ) ) {
+
+ batchUpdateCollectionIndex( indexUpdate, containerEntity, collection.getName() );
+ }
+ }
+ }
+ }
+ }
+
+ batchUpdateBackwardConnectionsDictionaryIndexes( indexUpdate );
+ }
+
+
+ private IndexScanner searchIndex( Object indexKey, QuerySlice slice, int pageSize ) throws Exception {
+
+ DynamicComposite[] range = slice.getRange();
+
+ Object keyPrefix = key( indexKey, slice.getPropertyName() );
+
+ IndexScanner scanner =
+ new IndexBucketScanner( cass, indexBucketLocator, ENTITY_INDEX, applicationId, IndexType.CONNECTION,
- keyPrefix, range[0], range[1], slice.isReversed(), pageSize, slice.getPropertyName() );
++ keyPrefix, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor(), slice.getPropertyName() );
+
+ return scanner;
+ }
+
+
+ /**
+ * Search the collection index using all the buckets for the given collection
+ *
+ * @param indexKey The index key to read
+ * @param slice Slice set in the query
+ * @param collectionName The name of the collection to search
+ * @param pageSize The page size to load when iterating
+ */
+ private IndexScanner searchIndexBuckets( Object indexKey, QuerySlice slice, String collectionName, int pageSize )
+ throws Exception {
+
+ DynamicComposite[] range = slice.getRange();
+
+ Object keyPrefix = key( indexKey, slice.getPropertyName() );
+
- // we have a cursor, so the first record should be discarded
- if ( slice.hasCursor() ) {
- pageSize++;
- }
-
+ IndexScanner scanner =
+ new IndexBucketScanner( cass, indexBucketLocator, ENTITY_INDEX, applicationId, IndexType.COLLECTION,
- keyPrefix, range[0], range[1], slice.isReversed(), pageSize, collectionName );
++ keyPrefix, range[0], range[1], slice.isReversed(), pageSize, slice.hasCursor(), collectionName );
+
+ return scanner;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Override
+ @Metered(group = "core", name = "RelationManager_isOwner")
+ public boolean isCollectionMember( String collectionName, EntityRef entity ) throws Exception {
+
+ Keyspace ko = cass.getApplicationKeyspace( applicationId );
+
+ ByteBuffer col = DynamicComposite
+ .toByteBuffer( asList( this.headEntity.getType(), collectionName, headEntity.getUuid() ) );
+
+ HColumn<ByteBuffer, ByteBuffer> result = cass.getColumn( ko, ENTITY_COMPOSITE_DICTIONARIES,
+ key( entity.getUuid(), Schema.DICTIONARY_CONTAINER_ENTITIES ), col, be, be );
+
+ return result != null;
+ }
+
+
+ /** @param connectionName The name of hte connection */
+ public boolean isConnectionMember( String connectionName, EntityRef entity ) throws Exception {
+ Keyspace ko = cass.getApplicationKeyspace( applicationId );
+
+ Object key = key( this.headEntity.getUuid(), DICTIONARY_CONNECTED_ENTITIES, connectionName );
+
+ DynamicComposite start = new DynamicComposite( entity.getUuid() );
+
+ List<HColumn<ByteBuffer, ByteBuffer>> cols =
+ cass.getColumns( ko, ENTITY_COMPOSITE_DICTIONARIES, key, start, null, 1, false );
+
+ if ( cols == null || cols.size() == 0 ) {
+ return false;
+ }
+
+ UUID returnedUUID = ( UUID ) DynamicComposite.fromByteBuffer( cols.get( 0 ).getName() ).get( 0 );
+
+ return entity.getUuid().equals( returnedUUID );
+
+
+ // addDeleteToMutator(batch, ENTITY_COMPOSITE_DICTIONARIES,
+ // key(connection.getConnectedEntityId(), DICTIONARY_CONNECTING_ENTITIES,
+ // connection.getConnectionType()),
+ // asList(connection.getConnectingEntityId(), connection.getConnectingEntityType()), timestamp);
+ //
+ //
+ // ConnectionRefImpl ref = new ConnectionRefImpl(this.headEntity, connectionName, entity);
+ //
+ //
+ //
+ //
+ //
+ //
+ // HColumn<String, UUID> col = cass.getColumn(ko, ENTITY_CONNECTIONS, ref.getUuid(),
+ // ConnectionRefImpl.CONNECTED_ENTITY_ID, se, ue);
+ //
+ //
+ // getConnectedEntities(this.headEntity, connectionName, )
+ //
+ // return col != null && entity.getUuid().equals(col.getValue());
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_getOwners")
+ public Map<String, Map<UUID, Set<String>>> getOwners() throws Exception {
+ Map<EntityRef, Set<String>> containerEntities = getContainingCollections();
+ Map<String, Map<UUID, Set<String>>> owners = new LinkedHashMap<String, Map<UUID, Set<String>>>();
+
+ for ( EntityRef owner : containerEntities.keySet() ) {
+ Set<String> collections = containerEntities.get( owner );
+ for ( String collection : collections ) {
+ MapUtils.addMapMapSet( owners, owner.getType(), owner.getUuid(), collection );
+ }
+ }
+
+ return owners;
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_getCollections")
+ public Set<String> getCollections() throws Exception {
+
+ Map<String, CollectionInfo> collections = getDefaultSchema().getCollections( headEntity.getType() );
+ if ( collections == null ) {
+ return null;
+ }
+
+ return collections.keySet();
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_getCollection_start_result")
+ public Results getCollection( String collectionName, UUID startResult, int count, Results.Level resultsLevel,
+ boolean reversed ) throws Exception {
+ // changed intentionally to delegate to search so that behavior is
+ // consistent across all index access.
+
+ // TODO T.N fix cursor parsing here so startResult can be used in this
+ // context. Needs a bit of refactor
+ // for accommodating cursor I/O USERGRID-1750. A bit hacky, but until a
+ // furthur refactor this works.
+
+ Query query = new Query().withResultsLevel( resultsLevel ).withReversed( reversed ).withLimit( count )
+ .withStartResult( startResult );
+
+ return searchCollection( collectionName, query );
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_getCollecitonForQuery")
+ public Results getCollection( String collectionName, Query query, Results.Level resultsLevel ) throws Exception {
+
+ // changed intentionally to delegate to search so that behavior is
+ // consistent across all index access.
+
+ return searchCollection( collectionName, query );
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_addToCollection")
+ public Entity addToCollection( String collectionName, EntityRef itemRef ) throws Exception {
+
+ Entity itemEntity = em.get( itemRef );
+
+ if ( itemEntity == null ) {
+ return null;
+ }
+
+ CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
+ if ( ( collection != null ) && !collection.getType().equals( itemRef.getType() ) ) {
+ return null;
+ }
+
+ UUID timestampUuid = newTimeUUID();
+ Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+
+ batchAddToCollection( batch, collectionName, itemEntity, timestampUuid );
+
+ if ( collection.getLinkedCollection() != null ) {
+ getRelationManager( itemEntity )
+ .batchAddToCollection( batch, collection.getLinkedCollection(), getHeadEntity(), timestampUuid );
+ }
+
+ batchExecute( batch, CassandraService.RETRY_COUNT );
+
+ return itemEntity;
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_addToCollections")
+ public Entity addToCollections( List<EntityRef> owners, String collectionName ) throws Exception {
+
+ Entity itemEntity = getHeadEntity();
+
+ Map<String, List<UUID>> collectionsByType = new LinkedHashMap<String, List<UUID>>();
+ for ( EntityRef owner : owners ) {
+ MapUtils.addMapList( collectionsByType, owner.getType(), owner.getUuid() );
+ }
+
+ UUID timestampUuid = newTimeUUID();
+ Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+
+ Schema schema = getDefaultSchema();
+ for ( Entry<String, List<UUID>> entry : collectionsByType.entrySet() ) {
+ CollectionInfo collection = schema.getCollection( entry.getKey(), collectionName );
+ if ( ( collection != null ) && !collection.getType().equals( headEntity.getType() ) ) {
+ continue;
+ }
+ batchAddToCollections( batch, entry.getKey(), entry.getValue(), collectionName, itemEntity, timestampUuid );
+
+ if ( collection.getLinkedCollection() != null ) {
+ logger.error(
+ "Bulk add to collections used on a linked collection, linked connection will not be updated" );
+ }
+ }
+
+ batchExecute( batch, CassandraService.RETRY_COUNT );
+
+ return null;
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_createItemInCollection")
+ public Entity createItemInCollection( String collectionName, String itemType, Map<String, Object> properties )
+ throws Exception {
+
+ if ( headEntity.getUuid().equals( applicationId ) ) {
+ if ( itemType.equals( TYPE_ENTITY ) ) {
+ itemType = singularize( collectionName );
+ }
+ if ( itemType.equals( TYPE_ROLE ) ) {
+ Long inactivity = ( Long ) properties.get( PROPERTY_INACTIVITY );
+ if ( inactivity == null ) {
+ inactivity = 0L;
+ }
+ return em.createRole( ( String ) properties.get( PROPERTY_NAME ),
+ ( String ) properties.get( PROPERTY_TITLE ), inactivity );
+ }
+ return em.create( itemType, properties );
+ }
+ else if ( headEntity.getType().equals( Group.ENTITY_TYPE ) && ( collectionName.equals( COLLECTION_ROLES ) ) ) {
+ UUID groupId = headEntity.getUuid();
+ String roleName = ( String ) properties.get( PROPERTY_NAME );
+ return em.createGroupRole( groupId, roleName, ( Long ) properties.get( PROPERTY_INACTIVITY ) );
+ }
+
+ CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
+ if ( ( collection != null ) && !collection.getType().equals( itemType ) ) {
+ return null;
+ }
+
+ properties = getDefaultSchema().cleanUpdatedProperties( itemType, properties, true );
+
+ Entity itemEntity = em.create( itemType, properties );
+
+ if ( itemEntity != null ) {
+ UUID timestampUuid = newTimeUUID();
+ Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+
+ batchAddToCollection( batch, collectionName, itemEntity, timestampUuid );
+
+ if ( collection.getLinkedCollection() != null ) {
+ getRelationManager( itemEntity )
+ .batchAddToCollection( batch, collection.getLinkedCollection(), getHeadEntity(),
+ timestampUuid );
+ }
+
+ batchExecute( batch, CassandraService.RETRY_COUNT );
+ }
+
+ return itemEntity;
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_removeFromCollection")
+ public void removeFromCollection( String collectionName, EntityRef itemRef ) throws Exception {
+
+ if ( headEntity.getUuid().equals( applicationId ) ) {
+ if ( collectionName.equals( COLLECTION_ROLES ) ) {
+ Entity itemEntity = em.get( itemRef );
+ if ( itemEntity != null ) {
+ RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
+ em.deleteRole( roleRef.getApplicationRoleName() );
+ return;
+ }
+ em.delete( itemEntity );
+ return;
+ }
+ em.delete( itemRef );
+ return;
+ }
+
+ Entity itemEntity = em.get( itemRef );
+
+ if ( itemEntity == null ) {
+ return;
+ }
+
+ UUID timestampUuid = newTimeUUID();
+ Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+
+ batchRemoveFromCollection( batch, collectionName, itemEntity, timestampUuid );
+
+ CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
+ if ( ( collection != null ) && ( collection.getLinkedCollection() != null ) ) {
+ getRelationManager( itemEntity )
+ .batchRemoveFromCollection( batch, collection.getLinkedCollection(), getHeadEntity(),
+ timestampUuid );
+ }
+
+ batchExecute( batch, CassandraService.RETRY_COUNT );
+
+ if ( headEntity.getType().equals( Group.ENTITY_TYPE ) ) {
+ if ( collectionName.equals( COLLECTION_ROLES ) ) {
+ String path = ( String ) ( ( Entity ) itemRef ).getMetadata( "path" );
+ if ( path.startsWith( "/roles/" ) ) {
+ RoleRef roleRef = SimpleRoleRef.forRoleEntity( itemEntity );
+ em.deleteRole( roleRef.getApplicationRoleName() );
+ }
+ }
+ }
+ }
+
+
+ @Metered(group = "core", name = "RelationManager_batchRemoveFromContainers")
+ public void batchRemoveFromContainers( Mutator<ByteBuffer> m, UUID timestampUuid ) throws Exception {
+ Entity entity = getHeadEntity();
+ // find all the containing collections
+ Map<EntityRef, Set<String>> containers = getContainingCollections();
+ if ( containers != null ) {
+ for ( Entry<EntityRef, Set<String>> container : containers.entrySet() ) {
+ for ( String collectionName : container.getValue() ) {
+ getRelationManager( container.getKey() )
+ .batchRemoveFromCollection( m, collectionName, entity, true, timestampUuid );
+ }
+ }
+ }
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_copyRelationships")
+ public void copyRelationships( String srcRelationName, EntityRef dstEntityRef, String dstRelationName )
+ throws Exception {
+
+ headEntity = em.validate( headEntity );
+ dstEntityRef = em.validate( dstEntityRef );
+
+ CollectionInfo srcCollection = getDefaultSchema().getCollection( headEntity.getType(), srcRelationName );
+
+ CollectionInfo dstCollection = getDefaultSchema().getCollection( dstEntityRef.getType(), dstRelationName );
+
+ Results results = null;
+ do {
+ if ( srcCollection != null ) {
+ results = em.getCollection( headEntity, srcRelationName, null, 5000, Level.REFS, false );
+ }
+ else {
+ results = em.getConnectedEntities( headEntity.getUuid(), srcRelationName, null, Level.REFS );
+ }
+
+ if ( ( results != null ) && ( results.size() > 0 ) ) {
+ List<EntityRef> refs = results.getRefs();
+ for ( EntityRef ref : refs ) {
+ if ( dstCollection != null ) {
+ em.addToCollection( dstEntityRef, dstRelationName, ref );
+ }
+ else {
+ em.createConnection( dstEntityRef, dstRelationName, ref );
+ }
+ }
+ }
+ }
+ while ( ( results != null ) && ( results.hasMoreResults() ) );
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_searchCollection")
+ public Results searchCollection( String collectionName, Query query ) throws Exception {
+
+ if ( query == null ) {
+ query = new Query();
+ }
+
+ headEntity = em.validate( headEntity );
+
+ CollectionInfo collection = getDefaultSchema().getCollection( headEntity.getType(), collectionName );
+
+ query.setEntityType( collection.getType() );
+
+ final CollectionResultsLoaderFactory factory = new CollectionResultsLoaderFactory();
+
+ // we have something to search with, visit our tree and evaluate the
+ // results
+ QueryProcessor qp = new QueryProcessor( query, collection, em, factory );
+ SearchCollectionVisitor visitor = new SearchCollectionVisitor( qp );
+
+ return qp.getResults( visitor );
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_createConnection_connection_ref")
+ public ConnectionRef createConnection( ConnectionRef connection ) throws Exception {
+ ConnectionRefImpl connectionImpl = new ConnectionRefImpl( connection );
+
+ updateEntityConnection( false, connectionImpl );
+
+ return connection;
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_createConnection_connectionType")
+ public ConnectionRef createConnection( String connectionType, EntityRef connectedEntityRef ) throws Exception {
+
+ headEntity = em.validate( headEntity );
+ connectedEntityRef = em.validate( connectedEntityRef );
+
+ ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
+
+ updateEntityConnection( false, connection );
+
+ return connection;
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_createConnection_paired_connection_type")
+ public ConnectionRef createConnection( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
+ EntityRef connectedEntityRef ) throws Exception {
+
+ ConnectionRefImpl connection =
+ new ConnectionRefImpl( headEntity, new ConnectedEntityRefImpl( pairedConnectionType, pairedEntity ),
+ new ConnectedEntityRefImpl( connectionType, connectedEntityRef ) );
+
+ updateEntityConnection( false, connection );
+
+ return connection;
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_createConnection_connected_entity_ref")
+ public ConnectionRef createConnection( ConnectedEntityRef... connections ) throws Exception {
+
+ ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connections );
+
+ updateEntityConnection( false, connection );
+
+ return connection;
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_connectionRef_type_entity")
+ public ConnectionRef connectionRef( String connectionType, EntityRef connectedEntityRef ) throws Exception {
+
+ ConnectionRef connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
+
+ return connection;
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_connectionRef_entity_to_entity")
+ public ConnectionRef connectionRef( String pairedConnectionType, EntityRef pairedEntity, String connectionType,
+ EntityRef connectedEntityRef ) throws Exception {
+
+ ConnectionRef connection =
+ new ConnectionRefImpl( headEntity, new ConnectedEntityRefImpl( pairedConnectionType, pairedEntity ),
+ new ConnectedEntityRefImpl( connectionType, connectedEntityRef ) );
+
+ return connection;
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_connectionRef_connections")
+ public ConnectionRef connectionRef( ConnectedEntityRef... connections ) {
+
+ ConnectionRef connection = new ConnectionRefImpl( headEntity, connections );
+
+ return connection;
+ }
+
+
+ @Override
+ @Metered(group = "core", name = "RelationManager_deleteConnection")
+ public void deleteConnection( ConnectionRef connectionRef ) throws Exception {
+ updateEntityConnection( true, new ConnectionRefImpl( connectionRef ) );
+ }
+
+
+ @Override
+ @Metered(group = "core",
<TRUNCATED>