You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by md...@apache.org on 2017/07/07 15:04:31 UTC
[2/2] usergrid git commit: delete collections via API,
currently uses utility queue
delete collections via API, currently uses utility queue
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7d3eb647
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7d3eb647
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7d3eb647
Branch: refs/heads/collectionDelete
Commit: 7d3eb647d86d56bc3ec1c780ee65d348ca28f0fd
Parents: a6fee78
Author: Mike Dunker <md...@google.com>
Authored: Fri Jul 7 08:03:50 2017 -0700
Committer: Mike Dunker <md...@google.com>
Committed: Fri Jul 7 08:03:50 2017 -0700
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 6 +
.../corepersistence/CpEntityManager.java | 46 ++++++--
.../corepersistence/CpEntityManagerFactory.java | 12 +-
.../asyncevents/AsyncEventService.java | 8 ++
.../asyncevents/AsyncEventServiceImpl.java | 113 +++++++++++++++++--
.../asyncevents/AsyncIndexProvider.java | 12 +-
.../CollectionDeleteTooSoonException.java | 39 +++++++
.../asyncevents/EventBuilder.java | 9 ++
.../asyncevents/EventBuilderImpl.java | 51 +++++++--
.../asyncevents/model/AsyncEvent.java | 3 +-
.../model/CollectionDeleteEvent.java | 58 ++++++++++
.../asyncevents/model/EntityDeleteEvent.java | 12 ++
.../index/CollectionDeleteService.java | 30 +++++
.../index/CollectionDeleteServiceImpl.java | 57 ++++++++++
.../corepersistence/index/CollectionScope.java | 29 +++++
.../index/CollectionScopeImpl.java | 92 +++++++++++++++
.../index/CollectionSettingsImpl.java | 19 +++-
.../index/CollectionVersionCache.java | 57 ++++++++++
.../index/CollectionVersionFig.java | 53 +++++++++
.../index/CollectionVersionManager.java | 36 ++++++
.../index/CollectionVersionManagerFactory.java | 65 +++++++++++
.../index/CollectionVersionManagerImpl.java | 111 ++++++++++++++++++
.../index/CollectionVersionUtil.java | 80 +++++++++++++
.../corepersistence/index/IndexServiceImpl.java | 6 +-
.../index/VersionedCollectionName.java | 29 +++++
.../index/VersionedCollectionNameImpl.java | 80 +++++++++++++
.../rx/impl/AllEntityIdsObservableImpl.java | 5 +-
.../corepersistence/util/CpNamingUtils.java | 15 ++-
.../usergrid/persistence/EntityManager.java | 4 +-
.../apache/usergrid/persistence/Results.java | 25 +++-
.../persistence/entities/Application.java | 26 +++++
.../apache/usergrid/utils/InflectionUtils.java | 42 ++++++-
.../corepersistence/AggregationServiceTest.java | 7 ++
.../index/AsyncEventServiceImplTest.java | 9 +-
.../index/CollectionVersionTest.java | 23 ++++
.../usergrid/persistence/RebuildIndexTest.java | 5 +-
.../rest/applications/CollectionResource.java | 58 ++++++++++
.../rest/applications/ServiceResource.java | 15 +++
.../CollectionDeleteTooSoonExceptionMapper.java | 44 ++++++++
.../apache/usergrid/services/ServiceInfo.java | 17 +++
.../usergrid/services/ServiceManager.java | 49 +++++++-
.../services/ServiceManagerFactory.java | 6 +-
.../usergrid/services/ActivitiesServiceIT.java | 2 +
.../usergrid/services/CollectionServiceIT.java | 6 +-
44 files changed, 1410 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index ec6b775..5515abd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -104,6 +104,8 @@ public class CoreModule extends AbstractModule {
bind( ApplicationIdCacheFactory.class );
bind( CollectionSettingsFactory.class );
bind( CollectionSettingsCache.class );
+ bind( CollectionVersionManagerFactory.class );
+ bind( CollectionVersionCache.class );
/**
@@ -141,6 +143,8 @@ public class CoreModule extends AbstractModule {
bind( ReIndexService.class ).to( ReIndexServiceImpl.class );
+ bind( CollectionDeleteService.class ).to( CollectionDeleteServiceImpl.class );
+
bind( ExportService.class ).to( ExportServiceImpl.class );
install( new FactoryModuleBuilder().implement( AggregationService.class, AggregationServiceImpl.class )
@@ -157,6 +161,8 @@ public class CoreModule extends AbstractModule {
install( new GuicyFigModule( CollectionSettingsCacheFig.class ) );
+ install( new GuicyFigModule( CollectionVersionFig.class ) );
+
install( new GuicyFigModule( EntityManagerFig.class ) );
install( new GuicyFigModule( AsyncEventsSchedulerFig.class ) );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index cdb4fc7..ad5220b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -29,9 +29,7 @@ import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.SliceCounterQuery;
import org.apache.commons.lang.NullArgumentException;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.index.CollectionSettings;
-import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
-import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
+import org.apache.usergrid.corepersistence.index.*;
import org.apache.usergrid.corepersistence.service.CollectionService;
import org.apache.usergrid.corepersistence.service.ConnectionService;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
@@ -160,6 +158,8 @@ public class CpEntityManager implements EntityManager {
private EntityCollectionManager ecm;
public QueueManagerFactory queueManagerFactory;
+ private CollectionDeleteService collectionDeleteService;
+ private CollectionVersionManagerFactory collectionVersionManagerFactory;
// /** Short-term cache to keep us from reloading same Entity during single request. */
@@ -186,7 +186,9 @@ public class CpEntityManager implements EntityManager {
final ConnectionService connectionService,
final CollectionSettingsFactory collectionSettingsFactory,
final UUID applicationId,
- final QueueManagerFactory queueManagerFactory) {
+ final QueueManagerFactory queueManagerFactory,
+ final CollectionDeleteService collectionDeleteService,
+ final CollectionVersionManagerFactory collectionVersionManagerFactory) {
this.entityManagerFig = entityManagerFig;
this.actorSystemFig = actorSystemFig;
@@ -253,6 +255,8 @@ public class CpEntityManager implements EntityManager {
this.skipAggregateCounters = false;
this.queueManagerFactory = queueManagerFactory;
+ this.collectionDeleteService = collectionDeleteService;
+ this.collectionVersionManagerFactory = collectionVersionManagerFactory;
}
@@ -735,7 +739,22 @@ public class CpEntityManager implements EntityManager {
@Override
public Set<String> getApplicationCollections() throws Exception {
- Set<String> existingCollections = getRelationManager( getApplication() ).getCollections();
+ Set<String> existingCollections = new HashSet<>();
+ for (String existingCollection : getRelationManager( getApplication() ).getCollections()) {
+ if (Application.isCustomCollectionName(existingCollection)) {
+ // check for correct version
+ VersionedCollectionName v = CollectionVersionUtil.parseVersionedName(existingCollection);
+ CollectionVersionManager cvm = collectionVersionManagerFactory.getInstance(
+ new CollectionScopeImpl(getApplication().asId(), v.getCollectionName())
+ );
+ String currentVersion = cvm.getCollectionVersion(true);
+ if (!v.getCollectionVersion().equals(currentVersion)) {
+ // not the right version, skip it
+ continue;
+ }
+ existingCollections.add(existingCollection);
+ }
+ }
Set<String> system_collections = Schema.getDefaultSchema().getCollectionNames( Application.ENTITY_TYPE );
if ( system_collections != null ) {
@@ -765,12 +784,13 @@ public class CpEntityManager implements EntityManager {
if ( !Schema.isAssociatedEntityType( collectionName ) ) {
Long count = counts.get( APPLICATION_COLLECTION + collectionName );
+ String unversionedCollectionName = CollectionVersionUtil.getBaseCollectionName(collectionName);
Map<String, Object> entry = new HashMap<String, Object>();
entry.put( "count", count != null ? count : 0 );
- entry.put( "type", singularize( collectionName ) );
- entry.put( "name", collectionName );
- entry.put( "title", capitalize( collectionName ) );
- metadata.put( collectionName, entry );
+ entry.put( "type", singularize( unversionedCollectionName ) );
+ entry.put( "name", unversionedCollectionName );
+ entry.put( "title", capitalize( unversionedCollectionName ) );
+ metadata.put( unversionedCollectionName, entry );
}
}
}
@@ -1870,6 +1890,13 @@ public class CpEntityManager implements EntityManager {
}
@Override
+ public void deleteCollection( String collectionName ){
+
+ collectionDeleteService.deleteCollection(applicationId, collectionName);
+
+ }
+
+ @Override
public void grantRolePermission( String roleName, String permission ) throws Exception {
roleName = roleName.toLowerCase();
permission = permission.toLowerCase();
@@ -2471,7 +2498,6 @@ public class CpEntityManager implements EntityManager {
final Entity entity;
- //this is the fall back, why isn't this writt
if ( entityType == null ) {
return null;
// throw new EntityNotFoundException( String.format( "Counld not find type for uuid {}", uuid ) );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index cec7258..b3dac57 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -26,9 +26,7 @@ import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import org.apache.commons.lang.StringUtils;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
-import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
-import org.apache.usergrid.corepersistence.index.ReIndexService;
+import org.apache.usergrid.corepersistence.index.*;
import org.apache.usergrid.corepersistence.service.CollectionService;
import org.apache.usergrid.corepersistence.service.ConnectionService;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@ -119,6 +117,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private final CollectionSettingsFactory collectionSettingsFactory;
private ActorSystemManager actorSystemManager;
private final LockManager lockManager;
+ private final CollectionDeleteService collectionDeleteService;
+ private final CollectionVersionManagerFactory collectionVersionManagerFactory;
private final QueueManagerFactory queueManagerFactory;
@@ -143,6 +143,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
this.collectionService = injector.getInstance( CollectionService.class );
this.connectionService = injector.getInstance( ConnectionService.class );
this.collectionSettingsFactory = injector.getInstance( CollectionSettingsFactory.class );
+ this.collectionDeleteService = injector.getInstance( CollectionDeleteService.class );
+ this.collectionVersionManagerFactory = injector.getInstance( CollectionVersionManagerFactory.class );
Properties properties = cassandraService.getProperties();
this.entityManagers = createEntityManagerCache( properties );
@@ -392,7 +394,9 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
connectionService,
collectionSettingsFactory,
applicationId,
- queueManagerFactory);
+ queueManagerFactory,
+ collectionDeleteService,
+ collectionVersionManagerFactory);
return em;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index cab4e3e..5fe4295 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.asyncevents;
+import org.apache.usergrid.corepersistence.index.CollectionScope;
import org.apache.usergrid.corepersistence.index.ReIndexAction;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
@@ -96,6 +97,13 @@ public interface AsyncEventService extends ReIndexAction {
void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId, UUID markedVersion);
/**
+ * The version of a collection has been changed, queue cleanup of old version
+ * @param collectionScope
+ * @param collectionVersion
+ */
+ void queueCollectionDelete(final CollectionScope collectionScope, final String collectionVersion);
+
+ /**
* current queue depth
* @return
*/
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 530cf7d..5628a11 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -30,10 +30,8 @@ import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.usergrid.corepersistence.asyncevents.model.*;
-import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
-import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
-import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
+import org.apache.usergrid.corepersistence.index.*;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
@@ -70,11 +68,11 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.commons.lang.StringUtils.indexOf;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
@@ -113,12 +111,14 @@ public class AsyncEventServiceImpl implements AsyncEventService {
private final LegacyQueueManager utilityQueueDead;
private final IndexProcessorFig indexProcessorFig;
private final LegacyQueueFig queueFig;
+ private final CollectionVersionFig collectionVersionFig;
private final IndexProducer indexProducer;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final EntityIndexFactory entityIndexFactory;
private final EventBuilder eventBuilder;
private final RxTaskScheduler rxTaskScheduler;
+ private final AllEntityIdsObservable allEntityIdsObservable;
private final Timer readTimer;
private final Timer writeTimer;
@@ -153,6 +153,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
final EventBuilder eventBuilder,
final MapManagerFactory mapManagerFactory,
final LegacyQueueFig queueFig,
+ final CollectionVersionFig collectionVersionFig,
+ final AllEntityIdsObservable allEntityIdsObservable,
@EventExecutionScheduler
final RxTaskScheduler rxTaskScheduler ) {
this.indexProducer = indexProducer;
@@ -187,6 +189,8 @@ public class AsyncEventServiceImpl implements AsyncEventService {
this.indexProcessorFig = indexProcessorFig;
this.queueFig = queueFig;
+ this.collectionVersionFig = collectionVersionFig;
+ this.allEntityIdsObservable = allEntityIdsObservable;
this.writeTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.write");
this.readTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.read");
@@ -211,16 +215,25 @@ public class AsyncEventServiceImpl implements AsyncEventService {
* Offer the EntityIdScope to SQS
*/
private void offer(final Serializable operation) {
+ offer(operation, false);
+ }
+
+ private void offer(final Serializable operation, boolean forUtilityQueue) {
final Timer.Context timer = this.writeTimer.time();
try {
//signal to SQS
- this.indexQueue.sendMessageToLocalRegion( operation );
+ if (forUtilityQueue) {
+ this.indexQueue.sendMessageToLocalRegion(operation);
+ } else {
+ this.indexQueue.sendMessageToLocalRegion(operation);
+ }
} catch (IOException e) {
throw new RuntimeException("Unable to queue message", e);
} finally {
timer.stop();
}
+
}
@@ -479,6 +492,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
single = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event);
+ } else if (event instanceof CollectionDeleteEvent) {
+
+ handleCollectionDelete(message);
+
} else {
throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim());
@@ -487,6 +504,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
if( !(event instanceof ElasticsearchIndexEvent)
&& !(event instanceof InitializeApplicationIndexEvent)
+ && !(event instanceof CollectionDeleteEvent)
&& single.isEmpty() ){
logger.warn("No index operation messages came back from event processing for eventType: {}, msgId: {}, msgBody: {}",
event.getClass().getSimpleName(), message.getMessageId(), message.getStringBody());
@@ -821,7 +839,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
@Override
public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
- logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType());
+ }
// sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
@@ -840,12 +860,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
+ final boolean markedOnly = entityDeleteEvent.markedOnly();
- if (logger.isDebugEnabled())
+ if (logger.isDebugEnabled()) {
logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
+ }
- final EventBuilderImpl.EntityDeleteResults
- entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
+ final EventBuilderImpl.EntityDeleteResults entityDeleteResults = markedOnly ?
+ eventBuilder.buildEntityDelete( applicationScope, entityId ) :
+ eventBuilder.buildEntityDeleteAllVersions( applicationScope, entityId );
// Delete the entities and remove from graph separately
@@ -858,6 +881,76 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
+ @Override
+ public void queueCollectionDelete(final CollectionScope collectionScope, final String collectionVersion) {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Offering CollectionDeleteEvent for application={}, collectionName={}, collectionVersion={}",
+ collectionScope.getApplication().getUuid(), collectionScope.getCollectionName(), collectionVersion);
+ }
+
+ // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
+ offer(new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion), true);
+ }
+
+ private void handleCollectionDelete(final LegacyQueueMessage message) {
+
+ Preconditions.checkNotNull(message, "Queue Message cannot be null for handleCollectionDelete");
+
+ final AsyncEvent event = (AsyncEvent) message.getBody();
+ Preconditions.checkNotNull(event, "QueueMessage body cannot be null for handleCollectionDelete" );
+ Preconditions.checkArgument( event instanceof CollectionDeleteEvent,
+ String.format( "Event Type for handleCollectionDelete must be COLLECTION_DELETE, got %s", event.getClass() ) );
+
+ final CollectionDeleteEvent collectionDeleteEvent = ( CollectionDeleteEvent ) event;
+ final CollectionScope collectionScope = collectionDeleteEvent.getCollectionScope();
+ if (collectionScope == null) {
+ logger.error("CollectionDeleteEvent received with null collectionScope");
+ // ack message, nothing more to do
+ return;
+ }
+ final UUID applicationID = collectionScope.getApplication().getUuid();
+ if (applicationID == null) {
+ logger.error("CollectionDeleteEvent collectionScope has null application");
+ // ack message, nothing more to do
+ return;
+ }
+ String collectionVersion = collectionDeleteEvent.getCollectionVersion();
+ if (collectionVersion == null) {
+ collectionVersion = "";
+ }
+ final ApplicationScope applicationScope = CpNamingUtils.getApplicationScope(applicationID);
+ final String versionedCollectionName =
+ CollectionVersionUtil.buildVersionedNameString(collectionScope.getCollectionName(),
+ collectionVersion, false);
+
+
+ final AtomicInteger count = new AtomicInteger();
+ int maxDeletes = collectionVersionFig.getDeletesPerEvent();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleCollectionDelete: applicationScope={} collectionName={} maxDeletes={}", applicationScope.toString(), versionedCollectionName, maxDeletes);
+ }
+ allEntityIdsObservable.getEdgesToEntities(Observable.just(applicationScope),
+ Optional.fromNullable(CpNamingUtils.getEdgeTypeFromCollectionName(versionedCollectionName.toLowerCase())), Optional.absent())
+ //.takeWhile(edgeScope-> count.intValue() < maxDeletes)
+ .take(maxDeletes)
+ .doOnNext(edgeScope-> {
+
+ offer(new EntityDeleteEvent(queueFig.getPrimaryRegion(),
+ new EntityIdScope(applicationScope, edgeScope.getEdge().getTargetNode()),false),
+ true);
+ count.incrementAndGet();
+ }).toBlocking().lastOrDefault(null);
+
+ logger.info("handleCollectionDelete: queued {} entity deletes for deleted collection", count.intValue());
+
+ if (count.intValue() >= maxDeletes) {
+ // requeue collection delete for next chunk of deletes
+ offer (new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion), true);
+ }
+ }
+
private void handleInitializeApplicationIndex(final AsyncEvent event, final LegacyQueueMessage message) {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 2ba6c0b..31fcd6d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -20,8 +20,10 @@
package org.apache.usergrid.corepersistence.asyncevents;
+import org.apache.usergrid.corepersistence.index.CollectionVersionFig;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
@@ -58,6 +60,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
private final IndexProducer indexProducer;
private final MapManagerFactory mapManagerFactory;
private final LegacyQueueFig queueFig;
+ private final CollectionVersionFig collectionVersionFig;
+ private final AllEntityIdsObservable allEntityIdsObservable;
private AsyncEventService asyncEventService;
@@ -73,7 +77,9 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
final EntityIndexFactory entityIndexFactory,
final IndexProducer indexProducer,
final MapManagerFactory mapManagerFactory,
- final LegacyQueueFig queueFig) {
+ final LegacyQueueFig queueFig,
+ final CollectionVersionFig collectionVersionFig,
+ final AllEntityIdsObservable allEntityIdsObservable) {
this.indexProcessorFig = indexProcessorFig;
this.queueManagerFactory = queueManagerFactory;
@@ -86,6 +92,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
this.indexProducer = indexProducer;
this.mapManagerFactory = mapManagerFactory;
this.queueFig = queueFig;
+ this.collectionVersionFig = collectionVersionFig;
+ this.allEntityIdsObservable = allEntityIdsObservable;
}
@@ -116,6 +124,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
eventBuilder,
mapManagerFactory,
queueFig,
+ collectionVersionFig,
+ allEntityIdsObservable,
rxTaskScheduler );
if ( impl.equals( LOCAL )) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionDeleteTooSoonException.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionDeleteTooSoonException.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionDeleteTooSoonException.java
new file mode 100644
index 0000000..bd37d46
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionDeleteTooSoonException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+public class CollectionDeleteTooSoonException extends RuntimeException {
+
+ private final long timeLastDeleted;
+ private final long timeRequiredBeforeDeleteMsec;
+
+ public CollectionDeleteTooSoonException(final long timeLastDeleted, final long timeRequiredBeforeDeleteMsec) {
+ this.timeLastDeleted = timeLastDeleted;
+ this.timeRequiredBeforeDeleteMsec = timeRequiredBeforeDeleteMsec;
+ }
+
+ public long getTimeLastDeleted() {
+ return timeLastDeleted;
+ }
+
+ public long getTimeRequiredBeforeDeleteMsec() {
+ return timeRequiredBeforeDeleteMsec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index 4db9f4b..8618c73 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -64,6 +64,15 @@ public interface EventBuilder {
*/
EntityDeleteResults buildEntityDelete(ApplicationScope applicationScope, Id entityId );
+ /**
+ * Return a bin with 2 observable streams for entity delete. This deletes all versions -- used only for an old
+ * collection version. Does not require versions to be marked for deletion.
+ * @param applicationScope
+ * @param entityId
+ * @return
+ */
+ EntityDeleteResults buildEntityDeleteAllVersions(ApplicationScope applicationScope, Id entityId );
+
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index bbdce5a..33d384e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -105,10 +105,10 @@ public class EventBuilderImpl implements EventBuilder {
//Does the queue entityDelete mark the entity then immediately does to the deleteEntityIndex. seems like
//it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
- @Override
- public EntityDeleteResults buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
+ private EntityDeleteResults buildEntityDeleteCommon(final ApplicationScope applicationScope, final Id entityId, boolean markedOnly) {
if (logger.isDebugEnabled()) {
- logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
+ logger.debug("Deleting entity id ({} versions) from index in app scope {} with entityId {}",
+ markedOnly ? "marked" : "all", applicationScope, entityId);
}
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
@@ -116,25 +116,30 @@ public class EventBuilderImpl implements EventBuilder {
//TODO USERGRID-1123: Implement so we don't iterate logs twice (latest DELETED version, then to get all DELETED)
- MvccLogEntry mostRecentlyMarked = ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking()
- .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
+ MvccLogEntry mostRecentToDelete = markedOnly ?
+ ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking()
+ .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED ) :
+ ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking()
+ .firstOrDefault( null );
// De-indexing and entity deletes don't check log entries. We must do that first. If no DELETED logs, then
// return an empty observable as our no-op.
Observable<IndexOperationMessage> deIndexObservable = Observable.empty();
Observable<List<MvccLogEntry>> ecmDeleteObservable = Observable.empty();
- if(mostRecentlyMarked != null){
+ if(mostRecentToDelete != null || !markedOnly){
// fetch entity versions to be de-index by looking in cassandra
- deIndexObservable =
- indexService.deIndexEntity(applicationScope, entityId, mostRecentlyMarked.getVersion(),
- getVersionsOlderThanMarked(ecm, entityId, mostRecentlyMarked.getVersion()));
+ deIndexObservable = markedOnly ?
+ indexService.deIndexEntity(applicationScope, entityId, mostRecentToDelete.getVersion(),
+ getVersionsOlderThanMarked(ecm, entityId, mostRecentToDelete.getVersion())) :
+ indexService.deIndexEntity(applicationScope, entityId, UUIDUtils.newTimeUUID(),
+ getAllVersions(ecm, entityId));
ecmDeleteObservable =
- ecm.getVersionsFromMaxToMin( entityId, mostRecentlyMarked.getVersion() )
+ ecm.getVersionsFromMaxToMin( entityId, mostRecentToDelete.getVersion() )
.filter( mvccLogEntry->
- mvccLogEntry.getVersion().timestamp() <= mostRecentlyMarked.getVersion().timestamp() )
+ mvccLogEntry.getVersion().timestamp() <= mostRecentToDelete.getVersion().timestamp() )
.buffer( serializationFig.getBufferSize() )
.doOnNext( buffer -> ecm.delete( buffer ) );
}
@@ -146,6 +151,17 @@ public class EventBuilderImpl implements EventBuilder {
}
@Override
+ public EntityDeleteResults buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
+ return buildEntityDeleteCommon(applicationScope, entityId, true);
+ }
+
+ // this deletes all versions of an entity, only used for collection delete
+ @Override
+ public EntityDeleteResults buildEntityDeleteAllVersions(final ApplicationScope applicationScope, final Id entityId ) {
+ return buildEntityDeleteCommon(applicationScope, entityId, false);
+ }
+
+ @Override
public Observable<IndexOperationMessage> buildEntityIndex( final EntityIndexOperation entityIndexOperation ) {
final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope();
@@ -210,4 +226,17 @@ public class EventBuilderImpl implements EventBuilder {
return versions;
}
+ private List<UUID> getAllVersions( final EntityCollectionManager ecm,
+ final Id entityId ) {
+
+ final List<UUID> versions = new ArrayList<>();
+
+ ecm.getVersionsFromMaxToMin(entityId, UUIDUtils.newTimeUUID())
+ .forEach( mvccLogEntry -> {
+ versions.add(mvccLogEntry.getVersion());
+ });
+
+ return versions;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index bd581ad..0ea0fdc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -44,7 +44,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
@JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ),
@JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" ),
- @JsonSubTypes.Type( value = DeIndexOldVersionsEvent.class, name = "deIndexOldVersionsEvent" )
+ @JsonSubTypes.Type( value = DeIndexOldVersionsEvent.class, name = "deIndexOldVersionsEvent" ),
+ @JsonSubTypes.Type( value = CollectionDeleteEvent.class, name = "collectionDeleteEvent" )
} )
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionDeleteEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionDeleteEvent.java
new file mode 100644
index 0000000..9fc978c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionDeleteEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.usergrid.corepersistence.index.CollectionScope;
+
+/**
+ * Event that will signal to queue up entity deletes for a collection delete.
+ */
+public final class CollectionDeleteEvent extends AsyncEvent {
+
+
+ @JsonProperty
+ protected CollectionScope collectionScope;
+
+ @JsonProperty
+ protected String collectionVersion;
+
+ /**
+ * Do not delete! Needed for Jackson
+ */
+ @SuppressWarnings( "unused" )
+ public CollectionDeleteEvent() {
+ super();
+ }
+
+ public CollectionDeleteEvent(String sourceRegion, CollectionScope collectionScope, String collectionVersion) {
+ super(sourceRegion);
+ this.collectionScope = collectionScope;
+ this.collectionVersion = collectionVersion;
+ }
+
+ public CollectionScope getCollectionScope() {
+ return collectionScope;
+ }
+
+ public String getCollectionVersion() {
+ return collectionVersion;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
index 01d2ba8..aa6a15b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
@@ -31,17 +31,29 @@ public final class EntityDeleteEvent extends AsyncEvent {
@JsonProperty
protected EntityIdScope entityIdScope;
+ @JsonProperty
+ protected boolean markedOnly;
+
public EntityDeleteEvent() {
super();
}
public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope) {
+ this(sourceRegion, entityIdScope, true);
+ }
+
+ public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope, boolean markedOnly) {
super(sourceRegion);
this.entityIdScope = entityIdScope;
+ this.markedOnly = markedOnly;
}
public EntityIdScope getEntityIdScope() {
return entityIdScope;
}
+
+ public boolean markedOnly() {
+ return markedOnly;
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
new file mode 100644
index 0000000..85b8fed
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+import java.util.UUID;
+
+public interface CollectionDeleteService {
+
+ /**
+ * Delete the current version of a collection by changing the collection version and queueing up a delete of the old entities
+ */
+ void deleteCollection(final UUID applicationID, final String baseCollectionName);
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
new file mode 100644
index 0000000..5c64079
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+import com.google.inject.Inject;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+public class CollectionDeleteServiceImpl implements CollectionDeleteService {
+ private static final Logger logger = LoggerFactory.getLogger(CollectionDeleteServiceImpl.class );
+
+ private CollectionVersionManagerFactory collectionVersionManagerFactory;
+ private AsyncEventService asyncEventService;
+
+ @Inject
+ public CollectionDeleteServiceImpl(
+ final CollectionVersionManagerFactory collectionVersionManagerFactory,
+ final AsyncEventService asyncEventService
+ )
+ {
+ this.collectionVersionManagerFactory = collectionVersionManagerFactory;
+ this.asyncEventService = asyncEventService;
+ }
+
+ @Override
+ public void deleteCollection(final UUID applicationID, final String baseCollectionName) {
+ CollectionScope scope = new CollectionScopeImpl(applicationID, baseCollectionName);
+ CollectionVersionManager collectionVersionManager = collectionVersionManagerFactory.getInstance(scope);
+
+ // change version
+ String oldVersion = collectionVersionManager.updateCollectionVersion();
+
+ // queue up delete of old version entities
+ asyncEventService.queueCollectionDelete(scope, oldVersion);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScope.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScope.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScope.java
new file mode 100644
index 0000000..9ec3ad9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScope.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+
+@JsonDeserialize(as = CollectionScopeImpl.class)
+public interface CollectionScope extends ApplicationScope {
+
+ String getCollectionName();
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScopeImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScopeImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScopeImpl.java
new file mode 100644
index 0000000..6c29ee5
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionScopeImpl.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import java.util.UUID;
+
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+
+
+public class CollectionScopeImpl implements CollectionScope {
+
+ protected Id application;
+ protected String collectionName;
+
+
+ /**
+ * Do not delete! Needed for Jackson
+ */
+ @SuppressWarnings( "unused" )
+ public CollectionScopeImpl(){
+
+ }
+
+ public CollectionScopeImpl(final Id application, final String collectionName ) {
+ this.application = application;
+ this.collectionName = collectionName;
+ }
+
+ public CollectionScopeImpl(final UUID applicationID, final String collectionName) {
+ this(new SimpleId(applicationID, TYPE_APPLICATION), collectionName);
+ }
+
+ @Override
+ public String getCollectionName() {
+ return collectionName;
+ }
+
+ @Override
+ public Id getApplication() {
+ return application;
+ }
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( !( o instanceof CollectionScopeImpl) ) {
+ return false;
+ }
+
+ final CollectionScopeImpl collectionVersionScope = (CollectionScopeImpl) o;
+
+ if ( !application.equals( collectionVersionScope.application) ) {
+ return false;
+ }
+
+ if ( !collectionName.equals( collectionVersionScope.collectionName ) ) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(application)
+ .append(collectionName)
+ .toHashCode();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java
index 921777a..74acd09 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java
@@ -20,7 +20,6 @@ package org.apache.usergrid.corepersistence.index;
import com.google.common.base.Optional;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.utils.JsonUtils;
import org.slf4j.Logger;
@@ -51,13 +50,15 @@ public class CollectionSettingsImpl implements CollectionSettings {
@Override
public Optional<Map<String, Object>> getCollectionSettings(final String collectionName ) {
+ // collectionName may be a versioned collection name -- get the base name
+ String baseCollectionName = CollectionVersionUtil.parseVersionedName(collectionName).getCollectionName();
+
String settings;
settings = cache.get(scope);
if( settings == null ) {
- settings = mapManager.getString(collectionName);
-
+ settings = mapManager.getString(baseCollectionName);
}
if (settings != null) {
@@ -77,14 +78,22 @@ public class CollectionSettingsImpl implements CollectionSettings {
@Override
public void putCollectionSettings(final String collectionName, final String collectionSchema ){
- mapManager.putString( collectionName, collectionSchema );
+
+ // collectionName may be a versioned collection name -- get the base name
+ String baseCollectionName = CollectionVersionUtil.parseVersionedName(collectionName).getCollectionName();
+
+ mapManager.putString( baseCollectionName, collectionSchema );
cache.put(scope, collectionSchema);
}
@Override
public void deleteCollectionSettings(final String collectionName){
- mapManager.delete( collectionName );
+
+ // collectionName may be a versioned collection name -- get the base name
+ String baseCollectionName = CollectionVersionUtil.parseVersionedName(collectionName).getCollectionName();
+
+ mapManager.delete( baseCollectionName );
cache.invalidate( scope );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionCache.java
new file mode 100644
index 0000000..e4e8e93
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionCache.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import java.util.concurrent.TimeUnit;
+
+@Singleton
+public class CollectionVersionCache {
+
+ private final Cache<CollectionScope,String> cache;
+
+
+ @Inject
+ public CollectionVersionCache(CollectionVersionFig fig ) {
+ this.cache = CacheBuilder.newBuilder()
+ .maximumSize(fig.getCacheSize())
+ .expireAfterWrite(fig.getCacheTimeout(), TimeUnit.SECONDS).build();
+ }
+
+
+ public void put(CollectionScope key, String value){
+ cache.put(key, value);
+ }
+
+ public String get(CollectionScope key){
+ return cache.getIfPresent(key);
+ }
+
+ public void invalidate(CollectionScope key){
+ cache.invalidate(key);
+ }
+
+ public void invalidateAll(){
+ cache.invalidateAll();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java
new file mode 100644
index 0000000..3bb75c7
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ * Collection version cache config
+ */
+@FigSingleton
+public interface CollectionVersionFig extends GuicyFig {
+
+ String CACHE_SIZE = "usergrid.collection_version_cache_size";
+ String CACHE_TIMEOUT_MS = "usergrid.collection_version_cache_timeout_ms";
+ String TIME_BETWEEN_DELETES_MS = "usergrid.collection_version_time_between_deletes_ms";
+ String DELETES_PER_EVENT = "usergrid.collection_deletes_per_event";
+
+ @Key(CACHE_SIZE)
+ @Default("500")
+ int getCacheSize();
+
+ @Key(CACHE_TIMEOUT_MS)
+ @Default("2000")
+ int getCacheTimeout();
+
+ @Key(TIME_BETWEEN_DELETES_MS)
+ @Default("60000")
+ long getTimeBetweenDeletes();
+
+ @Key(DELETES_PER_EVENT)
+ @Default("10000")
+ int getDeletesPerEvent();
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java
new file mode 100644
index 0000000..9768a55
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.corepersistence.asyncevents.CollectionDeleteTooSoonException;
+
+public interface CollectionVersionManager {
+
+ /**
+ * Get the collection version from the cache.
+ */
+ String getCollectionVersion(final boolean bypassCache);
+
+ String getVersionedCollectionName(final boolean bypassCache);
+
+ String updateCollectionVersion() throws CollectionDeleteTooSoonException;
+
+ Long getTimeLastChanged();
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerFactory.java
new file mode 100644
index 0000000..7e7a2a7
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+
+@Singleton
+public class CollectionVersionManagerFactory {
+
+ private final LoadingCache<CollectionScope,CollectionVersionManager> versionCache;
+
+ @Inject
+ public CollectionVersionManagerFactory(final CollectionVersionFig fig,
+ final MapManagerFactory mapManagerFactory,
+ final CollectionVersionCache collectionVersionCache ){
+ versionCache = CacheBuilder.newBuilder()
+ .maximumSize( fig.getCacheSize() )
+ .expireAfterWrite( fig.getCacheTimeout(), TimeUnit.MILLISECONDS )
+ .build( new CacheLoader<CollectionScope, CollectionVersionManager>() {
+ @Override
+ public CollectionVersionManager load(CollectionScope scope ) throws Exception {
+
+ final MapManager mm = mapManagerFactory
+ .createMapManager( CpNamingUtils.getCollectionVersionMapScope(scope.getApplication()));
+ return new CollectionVersionManagerImpl( scope, mm, collectionVersionCache, fig ) ;
+ }
+ } );
+ }
+
+
+ public CollectionVersionManager getInstance(CollectionScope scope ) {
+ try {
+ return versionCache.get(scope);
+ }catch (ExecutionException e){
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java
new file mode 100644
index 0000000..7ed557c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+import com.google.inject.Inject;
+import org.apache.usergrid.corepersistence.asyncevents.CollectionDeleteTooSoonException;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Cache collection version to reduce load on Cassandra.
+ */
+public class CollectionVersionManagerImpl implements CollectionVersionManager {
+ private static final Logger logger = LoggerFactory.getLogger(CollectionVersionManagerImpl.class );
+
+ private final MapManager mapManager;
+ private final CollectionVersionCache cache;
+ private final CollectionScope scope;
+ private final CollectionVersionFig collectionVersionFig;
+ private final String collectionName;
+
+ private static final String MAP_PREFIX_VERSION = "VERSION:";
+ private static final String MAP_PREFIX_LAST_CHANGED = "LASTCHANGED:";
+
+ @Inject
+ public CollectionVersionManagerImpl(CollectionScope scope, MapManager mapManager, CollectionVersionCache cache, CollectionVersionFig collectionVersionFig) {
+ this.scope = scope;
+ this.mapManager = mapManager;
+ this.cache = cache;
+ this.collectionVersionFig = collectionVersionFig;
+ this.collectionName = scope.getCollectionName();
+ }
+
+ @Override
+ public String getCollectionVersion(final boolean bypassCache) {
+
+ String version = null;
+ if (!bypassCache) {
+ version = cache.get(scope);
+ }
+
+ if( version == null ) {
+ version = mapManager.getString(MAP_PREFIX_VERSION+collectionName);
+ }
+
+ if (version != null) {
+ return version;
+ }else{
+ cache.put(scope, ""); // store empty string here so empty is cached as well
+ }
+
+ return "";
+ }
+
+ @Override
+ public Long getTimeLastChanged() {
+ return mapManager.getLong(MAP_PREFIX_LAST_CHANGED+collectionName);
+ }
+
+ @Override
+ public String getVersionedCollectionName(final boolean bypassCache) {
+ String collectionVersion = getCollectionVersion(bypassCache);
+ return CollectionVersionUtil.buildVersionedNameString(collectionName, collectionVersion, false);
+ }
+
+ // returns old collection version
+ @Override
+ public String updateCollectionVersion() throws CollectionDeleteTooSoonException {
+ // check for time last changed
+ Long timeLastChanged = getTimeLastChanged();
+ long timeBetweenDeletes = collectionVersionFig.getTimeBetweenDeletes();
+ if (timeLastChanged != null) {
+ if (System.currentTimeMillis() - timeLastChanged < timeBetweenDeletes) {
+ // too soon
+ throw new CollectionDeleteTooSoonException(timeLastChanged, timeBetweenDeletes);
+ }
+ }
+
+ String oldCollectionVersion = getCollectionVersion(true);
+ String newCollectionVersion = getNewCollectionVersion();
+ mapManager.putLong(MAP_PREFIX_LAST_CHANGED+collectionName, System.currentTimeMillis());
+ mapManager.putString(MAP_PREFIX_VERSION+collectionName, newCollectionVersion);
+ cache.put(scope, newCollectionVersion);
+ logger.info("Replacing collection version for collection {}, application {}: oldVersion={} newVersion={}",
+ collectionName, scope.getApplication().getUuid(), oldCollectionVersion, newCollectionVersion);
+ return oldCollectionVersion;
+ }
+
+ private static String getNewCollectionVersion() {
+ return UUIDGenerator.newTimeUUID().toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtil.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtil.java
new file mode 100644
index 0000000..46e4e09
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtil.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import com.amazonaws.util.StringUtils;
+import com.google.common.base.Preconditions;
+
+import java.util.regex.Pattern;
+
+public class CollectionVersionUtil {
+ private static final String VERSIONED_NAME_SEPARATOR = "%~!~%";
+
+ public static VersionedCollectionName parseVersionedName(String versionedCollectionNameString) throws IllegalArgumentException {
+ Preconditions.checkNotNull(versionedCollectionNameString, "collection name string is required");
+ String collectionName;
+ String collectionVersion;
+ try {
+ String[] parts = versionedCollectionNameString.split(Pattern.quote(VERSIONED_NAME_SEPARATOR));
+ if (parts.length == 2) {
+ collectionName = parts[0];
+ collectionVersion = parts[1];
+ } else if (parts.length == 1) {
+ collectionName = parts[0];
+ collectionVersion = "";
+ } else {
+ throw new IllegalArgumentException("Invalid format for versioned collection, versionedCollectionNameString=" + versionedCollectionNameString);
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Unable to parse versioned collection, versionedCollectionNameString=" + versionedCollectionNameString, e);
+ }
+ return new VersionedCollectionNameImpl(collectionName, collectionVersion);
+ }
+
+ public static String getBaseCollectionName(String versionedCollectionNameString) throws IllegalArgumentException {
+ return parseVersionedName(versionedCollectionNameString).getCollectionName();
+ }
+
+ public static boolean collectionNameHasVersion(String collectionNameString) {
+ try {
+ VersionedCollectionName parsedName = parseVersionedName(collectionNameString);
+ return !StringUtils.isNullOrEmpty(parsedName.getCollectionVersion());
+ }
+ catch (Exception e) {
+ return false;
+ }
+ }
+
+ public static String buildVersionedNameString(final String baseName, final String collectionVersion,
+ final boolean validateBaseName) throws IllegalArgumentException {
+ Preconditions.checkNotNull(baseName, "base name is required");
+ if (validateBaseName && baseName.contains(VERSIONED_NAME_SEPARATOR)) {
+ throw new IllegalArgumentException("Cannot build versioned name using a base name that already includes the version separator");
+ }
+ if (collectionVersion == null || collectionVersion == "") {
+ return baseName;
+ }
+ return baseName + VERSIONED_NAME_SEPARATOR + collectionVersion;
+ }
+
+ public static VersionedCollectionName createVersionedName(String baseName, String collectionVersion) {
+ return new VersionedCollectionNameImpl(baseName, collectionVersion);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 1b8614f..8473b2e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -36,9 +36,7 @@ import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
-import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapManagerFactory;
-import org.apache.usergrid.persistence.map.MapScope;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
@@ -264,6 +262,10 @@ public class IndexServiceImpl implements IndexService {
final EntityIndex ei = entityIndexFactory.
createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
+ if (logger.isDebugEnabled()) {
+ logger.debug("deIndexEntity: entityId={}:{}, markedVersion={}, otherVersionsSize={}",
+ entityId.getUuid().toString(), entityId.getType(), markedVersion.toString(), allVersionsBeforeMarked.size());
+ }
// use LONG.MAX_VALUE in search edge because this value is not used elsewhere in lower code foe de-indexing
// previously .timstamp() was used on entityId, but some entities do not have type-1 UUIDS ( legacy data)
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionName.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionName.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionName.java
new file mode 100644
index 0000000..87f4c05
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionName.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+public interface VersionedCollectionName {
+
+ String getCollectionName();
+
+ String getCollectionVersion();
+
+ boolean hasVersion();
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionNameImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionNameImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionNameImpl.java
new file mode 100644
index 0000000..5f84e54
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/VersionedCollectionNameImpl.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+
+public class VersionedCollectionNameImpl implements VersionedCollectionName {
+
+ private final String collectionName;
+ private final String collectionVersion;
+
+ public VersionedCollectionNameImpl(final String collectionName, final String collectionVersion ) {
+ Preconditions.checkNotNull(collectionName, "collection name is required");
+ this.collectionName = collectionName;
+ this.collectionVersion = collectionVersion != null ? collectionVersion : "";
+ }
+
+ @Override
+ public String getCollectionName() {
+ return collectionName;
+ }
+
+ @Override
+ public String getCollectionVersion() {
+ return collectionVersion;
+ }
+
+ @Override
+ public boolean hasVersion() {
+ return !collectionVersion.equals("");
+ }
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( !( o instanceof VersionedCollectionNameImpl) ) {
+ return false;
+ }
+
+ final VersionedCollectionNameImpl versionedCollectionName = (VersionedCollectionNameImpl) o;
+
+ if ( !collectionName.equals( versionedCollectionName.collectionName ) ) {
+ return false;
+ }
+ if ( !collectionVersion.equals( versionedCollectionName.collectionVersion ) ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(collectionName)
+ .append(collectionVersion)
+ .toHashCode();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
index 0420a32..13a85c4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
@@ -19,10 +19,7 @@
package org.apache.usergrid.corepersistence.rx.impl;
-
-
-
-import com.google.common.base.Optional;
+import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.google.inject.Singleton;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 9c6e318..e9867b6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -1,4 +1,3 @@
-package org.apache.usergrid.corepersistence.util;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,6 +17,7 @@ package org.apache.usergrid.corepersistence.util;
* under the License.
*/
+package org.apache.usergrid.corepersistence.util;
import java.util.UUID;
@@ -86,6 +86,11 @@ public class CpNamingUtils {
*/
public static String TYPES_BY_UUID_MAP = "zzz_typesbyuuid_zzz";
+ /**
+ * The name of the map that holds our collection->version mapping
+ */
+ public static String VERSION_FOR_COLLECTION_MAP = "zzz_versionforcollection_zzz";
+
/**
* Generate a standard edge name for our graph using the connection name. To be used only for searching. DO NOT use
@@ -317,6 +322,14 @@ public class CpNamingUtils {
/**
+ * Get the map scope for the applicationId to store collection name to collection version mapping
+ */
+ public static MapScope getCollectionVersionMapScope( final Id applicationId ) {
+ return new MapScopeImpl( applicationId, CpNamingUtils.VERSION_FOR_COLLECTION_MAP );
+ }
+
+
+ /**
* Generate either the collection name or connection name from the edgeName
*/
public static String getNameFromEdgeType( final String edgeName ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index ae4623d..a977f31 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -530,6 +530,8 @@ public interface EntityManager {
Object getCollectionSettings( String collectionName );
+ void deleteCollection( String collectionName );
+
public void grantRolePermission( String roleName, String permission ) throws Exception;
public void grantRolePermissions( String roleName, Collection<String> permissions ) throws Exception;
@@ -743,7 +745,7 @@ public interface EntityManager {
/**
* Add a new index to the application for scale
- * @param suffix unique indentifier for additional index
+ * @param newIndexName unique identifier for additional index
* @param shards number of shards
* @param replicas number of replicas
* @param writeConsistency only "one, quorum, or all"
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7d3eb647/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
index 3502581..5917949 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
@@ -522,8 +522,31 @@ public class Results implements Iterable<Entity> {
level = Level.CORE_PROPERTIES;
}
+ // returns null if index out of range
+ public Entity getEntity( final int index ) {
+ if (index < 0) {
+ return null;
+ }
+ if (entities == null) {
+ // single entity
+ if (entity == null || index > 0) {
+ return null;
+ }
+ return entity;
+ } else {
+ if (index >= entities.size()) {
+ return null;
+ }
+ return entities.get(index);
+ }
+ }
+
public void setEntity( final int index, final Entity entity){
- this.entities.set( index, entity );
+ if (entities == null) {
+ this.entity = entity;
+ } else {
+ this.entities.set(index, entity);
+ }
}