You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by md...@apache.org on 2017/08/28 23:24:00 UTC
[14/15] usergrid git commit: Add ability to walk through a collection
and delete all the entities,
optionally up to a certain timestamp. Modeled after reindex services.
Add ability to walk through a collection and delete all the entities, optionally up to a certain timestamp. Modeled after reindex services.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/11823f29
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/11823f29
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/11823f29
Branch: refs/heads/master
Commit: 11823f294dfae762754ef1c4da8a5ee573107968
Parents: 3f7afcd
Author: Mike Dunker <md...@google.com>
Authored: Mon Aug 28 14:46:17 2017 -0700
Committer: Mike Dunker <md...@google.com>
Committed: Mon Aug 28 14:46:17 2017 -0700
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 1 +
.../asyncevents/AsyncEventService.java | 3 +-
.../asyncevents/AsyncEventServiceImpl.java | 67 +++--
.../asyncevents/EventBuilder.java | 13 +-
.../asyncevents/EventBuilderImpl.java | 45 ++-
.../asyncevents/model/EntityDeleteEvent.java | 27 +-
.../index/CollectionDeleteAction.java | 43 +++
.../index/CollectionDeleteRequestBuilder.java | 92 ++++++
.../CollectionDeleteRequestBuilderImpl.java | 146 +++++++++
.../index/CollectionDeleteService.java | 108 +++++++
.../index/CollectionDeleteServiceImpl.java | 299 +++++++++++++++++++
.../index/IndexProcessorFig.java | 9 +
.../index/ReIndexServiceImpl.java | 2 +-
.../persistence/CollectionDeleteTest.java | 266 +++++++++++++++++
.../resources/usergrid-custom-test.properties | 2 +
.../rest/applications/CollectionResource.java | 130 +++++++-
16 files changed, 1217 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index ec6b775..a0748e6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -140,6 +140,7 @@ public class CoreModule extends AbstractModule {
bind( ReIndexService.class ).to( ReIndexServiceImpl.class );
+ bind( CollectionDeleteService.class ).to( CollectionDeleteServiceImpl.class );
bind( ExportService.class ).to( ExportServiceImpl.class );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 9e346cf..04eaf4c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.asyncevents;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteAction;
import org.apache.usergrid.corepersistence.index.ReIndexAction;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
@@ -33,7 +34,7 @@ import java.util.UUID;
/**
* Low level queue service for events in the entity. These events are fire and forget, and will always be asynchronous
*/
-public interface AsyncEventService extends ReIndexAction {
+public interface AsyncEventService extends ReIndexAction, CollectionDeleteAction {
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 428772f..3d06cae 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -75,9 +75,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.commons.lang.StringUtils.indexOf;
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
/**
* TODO, this whole class is becoming a nightmare.
@@ -106,7 +103,6 @@ public class AsyncEventServiceImpl implements AsyncEventService {
public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
public static final String QUEUE_NAME_DELETE = "delete";
- public static final String DEAD_LETTER_SUFFIX = "_dead";
private final LegacyQueueManager indexQueue;
@@ -522,8 +518,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
applicationScope);
- logger.trace("Offering InitializeApplicationIndexEvent for {}:{}",
- applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering InitializeApplicationIndexEvent for {}:{}",
+ applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType());
+ }
offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), AsyncEventQueueType.REGULAR);
@@ -535,8 +533,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
final Entity entity, long updatedAfter) {
- logger.trace("Offering EntityIndexEvent for {}:{}",
- entity.getId().getUuid(), entity.getId().getType());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering EntityIndexEvent for {}:{}",
+ entity.getId().getUuid(), entity.getId().getType());
+ }
offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),
new EntityIdScope(applicationScope, entity.getId()), updatedAfter));
@@ -577,8 +577,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
final Entity entity,
final Edge newEdge) {
- logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
- newEdge.getType(), entity.getId().getUuid(), entity.getId().getType());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
+ newEdge.getType(), entity.getId().getUuid(), entity.getId().getType());
+ }
offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge ));
@@ -612,8 +614,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
public void queueDeleteEdge(final ApplicationScope applicationScope,
final Edge edge) {
- logger.trace("Offering EdgeDeleteEvent for type {} to target {}:{}",
- edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering EdgeDeleteEvent for type {} to target {}:{}",
+ edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType());
+ }
// sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ), AsyncEventQueueType.DELETE );
@@ -675,7 +679,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
//send to the topic so all regions index the batch
- logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId );
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId);
+ }
offerTopic( elasticsearchIndexEvent, queueType );
}
@@ -749,8 +755,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
// queue the de-index of old versions to the topic so cleanup happens in all regions
- logger.trace("Offering DeIndexOldVersionsEvent for app {} {}:{}",
- applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering DeIndexOldVersionsEvent for app {} {}:{}",
+ applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType());
+ }
offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
new EntityIdScope( applicationScope, entityId), markedVersion), AsyncEventQueueType.DELETE );
@@ -810,7 +818,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
@Override
public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
- logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType());
+ }
// sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ),
@@ -830,12 +840,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
+ final boolean isCollectionDelete = entityDeleteEvent.isCollectionDelete();
+ final long updatedBefore = entityDeleteEvent.getUpdatedBefore();
if (logger.isDebugEnabled()) {
- logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
+ logger.debug("Deleting entity id from index in app scope {} with entityId {}, isCollectionDelete {}, updatedBefore {}",
+ applicationScope, entityId, isCollectionDelete, updatedBefore);
}
- return eventBuilder.buildEntityDelete( applicationScope, entityId );
+ return eventBuilder.buildEntityDelete( applicationScope, entityId, isCollectionDelete, updatedBefore );
}
@@ -1192,11 +1205,27 @@ public class AsyncEventServiceImpl implements AsyncEventService {
});
- logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size());
+ }
offerBatch( batch, queueType );
}
+ public void deleteBatch(final List<EdgeScope> edges, final long updatedBefore, AsyncEventQueueType queueType) {
+
+ final List<EntityDeleteEvent> batch = new ArrayList<>();
+ edges.forEach(e -> {
+
+ //change to id scope to avoid serialization issues
+ batch.add(new EntityDeleteEvent(queueFig.getPrimaryRegion(),
+ new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), true, updatedBefore));
+
+ });
+
+ offerBatch(batch, queueType);
+ }
+
public class IndexEventResult{
private final Optional<IndexOperationMessage> indexOperationMessage;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index ebb9190..4bb6312 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -63,7 +63,18 @@ public interface EventBuilder {
* @param entityId
* @return
*/
- IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId );
+ IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId);
+
+ /**
+ * Return a bin with 2 observable streams for entity delete.
+ * @param applicationScope
+ * @param entityId
+ * @param isCollectionDelete
+ * @param updatedBefore
+ * @return
+ */
+ IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId,
+ boolean isCollectionDelete, long updatedBefore);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 5051598..7c72b72 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -24,7 +24,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import org.antlr.misc.Graph;
import org.apache.usergrid.corepersistence.index.*;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -128,15 +130,48 @@ public class EventBuilderImpl implements EventBuilder {
//it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
@Override
- public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
+ public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
+ return buildEntityDelete(applicationScope, entityId, false, Long.MAX_VALUE);
+ }
+
+ @Override
+ public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId,
+ final boolean isCollectionDelete, final long updatedBefore) {
if (logger.isDebugEnabled()) {
- logger.debug("Deleting entity id (marked versions) from index in app scope {} with entityId {}",
- applicationScope, entityId);
+ logger.debug("Deleting entity id (marked versions) from index in app scope {} with entityId {}, isCollectionDelete {}, updatedBefore={}",
+ applicationScope, entityId, isCollectionDelete, updatedBefore);
+ }
+
+ final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
+ final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
+
+ boolean deleteEntity = ecm.load(entityId).
+ map(entity -> {
+ final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED );
+
+ boolean willDelete = false;
+ if ( modified == null ) {
+ // We don't have a modified field, so we can't check, so delete it
+ willDelete = true;
+ } else if (modified.getValue() <= updatedBefore) {
+ willDelete = true;
+ }
+
+ if (isCollectionDelete && willDelete) {
+ // need to mark for deletion
+ ecm.mark(entityId, null)
+ .mergeWith(gm.markNode(entityId, CpNamingUtils.createGraphOperationTimestamp()))
+ .toBlocking().last();
+ }
+
+ return willDelete;
+ }).toBlocking().firstOrDefault(true);
+
+ if (!deleteEntity) {
+ return new IndexOperationMessage();
}
- final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
- final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
MvccLogEntry mostRecentToDelete =
ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() )
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
index 01d2ba8..1589632 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
@@ -24,6 +24,7 @@ import org.apache.usergrid.persistence.collection.serialization.impl.migration.E
/**
* Event that will signal to finish the actual delete (post-mark delete) for an Entity
+ * It will mark if this is for a collection delete
*/
public final class EntityDeleteEvent extends AsyncEvent {
@@ -31,17 +32,41 @@ public final class EntityDeleteEvent extends AsyncEvent {
@JsonProperty
protected EntityIdScope entityIdScope;
+ @JsonProperty
+ private long updatedBefore;
+
+ @JsonProperty
+ private boolean isCollectionDelete;
+
public EntityDeleteEvent() {
super();
}
public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope) {
super(sourceRegion);
- this.entityIdScope = entityIdScope;
+ this.entityIdScope = entityIdScope;
+ this.updatedBefore = Long.MAX_VALUE;
+ this.isCollectionDelete = false;
+ }
+
+ public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope,
+ boolean isCollectionDelete, long updatedBefore) {
+ super(sourceRegion);
+ this.entityIdScope = entityIdScope;
+ this.updatedBefore = updatedBefore;
+ this.isCollectionDelete = isCollectionDelete;
}
public EntityIdScope getEntityIdScope() {
return entityIdScope;
}
+
+ public long getUpdatedBefore() {
+ return updatedBefore;
+ }
+
+ public boolean isCollectionDelete() {
+ return isCollectionDelete;
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java
new file mode 100644
index 0000000..7bad06b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import java.util.List;
+
+
+/**
+ * Callback to perform a collection delete operation based on an scope during bulk collection delete operations
+ */
+public interface CollectionDeleteAction {
+
+ /**
+ * Delete a batch list of entities.
+ * @param edges
+ * @param updatedBefore
+ * @param queueType
+ */
+ void deleteBatch(final List<EdgeScope> edges, final long updatedBefore, AsyncEventQueueType queueType);
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java
new file mode 100644
index 0000000..4abdfea
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import com.google.common.base.Optional;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * A builder interface to build our collection delete request
+ */
+public interface CollectionDeleteRequestBuilder {
+
+ /**
+ * Set the application id
+ */
+ CollectionDeleteRequestBuilder withApplicationId(final UUID applicationId);
+
+ /**
+ * Set the collection name.
+ * @param collectionName
+ * @return
+ */
+ CollectionDeleteRequestBuilder withCollection(final String collectionName);
+
+ /**
+ * Set our cursor to resume processing
+ * @param cursor
+ * @return
+ */
+ CollectionDeleteRequestBuilder withCursor(final String cursor);
+
+
+ CollectionDeleteRequestBuilder withDelay(int delayTimer, TimeUnit timeUnit);
+
+ /**
+ * Set the timestamp to delete entities updated <= this timestamp
+ * @param timestamp
+ * @return
+ */
+ CollectionDeleteRequestBuilder withEndTimestamp(final Long timestamp);
+
+
+ Optional<Integer> getDelayTimer();
+
+ Optional<TimeUnit> getTimeUnitOptional();
+
+ /**
+ * Get the application scope
+ * @return
+ */
+ Optional<ApplicationScope> getApplicationScope();
+
+ /**
+ * Get the collection name
+ * @return
+ */
+ Optional<String> getCollectionName();
+
+ /**
+ * Get the cursor
+ * @return
+ */
+ Optional<String> getCursor();
+
+ /**
+ * Get the latest timestamp to delete
+ * @return
+ */
+ Optional<Long> getEndTimestamp();
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java
new file mode 100644
index 0000000..890b770
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import com.google.common.base.Optional;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * collection delete service request builder
+ */
+public class CollectionDeleteRequestBuilderImpl implements CollectionDeleteRequestBuilder {
+
+ private Optional<UUID> withApplicationId = Optional.absent();
+ private Optional<String> withCollectionName = Optional.absent();
+ private Optional<String> cursor = Optional.absent();
+ private Optional<Long> endTimestamp = Optional.absent();
+ private Optional<Integer> delayTimer = Optional.absent();
+ private Optional<TimeUnit> timeUnitOptional = Optional.absent();
+
+
+ /***
+ *
+ * @param applicationId The application id
+ * @return
+ */
+ @Override
+ public CollectionDeleteRequestBuilder withApplicationId( final UUID applicationId ) {
+ this.withApplicationId = Optional.fromNullable( applicationId );
+ return this;
+ }
+
+
+ /**
+ * the collection name
+ * @param collectionName
+ * @return
+ */
+ @Override
+ public CollectionDeleteRequestBuilder withCollection( final String collectionName ) {
+ this.withCollectionName = Optional.fromNullable( CpNamingUtils.getEdgeTypeFromCollectionName( collectionName.toLowerCase() ) );
+ return this;
+ }
+
+
+ /**
+ * The cursor
+ * @param cursor
+ * @return
+ */
+ @Override
+ public CollectionDeleteRequestBuilder withCursor( final String cursor ) {
+ this.cursor = Optional.fromNullable( cursor );
+ return this;
+ }
+
+
+ /**
+ * Determines whether we should tack on a delay for collection delete and for how long if we do. Also
+ * allowed to specify how throttled back it should be.
+ * @param delayTimer
+ * @param timeUnit
+ * @return
+ */
+ @Override
+ public CollectionDeleteRequestBuilder withDelay( final int delayTimer, final TimeUnit timeUnit ){
+ this.delayTimer = Optional.fromNullable( delayTimer );
+ this.timeUnitOptional = Optional.fromNullable( timeUnit );
+
+ return this;
+ }
+
+
+ /**
+ * Set end timestamp in epoch time. Only entities created before this time will be processed for deletion
+ * @param timestamp
+ * @return
+ */
+ @Override
+ public CollectionDeleteRequestBuilder withEndTimestamp( final Long timestamp ) {
+ this.endTimestamp = Optional.fromNullable( timestamp );
+ return this;
+ }
+
+
+ @Override
+ public Optional<Integer> getDelayTimer() {
+ return delayTimer;
+ }
+
+ @Override
+ public Optional<TimeUnit> getTimeUnitOptional() {
+ return timeUnitOptional;
+ }
+
+
+ @Override
+ public Optional<ApplicationScope> getApplicationScope() {
+
+ if ( this.withApplicationId.isPresent() ) {
+ return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) );
+ }
+
+ return Optional.absent();
+ }
+
+
+ @Override
+ public Optional<String> getCollectionName() {
+ return withCollectionName;
+ }
+
+
+ @Override
+ public Optional<String> getCursor() {
+ return cursor;
+ }
+
+
+ @Override
+ public Optional<Long> getEndTimestamp() {
+ return endTimestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
new file mode 100644
index 0000000..c939dd3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+/**
+ * An interface for re-indexing all entities in an application
+ */
+public interface CollectionDeleteService {
+
+
+ /**
+ * Perform a collection delete via service
+ *
+ * @param collectionDeleteRequestBuilder The builder to build the request
+ */
+ CollectionDeleteStatus deleteCollection(final CollectionDeleteRequestBuilder collectionDeleteRequestBuilder);
+
+
+ /**
+ * Generate a build for the collection delete
+ */
+ CollectionDeleteRequestBuilder getBuilder();
+
+
+ /**
+ * Get the status of a job
+ * @param jobId The jobId returned during the collection delete
+ * @return
+ */
+ CollectionDeleteStatus getStatus(final String jobId);
+
+
+ /**
+ * The response when requesting a collection delete operation
+ */
+ public class CollectionDeleteStatus {
+ final String jobId;
+ final Status status;
+ final long numberProcessed;
+ final long lastUpdated;
+
+
+ public CollectionDeleteStatus(final String jobId, final Status status, final long numberProcessed,
+ final long lastUpdated ) {
+ this.jobId = jobId;
+ this.status = status;
+ this.numberProcessed = numberProcessed;
+ this.lastUpdated = lastUpdated;
+ }
+
+
+ /**
+ * Get the jobId used to resume this operation
+ */
+ public String getJobId() {
+ return jobId;
+ }
+
+
+ /**
+ * Get the last updated time, as a long
+ * @return
+ */
+ public long getLastUpdated() {
+ return lastUpdated;
+ }
+
+
+ /**
+ * Get the number of records processed
+ * @return
+ */
+ public long getNumberProcessed() {
+ return numberProcessed;
+ }
+
+
+ /**
+ * Get the status
+ * @return
+ */
+ public Status getStatus() {
+ return status;
+ }
+ }
+
+ enum Status{
+ STARTED, INPROGRESS, COMPLETE, UNKNOWN;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
new file mode 100644
index 0000000..7b3e324
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializerUtil;
+import org.apache.usergrid.corepersistence.pipeline.read.CursorSeek;
+import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.StringUtils;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.utils.InflectionUtils;
+import org.apache.usergrid.utils.JsonUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+import static com.google.common.base.Optional.fromNullable;
+
+
+@Singleton
+public class CollectionDeleteServiceImpl implements CollectionDeleteService {
+
+ private static final Logger logger = LoggerFactory.getLogger( CollectionDeleteServiceImpl.class );
+
+ private static final MapScope RESUME_MAP_SCOPE =
+ new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "collectiondeleteresume" );
+
+ //Keep cursors to resume collection delete for 10 days.
+ private static final int CURSOR_TTL = 60 * 60 * 24 * 10;
+
+ private static final String MAP_CURSOR_KEY = "cursor";
+ private static final String MAP_COUNT_KEY = "count";
+ private static final String MAP_STATUS_KEY = "status";
+ private static final String MAP_UPDATED_KEY = "lastUpdated";
+
+
+ private final AllApplicationsObservable allApplicationsObservable;
+ private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+ private final AllEntityIdsObservable allEntityIdsObservable;
+ private final IndexProcessorFig indexProcessorFig;
+ private final MapManager mapManager;
+ private final MapManagerFactory mapManagerFactory;
+ private final AsyncEventService indexService;
+ private final EntityIndexFactory entityIndexFactory;
+ private final CollectionSettingsFactory collectionSettingsFactory;
+
+
+ @Inject
+ public CollectionDeleteServiceImpl(final EntityIndexFactory entityIndexFactory,
+ final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final AllEntityIdsObservable allEntityIdsObservable,
+ final MapManagerFactory mapManagerFactory,
+ final AllApplicationsObservable allApplicationsObservable,
+ final IndexProcessorFig indexProcessorFig,
+ final CollectionSettingsFactory collectionSettingsFactory,
+ final AsyncEventService indexService ) {
+ this.entityIndexFactory = entityIndexFactory;
+ this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+ this.allEntityIdsObservable = allEntityIdsObservable;
+ this.allApplicationsObservable = allApplicationsObservable;
+ this.indexProcessorFig = indexProcessorFig;
+ this.indexService = indexService;
+ this.collectionSettingsFactory = collectionSettingsFactory;
+ this.mapManagerFactory = mapManagerFactory;
+ this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
+ }
+
+
+ //TODO: optional delay, param.
+ @Override
+ public CollectionDeleteStatus deleteCollection( final CollectionDeleteRequestBuilder collectionDeleteRequestBuilder) {
+
+ final AtomicInteger count = new AtomicInteger();
+
+ final Optional<EdgeScope> cursor = parseCursor( collectionDeleteRequestBuilder.getCursor() );
+
+ final CursorSeek<Edge> cursorSeek = getResumeEdge( cursor );
+
+ final Optional<Integer> delayTimer = collectionDeleteRequestBuilder.getDelayTimer();
+
+ final Optional<TimeUnit> timeUnitOptional = collectionDeleteRequestBuilder.getTimeUnitOptional();
+
+ Optional<ApplicationScope> appId = collectionDeleteRequestBuilder.getApplicationScope();
+
+ Preconditions.checkArgument(collectionDeleteRequestBuilder.getCollectionName().isPresent(),
+ "You must specify a collection name");
+ String collectionName = collectionDeleteRequestBuilder.getCollectionName().get();
+
+ Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()),
+ "You cannot specify an app id and a cursor. When resuming with cursor you must omit the appid." );
+ Preconditions.checkArgument( cursor.isPresent() || appId.isPresent(),
+ "Either application ID or cursor is required.");
+
+ ApplicationScope applicationScope;
+ if (appId.isPresent()) {
+ applicationScope = appId.get();
+ } else { // cursor is present
+ applicationScope = cursor.get().getApplicationScope();
+ }
+
+
+ final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+
+ // default to current time
+ final long endTimestamp = collectionDeleteRequestBuilder.getEndTimestamp().or( System.currentTimeMillis() );
+
+ String pluralizedCollectionName = InflectionUtils.pluralize(CpNamingUtils.getNameFromEdgeType(collectionName));
+
+ CollectionSettings collectionSettings =
+ collectionSettingsFactory.getInstance(new CollectionSettingsScopeImpl(applicationScope.getApplication(), pluralizedCollectionName));
+
+ Optional<Map<String, Object>> existingSettings =
+ collectionSettings.getCollectionSettings( pluralizedCollectionName );
+
+ if ( existingSettings.isPresent() ) {
+
+ Map jsonMapData = existingSettings.get();
+
+ jsonMapData.put( "lastCollectionClear", Instant.now().toEpochMilli() );
+
+ collectionSettings.putCollectionSettings(
+ pluralizedCollectionName, JsonUtils.mapToJsonString(jsonMapData ) );
+ }
+
+ allEntityIdsObservable.getEdgesToEntities( Observable.just(applicationScope),
+ fromNullable(collectionName), cursorSeek.getSeekValue() )
+ .buffer( indexProcessorFig.getCollectionDeleteBufferSize())
+ .doOnNext( edgeScopes -> {
+ logger.info("Sending batch of {} to be deleted.", edgeScopes.size());
+ indexService.deleteBatch(edgeScopes, endTimestamp, AsyncEventQueueType.DELETE);
+ count.addAndGet(edgeScopes.size() );
+ if( edgeScopes.size() > 0 ) {
+ writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
+ }
+ writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); })
+ .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
+ .subscribeOn( Schedulers.io() ).subscribe();
+
+
+ return new CollectionDeleteStatus( jobId, Status.STARTED, 0, 0 );
+ }
+
+
+ @Override
+ public CollectionDeleteRequestBuilder getBuilder() {
+ return new CollectionDeleteRequestBuilderImpl();
+ }
+
+
+ @Override
+ public CollectionDeleteStatus getStatus( final String jobId ) {
+ Preconditions.checkNotNull( jobId, "jobId must not be null" );
+ return getCollectionDeleteResponse( jobId );
+ }
+
+
+ /**
+ * Get the resume edge scope
+ *
+ * @param edgeScope The optional edge scope from the cursor
+ */
+ private CursorSeek<Edge> getResumeEdge( final Optional<EdgeScope> edgeScope ) {
+
+
+ if ( edgeScope.isPresent() ) {
+ return new CursorSeek<>( Optional.of( edgeScope.get().getEdge() ) );
+ }
+
+ return new CursorSeek<>( Optional.absent() );
+ }
+
+
+ /**
+ * Swap our cursor for an optional edgescope
+ */
+ private Optional<EdgeScope> parseCursor( final Optional<String> cursor ) {
+
+ if ( !cursor.isPresent() ) {
+ return Optional.absent();
+ }
+
+ //get our cursor
+ final String persistedCursor = mapManager.getString( cursor.get() );
+
+ if ( persistedCursor == null ) {
+ return Optional.absent();
+ }
+
+ final JsonNode node = CursorSerializerUtil.fromString( persistedCursor );
+
+ final EdgeScope edgeScope = EdgeScopeSerializer.INSTANCE.fromJsonNode( node, CursorSerializerUtil.getMapper() );
+
+ return Optional.of( edgeScope );
+ }
+
+
+ /**
+ * Write the cursor state to the map in cassandra
+ */
+ private void writeCursorState( final String jobId, final EdgeScope edge ) {
+
+ final JsonNode node = EdgeScopeSerializer.INSTANCE.toNode( CursorSerializerUtil.getMapper(), edge );
+
+ final String serializedState = CursorSerializerUtil.asString( node );
+
+ mapManager.putString( jobId + MAP_CURSOR_KEY, serializedState, CURSOR_TTL);
+ }
+
+
+ /**
+ * Write our state meta data into cassandra so everyone can see it
+ * @param jobId
+ * @param status
+ * @param processedCount
+ * @param lastUpdated
+ */
+ private void writeStateMeta( final String jobId, final Status status, final long processedCount,
+ final long lastUpdated ) {
+
+ if(logger.isDebugEnabled()) {
+ logger.debug( "Flushing state for jobId {}, status {}, processedCount {}, lastUpdated {}",
+ jobId, status, processedCount, lastUpdated);
+ }
+
+ mapManager.putString( jobId + MAP_STATUS_KEY, status.name() );
+ mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
+ mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
+ }
+
+
+ /**
+ * Get the index response from the jobId
+ * @param jobId
+ * @return
+ */
+ private CollectionDeleteStatus getCollectionDeleteResponse( final String jobId ) {
+
+ final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY );
+
+ if(stringStatus == null){
+ return new CollectionDeleteStatus( jobId, Status.UNKNOWN, 0, 0 );
+ }
+
+ final Status status = Status.valueOf( stringStatus );
+
+ final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
+ final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY );
+
+ return new CollectionDeleteStatus( jobId, status, processedCount, lastUpdated );
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index eb63056..948e106 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -56,6 +56,8 @@ public interface IndexProcessorFig extends GuicyFig {
String REINDEX_BUFFER_SIZE = "elasticsearch.reindex.buffer_size";
+ String COLLECTION_DELETE_BUFFER_SIZE = "elasticsearch.collection_delete.buffer_size";
+
String REINDEX_CONCURRENCY_FACTOR = "elasticsearch.reindex.concurrency.factor";
@@ -157,6 +159,13 @@ public interface IndexProcessorFig extends GuicyFig {
int getReindexConcurrencyFactor();
/**
+ * Number of parallel buffers during collection delete
+ */
+ @Default("500")
+ @Key(COLLECTION_DELETE_BUFFER_SIZE)
+ int getCollectionDeleteBufferSize();
+
+ /**
* Flag to resolve the LOCAL queue implementation service synchronously.
*/
@Default("false")
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index c7371b3..05602fc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -352,7 +352,7 @@ public class ReIndexServiceImpl implements ReIndexService {
final Status status = Status.valueOf( stringStatus );
final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
- final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY );
+ final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY );
return new ReIndexStatus( jobId, status, processedCount, lastUpdated );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
new file mode 100644
index 0000000..ddf2c68
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence;
+
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.inject.Injector;
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.cassandra.SpringResource;
+import org.apache.usergrid.corepersistence.index.*;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+
+@NotThreadSafe
+public class CollectionDeleteTest extends AbstractCoreIT {
+ private static final Logger logger = LoggerFactory.getLogger( CollectionDeleteTest.class );
+
+ private static final MetricRegistry registry = new MetricRegistry();
+
+
+ private static final int ENTITIES_TO_DELETE = 1000;
+ private static final int ENTITIES_TO_ADD_AFTER_TIME = 3;
+
+
+ @Before
+ public void startReporting() {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Starting metrics reporting");
+ }
+ }
+
+
+ @After
+ public void printReport() {
+ logger.debug( "Printing metrics report" );
+ }
+
+
+ @Test( timeout = 240000 )
+ public void clearOneCollection() throws Exception {
+
+ logger.info( "Started clearOneCollection()" );
+
+ String rand = RandomStringUtils.randomAlphanumeric( 5 );
+ final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
+
+ final EntityManager em = setup.getEmf().getEntityManager( appId );
+
+ final CollectionDeleteService collectionDeleteService = setup.getInjector().getInstance( CollectionDeleteService.class );
+
+ // ----------------- create a bunch of entities
+
+ Map<String, Object> entityMap = new HashMap<String, Object>() {{
+ put( "key1", 1000 );
+ put( "key2", 2000 );
+ put( "key3", "Some value" );
+ }};
+
+ String collectionName = "items";
+ String itemType = "item";
+
+
+ List<EntityRef> entityRefs = new ArrayList<EntityRef>();
+ for ( int i = 0; i < ENTITIES_TO_DELETE; i++ ) {
+
+ final Entity entity;
+
+ try {
+ entityMap.put( "key", i );
+ entity = em.create(itemType, entityMap);
+ }
+ catch ( Exception ex ) {
+ throw new RuntimeException( "Error creating entity", ex );
+ }
+
+ entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+ if ( i % 10 == 0 ) {
+ logger.info( "Created {} entities", i );
+ }
+ }
+
+ logger.info("Created {} entities", ENTITIES_TO_DELETE);
+ long timeFirstPutDone = System.currentTimeMillis();
+ logger.info("timeFirstPutDone={}", timeFirstPutDone);
+
+ for (int i = 0; i < ENTITIES_TO_ADD_AFTER_TIME; i++) {
+
+ final Entity entity;
+
+ try {
+ entityMap.put( "key", ENTITIES_TO_DELETE + i );
+ entity = em.create(itemType, entityMap);
+ }
+ catch ( Exception ex ) {
+ throw new RuntimeException( "Error creating entity", ex );
+ }
+
+ entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+ if ( i % 10 == 0 ) {
+ logger.info( "Created {} entities after delete time", i );
+ }
+
+ }
+ logger.info("Created {} entities after delete time", ENTITIES_TO_ADD_AFTER_TIME);
+
+
+ app.waitForQueueDrainAndRefreshIndex(5000);
+
+ final CollectionDeleteRequestBuilder builder =
+ collectionDeleteService.getBuilder()
+ .withApplicationId( em.getApplicationId() )
+ .withCollection(collectionName)
+ .withEndTimestamp(timeFirstPutDone);
+
+ CollectionDeleteService.CollectionDeleteStatus status = collectionDeleteService.deleteCollection(builder);
+
+ assertNotNull( status.getJobId(), "JobId is present" );
+
+ logger.info( "Delete collection" );
+
+
+ waitForDelete( status, collectionDeleteService );
+
+ app.waitForQueueDrainAndRefreshIndex(15000);
+
+ // ----------------- test that we can read the entries after the timestamp
+
+ readData( em, collectionName,ENTITIES_TO_ADD_AFTER_TIME);
+ }
+
+ /**
+ * Wait for the delete to occur
+ */
+ private void waitForDelete( final CollectionDeleteService.CollectionDeleteStatus status, final CollectionDeleteService collectionDeleteService )
+ throws InterruptedException, IllegalArgumentException {
+ if (status != null) {
+ logger.info("waitForDelete: jobID={}", status.getJobId());
+ } else {
+ logger.info("waitForDelete: error, status = null");
+ throw new IllegalArgumentException("collectionDeleteStatus = null");
+ }
+ while ( true ) {
+
+ try {
+ final CollectionDeleteService.CollectionDeleteStatus updatedStatus =
+ collectionDeleteService.getStatus( status.getJobId() );
+
+ if (updatedStatus == null) {
+ logger.info("waitForDelete: updated status is null");
+ } else {
+ logger.info("waitForDelete: status={} numberProcessed={}",
+ updatedStatus.getStatus().toString(), updatedStatus.getNumberProcessed());
+
+ if ( updatedStatus.getStatus() == CollectionDeleteService.Status.COMPLETE ) {
+ break;
+ }
+ }
+ }
+ catch ( IllegalArgumentException iae ) {
+ //swallow. Thrown if our job can't be found. I.E hasn't updated yet
+ }
+
+
+ Thread.sleep( 1000 );
+ }
+ }
+
+
+ private int readData(EntityManager em, String collectionName, int expectedEntities)
+ throws Exception {
+
+ app.waitForQueueDrainAndRefreshIndex();
+
+ Results results = em.getCollection(em.getApplicationRef(), collectionName, null, expectedEntities,
+ Query.Level.ALL_PROPERTIES, false);
+
+ int count = 0;
+ while ( true ) {
+
+ if (results.getEntities().size() == 0) {
+ break;
+ }
+
+ UUID lastEntityUUID = null;
+ for ( Entity e : results.getEntities() ) {
+
+ assertEquals(2000, e.getProperty("key2"));
+
+ if (count % 100 == 0) {
+ logger.info("read {} entities", count);
+ }
+ lastEntityUUID = e.getUuid();
+ count++;
+ }
+
+ results = em.getCollection(em.getApplicationRef(), collectionName, lastEntityUUID, expectedEntities,
+ Query.Level.ALL_PROPERTIES, false);
+
+ }
+ logger.info("read {} total entities", count);
+
+ assertEquals( "Did not get expected entities", expectedEntities, count );
+ return count;
+ }
+
+ private int countEntities( EntityManager em, String collectionName, int expectedEntities)
+ throws Exception {
+
+ app.waitForQueueDrainAndRefreshIndex();
+
+ Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
+ Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
+
+ int count = 0;
+ while ( true ) {
+
+ count += results.size();
+
+
+ if ( results.hasCursor() ) {
+ logger.info( "Counted {} : query again with cursor", count );
+ q.setCursor( results.getCursor() );
+ results = em.searchCollection( em.getApplicationRef(), collectionName, q );
+ }
+ else {
+ break;
+ }
+ }
+
+ assertEquals( "Did not get expected entities", expectedEntities, count );
+ return count;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/query-validator/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------
diff --git a/stack/query-validator/src/test/resources/usergrid-custom-test.properties b/stack/query-validator/src/test/resources/usergrid-custom-test.properties
index bc1ba56..c8e3eee 100644
--- a/stack/query-validator/src/test/resources/usergrid-custom-test.properties
+++ b/stack/query-validator/src/test/resources/usergrid-custom-test.properties
@@ -30,3 +30,5 @@ usergrid.sysadmin.login.allowed=true
# This property is required to be set and cannot be defaulted anywhere
usergrid.cluster_name=usergrid
+
+elasticsearch.queue_impl=LOCAL
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
index b8c1caa..c9174c1 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
@@ -18,19 +18,19 @@
package org.apache.usergrid.rest.applications;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
+import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.PathSegment;
import javax.ws.rs.core.UriInfo;
+import com.google.common.base.Preconditions;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteRequestBuilder;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteRequestBuilderImpl;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteService;
+import org.apache.usergrid.persistence.index.utils.ConversionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@@ -48,6 +48,9 @@ import org.apache.usergrid.services.ServicePayload;
import com.fasterxml.jackson.jaxrs.json.annotation.JSONP;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* A collection resource that stands before the Service Resource. If it cannot find
@@ -61,6 +64,9 @@ import com.fasterxml.jackson.jaxrs.json.annotation.JSONP;
})
public class CollectionResource extends ServiceResource {
+ private static final Logger logger = LoggerFactory.getLogger( CollectionResource.class );
+ private static final String UPDATED_BEFORE_FIELD = "updatedBefore";
+
public CollectionResource() {
}
@@ -190,6 +196,61 @@ public class CollectionResource extends ServiceResource {
}
+ @PUT
+ @Path("{itemName}/_clear")
+ @Produces({MediaType.APPLICATION_JSON, "application/javascript"})
+ @RequireApplicationAccess
+ @JSONP
+ public ApiResponse clearCollectionPut(
+ final Map<String, Object> payload,
+ @PathParam("itemName") final String collectionName,
+ @QueryParam("callback") @DefaultValue("callback") String callback
+ ) throws Exception {
+
+ logger.info("Clearing collection {} for application {}", collectionName, getApplicationId().toString());
+
+ final CollectionDeleteRequestBuilder request = createRequest()
+ .withApplicationId(getApplicationId())
+ .withCollection(collectionName);
+
+ return executeResumeAndCreateResponse(payload, request, callback);
+
+ }
+
+
+ @GET
+ @Path( "{itemName}/_clear/{jobId}")
+ @Produces({MediaType.APPLICATION_JSON,"application/javascript"})
+ @RequireApplicationAccess
+ @JSONP
+ public ApiResponse clearCollectionJobGet(
+ @Context UriInfo ui,
+ @PathParam("itemName") PathSegment itemName,
+ @PathParam("jobId") String jobId,
+ @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
+
+ if(logger.isTraceEnabled()){
+ logger.trace( "CollectionResource.clearCollectionJobGet" );
+ }
+
+ Preconditions
+ .checkNotNull(jobId, "path param jobId must not be null" );
+
+ CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().getStatus(jobId);
+
+ final ApiResponse response = createApiResponse();
+
+ response.setAction( "clear collection" );
+ response.setProperty( "jobId", status.getJobId() );
+ response.setProperty( "status", status.getStatus() );
+ response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
+ response.setProperty( "numberCheckedForDeletion", status.getNumberProcessed() );
+ response.setSuccess();
+
+ return response;
+ }
+
+
// TODO: this can't be controlled and until it can be controlled we shouldn' allow muggles to do this.
// So system access only.
// TODO: use scheduler here to get around people sending a reindex call 30 times.
@@ -210,4 +271,57 @@ public class CollectionResource extends ServiceResource {
services.getApplicationId().toString(),itemName.getPath(),false,callback );
}
+
+ private CollectionDeleteService getCollectionDeleteService() {
+ return injector.getInstance( CollectionDeleteService.class );
+ }
+
+
+ private CollectionDeleteRequestBuilder createRequest() {
+ return new CollectionDeleteRequestBuilderImpl();
+ }
+
+
+ private ApiResponse executeResumeAndCreateResponse( final Map<String, Object> payload,
+ final CollectionDeleteRequestBuilder request,
+ final String callback ) {
+
+ Map<String,Object> newPayload = payload;
+ if(newPayload == null || !payload.containsKey( UPDATED_BEFORE_FIELD )){
+ newPayload = new HashMap<>(1);
+ newPayload.put(UPDATED_BEFORE_FIELD,Long.MAX_VALUE);
+ }
+
+ Preconditions.checkArgument(newPayload.get(UPDATED_BEFORE_FIELD) instanceof Number,
+ "The field \"updatedBefore\" in the payload must be a timestamp" );
+
+ //add our updated timestamp to the request
+ if ( newPayload.containsKey( UPDATED_BEFORE_FIELD ) ) {
+ final long timestamp = ConversionUtils.getLong(newPayload.get(UPDATED_BEFORE_FIELD));
+ request.withEndTimestamp( timestamp );
+ }
+
+ return executeAndCreateResponse( request, callback );
+ }
+
+ /**
+ * Execute the request and return the response.
+ */
+ private ApiResponse executeAndCreateResponse(final CollectionDeleteRequestBuilder request, final String callback ) {
+
+
+ final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection( request );
+
+ final ApiResponse response = createApiResponse();
+
+ response.setAction( "clear collection" );
+ response.setProperty( "jobId", status.getJobId() );
+ response.setProperty( "status", status.getStatus() );
+ response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
+ response.setProperty( "numberQueued", status.getNumberProcessed() );
+ response.setSuccess();
+
+ return response;
+ }
+
}